Spark 3.3.0 connect kafka problem
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
08-23-2022 04:40 AM
I am trying to connect to my Kafka from spark but getting an error:
Kafka Version: 2.4.1
Spark Version: 3.3.0
I am using jupyter notebook to execute the pyspark code below:
```
from pyspark.sql.functions import *
from pyspark.sql.types import *
#import library
import os
from pyspark.sql import SparkSession
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 pyspark-shell'
sc = SparkSession.builder.appName('Pyspark_kafka').getOrCreate()
df = sc \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "zonos.engrid.in:9092") \
.option("subscribe", "ext_device-event_10121") \
.option("startingOffsets", "earliest") \
.option("endingOffsets", "latest") \
.load()
```
Which gives me the following error:
```
---------------------------------------------------------------
AnalysisException Traceback (most recent call last)
<ipython-input-18-409d93832e70> in <module>
5 .option("subscribe", "ext_device-event_10121") \
6 .option("startingOffsets", "earliest") \
----> 7 .option("endingOffsets", "latest") \
8 .load()
/opt/spark/spark-3.3.0-bin-hadoop3/python/pyspark/sql/streaming.py in load(self, path, format, schema, **options)
467 return self._df(self._jreader.load(path))
468 else:
--> 469 return self._df(self._jreader.load())
470
471 def json(
/usr/local/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
/opt/spark/spark-3.3.0-bin-hadoop3/python/pyspark/sql/utils.py in deco(*a, **kw)
194 # Hide where the exception came from that shows a non-Pythonic
195 # JVM exception message.
--> 196 raise converted from None
197 else:
198 raise
AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".
```
Not sure what's wrong with the connector or the code please help me out.
Thanks
- Labels:
-
Import
-
Kafka
-
Pyspark
-
Spark
-
Spark streaming
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-06-2022 08:56 AM
This is how I can config to run PySpark (scala 2.12 Spark 3.2.1) Structure Streaming with Kafka on jupyter lab (need to download 2 jars file spark-sql-kafka-0-10_2.12-3.2.1.jar, kafka-clients-2.1.1.jar to folder jars)
spark = SparkSession\
.builder\
.config("spark.jars", os.getcwd() + "/jars/spark-sql-kafka-0-10_2.12-3.2.1.jar" + "," + os.getcwd() + "/jars/kafka-clients-2.1.1.jar") \
.appName("Structured_Redpanda_WordCount")\
.getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions", 1)
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-28-2022 11:25 AM
In my case it solved, just download the jars.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-02-2024 02:39 PM
Hi,
What jar has been added and under which location and how to integrated jar with code?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-03-2024 02:23 PM
Hi @avnish26, did you added the Jar files to the cluster? do you still have issues? please let us know
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-01-2024 02:04 PM
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("kafkaStream") \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.3") \
.getOrCreate()
# Read data from Kafka topic
df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "firstStream") \
.option("startingsets","latest")\
.load()\
.selectExpr("CAST(value as STRING)")
------------------------------
I have the same error ,but keep in mind spark version is 3.5.3

