I am setting up a ETL process using pyspark. My input is a kafka stream and i am writing output to multiple sink (one into kafka and another into cloud storage). I am writing checkpoints on the cloud storage. The issue i am facing is that, whenever my application is getting failed due to some reason and when i am restarting my application then, my pyspark application is again reprocessing some (not all) of the input stream data causing data redundancy. Is there any way i can avoid this. I am using spark 3.5.0 and python 3.11. Below are some of my application code:
Spark Session :
spark = SparkSession \
.builder \
.appName("ETL") \
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.2") \
.config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')\
.config('spark.driver.extraJavaOptions', '-Duser.timezone=GMT') \
.config('spark.executor.extraJavaOptions', '-Duser.timezone=GMT') \
.config('spark.sql.session.timeZone', 'UTC') \
.config('spark.hadoop.fs.s3a.buffer.dir', '/tmp,/mnt/tmp') \
.config('spark.hadoop.fs.s3a.fast.upload.buffer', 'bytebuffer') \
.config('spark.hadoop.fs.s3a.fast.upload.active.blocks', 1) \
.config('spark.streaming.backpressure.enabled', True) \
.config("spark.redis.host",conf["nosql-host"]) \
.config("spark.redis.port",conf["nosql-port"]) \
.config("spark.redis.db",conf["nosql-db"]) \
.config("spark.redis.auth", __REDIS_CREDENTIAL__) \
.getOrCreate()
Kafka Read Stream :
streamDF = (spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_bootstrap_server_consumer) \
.option("subscribe", kafka_topic_name) \
.option("mode", "PERMISSIVE") \
.option("startingOffsets", "earliest").option("failOnDataLoss", "false") \
.load().withColumn('fixedValue', fn.expr("substring(value, 6, length(value)-5)")).select('fixedValue'))
Write Stream to multiple sinks :
write_stream = extractionDF \
.writeStream \
.trigger(processingTime='2 seconds') \
.outputMode("append") \
.foreachBatch(lambda df,epochId: write_to_multiple_sinks(df, epochId,processed_cloud_storage_path,kafka_bootstrap_server_producer)) \
.option("truncate", "false").option("checkpointLocation", cloud_storage_path)\
.start()
write_to_multiple_sinks Function :
def write_to_multiple_sinks(dataframe: DataFrame, epochId,cloud_storage_path, kafka_bootstrap_server):
dataframe = dataframe.cache()
druidDF = dataframe.select(druidSchema())
druidDF.selectExpr(producerTopic,"to_json(struct(*)) AS value").write\
.format("kafka")\
.option("kafka.bootstrap.servers", kafka_bootstrap_server).save()
processedDF = dataframe.select(processedSchema())
processedDF.write.format("csv").mode("append").option("sep","^").option("compression","gzip").option("path", cloud_storage_path).save()