cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

POC on spark 4.x

RevanthV
New Contributor III

I need to do some POC with spark 3.5.7 and 4.x and need some local setup with some sample Kafka source. 

The POC would read data from Kafka via streaming job and write to delta table and I would like to do this on spark-4.x ..Do you know of any quick place where i can do this.? I have found a way 2 days ago via a community post..but I still get some connectivity errors.

3 REPLIES 3

Louis_Frolio
Databricks Employee
Databricks Employee

Hello @RevanthV , I did some digging and here are some helpful tips.

Got it โ€” here are fast, reproducible ways to stand up a local Kafka source and run a Spark Structured Streaming job that writes to a Delta table, plus the common fixes for the connectivity errors youโ€™re likely seeing.

Quick setup options

  • Use a simple local Kafka via Docker Compose (KRaft mode, no ZooKeeper) and point Spark to localhost:9092. This is usually the quickest way to get a working Kafka bootstrap server for development.

  • If you want to skip local networking headaches, spin up a Databricks workspace and install the dbdemos โ€œSpark Streaming โ€” Advancedโ€ demo; it includes Kafka ingestion and Delta writes with one command: %pip install dbdemos then dbdemos.install('streaming-sessionization').

  • For production-like behavior or more guardrails, you can ingest Kafka directly with Databricks Delta Live Tables (DLT) and Structured Streaming; itโ€™s the same Kafka reader and Delta sink youโ€™d use locally, just with managed orchestration and built-in reliability features.

Minimal local Docker Compose for Kafka

Save as docker-compose.yml and run docker compose up -d:

version: "3.8"
services:
  kafka:
    image: bitnami/kafka:latest
    container_name: local-kafka
    environment:
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_CFG_PROCESS_ROLES=broker,controller
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@localhost:9094
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9094
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
      - ALLOW_PLAINTEXT_LISTENER=yes
    ports:
      - "9092:9092"
      - "9094:9094"
 

Then create a test topic and produce a couple events (use Confluent CLI, kcat, or the Kafka console tools). If kcat -b localhost:9092 -L lists your broker and topic, your bootstrap servers are reachable.

Spark 4.x streaming read from Kafka and write to Delta

The code is the same across Spark 3.5.x and 4.x; the key differences are the connector JAR versions. Reading uses format("kafka") with kafka.bootstrap.servers, subscribe, and startingOffsets; writing uses format("delta") plus a checkpointLocation to ensure reliable progress tracking.

 

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

spark = (SparkSession.builder
         .appName("kafka-to-delta")
         .getOrCreate())

# 1) Stream from Kafka
df_raw = (spark.readStream
          .format("kafka")
          .option("kafka.bootstrap.servers", "localhost:9092")
          .option("subscribe", "events")
          .option("startingOffsets", "latest")
          .load())

# 2) Parse the Kafka value payload (adjust schema to your data)
event_schema = StructType([
    StructField("id", StringType(), True),
    StructField("type", StringType(), True),
    StructField("count", IntegerType(), True)
])

df_parsed = (df_raw
             .selectExpr("CAST(value AS STRING) AS json")
             .select(from_json(col("json"), event_schema).alias("event"))
             .select("event.*"))

# 3) Write to Delta with checkpointing
query = (df_parsed.writeStream
         .format("delta")
         .option("checkpointLocation", "/tmp/chk/kafka_events")
         .outputMode("append")
         .start("/tmp/delta/kafka_events"))

query.awaitTermination()
 

Dependencies for Spark 3.5.7 vs 4.x

  • Use the Kafka connector that matches your Spark major/minor: org.apache.spark:spark-sql-kafka-0-10_2.12:<your-spark-version>. Add the matching org.apache.kafka:kafka-clients:<broker-compatible-version>. This ensures KafkaSourceProvider loads correctly at runtime.

  • Add Delta Lake libraries compatible with your Spark/Scala build and enable the Delta SQL extension/catalog (if not already bundled with your distribution). Delta Lake integrates with Structured Streaming for both sources and sinks and requires checkpointing when used as a sink.

Common connectivity errors and quick fixes

  • โ€œNo data and no errorโ€ after starting the stream often means your kafka.bootstrap.servers isnโ€™t resolvable or reachable; the Kafka client will retry indefinitely without throwing. Validate with kcat -b localhost:9092 -L and fix advertised listeners or hostnames first.

  • Wrong listener/advertised listener: your Docker broker must advertise a hostname/port accessible from the host (e.g., PLAINTEXT://localhost:9092). If it advertises an internal container name or IP, the Spark driver canโ€™t connect. Update KAFKA_CFG_ADVERTISED_LISTENERS and restart.

  • SASL/SSL mismatch: if your broker requires TLS or SASL, set the corresponding Spark options (for example, kafka.security.protocol, kafka.ssl.*, kafka.sasl.*). Start plaintext first to validate networking, then add security configs incrementally.

  • Missing connector JARs: โ€œNoClassDefFoundError: โ€ฆKafkaSourceProviderโ€ means the spark-sql-kafka-0-10 package is absent or wrong-version. Use --packages org.apache.spark:spark-sql-kafka-0-10_2.12:<spark-version> (or add to your build) and ensure Scala version alignment.

  • Throughput/backlog issues: control ingestion using maxOffsetsPerTrigger and parallelism via minPartitions. Monitor backlog and tune these settings to keep up with incoming volume.

A โ€œquick placeโ€ to do this on Spark 4.x

  • Local: Use the Docker Compose above, then spark-submit or spark-shell with matching Kafka and Delta packages. Itโ€™s the fastest path for a local POC.

  • Managed (fastest to verify endโ€‘toโ€‘end): Launch a small Databricks cluster, use the Kafka reader options shown above, and write to a Delta table. The same format("kafka")/format("delta") APIs apply, and the docs walk through the exact options and behaviors youโ€™ll see in the stream.

Troubleshooting checklist

  • Verify broker reachability: kcat -b localhost:9092 -L should list your topic/partitions. If it fails, fix Docker networking/advertised listeners before testing Spark.

  • Check topic subscription settings: use subscribe="events" and confirm itโ€™s the right topic name; set startingOffsets="latest" to avoid scanning old history during POC.

  • Always set a checkpointLocation on your Delta sink to allow reliable restart and exactly-once semantics in the pipeline.

     

Hope this helps, Louis.

Hey Louis,

 

Good Day , I have found all the above info on ChatGPT  long time ago , but using any of the set up takes a lot of time and I needed something with can be quick and developer friendly.

Last Friday I came across a post from @K_Anudeep on Setting up a complete env locally in few minutes on Docker and it was quite good.But i am facing zome connection issues when extending the containers

K_Anudeep
Databricks Employee
Databricks Employee

Hey @RevanthV ,

Thanks for tagging me here.Firstly, I am happy that you have tried out teh setup using the project posted in the community.โญ Setup Spark with Hadoop Anywhere : A DBR aligned local Spark+HDFS+Hive stack on Dockerโญ 

Could you please let me know or post your Docker-Compose file after you have made the modification?

Anudeep