- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-22-2021 06:53 AM
I am new to real time scenarios and I need to create a spark structured streaming jobs in databricks. I am trying to apply some rule based validations from backend configurations on each incoming JSON message. I need to do the following actions on the incoming JSON
- I need to flatten the incoming JSON
- Based on the source property in JSON, I need to fetch the validations configured for the source and apply it on the message
- I need to log the validation errors to a synapse table
- If there are no validation errors the data needs to be saved to a separate table based on the backend config.
I tried implementing this using the foreachBatch in databricks notebook. My code snippets looks as below:
- Reading from EventHub
streamingDF = spark.readStream.format("eventhubs").options(**ehConf).load() - Starting the foreachbatch query
streamingDF.writeStream.outputMode("append") \
.option("checkpointLocation","dbfs:/FileStore/checkpoint/rtdqchekpoint") \
.foreachBatch(processStream).start().awaitTermination()- processStream function
msgDF = df.select(df.body.cast("string").alias("msgBody"))
msgDfVal = msgDF.collect()
messages = msgDfVal[0][0]
properties = df.select(df.properties).collect()
sourceFeedName = properties[0][0]['sourceFeedName']
ruleConfigDF = getBusinessRulesforSource(sourceFeedName)
dfJson = spark.read.json(sc.parallelize([messages]))
srcDqDataDf = flatten(dfJson)
errorDetails = applyBusinessRule(srcDataDf, ruleConfigDF)
if(errorDetails != None and len(errorDetails) >0):
errCnt = len(errorDetails)
saveErrorDetailsToSynapse(errorDetails)
saveDatatoSynapse(srcDataDf)However while processing i found that only a few incoming messages are coming into the processStream function. For eg : if 10 messages came only 5 or 6 random messages are coming into the processStream. I tried removing all the processing logic. Still only few messages are coming.
Since I am new to the real time processing, can some one help me on why this is happening and on what I am doing wrongly.