cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Spark always performing broad casts irrespective of spark.sql.autoBroadcastJoinThreshold during streaming merge operation with DeltaTable.

gauthamchettiar
New Contributor II

I am trying to do a streaming merge between delta tables using this guide - https://docs.delta.io/latest/delta-update.html#upsert-from-streaming-queries-using-foreachbatch

Our Code Sample (Java):

        Dataset<Row> sourceDf = sparkSession
            .readStream()
            .format("delta")
            .option("inferSchema", "true")
            .load(sourcePath);
        
        DeltaTable deltaTable = DeltaTable.forPath(sparkSession, targetPath);
 
        sourceDf.createOrReplaceTempView("vTempView");
        
        StreamingQuery sq = spark().sql("select * from vTempView").writeStream()
            .format("delta")
            .foreachBatch((microDf, id) -> {
                deltaTable.alias("e").merge(microDf.alias("d"), "e.SALE_ID = d.SALE_ID")
                    .whenMatched().updateAll()
                    .whenNotMatched().insertAll()
                    .execute();
            })
            .outputMode("update")
            .option("checkpointLocation", checkpointPath)
            .trigger(Trigger.Once())
            .start().awaitTermination();

Here Source path and Target path is already in sync using the checkpoint folder. Which has around 8 million rows of data amounting to around 450mb of parquet files.

When new data comes in Source Path (let's say 987 rows), then spark is trying to perform a BroadCastHashJoin, and broad casts the target table which has 8M rows.

Here's a DAG snippet for merge operation (with table with 1M rows),

BroadCastJoin 1M 

Expectation:

I am expecting smaller dataset (i.e: 987 rows) to be broadcasted. If not then at least spark should not broadcast target table, as it is not larger than provided spark.sql.autoBroadcastJoinThreshold setting and neither are we providing any broadcast hint anywhere.

Things I have tried:

I searched around and got this article - https://learn.microsoft.com/en-us/azure/databricks/kb/sql/bchashjoin-exceeds-bcjointhreshold-oom. It provides 2 solutions,

  1. Run "ANALYZE TABLE ..." (but since we are reading target table from path and not from a table this is not possible)
  2. Cache the table you are broadcasting, DeltaTable does not have any provision to cache table, so can't do this.

I thought this was because we are using DeltaTable.forPath() method for reading target table and spark is unable to calculate target table metrics. So I also tried a different approach,

Dataset<Row> sourceDf = sparkSession
            .readStream()
            .format("delta")
            .option("inferSchema", "true")
            .load(sourcePath);
        
        Dataset<Row> targetDf = sparkSession
            .read()
            .format("delta")
            .option("inferSchema", "true")
            .load(targetPath);
        
        sourceDf.createOrReplaceTempView("vtempview");
        targetDf.createOrReplaceTempView("vtemptarget");
        targetDf.cache();
        
        StreamingQuery sq = sparkSession.sql("select * from vtempview").writeStream()
            .format("delta")
            .foreachBatch((microDf, id) -> {
                microDf.createOrReplaceTempView("vtempmicrodf");
                microDf.sparkSession().sql(
                    "MERGE INTO vtemptarget as t USING vtempmicrodf as s ON t.SALE_ID = s.SALE_ID WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * "
                );
            })
            .outputMode("update")
            .option("checkpointLocation", checkpointPath)
            .trigger(Trigger.Once())
            .start().awaitTermination();

In above snippet I am also caching the targetDf so that Spark can calculate metrics and not broad cast target table. But it didn't help and spark still broad casts it.

Now I am out of options. Can anyone give me some guidance on this?

0 REPLIES 0
Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!