<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Streaming Reads Full Table with Liquid Clustering in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/streaming-reads-full-table-with-liquid-clustering/m-p/70693#M34127</link>
    <description>&lt;P&gt;It seems You are not using checkpoint location, is that intended?&lt;/P&gt;&lt;P&gt;&lt;A href="https://docs.databricks.com/en/structured-streaming/query-recovery.html" target="_blank"&gt;https://docs.databricks.com/en/structured-streaming/query-recovery.html&lt;/A&gt;&lt;/P&gt;&lt;P&gt;That might be the reason your streaming query is reading the whole table every time You trigger the process.&lt;/P&gt;</description>
    <pubDate>Mon, 27 May 2024 08:46:22 GMT</pubDate>
    <dc:creator>radothede</dc:creator>
    <dc:date>2024-05-27T08:46:22Z</dc:date>
    <item>
      <title>Streaming Reads Full Table with Liquid Clustering</title>
      <link>https://community.databricks.com/t5/data-engineering/streaming-reads-full-table-with-liquid-clustering/m-p/70622#M34108</link>
      <description>&lt;P&gt;Each merge/update on a table with liquid clustering force the streaming to read whole table.&lt;BR /&gt;Databricks Runtime: 14.3 LTS&lt;/P&gt;&lt;P&gt;Below I prepare a simple scripts to reproduce the issue:&lt;/P&gt;&lt;P&gt;Create schema.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;%sql
CREATE SCHEMA IF NOT EXISTS test;&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;Create table with simple structure.&lt;BR /&gt;Timestamp of a value, id, value, year/month/day (for clustering).&lt;BR /&gt;Table is clustered by year+month+day and pointId.&lt;BR /&gt;deletion vectors enabled.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;%sql
CREATE TABLE IF NOT EXISTS test.test_values (
tsUtc TIMESTAMP NOT NULL,
pointId STRING NOT NULL,
value DOUBLE NOT NULL,
pYear INT NOT NULL,
pMonth INT NOT NULL,
pDay INT NOT NULL,
tsInserted TIMESTAMP NOT NULL
)
USING delta
CLUSTER BY (pYear, pMonth, pDay, pointId)
TBLPROPERTIES ('delta.enableDeletionVectors' = true)
;&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Start a stream to read test.test_values table and print the count of retrieved records.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;%python
from pyspark.sql import DataFrame

_table_name = "test.test_values"

_source_df = spark.readStream.format("delta").option("ignoreChanges", "true").table(_table_name)

def upsert_batch(df: DataFrame, batch_id: int):
print(f"Number of records: {df.count()}")

_query_name = "test_query_1"
_processing_time = "30 seconds"
sink = (_source_df
.writeStream
.foreachBatch(upsert_batch)
.trigger(processingTime=_processing_time)
.queryName(_query_name)
.start())

sink.awaitTermination()&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;Insert test data for 10 different point, with 1000 records of randomly generated timestamp and value.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;WITH seq AS (
SELECT explode(sequence(1, 1000)) AS id
),
points AS (
SELECT explode(array(
'point1', 'point2', 'point3', 'point4', 'point5',
'point6', 'point7', 'point8', 'point9', 'point10')
) AS pointId
),
cross_joined AS (
SELECT p.pointId, s.id
FROM points p
CROSS JOIN seq s
),
randomized AS (
SELECT
pointId,
current_timestamp() AS tsUtc,
rand() as value,
'2024' AS pYear,
'5' AS pMonth,
'24' AS pDay,
current_timestamp() AS tsInserted
FROM cross_joined
)

INSERT INTO test.test_values(tsUtc, pointId, value, pYear, pMonth, pDay, tsInserted)
SELECT tsUtc, pointId, value, pYear, pMonth, pDay, tsInserted
FROM randomized;&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;After that stream reads 10k records, which is correct.&lt;BR /&gt;Number of records: 10000&lt;/P&gt;&lt;P&gt;After "identified" an incorrect data, and update should be performed for all values for pointId=point1 and whole day of the 24th of May.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;%sql
update test.test_values
set tsInserted=now(), value = value + 1
where pointId="point1" and pyear=2024 and pmonth=5 and pday=24;&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;The expected number of records retrieved by stream is 1000, the exact number of updated records (each point have 1k records per day).&lt;BR /&gt;But in fact the retrieved numebr of records is 10000;&lt;BR /&gt;Number of records: 10000&lt;/P&gt;&lt;P&gt;Performing the same update for same point produces correct result:&lt;BR /&gt;Number of records: 1000&lt;/P&gt;&lt;P&gt;But updating data for different pointId again produces not expected result.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;%sql
update test.test_values
set tsInserted=now(), value = value + 1
where pointId="point5" and pyear=2024 and pmonth=5 and pday=24;&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Number of records: 9000&lt;/P&gt;&lt;P&gt;Recreating the table with the same structure, but partitioned (PARTITIONED BY (pYear, pMonth, pDay, pointId)) instead of clustered produces correct expected results.&lt;/P&gt;&lt;P&gt;What could be the reason of such behavior?&lt;/P&gt;</description>
      <pubDate>Sat, 25 May 2024 08:25:09 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/streaming-reads-full-table-with-liquid-clustering/m-p/70622#M34108</guid>
      <dc:creator>bampo</dc:creator>
      <dc:date>2024-05-25T08:25:09Z</dc:date>
    </item>
    <item>
      <title>Re: Streaming Reads Full Table with Liquid Clustering</title>
      <link>https://community.databricks.com/t5/data-engineering/streaming-reads-full-table-with-liquid-clustering/m-p/70675#M34119</link>
      <description>&lt;P&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/105716"&gt;@bampo&lt;/a&gt;&amp;nbsp;&lt;BR /&gt;Partitioning is splitting the data in multiple parquet files, that's why it's reading the table partially.&lt;BR /&gt;Liquid clustering behaves differently. With that small amount of data, it's keeping all of the data in the single file, because it's the most efficient way of handling a small data volume.&lt;BR /&gt;&lt;BR /&gt;This video explains what liquid clustering is about - &lt;A href="https://www.youtube.com/watch?v=5t6wX28JC_M" target="_blank"&gt;https://www.youtube.com/watch?v=5t6wX28JC_M&lt;/A&gt;&lt;/P&gt;</description>
      <pubDate>Mon, 27 May 2024 07:37:54 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/streaming-reads-full-table-with-liquid-clustering/m-p/70675#M34119</guid>
      <dc:creator>daniel_sahal</dc:creator>
      <dc:date>2024-05-27T07:37:54Z</dc:date>
    </item>
    <item>
      <title>Re: Streaming Reads Full Table with Liquid Clustering</title>
      <link>https://community.databricks.com/t5/data-engineering/streaming-reads-full-table-with-liquid-clustering/m-p/70688#M34126</link>
      <description>&lt;P&gt;Thanks for the reply!&lt;/P&gt;&lt;P&gt;But it doesn't explain why after the first update whole table is returned (10k records), and after second update (of the same data) only updated records are returned (1k records).&lt;/P&gt;&lt;P&gt;Also worth to mention, that in my example I use small amount of data only for showcase reason. On our test environment the table consists of more than 30 millions of records, and after updating 200 of them, stream reads the whole table (30m+ records) which is unacceptable.&lt;/P&gt;</description>
      <pubDate>Mon, 27 May 2024 08:18:42 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/streaming-reads-full-table-with-liquid-clustering/m-p/70688#M34126</guid>
      <dc:creator>bampo</dc:creator>
      <dc:date>2024-05-27T08:18:42Z</dc:date>
    </item>
    <item>
      <title>Re: Streaming Reads Full Table with Liquid Clustering</title>
      <link>https://community.databricks.com/t5/data-engineering/streaming-reads-full-table-with-liquid-clustering/m-p/70693#M34127</link>
      <description>&lt;P&gt;It seems You are not using checkpoint location, is that intended?&lt;/P&gt;&lt;P&gt;&lt;A href="https://docs.databricks.com/en/structured-streaming/query-recovery.html" target="_blank"&gt;https://docs.databricks.com/en/structured-streaming/query-recovery.html&lt;/A&gt;&lt;/P&gt;&lt;P&gt;That might be the reason your streaming query is reading the whole table every time You trigger the process.&lt;/P&gt;</description>
      <pubDate>Mon, 27 May 2024 08:46:22 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/streaming-reads-full-table-with-liquid-clustering/m-p/70693#M34127</guid>
      <dc:creator>radothede</dc:creator>
      <dc:date>2024-05-27T08:46:22Z</dc:date>
    </item>
    <item>
      <title>Re: Streaming Reads Full Table with Liquid Clustering</title>
      <link>https://community.databricks.com/t5/data-engineering/streaming-reads-full-table-with-liquid-clustering/m-p/70695#M34128</link>
      <description>&lt;P&gt;Thanks for the reply,&lt;/P&gt;&lt;P&gt;In the example checkpoint is not used for simplification, but in real scenario it is used for sure.&lt;/P&gt;&lt;P&gt;Described scenario reproduces with a single trigger of a stream (without re-triggering the process), so checkpoint is not an issue in this case.&lt;/P&gt;</description>
      <pubDate>Mon, 27 May 2024 08:58:16 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/streaming-reads-full-table-with-liquid-clustering/m-p/70695#M34128</guid>
      <dc:creator>bampo</dc:creator>
      <dc:date>2024-05-27T08:58:16Z</dc:date>
    </item>
  </channel>
</rss>

