cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Size of output data increased 4 times average size.

adhi_databricks
Contributor

Hey guys,

We have a databricks job, which dumps data in S3 at daily level, and average size of file would be 60GB and file format is ORC, one inner join operation was taking more than 3hrs , when debugged the join was not auto-broadcasted and it was doing a sort-merge join by defualt where data skew lead to run the job for more than 3 hours.
Cluster config : 14.3 LTS r-fleet.4xlarge (Driver) and 2-50 workers r-fleet.2xlarge.

After we found it was not broadcasting the smaller table automatically, we explicitly mentioned to broadcast the join, and dumping the same in orc format.But this time it was 200gb of data dumped at daily level instead of 60GB on average.

Wondering why the data size have increased much, but still when we compared the older data (60GB) and this new dumped 200gb still seems to be same like row count is same and data values compared is also same.
By default its using the snappy compression codec to dump the data.

If someone have idea on this, Please let me know the reason behind this!

Thanks in advance!!

1 REPLY 1

Louis_Frolio
Databricks Employee
Databricks Employee

Hey @adhi_databricks , I did some digging and have come up with some helpful tips.  

The significant increase in file size from 60GB to 200GB after implementing broadcast join, despite having identical data, is most likely caused by poor compression efficiency due to increased output partitioning and small ORC stripes.

Root Cause

When you switched to broadcast join, the output DataFrame likely ended up with 200 partitions (the default value of `spark.sql.shuffle.partitions`). While broadcast join eliminates shuffling during the join operation itself, the resulting DataFrame still inherits the partitioning configuration, which affects how data is written to ORC files.

With sort-merge join, your data was likely consolidated into fewer, larger partitions due to the shuffle operation. After switching to broadcast join, the data became distributed across many more partitions (potentially 200), causing Spark to write many small ORC files or create files with numerous small stripes.

Why This Causes Size Bloat

ORC compression efficiency heavily depends on stripe size. The default ORC stripe size is 67MB, and Snappy compression works best with larger, continuous data blocks. When data is fragmented across 200+ partitions:

- Each partition writes small amounts of data
- Small stripes (under 2-8MB) compress poorly compared to larger ones
- Compression overhead and metadata increase proportionally
- Dictionary encoding and other ORC optimizations become less effective

Solutions

Coalesce before writing:
```python
df.coalesce(10).write.format("orc").save(output_path)
```

Adjust shuffle partitions:
```python
spark.conf.set("spark.sql.shuffle.partitions", "10")
```

Configure ORC stripe size:
```python
spark.conf.set("spark.sql.orc.stripe.size", "134217728") # 128MB
```

The key is to consolidate your data into fewer, larger partitions before writing to ORC format. This allows ORC to create larger stripes that compress much more efficiently with Snappy, bringing your file size back down to the expected 60GB range.

Hope this helps, Louis.

 

Broadcast join is an important part of Spark SQL's execution engine. When used, it performs a join on two relations by first broadcasting the smaller one to all Spark executors, then evaluating the join criteria with each executor's partitions of the other relation. When the broadcasted relation ...

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now