Spark structured streaming - not working with checkpoint location set
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
05-22-2024 03:19 AM - edited 05-22-2024 03:26 AM
We have structured streaming that reads from external delta table defined in following way:
try:
df_silver = (
spark.readStream
.format("delta")
.option("skipChangeCommits", True)
.table(src_location)
)
(
df_silver.writeStream
.trigger(availableNow = True)
.option("checkpointLocation", checkpoint_location)
.option("mergeSchema", "true")
.foreachBatch(merge_silver_to_gold)
.start()
.awaitTermination()
)
except Exception as e:
....
Only the initial load write data. All other subsequent runs doesn't load anything. Once checkpoint location is removed data is loaded correctly. Here are stats from Streaming query progress from driver logs:
{
"id" : "xxx",
"runId" : "xxx",
"name" : null,
"timestamp" : "2024-05-22T14:32:41.183Z",
"batchId" : 1,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"addBatch" : 6599,
"commitOffsets" : 90,
"getBatch" : 41,
"latestOffset" : 129,
"queryPlanning" : 27,
"triggerExecution" : 7015,
"walCommit" : 103
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "DeltaSource[xxx]",
"startOffset" : {
"sourceVersion" : 1,
"reservoirId" : "dc87ece9-02eb-404f-b24f-dc61675a4a83",
"reservoirVersion" : 274,
"index" : 4,
"isStartingVersion" : true
},
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "dc87ece9-02eb-404f-b24f-dc61675a4a83",
"reservoirVersion" : 275,
"index" : -1,
"isStartingVersion" : false
},
"latestOffset" : null,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "ForeachBatchSink",
"numOutputRows" : -1
}
}
No rows are read, even though some rows were added to source table. Value of endOffset.reservoirVersion seems to be odd. It is 275 what is equal to maximal version of source Delta Table from which query is reading. All other similar queries that we checked (other Delta Table as source, foreachBatch as sink), have this value set to one unit higher than latest Delta version, so the version that not yet exists.
Line setting the endOffset from the logs:
INFO DeltaSource: lastOffset for Trigger.AvailableNow has set to {"sourceVersion":1,"reservoirId":"dc87ece9-02eb-404f-b24f-dc61675a4a83","reservoirVersion":275,"index":-1,"isStartingVersion":false}
What we tried so far:
- Remove checkpoint location directory - initial load works fine, then all subsequent loads have similar issue as described
- Create different query, with new sink and checkpoint location that reads from the same source - issue with endOffset occurs.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
05-24-2024 11:39 AM
Hi,
I see you are using `Trigger.AvailableNow`. Is this intended to be a continuous stream or an incremental batch trigger at an interval with Databricks Workflows?
From the docs (https://docs.databricks.com/en/structured-streaming/triggers.html#configure-structured-streaming-tri...
> Databricks recommends you use Trigger.AvailableNow for all incremental batch processing workloads.
> The available now trigger option consumes all available records as an incremental batch with the ability to configure batch size with options such as maxBytesPerTrigger (sizing options vary by data source).
If intending to run in continuous mode, can you please try with a different trigger interval?
Thanks.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
05-27-2024 12:07 AM
Hi. It runs as incremental batch trigger with Workflow.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
05-27-2024 12:54 AM
Hi,
have You tried changing the checkpoint location path?
I used that below syntax previously and it worked fine for me:
stream = (
df.writeStream
.format("delta")
.trigger(availableNow=True)
.option("checkpointLocation", checkpoint_location)
.foreachBatch(...)
.start()
)
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
05-27-2024 12:55 AM
Of course. I even tried completely removing checkpoint location itself.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
05-27-2024 01:07 AM
Whats the logic of merge function?
merge_silver_to_gold
Whats the output of describe history against that destination delta table after running the streaming query?

