Wednesday - last edited Wednesday
I need to do some POC with spark 3.5.7 and 4.x and need some local setup with some sample Kafka source.
The POC would read data from Kafka via streaming job and write to delta table and I would like to do this on spark-4.x ..Do you know of any quick place where i can do this.? I have found a way 2 days ago via a community post..but I still get some connectivity errors.
Wednesday
Hello @RevanthV , I did some digging and here are some helpful tips.
Got it โ here are fast, reproducible ways to stand up a local Kafka source and run a Spark Structured Streaming job that writes to a Delta table, plus the common fixes for the connectivity errors youโre likely seeing.
Use a simple local Kafka via Docker Compose (KRaft mode, no ZooKeeper) and point Spark to localhost:9092. This is usually the quickest way to get a working Kafka bootstrap server for development.
If you want to skip local networking headaches, spin up a Databricks workspace and install the dbdemos โSpark Streaming โ Advancedโ demo; it includes Kafka ingestion and Delta writes with one command: %pip install dbdemos then dbdemos.install('streaming-sessionization').
For production-like behavior or more guardrails, you can ingest Kafka directly with Databricks Delta Live Tables (DLT) and Structured Streaming; itโs the same Kafka reader and Delta sink youโd use locally, just with managed orchestration and built-in reliability features.
Save as docker-compose.yml and run docker compose up -d:
version: "3.8"
services:
kafka:
image: bitnami/kafka:latest
container_name: local-kafka
environment:
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=broker,controller
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@localhost:9094
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
- ALLOW_PLAINTEXT_LISTENER=yes
ports:
- "9092:9092"
- "9094:9094"
Then create a test topic and produce a couple events (use Confluent CLI, kcat, or the Kafka console tools). If kcat -b localhost:9092 -L lists your broker and topic, your bootstrap servers are reachable.
The code is the same across Spark 3.5.x and 4.x; the key differences are the connector JAR versions. Reading uses format("kafka") with kafka.bootstrap.servers, subscribe, and startingOffsets; writing uses format("delta") plus a checkpointLocation to ensure reliable progress tracking.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
spark = (SparkSession.builder
.appName("kafka-to-delta")
.getOrCreate())
# 1) Stream from Kafka
df_raw = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.option("startingOffsets", "latest")
.load())
# 2) Parse the Kafka value payload (adjust schema to your data)
event_schema = StructType([
StructField("id", StringType(), True),
StructField("type", StringType(), True),
StructField("count", IntegerType(), True)
])
df_parsed = (df_raw
.selectExpr("CAST(value AS STRING) AS json")
.select(from_json(col("json"), event_schema).alias("event"))
.select("event.*"))
# 3) Write to Delta with checkpointing
query = (df_parsed.writeStream
.format("delta")
.option("checkpointLocation", "/tmp/chk/kafka_events")
.outputMode("append")
.start("/tmp/delta/kafka_events"))
query.awaitTermination()
Use the Kafka connector that matches your Spark major/minor: org.apache.spark:spark-sql-kafka-0-10_2.12:<your-spark-version>. Add the matching org.apache.kafka:kafka-clients:<broker-compatible-version>. This ensures KafkaSourceProvider loads correctly at runtime.
Add Delta Lake libraries compatible with your Spark/Scala build and enable the Delta SQL extension/catalog (if not already bundled with your distribution). Delta Lake integrates with Structured Streaming for both sources and sinks and requires checkpointing when used as a sink.
โNo data and no errorโ after starting the stream often means your kafka.bootstrap.servers isnโt resolvable or reachable; the Kafka client will retry indefinitely without throwing. Validate with kcat -b localhost:9092 -L and fix advertised listeners or hostnames first.
Wrong listener/advertised listener: your Docker broker must advertise a hostname/port accessible from the host (e.g., PLAINTEXT://localhost:9092). If it advertises an internal container name or IP, the Spark driver canโt connect. Update KAFKA_CFG_ADVERTISED_LISTENERS and restart.
SASL/SSL mismatch: if your broker requires TLS or SASL, set the corresponding Spark options (for example, kafka.security.protocol, kafka.ssl.*, kafka.sasl.*). Start plaintext first to validate networking, then add security configs incrementally.
Missing connector JARs: โNoClassDefFoundError: โฆKafkaSourceProviderโ means the spark-sql-kafka-0-10 package is absent or wrong-version. Use --packages org.apache.spark:spark-sql-kafka-0-10_2.12:<spark-version> (or add to your build) and ensure Scala version alignment.
Throughput/backlog issues: control ingestion using maxOffsetsPerTrigger and parallelism via minPartitions. Monitor backlog and tune these settings to keep up with incoming volume.
Local: Use the Docker Compose above, then spark-submit or spark-shell with matching Kafka and Delta packages. Itโs the fastest path for a local POC.
Managed (fastest to verify endโtoโend): Launch a small Databricks cluster, use the Kafka reader options shown above, and write to a Delta table. The same format("kafka")/format("delta") APIs apply, and the docs walk through the exact options and behaviors youโll see in the stream.
Verify broker reachability: kcat -b localhost:9092 -L should list your topic/partitions. If it fails, fix Docker networking/advertised listeners before testing Spark.
Check topic subscription settings: use subscribe="events" and confirm itโs the right topic name; set startingOffsets="latest" to avoid scanning old history during POC.
Always set a checkpointLocation on your Delta sink to allow reliable restart and exactly-once semantics in the pipeline.
Hope this helps, Louis.
Wednesday
Hey Louis,
Good Day , I have found all the above info on ChatGPT long time ago , but using any of the set up takes a lot of time and I needed something with can be quick and developer friendly.
Last Friday I came across a post from @K_Anudeep on Setting up a complete env locally in few minutes on Docker and it was quite good.But i am facing zome connection issues when extending the containers
Wednesday
Hey @RevanthV ,
Thanks for tagging me here.Firstly, I am happy that you have tried out teh setup using the project posted in the community.โญ Setup Spark with Hadoop Anywhere : A DBR aligned local Spark+HDFS+Hive stack on Dockerโญ
Could you please let me know or post your Docker-Compose file after you have made the modification?
Passionate about hosting events and connecting people? Help us grow a vibrant local communityโsign up today to get started!
Sign Up Now