cancel
Showing results for 
Search instead for 
Did you mean: 
Machine Learning
Dive into the world of machine learning on the Databricks platform. Explore discussions on algorithms, model training, deployment, and more. Connect with ML enthusiasts and experts.
cancel
Showing results for 
Search instead for 
Did you mean: 

Catch-up Structured Stream hangs on last step of write job to delta sync using toTable

elementalM
New Contributor III

I'm running databricks version 10.4 on gcp. I'm running a structured stream trying to process historical files in a delta table on gcp cloud storage. This source delta table is big but maintained with OPTIMIZE.

The stream repartitions which seems to be the big issue.

I am using .trigger(availableNow=True). Shuffle write is huge but query seems to proceed well until last stage of write, i.e. 199/200 completed.

Changing spark.sql.shuffle.partitions or maxFilesPerTrigger seem to have NO effect on the processing.

This is the stage:

org.apache.spark.sql.streaming.DataStreamWriter.toTable(DataStreamWriter.scala:361)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
py4j.Gateway.invoke(Gateway.java:295)
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
py4j.commands.CallCommand.execute(CallCommand.java:79)
py4j.GatewayConnection.run(GatewayConnection.java:251)
java.lang.Thread.run(Thread.java:748)
 

imageand stream config:

        .repartition('date', 'id')\
        .writeStream\
        .trigger(availableNow=True)\
        .option('checkpointLocation', f'{checkpoint_basepath}{dest_database}-{model}-{full_source_table.replace(".","-")}')\
        .format('delta')\
        .queryName(model)\
        .outputMode('append')\
        .option('mergeSchema', 'true')\
        .partitionBy('date', 'id')\
        .toTable(table_name)

4 REPLIES 4

Hubert-Dudek
Esteemed Contributor III

Can you try:

  • remove megeSchema
  • remove repartition
  • analyze details about task 199
  • try maxBytesPerTrigger in readStream

thanks for the response. have you seen maxBytesPerTrigger work with latest versions?

You might have a data skew because you are doing repartition("date","id"). Do you have more data for some days? or Ids? have you try by removing this repartition() step?

Anonymous
Not applicable

Hi @Dwight Branscombe​ 

Hope all is well! Just wanted to check in if you were able to resolve your issue and would you be happy to share the solution or mark an answer as best? Else please let us know if you need more help. 

We'd love to hear from you.

Thanks!

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group