cancel
Showing results for 
Search instead for 
Did you mean: 
Community Articles
Dive into a collaborative space where members like YOU can exchange knowledge, tips, and best practices. Join the conversation today and unlock a wealth of collective wisdom to enhance your experience and drive success.
cancel
Showing results for 
Search instead for 
Did you mean: 

Databricks Kafka Multi-Stream Ingestion Architecture: Scaling Beyond Single-Stream Bottlenecks

RamuPilla
New Contributor

The Real Problem: Kafka Source Parallelism in Spark

Before discussing foreachBatch, multi-table writes, or any specific use case, it helps to understand the underlying issue. This is a problem with how Spark Structured Streaming consumes from Kafka, and it affects every Kafka-sourced streaming workload.

How Spark Reads From Kafka (The Default Behavior)

When you write a Spark Structured Streaming query against a Kafka topic, you typically write something like this:

df = (spark.readStream

    .format("kafka")

    .option("kafka.bootstrap.servers", brokers)

    .option("subscribe", "events_topic")    # 50 partitions

    .load())

# ... do anything with df ...

query = df.writeStream.format("delta").start("/path")

What happens under the hood:

  • A single StreamingQuery is launched
  • Each microbatch trigger reads from ALL 50 partitions
  • All messages are pulled into ONE DataFrame (one microbatch)
  • The microbatch is processed as a unit by Spark's task scheduler
  • Even though Spark distributes tasks across executors, the BATCH itself is serial
  • The next microbatch can only start AFTER the current one completes

The Hidden Bottleneck

Kafka was designed for partition-level parallelism. With 50 partitions, you could have 50 consumers reading concurrently. But Spark's default pattern collapses this into a single consumer-coordinator-style read. Kafka's partition-level parallelism is not utilized.

Even with minimal downstream processing, you are throttled at the source. This is why a 50-partition topic with a single-stream Spark query performs similarly to a 5-partition topic — the partition count is rarely the limiting factor.

Severity Depends on the Downstream Workload

  • Simple writeStream to one Delta table: throughput tops out around ~25K msg/sec. Workable for many use cases.
  • Stateful aggregations / windowed joins: throughput drops to ~18K msg/sec because each batch carries state computation overhead.
  • foreachBatch with multi-table routing: throughput drops to ~12K msg/sec because the bottleneck stacks — single batch consumption AND serial multi-table writes inside the batch.

This is why foreachBatch use cases are the showcase for the multi-stream pattern: the impact is most visible and immediately measurable. The same fix applies to every Kafka-sourced workload.

 

Solution: The General Multi-Stream Pattern

The Core Idea

Instead of one Spark stream reading all 50 partitions, launch N independent Spark streams, each owning an exclusive subset of partitions. The 50 partitions are split via round-robin assignment so each stream consumes about 12-13 partitions worth of data.

Critical properties of this pattern

  • Each stream has its own checkpoint location (per-stream offset state)
  • Each stream is a complete Spark Structured Streaming query (its own readStream + writeStream)
  • Each stream uses Kafka's assign option to claim specific partitions (NOT subscribe)
  • Streams run in parallel inside a single Databricks job (via ThreadPoolExecutor)
  • The downstream logic stays unchanged — it just runs in parallel

Why This Works for Any Workload

The pattern is workload-agnostic. Whatever the original single Spark stream was doing, you now do it in 4 (or 8, or N) streams in parallel:

  • If you were doing simple writeStream to a Delta table, you now have 4 parallel writeStreams to the same Delta table.
  • If you were doing windowed aggregations, you now have 4 parallel aggregations (each over its partition subset).
  • If you were doing foreachBatch with multi-table routing, you now have 4 parallel foreachBatch routers — each handling fewer partitions and fewer messages per batch.

In a foreachBatch use case, two bottlenecks stack:

  • Bottleneck 1 (source): single microbatch consumption — the same as any Kafka workload
  • Bottleneck 2 (sink): inside foreachBatch, you filter and write to 10+ Delta tables sequentially. Each write blocks the next.

Multi-stream addresses BOTH bottlenecks at once. Each stream consumes fewer partitions (smaller microbatch) AND runs its own foreachBatch in parallel (more sink writes happening simultaneously). This is why foreachBatch users see the largest improvements.

 

Architecture Diagram

The diagram below shows the side-by-side comparison: single-stream consuming all partitions on the left, multi-stream with round-robin partition assignment on the right.

RamuPilla_0-1778329737668.jpeg

Figure 1: General Multi-Stream Pattern - Same workload, parallel execution

The Showcase Use Case: foreachBatch with Multi-Table Writes

A common pattern: a single Kafka topic carries heterogeneous events that must be routed into multiple Delta tables. Each event has a discriminator field (e.g., event_type) that determines its destination table.

Concrete scenario

  • Messages have an event_type field: orders, users, payments, inventory, products, etc.
  • Single Kafka topic: events_topic with 50 partitions
  • Each event_type must be routed to its corresponding Delta table
  • Total target tables: 10+ Delta tables in Databricks
  • Each table has a different schema

foreachBatch stacks two bottlenecks

  • Sink bottleneck: inside the batch, 10+ table writes happen sequentially
  • Source bottleneck: 50 partitions → 1 microbatch → 1 thread (the general Kafka issue)
  • Combined effect: throughput drops to ~12K msg/sec
  • p99 latency: 500-800ms (the entire batch waits for all 10 writes)
  • Kafka lag grows whenever producers exceed 12K msg/sec

Multi-Stream Solution Architecture

The diagram below shows the multi-stream architecture for the foreachBatch use case. The 50 Kafka partitions are distributed via round-robin across 4 independent streams. Each stream runs its own foreachBatch with the same multi-table routing logic, executing in parallel.

RamuPilla_2-1778329737752.jpeg

PySpark Implementation

Step 1: Round-Robin Partition Assignment

def assign_partitions_round_robin(topic, brokers, num_streams):

    """

    Distribute Kafka topic partitions across streams using round-robin.

    Returns: dict mapping stream_id -> Kafka 'assign' option string

    """

    admin = KafkaAdminClient(bootstrap_servers=brokers)

    metadata = admin.describe_topics([topic])

    total_partitions = len(metadata[0]["partitions"])

   

    # Round-robin assignment

    assignments = {i: [] for i in range(num_streams)}

    for p in range(total_partitions):

        stream_id = p % num_streams

        assignments[stream_id].append(p)

   

    # Convert to Spark "assign" option format: '{"topic_name": [0, 4, 8, ...]}'

    spark_assignments = {}

    for stream_id, partitions in assignments.items():

        spark_assignments[stream_id] = json.dumps({topic: partitions})

        print(f"Stream {stream_id}: {len(partitions)} partitions -> {partitions}")

   

    return spark_assignments

Step 2: Per-Stream foreachBatch Function

from pyspark.sql.functions import col



# Define table routing config (event_type -> delta path)

TABLE_ROUTING = {

    "orders":    "/mnt/delta/orders",

    "users":     "/mnt/delta/users",

    "payments":  "/mnt/delta/payments",

    "inventory": "/mnt/delta/inventory",

    "products":  "/mnt/delta/products",

    "shipments": "/mnt/delta/shipments",

    "refunds":   "/mnt/delta/refunds",

    "reviews":   "/mnt/delta/reviews",

    "analytics": "/mnt/delta/analytics",

    "events":    "/mnt/delta/events"

}



def make_process_batch_fn(stream_id):

    """Factory that creates a foreachBatch function with stream context"""

   

    def process_batch(batch_df, batch_id):

        print(f"[Stream {stream_id}] Processing batch {batch_id}")

        batch_df.cache()

       

        # Iterate over the routing config and write to each Delta table

        for event_type, delta_path in TABLE_ROUTING.items():

            filtered = batch_df.filter(col("event_type") == event_type)

            if not filtered.isEmpty():

                (filtered.write

                    .format("delta")

                    .mode("append")

                    .save(delta_path))

       

        batch_df.unpersist()

   

    return process_batch

Step 3: Launch Streams in Parallel

assignments = assign_partitions_round_robin(

    topic=KAFKA_TOPIC,

    brokers=KAFKA_BROKERS,

    num_streams=NUM_STREAMS

)



queries = []

with ThreadPoolExecutor(max_workers=NUM_STREAMS) as executor:

    futures = []

    for stream_id, partition_assignment in assignments.items():

        future = executor.submit(

            start_stream,

            stream_id, partition_assignment,

            KAFKA_BROKERS, KAFKA_TOPIC, CHECKPOINT_BASE

        )

        futures.append(future)

   

    for future in futures:

        queries.append(future.result())



print(f"All {NUM_STREAMS} streams started.")



# C: Wait for all streams (or handle in monitoring loop)

for q in queries:

    q.awaitTermination()

 

Why Round-Robin Handles Partition Changes Gracefully

Round-robin distributes partitions evenly because every Nth partition lands on the same stream. When partitions increase, new ones slot naturally into existing streams. When they decrease, the remaining partitions stay balanced. Each stream's load only shifts by ±1 partition regardless of how the count changes, so no stream gets overwhelmed or starved.

The formula partition_id % num_streams is stateless and deterministic, meaning rebalancing is just recomputation — no coordination, no migration overhead.

Key Takeaways from Benchmarks

  • Multi-stream helps EVERY workload — at minimum 3x throughput improvement
  • foreachBatch use cases see the largest gains (5-7x) due to addressing both stacked bottlenecks
  • Sweet spot for 50 partitions: 5-10 streams
  • CPU efficiency: 80% across 4 cores is preferable to 100% on 1 core (better headroom for spikes)
  • Recovery time: per-stream checkpoint provides isolated failure recovery

 

 

Other Applications (Beyond foreachBatch)

The multi-stream pattern is not limited to foreachBatch — it boosts parallelism for any Kafka-sourced Spark workload:

  1. Simple Kafka → Single Delta Table: Even basic ingestion gains 3x throughput when 50+ partitions are split across parallel streams writing to the same table.
  2. Windowed Aggregations & Joins: Each stream maintains its own state store on its partition subset, enabling parallel stateful processing.
  3. Multi-Sink Writes (Delta + Elasticsearch + S3): Each stream writes to all sinks in parallel — multiplying throughput across every downstream system.
  4. Real-Time ML Scoring: Each stream applies the same model to its partition subset, scaling inference horizontally without a global bottleneck.
  5. CDC Pipelines: Distribute INSERT/UPDATE/DELETE events across streams; each handles its share of changes with isolated checkpoints.

When NOT to Use This Pattern

Multi-stream is useful but not always the right tool. Skip it when:

  1. Low Partition Count: Fewer than 10 partitions — overhead exceeds the gains.
  2. Throughput SLA Already Met: Single-stream comfortably meets your throughput target — keep the simpler design.
  3. Global Ordering Required: Strict global ordering across all partitions is needed (multi-stream loses ordering guarantees across streams).
  4. Cross-Partition Stateful Operations: Stateful operations need GLOBAL state across all partitions (workaround: aggregate per-stream, merge downstream).
  5. Operational Capacity: The team cannot handle monitoring N streams, or the cluster is undersized to run N parallel queries.

 

 

Key Insight

Multi-stream is a GENERAL pattern for Kafka source parallelism in Spark. foreachBatch use cases are the showcase — where benefits are most visible — but the same pattern accelerates simple writes, aggregations, and joins.

When to Apply

Apply multi-stream when ALL of these hold:

  • Single-stream throughput is below your SLA
  • Kafka topic has 15+ partitions (and growing)
  • p99 latency exceeds your target
  • Cluster CPU shows single-core saturation while others sit idle
  • Operational maturity: the team can monitor N streams

Final Thoughts

The multi-stream pattern is a general-purpose tool. It does not change WHAT you do (your foreachBatch logic, aggregations, joins, simple writes — all unchanged) — it changes HOW MUCH parallelism you achieve. By splitting one stream into multiple streams, each owning fewer partitions, you achieve the partition-level parallelism Kafka supports.

Start with foreachBatch use cases if you have them — that is where the gains are largest. The same pattern works equally well for simple Kafka-to-Delta writes, stateful aggregations, multi-sink topologies, ML pipelines, and CDC ingestion.

Note on numbers: The throughput and latency numbers in this article are illustrative, based on typical patterns observed in production-scale Databricks deployments. Actual results vary by cluster size, message size, partition count, and downstream sink characteristics. Treat them as directional guidance, and benchmark against your own workload.

0 REPLIES 0