Missing rows while processing records using foreachbatch in spark structured streaming from Azure Event Hub

sparkstreaming
New Contributor III

I am new to real time scenarios and I need to create a spark structured streaming jobs in databricks. I am trying to apply some rule based validations from backend configurations on each incoming JSON message. I need to do the following actions on the incoming JSON

  1. I need to flatten the incoming JSON
  2. Based on the source property in JSON, I need to fetch the validations configured for the source and apply it on the message
  3. I need to log the validation errors to a synapse table
  4. If there are no validation errors the data needs to be saved to a separate table based on the backend config.

I tried implementing this using the foreachBatch in databricks notebook. My code snippets looks as below:

  • Reading from EventHub
streamingDF = spark.readStream.format("eventhubs").options(**ehConf).load() 
  • Starting the foreachbatch query
streamingDF.writeStream.outputMode("append") \
  .option("checkpointLocation","dbfs:/FileStore/checkpoint/rtdqchekpoint") \
  .foreachBatch(processStream).start().awaitTermination()
  • processStream function
msgDF = df.select(df.body.cast("string").alias("msgBody"))
msgDfVal = msgDF.collect()
messages = msgDfVal[0][0]
properties = df.select(df.properties).collect()
sourceFeedName = properties[0][0]['sourceFeedName']
ruleConfigDF = getBusinessRulesforSource(sourceFeedName)
dfJson = spark.read.json(sc.parallelize([messages]))
srcDqDataDf = flatten(dfJson)
errorDetails = applyBusinessRule(srcDataDf, ruleConfigDF)
if(errorDetails != None and len(errorDetails) >0):
    errCnt = len(errorDetails)
    saveErrorDetailsToSynapse(errorDetails)
saveDatatoSynapse(srcDataDf)

However while processing i found that only a few incoming messages are coming into the processStream function. For eg : if 10 messages came only 5 or 6 random messages are coming into the processStream. I tried removing all the processing logic. Still only few messages are coming.

Since I am new to the real time processing, can some one help me on why this is happening and on what I am doing wrongly.