cancel
Showing results for 
Search instead for 
Did you mean: 
Technical Blog
Explore in-depth articles, tutorials, and insights on data analytics and machine learning in the Databricks Technical Blog. Stay updated on industry trends, best practices, and advanced techniques.
cancel
Showing results for 
Search instead for 
Did you mean: 
craig_lukasik
Databricks Employee
Databricks Employee

Since 2022, Databricks Engineering has been on a mission to simplify streaming workloads through Project Lightspeed. We’ve democratized stateful processing with features like transformWithState and the State Reader API.

Now, with Databricks Runtime 18, we are addressing one of the most persistent headaches in streaming optimization: Shuffle tuning.

We are excited to announce that Adaptive Query Execution (AQE) and Auto Optimized Shuffle (AOS) are now fully supported for stateless Structured Streaming queries. This update brings the "set it and forget it" simplicity of batch processing to your streaming pipelines.

Shuffle and When it Occurs

Shuffle is the redistribution (repartitioning) of data across executors and partitions, usually inserted as an Exchange between stages; it moves rows over the network and is one of the costliest operators. If you are unsure whether your streaming job includes a shuffle stage, the Spark UI can help you determine whether it is involved in the Spark plan, as not all streaming workloads require shuffle. A Spark query plan may include a shuffle stage for various reasons, including:

  • Joins, aggregations, and window operations: These introduce exchanges and therefore shuffles. 
  • Global sorting: orderBy typically requires a shuffle for global ordering. 
  • Explicit repartitioning: repartition(...) or output partitionBy(...) forces data redistribution before the write. 
  • Non-broadcast joins: If the smaller side isn’t broadcast, the plan uses shuffle-based joins; AQE may switch to broadcast at runtime when thresholds allow.

The Problem: The "200 Partition" Trap

If you’ve optimized streaming pipelines before DBR 18, you know the struggle.

  • The "Auto" Illusion: Setting spark.sql.shuffle.partitions="auto" in a streaming query often did nothing, silently falling back to the default 200 partitions. This left clusters underutilized on large workloads or created too many small files on tiny ones.
  • Manual Tuning: You had to hard-code partition counts (e.g., 4000) based on peak traffic, leading to inefficiency during quiet periods.
  • The foreachBatch Hack: To get around this, many developers resorted to complex foreachBatch workarounds to force a static batch plan, thereby triggering AQE. While effective, this added unnecessary complexity and maintenance overhead.

The Solution: Adaptive Streaming in DBR 18

In DBR 18, the engine is finally smart enough to handle this for you. For stateless queries (like filters, projections, and stream-static joins), Spark now leverages the same adaptive intelligence used in batch jobs.

Auto Optimized Shuffle (AOS) "Just Works"

When you set spark.sql.shuffle.partitions to "auto", it actually behaves as "auto".

  • How it works: AOS automatically determines the initial pre-shuffle partition count based on the estimated size of the micro-batch.
  • The Impact: No more hard-coded magic numbers. Whether your micro-batch is 100MB or 100GB, AOS sizes the shuffle correctly.

AQE Enters the Chat

Once the shuffle happens, Adaptive Query Execution (AQE) steps in to optimize the plan on the fly.

  • Coalescing: AQE combines small, empty partitions into properly sized ones, preventing the "small file problem" downstream.
  • Skew Handling: It detects skewed data partitions and splits them dynamically to prevent straggler tasks from holding up the stream.
  • Join Optimization: It can switch join strategies (e.g., SortMerge to Broadcast) at runtime based on actual data volume.

Why This Matters

  • Simpler Code: You can delete those foreachBatch wrappers and manual config overrides. Your streaming code focuses on business logic, not infrastructure tuning.
  • Better Performance: Your streams instantly adapt to changes in data volume. A spike in traffic results in more partitions; a lull leads to fewer.
  • Cost Efficiency: No more over-provisioned clusters waiting on a few skewed tasks. AQE ensures resources are used efficiently.

Getting Started

Upgrading is simple. Move your workspace to DBR 18+, and these optimizations are enabled by default for stateless queries.

  • AQE: Enabled by default (spark.sql.adaptive.streaming.stateless.enabled = true).
  • AOS: Just set spark.sql.shuffle.partitions = "auto".

Note: These improvements currently apply to stateless operators. Complex stateful repartitioning requires different heuristics and remains a focus for future Project Lightspeed efforts.

Conclusion

With DBR 18, we are one step closer to a world where you simply write business logic, and the engine handles the rest. By bringing AQE and AOS to stateless streaming, we’re removing the "tuning tax" from data engineers, letting you focus on building value rather than managing partitions.

Stay tuned for more updates from Project Lightspeed!

FAQ: AQE and AOS in Streaming

Q: Does this update apply to stateful streaming operations like aggregations or mapGroupsWithState? A: No. Currently, these AQE and AOS improvements apply specifically to stateless queries (e.g., filters, projections, stream-static joins). Stateful repartitioning requires strict state store compatibility and is not yet supported by these specific features.

Q: Do I need to change my code to enable AQE for streaming? A: In DBR 18+, AQE for stateless streaming is enabled by default via spark.sql.adaptive.streaming.stateless.enabled. You do not need to change code, though you should ensure spark.sql.shuffle.partitions is set to "auto" to leverage AOS.

Q: How does AOS determine the partition count for a micro-batch? A: AOS estimates the size of the incoming micro-batch data and calculates an appropriate number of pre-shuffle partitions, ensuring the cluster isn't over-provisioned for small batches or choked by large ones.

Q: Will this fix "small file" problems in my downstream Delta tables? A: Yes, indirectly. By using AQE to coalesce small partitions during the write phase, the engine writes fewer, larger files to storage, reducing the need for aggressive downstream OPTIMIZE or auto-compaction jobs.

Q: Can I remove my existing foreachBatch logic? A: If you were using foreachBatch solely to trigger batch-style AQE optimizations for a stateless write, you can likely revert to a standard streaming write in DBR 18. If the foreachBatch contains other custom logic (such as merging to external systems), you must keep it.