Each merge/update on a table with liquid clustering force the streaming to read whole table.
Databricks Runtime: 14.3 LTS
Below I prepare a simple scripts to reproduce the issue:
Create schema.
%sql
CREATE SCHEMA IF NOT EXISTS test;
Create table with simple structure.
Timestamp of a value, id, value, year/month/day (for clustering).
Table is clustered by year+month+day and pointId.
deletion vectors enabled.
%sql
CREATE TABLE IF NOT EXISTS test.test_values (
tsUtc TIMESTAMP NOT NULL,
pointId STRING NOT NULL,
value DOUBLE NOT NULL,
pYear INT NOT NULL,
pMonth INT NOT NULL,
pDay INT NOT NULL,
tsInserted TIMESTAMP NOT NULL
)
USING delta
CLUSTER BY (pYear, pMonth, pDay, pointId)
TBLPROPERTIES ('delta.enableDeletionVectors' = true)
;
Start a stream to read test.test_values table and print the count of retrieved records.
%python
from pyspark.sql import DataFrame
_table_name = "test.test_values"
_source_df = spark.readStream.format("delta").option("ignoreChanges", "true").table(_table_name)
def upsert_batch(df: DataFrame, batch_id: int):
print(f"Number of records: {df.count()}")
_query_name = "test_query_1"
_processing_time = "30 seconds"
sink = (_source_df
.writeStream
.foreachBatch(upsert_batch)
.trigger(processingTime=_processing_time)
.queryName(_query_name)
.start())
sink.awaitTermination()
Insert test data for 10 different point, with 1000 records of randomly generated timestamp and value.
WITH seq AS (
SELECT explode(sequence(1, 1000)) AS id
),
points AS (
SELECT explode(array(
'point1', 'point2', 'point3', 'point4', 'point5',
'point6', 'point7', 'point8', 'point9', 'point10')
) AS pointId
),
cross_joined AS (
SELECT p.pointId, s.id
FROM points p
CROSS JOIN seq s
),
randomized AS (
SELECT
pointId,
current_timestamp() AS tsUtc,
rand() as value,
'2024' AS pYear,
'5' AS pMonth,
'24' AS pDay,
current_timestamp() AS tsInserted
FROM cross_joined
)
INSERT INTO test.test_values(tsUtc, pointId, value, pYear, pMonth, pDay, tsInserted)
SELECT tsUtc, pointId, value, pYear, pMonth, pDay, tsInserted
FROM randomized;
After that stream reads 10k records, which is correct.
Number of records: 10000
After "identified" an incorrect data, and update should be performed for all values for pointId=point1 and whole day of the 24th of May.
%sql
update test.test_values
set tsInserted=now(), value = value + 1
where pointId="point1" and pyear=2024 and pmonth=5 and pday=24;
The expected number of records retrieved by stream is 1000, the exact number of updated records (each point have 1k records per day).
But in fact the retrieved numebr of records is 10000;
Number of records: 10000
Performing the same update for same point produces correct result:
Number of records: 1000
But updating data for different pointId again produces not expected result.
%sql
update test.test_values
set tsInserted=now(), value = value + 1
where pointId="point5" and pyear=2024 and pmonth=5 and pday=24;
Number of records: 9000
Recreating the table with the same structure, but partitioned (PARTITIONED BY (pYear, pMonth, pDay, pointId)) instead of clustered produces correct expected results.
What could be the reason of such behavior?