โ08-23-2022 04:40 AM
I am trying to connect to my Kafka from spark but getting an error:
Kafka Version: 2.4.1
Spark Version: 3.3.0
I am using jupyter notebook to execute the pyspark code below:
```
from pyspark.sql.functions import *
from pyspark.sql.types import *
#import library
import os
from pyspark.sql import SparkSession
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 pyspark-shell'
sc = SparkSession.builder.appName('Pyspark_kafka').getOrCreate()
df = sc \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "zonos.engrid.in:9092") \
.option("subscribe", "ext_device-event_10121") \
.option("startingOffsets", "earliest") \
.option("endingOffsets", "latest") \
.load()
```
Which gives me the following error:
```
---------------------------------------------------------------
AnalysisException Traceback (most recent call last)
<ipython-input-18-409d93832e70> in <module>
5 .option("subscribe", "ext_device-event_10121") \
6 .option("startingOffsets", "earliest") \
----> 7 .option("endingOffsets", "latest") \
8 .load()
/opt/spark/spark-3.3.0-bin-hadoop3/python/pyspark/sql/streaming.py in load(self, path, format, schema, **options)
467 return self._df(self._jreader.load(path))
468 else:
--> 469 return self._df(self._jreader.load())
470
471 def json(
/usr/local/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
/opt/spark/spark-3.3.0-bin-hadoop3/python/pyspark/sql/utils.py in deco(*a, **kw)
194 # Hide where the exception came from that shows a non-Pythonic
195 # JVM exception message.
--> 196 raise converted from None
197 else:
198 raise
AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".
```
Not sure what's wrong with the connector or the code please help me out.
Thanks
โ11-06-2022 08:56 AM
This is how I can config to run PySpark (scala 2.12 Spark 3.2.1) Structure Streaming with Kafka on jupyter lab (need to download 2 jars file spark-sql-kafka-0-10_2.12-3.2.1.jar, kafka-clients-2.1.1.jar to folder jars)
spark = SparkSession\
.builder\
.config("spark.jars", os.getcwd() + "/jars/spark-sql-kafka-0-10_2.12-3.2.1.jar" + "," + os.getcwd() + "/jars/kafka-clients-2.1.1.jar") \
.appName("Structured_Redpanda_WordCount")\
.getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions", 1)
โ12-28-2022 11:25 AM
In my case it solved, just download the jars.
โ01-02-2024 02:39 PM
Hi,
What jar has been added and under which location and how to integrated jar with code?
โ01-03-2024 02:23 PM
Hi @avnish26, did you added the Jar files to the cluster? do you still have issues? please let us know
Sunday
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("kafkaStream") \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.3") \
.getOrCreate()
# Read data from Kafka topic
df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "firstStream") \
.option("startingsets","latest")\
.load()\
.selectExpr("CAST(value as STRING)")
------------------------------
I have the same error ,but keep in mind spark version is 3.5.3
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group