Catch-up Structured Stream hangs on last step of write job to delta sync using toTable
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
09-30-2022 10:41 AM
I'm running databricks version 10.4 on gcp. I'm running a structured stream trying to process historical files in a delta table on gcp cloud storage. This source delta table is big but maintained with OPTIMIZE.
The stream repartitions which seems to be the big issue.
I am using .trigger(availableNow=True). Shuffle write is huge but query seems to proceed well until last stage of write, i.e. 199/200 completed.
Changing spark.sql.shuffle.partitions or maxFilesPerTrigger seem to have NO effect on the processing.
This is the stage:
org.apache.spark.sql.streaming.DataStreamWriter.toTable(DataStreamWriter.scala:361)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
py4j.Gateway.invoke(Gateway.java:295)
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
py4j.commands.CallCommand.execute(CallCommand.java:79)
py4j.GatewayConnection.run(GatewayConnection.java:251)
java.lang.Thread.run(Thread.java:748)
and stream config:
.repartition('date', 'id')\
.writeStream\
.trigger(availableNow=True)\
.option('checkpointLocation', f'{checkpoint_basepath}{dest_database}-{model}-{full_source_table.replace(".","-")}')\
.format('delta')\
.queryName(model)\
.outputMode('append')\
.option('mergeSchema', 'true')\
.partitionBy('date', 'id')\
.toTable(table_name)
- Labels:
-
Databricks version
-
Delta table
-
Job
-
Stream
-
Version
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
09-30-2022 11:26 AM
Can you try:
- remove megeSchema
- remove repartition
- analyze details about task 199
- try maxBytesPerTrigger in readStream
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
09-30-2022 11:35 AM
thanks for the response. have you seen maxBytesPerTrigger work with latest versions?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-03-2022 10:55 AM
You might have a data skew because you are doing repartition("date","id"). Do you have more data for some days? or Ids? have you try by removing this repartition() step?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-19-2022 03:21 AM
Hi @Dwight Branscombe
Hope all is well! Just wanted to check in if you were able to resolve your issue and would you be happy to share the solution or mark an answer as best? Else please let us know if you need more help.
We'd love to hear from you.
Thanks!