โ05-22-2024 03:19 AM - edited โ05-22-2024 03:26 AM
We have structured streaming that reads from external delta table defined in following way:
try:
df_silver = (
spark.readStream
.format("delta")
.option("skipChangeCommits", True)
.table(src_location)
)
(
df_silver.writeStream
.trigger(availableNow = True)
.option("checkpointLocation", checkpoint_location)
.option("mergeSchema", "true")
.foreachBatch(merge_silver_to_gold)
.start()
.awaitTermination()
)
except Exception as e:
....
Only the initial load write data. All other subsequent runs doesn't load anything. Once checkpoint location is removed data is loaded correctly. Here are stats from Streaming query progress from driver logs:
{
"id" : "xxx",
"runId" : "xxx",
"name" : null,
"timestamp" : "2024-05-22T14:32:41.183Z",
"batchId" : 1,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"addBatch" : 6599,
"commitOffsets" : 90,
"getBatch" : 41,
"latestOffset" : 129,
"queryPlanning" : 27,
"triggerExecution" : 7015,
"walCommit" : 103
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "DeltaSource[xxx]",
"startOffset" : {
"sourceVersion" : 1,
"reservoirId" : "dc87ece9-02eb-404f-b24f-dc61675a4a83",
"reservoirVersion" : 274,
"index" : 4,
"isStartingVersion" : true
},
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "dc87ece9-02eb-404f-b24f-dc61675a4a83",
"reservoirVersion" : 275,
"index" : -1,
"isStartingVersion" : false
},
"latestOffset" : null,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "ForeachBatchSink",
"numOutputRows" : -1
}
}
No rows are read, even though some rows were added to source table. Value of endOffset.reservoirVersion seems to be odd. It is 275 what is equal to maximal version of source Delta Table from which query is reading. All other similar queries that we checked (other Delta Table as source, foreachBatch as sink), have this value set to one unit higher than latest Delta version, so the version that not yet exists.
Line setting the endOffset from the logs:
INFO DeltaSource: lastOffset for Trigger.AvailableNow has set to {"sourceVersion":1,"reservoirId":"dc87ece9-02eb-404f-b24f-dc61675a4a83","reservoirVersion":275,"index":-1,"isStartingVersion":false}
What we tried so far:
โ05-24-2024 11:39 AM
Hi,
I see you are using `Trigger.AvailableNow`. Is this intended to be a continuous stream or an incremental batch trigger at an interval with Databricks Workflows?
From the docs (https://docs.databricks.com/en/structured-streaming/triggers.html#configure-structured-streaming-tri...
> Databricks recommends you use Trigger.AvailableNow for all incremental batch processing workloads.
> The available now trigger option consumes all available records as an incremental batch with the ability to configure batch size with options such as maxBytesPerTrigger (sizing options vary by data source).
If intending to run in continuous mode, can you please try with a different trigger interval?
Thanks.
โ05-27-2024 12:07 AM
Hi. It runs as incremental batch trigger with Workflow.
โ05-27-2024 12:54 AM
Hi,
have You tried changing the checkpoint location path?
I used that below syntax previously and it worked fine for me:
stream = (
df.writeStream
.format("delta")
.trigger(availableNow=True)
.option("checkpointLocation", checkpoint_location)
.foreachBatch(...)
.start()
)
โ05-27-2024 12:55 AM
Of course. I even tried completely removing checkpoint location itself.
โ05-27-2024 01:07 AM
Whats the logic of merge function?
merge_silver_to_gold
Whats the output of describe history against that destination delta table after running the streaming query?
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group