cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Enabling Adaptive Query Execution and Cost-Based Optimizer in Structured Streaming foreachBatch

tlecomte
New Contributor III

Dear Databricks community,

I am using Spark Structured Streaming to move data from silver to gold in an ETL fashion. The source stream is the change data feed of a Delta table in silver. The streaming dataframe is transformed and joined with a couple of (non-streamed) Delta tables.

Because of the multiple joins, the tasks are significantly skewed. So we believe the operation would greatly benefit from Adaptive Query Execution and Cost Based Optimizer. These 2 options are disabled by default on streaming datasets. My understanding of the comments at https://github.com/apache/spark/blob/87a235c2143449bd8da0acee4ec3cd99993155bb/sql/core/src/main/scal... is that these options are disabled because of incompatibilities with stateful streaming queries (stream-stream joins or stream aggregations). Since my streaming queries are stateless, I've tested enabling the 2 options.

The code looks like this:

val streamingDF = spark.readStream.format("delta").option("readChangeFeed", value = true).load("silver.sourceTable")      
 
val joinedTableDF = spark.read.format("delta").load("silver.joinedTable")
 
streamingDF.as("T")
.join(joinedTableDF.as("TJoin"), $"TJoin.JoinKey" === $"T.JoinKey")
.select(...)
.writeStream.foreachBatch((outputDf: DataFrame, bid: Long) => {
 
    // enabling AQE and CBO for performance since streaming query is stateless
    outputDf.sparkSession.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
    outputDf.sparkSession.conf.set(SQLConf.CBO_ENABLED.key, "true")
 
    val goldTable = DeltaTable.forName("gold.targetTable")
 
    val mergeCondition = matchingColumns.map(x => s"gold.$x = change_data.$x").mkString(" AND ")
    goldTable.as("gold")
      .merge(
        latestChangeData.as("change_data"),
        "gold.Key = change_data.Key"
      )
      .whenMatched("change_data._change_type = 'delete'")
      .delete()
      .whenMatched("change_data._change_type = 'update_postimage'")
      .updateAll()
      .whenNotMatched("change_data._change_type != 'delete'")
      .insertAll()
      .execute()
}).start()

When testing with vanilla Spark 3.3 on a local standalone cluster, or in a CI pipeline, the queries work as expected. However, when running the same code in a Databricks cluster (11.3 LTS runtime), I get the following error:

org.apache.spark.SparkException: Unexpected build plan for Executor Side Broadcast Join: ReusedStaticSubplan Exchange SinglePartition, EXECUTOR_BROADCAST, [plan_id=1687939], Exchange SinglePartition, EXECUTOR_BROADCAST, [plan_id=1619980]
        at org.apache.spark.sql.execution.joins.ExecutorBroadcast$.getShuffleIdFromPlan(ExecutorBroadcast.scala:160)
        at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.executorBroadcast$lzycompute(BroadcastHashJoinExec.scala:73)
        at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.executorBroadcast(BroadcastHashJoinExec.scala:71)
        at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:301)
        at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareRelation(BroadcastHashJoinExec.scala:321)
        at org.apache.spark.sql.execution.joins.HashJoin.codegenInner(HashJoin.scala:391)
        ...

Is it not correct to enable AQE and CBO this way ?

Is there something specific to Databricks Spark that needs adjusting to avoid the "Unexpected build plan for Executor Side Broadcast Join" error ?

Thank you very much for your help!

2 ACCEPTED SOLUTIONS

Accepted Solutions

Lingesh
Databricks Employee
Databricks Employee

It's not recommended to have AQE on a Streaming query for the same reason you shared in the description. It has been documented here

View solution in original post

tlecomte
New Contributor III

This blog post is the answer to my question: Adaptive Query Execution in Structured Streaming | Databricks Blog

In summary: the need is confirmed, and Databricks Runtime 13.1 enables AQE by default in foreachBatch sinks in non-Photon clusters. Looking forward to seeing it enabled in Photon clusters too.

View solution in original post

6 REPLIES 6

Debayan
Databricks Employee
Databricks Employee

Hi, Searching on the error I got a change request which is in progress, could you please confirm if this helps: https://issues.apache.org/jira/browse/SPARK-17556

tlecomte
New Contributor III

Thanks debayan. SPARK-17556 looks a bit stale. There are 2 linked pull requests that are both closed but not merged, and I could not find the files from the stacktrace above in any of these 2. So it looks like the code that runs in Databricks Spark is actually different.

Hubert-Dudek
Esteemed Contributor III

How big is the batch? Not sure if AQE will provide any advantage as there are some limits (for skew joins above 256 MB, even to activate skew joins). Broadcast in opposite only work when data is small and as data is not on partitions on workers torrent mechanism will be not used. In my opinion, it doesn't make sense.

Good question. In the nominal scenario the streams come from change data capture records from the data sources. These batches are usually small, with spikes to ~10k records when bulk operations occur at the data source. Nevertheless, even when the batch itself is small, it is joined with much larger tables.

The same streams are also used for (re)initializing the table contents by sending a snapshot of the full table content from the data source. In that scenario, batches can go up to 500M records, for GB of data.

Lingesh
Databricks Employee
Databricks Employee

It's not recommended to have AQE on a Streaming query for the same reason you shared in the description. It has been documented here

tlecomte
New Contributor III

This blog post is the answer to my question: Adaptive Query Execution in Structured Streaming | Databricks Blog

In summary: the need is confirmed, and Databricks Runtime 13.1 enables AQE by default in foreachBatch sinks in non-Photon clusters. Looking forward to seeing it enabled in Photon clusters too.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group