cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Spark 3.3.0 connect kafka problem

avnish26
New Contributor III

I am trying to connect to my Kafka from spark but getting an error:

Kafka Version: 2.4.1

Spark Version: 3.3.0

I am using jupyter notebook to execute the pyspark code below:

```

from pyspark.sql.functions import *

from pyspark.sql.types import *

#import library 

import os

from pyspark.sql import SparkSession

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 pyspark-shell'

sc = SparkSession.builder.appName('Pyspark_kafka').getOrCreate()

df = sc \

    .readStream \

    .format("kafka") \

    .option("kafka.bootstrap.servers", "zonos.engrid.in:9092") \

    .option("subscribe", "ext_device-event_10121") \

    .option("startingOffsets", "earliest") \

    .option("endingOffsets", "latest") \

    .load()

```

Which gives me the following error:

```

---------------------------------------------------------------

AnalysisException       Traceback (most recent call last)

<ipython-input-18-409d93832e70> in <module>

   5     .option("subscribe", "ext_device-event_10121") \

   6     .option("startingOffsets", "earliest") \

----> 7     .option("endingOffsets", "latest") \

   8     .load()

/opt/spark/spark-3.3.0-bin-hadoop3/python/pyspark/sql/streaming.py in load(self, path, format, schema, **options)

  467       return self._df(self._jreader.load(path))

  468     else:

--> 469       return self._df(self._jreader.load())

  470 

  471   def json(

/usr/local/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args)

  1255     answer = self.gateway_client.send_command(command)

  1256     return_value = get_return_value(

-> 1257       answer, self.gateway_client, self.target_id, self.name)

  1258 

  1259     for temp_arg in temp_args:

/opt/spark/spark-3.3.0-bin-hadoop3/python/pyspark/sql/utils.py in deco(*a, **kw)

  194         # Hide where the exception came from that shows a non-Pythonic

  195         # JVM exception message.

--> 196         raise converted from None

  197       else:

  198         raise

AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".     

```

Not sure what's wrong with the connector or the code please help me out.

Thanks

4 REPLIES 4

Nghiaht1
New Contributor III

This is how I can config to run PySpark (scala 2.12 Spark 3.2.1) Structure Streaming with Kafka on jupyter lab (need to download 2 jars file spark-sql-kafka-0-10_2.12-3.2.1.jar, kafka-clients-2.1.1.jar to folder jars)

spark = SparkSession\

  .builder\

  .config("spark.jars", os.getcwd() + "/jars/spark-sql-kafka-0-10_2.12-3.2.1.jar" + "," + os.getcwd() + "/jars/kafka-clients-2.1.1.jar") \

  .appName("Structured_Redpanda_WordCount")\

  .getOrCreate()

spark.conf.set("spark.sql.shuffle.partitions", 1)

In my case it solved, just download the jars.

dilip4dd
New Contributor II

Hi,

What jar has been added and under which location and how to integrated jar with code?

jose_gonzalez
Databricks Employee
Databricks Employee

Hi @avnish26, did you added the Jar files to the cluster? do you still have issues? please let us know

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group