Hello @RevanthV , I did some digging and here are some helpful tips.
Got it โ here are fast, reproducible ways to stand up a local Kafka source and run a Spark Structured Streaming job that writes to a Delta table, plus the common fixes for the connectivity errors youโre likely seeing.
Quick setup options
-
Use a simple local Kafka via Docker Compose (KRaft mode, no ZooKeeper) and point Spark to localhost:9092. This is usually the quickest way to get a working Kafka bootstrap server for development.
-
If you want to skip local networking headaches, spin up a Databricks workspace and install the dbdemos โSpark Streaming โ Advancedโ demo; it includes Kafka ingestion and Delta writes with one command: %pip install dbdemos then dbdemos.install('streaming-sessionization').
-
For production-like behavior or more guardrails, you can ingest Kafka directly with Databricks Delta Live Tables (DLT) and Structured Streaming; itโs the same Kafka reader and Delta sink youโd use locally, just with managed orchestration and built-in reliability features.
Minimal local Docker Compose for Kafka
Save as docker-compose.yml and run docker compose up -d:
version: "3.8"
services:
kafka:
image: bitnami/kafka:latest
container_name: local-kafka
environment:
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=broker,controller
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@localhost:9094
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
- ALLOW_PLAINTEXT_LISTENER=yes
ports:
- "9092:9092"
- "9094:9094"
Then create a test topic and produce a couple events (use Confluent CLI, kcat, or the Kafka console tools). If kcat -b localhost:9092 -L lists your broker and topic, your bootstrap servers are reachable.
Spark 4.x streaming read from Kafka and write to Delta
The code is the same across Spark 3.5.x and 4.x; the key differences are the connector JAR versions. Reading uses format("kafka") with kafka.bootstrap.servers, subscribe, and startingOffsets; writing uses format("delta") plus a checkpointLocation to ensure reliable progress tracking.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
spark = (SparkSession.builder
.appName("kafka-to-delta")
.getOrCreate())
df_raw = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.option("startingOffsets", "latest")
.load())
event_schema = StructType([
StructField("id", StringType(), True),
StructField("type", StringType(), True),
StructField("count", IntegerType(), True)
])
df_parsed = (df_raw
.selectExpr("CAST(value AS STRING) AS json")
.select(from_json(col("json"), event_schema).alias("event"))
.select("event.*"))
query = (df_parsed.writeStream
.format("delta")
.option("checkpointLocation", "/tmp/chk/kafka_events")
.outputMode("append")
.start("/tmp/delta/kafka_events"))
query.awaitTermination()
Dependencies for Spark 3.5.7 vs 4.x
-
Use the Kafka connector that matches your Spark major/minor: org.apache.spark:spark-sql-kafka-0-10_2.12:<your-spark-version>. Add the matching org.apache.kafka:kafka-clients:<broker-compatible-version>. This ensures KafkaSourceProvider loads correctly at runtime.
-
Add Delta Lake libraries compatible with your Spark/Scala build and enable the Delta SQL extension/catalog (if not already bundled with your distribution). Delta Lake integrates with Structured Streaming for both sources and sinks and requires checkpointing when used as a sink.
Common connectivity errors and quick fixes
-
โNo data and no errorโ after starting the stream often means your kafka.bootstrap.servers isnโt resolvable or reachable; the Kafka client will retry indefinitely without throwing. Validate with kcat -b localhost:9092 -L and fix advertised listeners or hostnames first.
-
Wrong listener/advertised listener: your Docker broker must advertise a hostname/port accessible from the host (e.g., PLAINTEXT://localhost:9092). If it advertises an internal container name or IP, the Spark driver canโt connect. Update KAFKA_CFG_ADVERTISED_LISTENERS and restart.
-
SASL/SSL mismatch: if your broker requires TLS or SASL, set the corresponding Spark options (for example, kafka.security.protocol, kafka.ssl.*, kafka.sasl.*). Start plaintext first to validate networking, then add security configs incrementally.
-
Missing connector JARs: โNoClassDefFoundError: โฆKafkaSourceProviderโ means the spark-sql-kafka-0-10 package is absent or wrong-version. Use --packages org.apache.spark:spark-sql-kafka-0-10_2.12:<spark-version> (or add to your build) and ensure Scala version alignment.
-
Throughput/backlog issues: control ingestion using maxOffsetsPerTrigger and parallelism via minPartitions. Monitor backlog and tune these settings to keep up with incoming volume.
A โquick placeโ to do this on Spark 4.x
-
Local: Use the Docker Compose above, then spark-submit or spark-shell with matching Kafka and Delta packages. Itโs the fastest path for a local POC.
-
Managed (fastest to verify endโtoโend): Launch a small Databricks cluster, use the Kafka reader options shown above, and write to a Delta table. The same format("kafka")/format("delta") APIs apply, and the docs walk through the exact options and behaviors youโll see in the stream.
Troubleshooting checklist
-
Verify broker reachability: kcat -b localhost:9092 -L should list your topic/partitions. If it fails, fix Docker networking/advertised listeners before testing Spark.
-
Check topic subscription settings: use subscribe="events" and confirm itโs the right topic name; set startingOffsets="latest" to avoid scanning old history during POC.
-
Always set a checkpointLocation on your Delta sink to allow reliable restart and exactly-once semantics in the pipeline.
Hope this helps, Louis.