cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Enabling Adaptive Query Execution and Cost-Based Optimizer in Structured Streaming foreachBatch

tlecomte
New Contributor III

Dear Databricks community,

I am using Spark Structured Streaming to move data from silver to gold in an ETL fashion. The source stream is the change data feed of a Delta table in silver. The streaming dataframe is transformed and joined with a couple of (non-streamed) Delta tables.

Because of the multiple joins, the tasks are significantly skewed. So we believe the operation would greatly benefit from Adaptive Query Execution and Cost Based Optimizer. These 2 options are disabled by default on streaming datasets. My understanding of the comments at https://github.com/apache/spark/blob/87a235c2143449bd8da0acee4ec3cd99993155bb/sql/core/src/main/scal... is that these options are disabled because of incompatibilities with stateful streaming queries (stream-stream joins or stream aggregations). Since my streaming queries are stateless, I've tested enabling the 2 options.

The code looks like this:

val streamingDF = spark.readStream.format("delta").option("readChangeFeed", value = true).load("silver.sourceTable")      
 
val joinedTableDF = spark.read.format("delta").load("silver.joinedTable")
 
streamingDF.as("T")
.join(joinedTableDF.as("TJoin"), $"TJoin.JoinKey" === $"T.JoinKey")
.select(...)
.writeStream.foreachBatch((outputDf: DataFrame, bid: Long) => {
 
    // enabling AQE and CBO for performance since streaming query is stateless
    outputDf.sparkSession.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
    outputDf.sparkSession.conf.set(SQLConf.CBO_ENABLED.key, "true")
 
    val goldTable = DeltaTable.forName("gold.targetTable")
 
    val mergeCondition = matchingColumns.map(x => s"gold.$x = change_data.$x").mkString(" AND ")
    goldTable.as("gold")
      .merge(
        latestChangeData.as("change_data"),
        "gold.Key = change_data.Key"
      )
      .whenMatched("change_data._change_type = 'delete'")
      .delete()
      .whenMatched("change_data._change_type = 'update_postimage'")
      .updateAll()
      .whenNotMatched("change_data._change_type != 'delete'")
      .insertAll()
      .execute()
}).start()

When testing with vanilla Spark 3.3 on a local standalone cluster, or in a CI pipeline, the queries work as expected. However, when running the same code in a Databricks cluster (11.3 LTS runtime), I get the following error:

org.apache.spark.SparkException: Unexpected build plan for Executor Side Broadcast Join: ReusedStaticSubplan Exchange SinglePartition, EXECUTOR_BROADCAST, [plan_id=1687939], Exchange SinglePartition, EXECUTOR_BROADCAST, [plan_id=1619980]
        at org.apache.spark.sql.execution.joins.ExecutorBroadcast$.getShuffleIdFromPlan(ExecutorBroadcast.scala:160)
        at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.executorBroadcast$lzycompute(BroadcastHashJoinExec.scala:73)
        at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.executorBroadcast(BroadcastHashJoinExec.scala:71)
        at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:301)
        at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareRelation(BroadcastHashJoinExec.scala:321)
        at org.apache.spark.sql.execution.joins.HashJoin.codegenInner(HashJoin.scala:391)
        ...

Is it not correct to enable AQE and CBO this way ?

Is there something specific to Databricks Spark that needs adjusting to avoid the "Unexpected build plan for Executor Side Broadcast Join" error ?

Thank you very much for your help!

2 ACCEPTED SOLUTIONS

Accepted Solutions

Lingesh
New Contributor III

It's not recommended to have AQE on a Streaming query for the same reason you shared in the description. It has been documented here

View solution in original post

tlecomte
New Contributor III

This blog post is the answer to my question: Adaptive Query Execution in Structured Streaming | Databricks Blog

In summary: the need is confirmed, and Databricks Runtime 13.1 enables AQE by default in foreachBatch sinks in non-Photon clusters. Looking forward to seeing it enabled in Photon clusters too.

View solution in original post

6 REPLIES 6

Debayan
Esteemed Contributor III
Esteemed Contributor III

Hi, Searching on the error I got a change request which is in progress, could you please confirm if this helps: https://issues.apache.org/jira/browse/SPARK-17556

tlecomte
New Contributor III

Thanks debayan. SPARK-17556 looks a bit stale. There are 2 linked pull requests that are both closed but not merged, and I could not find the files from the stacktrace above in any of these 2. So it looks like the code that runs in Databricks Spark is actually different.

Hubert-Dudek
Esteemed Contributor III

How big is the batch? Not sure if AQE will provide any advantage as there are some limits (for skew joins above 256 MB, even to activate skew joins). Broadcast in opposite only work when data is small and as data is not on partitions on workers torrent mechanism will be not used. In my opinion, it doesn't make sense.

Good question. In the nominal scenario the streams come from change data capture records from the data sources. These batches are usually small, with spikes to ~10k records when bulk operations occur at the data source. Nevertheless, even when the batch itself is small, it is joined with much larger tables.

The same streams are also used for (re)initializing the table contents by sending a snapshot of the full table content from the data source. In that scenario, batches can go up to 500M records, for GB of data.

Lingesh
New Contributor III

It's not recommended to have AQE on a Streaming query for the same reason you shared in the description. It has been documented here

tlecomte
New Contributor III

This blog post is the answer to my question: Adaptive Query Execution in Structured Streaming | Databricks Blog

In summary: the need is confirmed, and Databricks Runtime 13.1 enables AQE by default in foreachBatch sinks in non-Photon clusters. Looking forward to seeing it enabled in Photon clusters too.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.