I'm running databricks version 10.4 on gcp. I'm running a structured stream trying to process historical files in a delta table on gcp cloud storage. This source delta table is big but maintained with OPTIMIZE.
The stream repartitions which seems to be the big issue.
I am using .trigger(availableNow=True). Shuffle write is huge but query seems to proceed well until last stage of write, i.e. 199/200 completed.
Changing spark.sql.shuffle.partitions or maxFilesPerTrigger seem to have NO effect on the processing.
This is the stage:
org.apache.spark.sql.streaming.DataStreamWriter.toTable(DataStreamWriter.scala:361)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
py4j.Gateway.invoke(Gateway.java:295)
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
py4j.commands.CallCommand.execute(CallCommand.java:79)
py4j.GatewayConnection.run(GatewayConnection.java:251)
java.lang.Thread.run(Thread.java:748)
and stream config:
.repartition('date', 'id')\
.writeStream\
.trigger(availableNow=True)\
.option('checkpointLocation', f'{checkpoint_basepath}{dest_database}-{model}-{full_source_table.replace(".","-")}')\
.format('delta')\
.queryName(model)\
.outputMode('append')\
.option('mergeSchema', 'true')\
.partitionBy('date', 'id')\
.toTable(table_name)