cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Spark 3.3.0 connect kafka problem

avnish26
New Contributor III

I am trying to connect to my Kafka from spark but getting an error:

Kafka Version: 2.4.1

Spark Version: 3.3.0

I am using jupyter notebook to execute the pyspark code below:

```

from pyspark.sql.functions import *

from pyspark.sql.types import *

#import library 

import os

from pyspark.sql import SparkSession

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 pyspark-shell'

sc = SparkSession.builder.appName('Pyspark_kafka').getOrCreate()

df = sc \

    .readStream \

    .format("kafka") \

    .option("kafka.bootstrap.servers", "zonos.engrid.in:9092") \

    .option("subscribe", "ext_device-event_10121") \

    .option("startingOffsets", "earliest") \

    .option("endingOffsets", "latest") \

    .load()

```

Which gives me the following error:

```

---------------------------------------------------------------

AnalysisException       Traceback (most recent call last)

<ipython-input-18-409d93832e70> in <module>

   5     .option("subscribe", "ext_device-event_10121") \

   6     .option("startingOffsets", "earliest") \

----> 7     .option("endingOffsets", "latest") \

   8     .load()

/opt/spark/spark-3.3.0-bin-hadoop3/python/pyspark/sql/streaming.py in load(self, path, format, schema, **options)

  467       return self._df(self._jreader.load(path))

  468     else:

--> 469       return self._df(self._jreader.load())

  470 

  471   def json(

/usr/local/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args)

  1255     answer = self.gateway_client.send_command(command)

  1256     return_value = get_return_value(

-> 1257       answer, self.gateway_client, self.target_id, self.name)

  1258 

  1259     for temp_arg in temp_args:

/opt/spark/spark-3.3.0-bin-hadoop3/python/pyspark/sql/utils.py in deco(*a, **kw)

  194         # Hide where the exception came from that shows a non-Pythonic

  195         # JVM exception message.

--> 196         raise converted from None

  197       else:

  198         raise

AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".     

```

Not sure what's wrong with the connector or the code please help me out.

Thanks

4 REPLIES 4

Nghiaht1
New Contributor III

This is how I can config to run PySpark (scala 2.12 Spark 3.2.1) Structure Streaming with Kafka on jupyter lab (need to download 2 jars file spark-sql-kafka-0-10_2.12-3.2.1.jar, kafka-clients-2.1.1.jar to folder jars)

spark = SparkSession\

  .builder\

  .config("spark.jars", os.getcwd() + "/jars/spark-sql-kafka-0-10_2.12-3.2.1.jar" + "," + os.getcwd() + "/jars/kafka-clients-2.1.1.jar") \

  .appName("Structured_Redpanda_WordCount")\

  .getOrCreate()

spark.conf.set("spark.sql.shuffle.partitions", 1)

In my case it solved, just download the jars.

dilip4dd
New Contributor II

Hi,

What jar has been added and under which location and how to integrated jar with code?

jose_gonzalez
Moderator
Moderator

Hi @avnish26, did you added the Jar files to the cluster? do you still have issues? please let us know

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.