cancel
Showing results for 
Search instead for 
Did you mean: 
Machine Learning
Dive into the world of machine learning on the Databricks platform. Explore discussions on algorithms, model training, deployment, and more. Connect with ML enthusiasts and experts.
cancel
Showing results for 
Search instead for 
Did you mean: 

Catch-up Structured Stream hangs on last step of write job to delta sync using toTable

elementalM
New Contributor III

I'm running databricks version 10.4 on gcp. I'm running a structured stream trying to process historical files in a delta table on gcp cloud storage. This source delta table is big but maintained with OPTIMIZE.

The stream repartitions which seems to be the big issue.

I am using .trigger(availableNow=True). Shuffle write is huge but query seems to proceed well until last stage of write, i.e. 199/200 completed.

Changing spark.sql.shuffle.partitions or maxFilesPerTrigger seem to have NO effect on the processing.

This is the stage:

org.apache.spark.sql.streaming.DataStreamWriter.toTable(DataStreamWriter.scala:361)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
py4j.Gateway.invoke(Gateway.java:295)
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
py4j.commands.CallCommand.execute(CallCommand.java:79)
py4j.GatewayConnection.run(GatewayConnection.java:251)
java.lang.Thread.run(Thread.java:748)
 

imageand stream config:

        .repartition('date', 'id')\
        .writeStream\
        .trigger(availableNow=True)\
        .option('checkpointLocation', f'{checkpoint_basepath}{dest_database}-{model}-{full_source_table.replace(".","-")}')\
        .format('delta')\
        .queryName(model)\
        .outputMode('append')\
        .option('mergeSchema', 'true')\
        .partitionBy('date', 'id')\
        .toTable(table_name)

4 REPLIES 4

Hubert-Dudek
Esteemed Contributor III

Can you try:

  • remove megeSchema
  • remove repartition
  • analyze details about task 199
  • try maxBytesPerTrigger in readStream

thanks for the response. have you seen maxBytesPerTrigger work with latest versions?

You might have a data skew because you are doing repartition("date","id"). Do you have more data for some days? or Ids? have you try by removing this repartition() step?

Anonymous
Not applicable

Hi @Dwight Branscombe​ 

Hope all is well! Just wanted to check in if you were able to resolve your issue and would you be happy to share the solution or mark an answer as best? Else please let us know if you need more help. 

We'd love to hear from you.

Thanks!

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!