from pyspark.sql import SparkSession
scala_version = '2.12'
spark_version = '3.3.0'
packages = [
f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
'org.apache.kafka:kafka-clients:3.2.1'
]
spark = SparkSession.builder\
.master("local")\
.appName("kafka-spark")\
.config("spark.jars.packages", ",".join(packages))\
.config("spark.dynamicAllocation.enabled", "false")\
.config("spark.streaming.kafka.maxRatePerPartition", 100)\
.config("spark.sql.shuffle.partitions", 2)\
.getOrCreate()
kafka_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", BROKER) \
.option("failOnDataLoss", "false") \
.option("subscribe", TOPIC) \
.option("includeHeaders", "true") \
.option("spark.streaming.kafka.maxRatePerPartition", 100)\
.load()
display(kafka_df)
An important parameter is spark.streaming.kafka.maxRatePerPartition which is the maximum rate (in messages per second) at which each Kafka partition will be read.
Code snippet includes two ways to configure maxRatePerPartition parameter, but unfortunately, none of them works.
Is there a proper way to configure rate limiting? (limit Spark reading)