cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Broadcast Join Failure in Streaming: Failed to store executor broadcast in BlockManager

pooja_bhumandla
New Contributor III

Hi Databricks Community,

Iโ€™m running a Structured Streaming job in Databricks with foreachBatch writing to a Delta table.

Failed to store executor broadcast spark_join_relation_1622863
(size = Some(67141632)) in BlockManager with storageLevel=StorageLevel(memory, deserialized, 1 replicas)


I understand that Spark may choose a broadcast join when one side of the join is small enough to be sent to all executors.

How exactly does Spark decide when to perform a broadcast join?
What are the recommended ways to handle or avoid broadcast join memory errors in streaming operations?

Any suggestions, configuration tips, or best practices would be greatly appreciated.

Thanks in advance!

1 REPLY 1

Louis_Frolio
Databricks Employee
Databricks Employee

Greetings @pooja_bhumandla , here are some helpful hints and tips.

Diagnosis

Your error indicates that a broadcast join operation is attempting to send ~64MB of data to executors, but the BlockManager cannot store it due to memory constraints. This commonly occurs in Structured Streaming with `foreachBatch` when Spark automatically decides to broadcast a DataFrame that exceeds available executor memory.

How Spark Decides on Broadcast Joins

Spark automatically performs a broadcast join when one side of the join meets these criteria:

- The estimated table size is below `spark.sql.autoBroadcastJoinThreshold` (default: **10MB**)
- The join type is compatible (INNER, CROSS, LEFT OUTER, RIGHT OUTER, LEFT SEMI, LEFT ANTI)
- Spark's cost-based optimizer determines it's the most efficient strategy

The problem is Spark's size estimator can underestimate actual DataFrame sizes, especially after transformations, leading to broadcast attempts that exceed actual memory capacity.

Recommended Solutions

1. Disable Auto-Broadcast for Streaming Jobs

Set the threshold to -1 to prevent automatic broadcasting:

```python
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
```

2. Use Join Hints to Force SortMergeJoin

Within your `foreachBatch` function, explicitly prevent broadcasting:

```python
from pyspark.sql.functions import broadcast

def process_batch(batch_df, batch_id):
# Use NO_BROADCAST_HASH hint
result = batch_df.join(other_df.hint("merge"), "key")
result.write.format("delta").mode("append").save(path)
```

3. Collect Table Statistics

Before joins, gather accurate statistics to improve Spark's size estimation:

```sql
ANALYZE TABLE your_table COMPUTE STATISTICS
```

4. Increase Executor Memory

If broadcasts are necessary, ensure sufficient memory:

```python
spark.conf.set("spark.executor.memory", "8g")
spark.conf.set("spark.executor.memoryOverhead", "2g")
```

5. Cache Tables Strategically

If repeatedly joining with the same table in `foreachBatch`, cache it beforehand to stabilize memory estimates.

Best Practice for Streaming

For Structured Streaming jobs, disabling auto-broadcast (`-1`) is generally recommended since streaming micro-batches can have unpredictable sizes, and SortMergeJoin handles dynamic data volumes more reliably.

Hope this helps, Louis.

In this end-to-end Databricks tutorial, I explain the fundamentals of Spark Structured Streaming and its usage for building low-latency, real-time analytics solutions. In this video, I explore streaming join transformations. Chapters: 00:00- Introduction 00:23- Streaming Joins 01:37- Joining ...

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local communityโ€”sign up today to get started!

Sign Up Now