โ01-02-2023 01:05 AM
Dear Databricks community,
I am using Spark Structured Streaming to move data from silver to gold in an ETL fashion. The source stream is the change data feed of a Delta table in silver. The streaming dataframe is transformed and joined with a couple of (non-streamed) Delta tables.
Because of the multiple joins, the tasks are significantly skewed. So we believe the operation would greatly benefit from Adaptive Query Execution and Cost Based Optimizer. These 2 options are disabled by default on streaming datasets. My understanding of the comments at https://github.com/apache/spark/blob/87a235c2143449bd8da0acee4ec3cd99993155bb/sql/core/src/main/scal... is that these options are disabled because of incompatibilities with stateful streaming queries (stream-stream joins or stream aggregations). Since my streaming queries are stateless, I've tested enabling the 2 options.
The code looks like this:
val streamingDF = spark.readStream.format("delta").option("readChangeFeed", value = true).load("silver.sourceTable")
val joinedTableDF = spark.read.format("delta").load("silver.joinedTable")
streamingDF.as("T")
.join(joinedTableDF.as("TJoin"), $"TJoin.JoinKey" === $"T.JoinKey")
.select(...)
.writeStream.foreachBatch((outputDf: DataFrame, bid: Long) => {
// enabling AQE and CBO for performance since streaming query is stateless
outputDf.sparkSession.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
outputDf.sparkSession.conf.set(SQLConf.CBO_ENABLED.key, "true")
val goldTable = DeltaTable.forName("gold.targetTable")
val mergeCondition = matchingColumns.map(x => s"gold.$x = change_data.$x").mkString(" AND ")
goldTable.as("gold")
.merge(
latestChangeData.as("change_data"),
"gold.Key = change_data.Key"
)
.whenMatched("change_data._change_type = 'delete'")
.delete()
.whenMatched("change_data._change_type = 'update_postimage'")
.updateAll()
.whenNotMatched("change_data._change_type != 'delete'")
.insertAll()
.execute()
}).start()
When testing with vanilla Spark 3.3 on a local standalone cluster, or in a CI pipeline, the queries work as expected. However, when running the same code in a Databricks cluster (11.3 LTS runtime), I get the following error:
org.apache.spark.SparkException: Unexpected build plan for Executor Side Broadcast Join: ReusedStaticSubplan Exchange SinglePartition, EXECUTOR_BROADCAST, [plan_id=1687939], Exchange SinglePartition, EXECUTOR_BROADCAST, [plan_id=1619980]
at org.apache.spark.sql.execution.joins.ExecutorBroadcast$.getShuffleIdFromPlan(ExecutorBroadcast.scala:160)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.executorBroadcast$lzycompute(BroadcastHashJoinExec.scala:73)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.executorBroadcast(BroadcastHashJoinExec.scala:71)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:301)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareRelation(BroadcastHashJoinExec.scala:321)
at org.apache.spark.sql.execution.joins.HashJoin.codegenInner(HashJoin.scala:391)
...
Is it not correct to enable AQE and CBO this way ?
Is there something specific to Databricks Spark that needs adjusting to avoid the "Unexpected build plan for Executor Side Broadcast Join" error ?
Thank you very much for your help!
โ03-30-2023 01:08 PM
It's not recommended to have AQE on a Streaming query for the same reason you shared in the description. It has been documented here
โ08-01-2023 12:38 AM - edited โ08-01-2023 12:39 AM
This blog post is the answer to my question: Adaptive Query Execution in Structured Streaming | Databricks Blog
In summary: the need is confirmed, and Databricks Runtime 13.1 enables AQE by default in foreachBatch sinks in non-Photon clusters. Looking forward to seeing it enabled in Photon clusters too.
โ01-02-2023 09:27 AM
Hi, Searching on the error I got a change request which is in progress, could you please confirm if this helps: https://issues.apache.org/jira/browse/SPARK-17556
โ01-02-2023 11:50 AM
Thanks debayan. SPARK-17556 looks a bit stale. There are 2 linked pull requests that are both closed but not merged, and I could not find the files from the stacktrace above in any of these 2. So it looks like the code that runs in Databricks Spark is actually different.
โ01-03-2023 03:39 AM
How big is the batch? Not sure if AQE will provide any advantage as there are some limits (for skew joins above 256 MB, even to activate skew joins). Broadcast in opposite only work when data is small and as data is not on partitions on workers torrent mechanism will be not used. In my opinion, it doesn't make sense.
โ01-03-2023 06:22 AM
Good question. In the nominal scenario the streams come from change data capture records from the data sources. These batches are usually small, with spikes to ~10k records when bulk operations occur at the data source. Nevertheless, even when the batch itself is small, it is joined with much larger tables.
The same streams are also used for (re)initializing the table contents by sending a snapshot of the full table content from the data source. In that scenario, batches can go up to 500M records, for GB of data.
โ03-30-2023 01:08 PM
It's not recommended to have AQE on a Streaming query for the same reason you shared in the description. It has been documented here
โ08-01-2023 12:38 AM - edited โ08-01-2023 12:39 AM
This blog post is the answer to my question: Adaptive Query Execution in Structured Streaming | Databricks Blog
In summary: the need is confirmed, and Databricks Runtime 13.1 enables AQE by default in foreachBatch sinks in non-Photon clusters. Looking forward to seeing it enabled in Photon clusters too.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group