cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Streaming Reads Full Table with Liquid Clustering

bampo
New Contributor II

Each merge/update on a table with liquid clustering force the streaming to read whole table.
Databricks Runtime: 14.3 LTS

Below I prepare a simple scripts to reproduce the issue:

Create schema.

 

%sql
CREATE SCHEMA IF NOT EXISTS test;

 


Create table with simple structure.
Timestamp of a value, id, value, year/month/day (for clustering).
Table is clustered by year+month+day and pointId.
deletion vectors enabled.

 

%sql
CREATE TABLE IF NOT EXISTS test.test_values (
tsUtc TIMESTAMP NOT NULL,
pointId STRING NOT NULL,
value DOUBLE NOT NULL,
pYear INT NOT NULL,
pMonth INT NOT NULL,
pDay INT NOT NULL,
tsInserted TIMESTAMP NOT NULL
)
USING delta
CLUSTER BY (pYear, pMonth, pDay, pointId)
TBLPROPERTIES ('delta.enableDeletionVectors' = true)
;

 

Start a stream to read test.test_values table and print the count of retrieved records.

 

%python
from pyspark.sql import DataFrame

_table_name = "test.test_values"

_source_df = spark.readStream.format("delta").option("ignoreChanges", "true").table(_table_name)

def upsert_batch(df: DataFrame, batch_id: int):
print(f"Number of records: {df.count()}")

_query_name = "test_query_1"
_processing_time = "30 seconds"
sink = (_source_df
.writeStream
.foreachBatch(upsert_batch)
.trigger(processingTime=_processing_time)
.queryName(_query_name)
.start())

sink.awaitTermination()

 


Insert test data for 10 different point, with 1000 records of randomly generated timestamp and value.

 

WITH seq AS (
SELECT explode(sequence(1, 1000)) AS id
),
points AS (
SELECT explode(array(
'point1', 'point2', 'point3', 'point4', 'point5',
'point6', 'point7', 'point8', 'point9', 'point10')
) AS pointId
),
cross_joined AS (
SELECT p.pointId, s.id
FROM points p
CROSS JOIN seq s
),
randomized AS (
SELECT
pointId,
current_timestamp() AS tsUtc,
rand() as value,
'2024' AS pYear,
'5' AS pMonth,
'24' AS pDay,
current_timestamp() AS tsInserted
FROM cross_joined
)

INSERT INTO test.test_values(tsUtc, pointId, value, pYear, pMonth, pDay, tsInserted)
SELECT tsUtc, pointId, value, pYear, pMonth, pDay, tsInserted
FROM randomized;

 

After that stream reads 10k records, which is correct.
Number of records: 10000

After "identified" an incorrect data, and update should be performed for all values for pointId=point1 and whole day of the 24th of May.

 

%sql
update test.test_values
set tsInserted=now(), value = value + 1
where pointId="point1" and pyear=2024 and pmonth=5 and pday=24;

 

The expected number of records retrieved by stream is 1000, the exact number of updated records (each point have 1k records per day).
But in fact the retrieved numebr of records is 10000;
Number of records: 10000

Performing the same update for same point produces correct result:
Number of records: 1000

But updating data for different pointId again produces not expected result.

 

%sql
update test.test_values
set tsInserted=now(), value = value + 1
where pointId="point5" and pyear=2024 and pmonth=5 and pday=24;

 

Number of records: 9000

Recreating the table with the same structure, but partitioned (PARTITIONED BY (pYear, pMonth, pDay, pointId)) instead of clustered produces correct expected results.

What could be the reason of such behavior?

4 REPLIES 4

daniel_sahal
Esteemed Contributor

@bampo 
Partitioning is splitting the data in multiple parquet files, that's why it's reading the table partially.
Liquid clustering behaves differently. With that small amount of data, it's keeping all of the data in the single file, because it's the most efficient way of handling a small data volume.

This video explains what liquid clustering is about - https://www.youtube.com/watch?v=5t6wX28JC_M

bampo
New Contributor II

Thanks for the reply!

But it doesn't explain why after the first update whole table is returned (10k records), and after second update (of the same data) only updated records are returned (1k records).

Also worth to mention, that in my example I use small amount of data only for showcase reason. On our test environment the table consists of more than 30 millions of records, and after updating 200 of them, stream reads the whole table (30m+ records) which is unacceptable.

radothede
Contributor II

It seems You are not using checkpoint location, is that intended?

https://docs.databricks.com/en/structured-streaming/query-recovery.html

That might be the reason your streaming query is reading the whole table every time You trigger the process.

bampo
New Contributor II

Thanks for the reply,

In the example checkpoint is not used for simplification, but in real scenario it is used for sure.

Described scenario reproduces with a single trigger of a stream (without re-triggering the process), so checkpoint is not an issue in this case.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group