cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Spark 3.3.0 connect kafka problem

avnish26
New Contributor III

I am trying to connect to my Kafka from spark but getting an error:

Kafka Version: 2.4.1

Spark Version: 3.3.0

I am using jupyter notebook to execute the pyspark code below:

```

from pyspark.sql.functions import *

from pyspark.sql.types import *

#import library 

import os

from pyspark.sql import SparkSession

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 pyspark-shell'

sc = SparkSession.builder.appName('Pyspark_kafka').getOrCreate()

df = sc \

    .readStream \

    .format("kafka") \

    .option("kafka.bootstrap.servers", "zonos.engrid.in:9092") \

    .option("subscribe", "ext_device-event_10121") \

    .option("startingOffsets", "earliest") \

    .option("endingOffsets", "latest") \

    .load()

```

Which gives me the following error:

```

---------------------------------------------------------------

AnalysisException       Traceback (most recent call last)

<ipython-input-18-409d93832e70> in <module>

   5     .option("subscribe", "ext_device-event_10121") \

   6     .option("startingOffsets", "earliest") \

----> 7     .option("endingOffsets", "latest") \

   8     .load()

/opt/spark/spark-3.3.0-bin-hadoop3/python/pyspark/sql/streaming.py in load(self, path, format, schema, **options)

  467       return self._df(self._jreader.load(path))

  468     else:

--> 469       return self._df(self._jreader.load())

  470 

  471   def json(

/usr/local/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args)

  1255     answer = self.gateway_client.send_command(command)

  1256     return_value = get_return_value(

-> 1257       answer, self.gateway_client, self.target_id, self.name)

  1258 

  1259     for temp_arg in temp_args:

/opt/spark/spark-3.3.0-bin-hadoop3/python/pyspark/sql/utils.py in deco(*a, **kw)

  194         # Hide where the exception came from that shows a non-Pythonic

  195         # JVM exception message.

--> 196         raise converted from None

  197       else:

  198         raise

AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".     

```

Not sure what's wrong with the connector or the code please help me out.

Thanks

5 REPLIES 5

Nghiaht1
New Contributor III

This is how I can config to run PySpark (scala 2.12 Spark 3.2.1) Structure Streaming with Kafka on jupyter lab (need to download 2 jars file spark-sql-kafka-0-10_2.12-3.2.1.jar, kafka-clients-2.1.1.jar to folder jars)

spark = SparkSession\

  .builder\

  .config("spark.jars", os.getcwd() + "/jars/spark-sql-kafka-0-10_2.12-3.2.1.jar" + "," + os.getcwd() + "/jars/kafka-clients-2.1.1.jar") \

  .appName("Structured_Redpanda_WordCount")\

  .getOrCreate()

spark.conf.set("spark.sql.shuffle.partitions", 1)

In my case it solved, just download the jars.

dilip4dd
New Contributor II

Hi,

What jar has been added and under which location and how to integrated jar with code?

jose_gonzalez
Databricks Employee
Databricks Employee

Hi @avnish26, did you added the Jar files to the cluster? do you still have issues? please let us know

Assem
New Contributor II

import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("kafkaStream") \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.3") \
.getOrCreate()
# Read data from Kafka topic
df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "firstStream") \
.option("startingsets","latest")\
.load()\
.selectExpr("CAST(value as STRING)")
------------------------------
I have the same error ,but keep in mind spark version is 3.5.3

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local communityโ€”sign up today to get started!

Sign Up Now