I am trying to do a streaming merge between delta tables using this guide - https://docs.delta.io/latest/delta-update.html#upsert-from-streaming-queries-using-foreachbatch
Our Code Sample (Java):
Dataset<Row> sourceDf = sparkSession
.readStream()
.format("delta")
.option("inferSchema", "true")
.load(sourcePath);
DeltaTable deltaTable = DeltaTable.forPath(sparkSession, targetPath);
sourceDf.createOrReplaceTempView("vTempView");
StreamingQuery sq = spark().sql("select * from vTempView").writeStream()
.format("delta")
.foreachBatch((microDf, id) -> {
deltaTable.alias("e").merge(microDf.alias("d"), "e.SALE_ID = d.SALE_ID")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute();
})
.outputMode("update")
.option("checkpointLocation", checkpointPath)
.trigger(Trigger.Once())
.start().awaitTermination();
Here Source path and Target path is already in sync using the checkpoint folder. Which has around 8 million rows of data amounting to around 450mb of parquet files.
When new data comes in Source Path (let's say 987 rows), then spark is trying to perform a BroadCastHashJoin, and broad casts the target table which has 8M rows.
Here's a DAG snippet for merge operation (with table with 1M rows),
Expectation:
I am expecting smaller dataset (i.e: 987 rows) to be broadcasted. If not then at least spark should not broadcast target table, as it is not larger than provided spark.sql.autoBroadcastJoinThreshold setting and neither are we providing any broadcast hint anywhere.
Things I have tried:
I searched around and got this article - https://learn.microsoft.com/en-us/azure/databricks/kb/sql/bchashjoin-exceeds-bcjointhreshold-oom. It provides 2 solutions,
- Run "ANALYZE TABLE ..." (but since we are reading target table from path and not from a table this is not possible)
- Cache the table you are broadcasting, DeltaTable does not have any provision to cache table, so can't do this.
I thought this was because we are using DeltaTable.forPath() method for reading target table and spark is unable to calculate target table metrics. So I also tried a different approach,
Dataset<Row> sourceDf = sparkSession
.readStream()
.format("delta")
.option("inferSchema", "true")
.load(sourcePath);
Dataset<Row> targetDf = sparkSession
.read()
.format("delta")
.option("inferSchema", "true")
.load(targetPath);
sourceDf.createOrReplaceTempView("vtempview");
targetDf.createOrReplaceTempView("vtemptarget");
targetDf.cache();
StreamingQuery sq = sparkSession.sql("select * from vtempview").writeStream()
.format("delta")
.foreachBatch((microDf, id) -> {
microDf.createOrReplaceTempView("vtempmicrodf");
microDf.sparkSession().sql(
"MERGE INTO vtemptarget as t USING vtempmicrodf as s ON t.SALE_ID = s.SALE_ID WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * "
);
})
.outputMode("update")
.option("checkpointLocation", checkpointPath)
.trigger(Trigger.Once())
.start().awaitTermination();
In above snippet I am also caching the targetDf so that Spark can calculate metrics and not broad cast target table. But it didn't help and spark still broad casts it.
Now I am out of options. Can anyone give me some guidance on this?