cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Missing rows while processing records using foreachbatch in spark structured streaming from Azure Event Hub

sparkstreaming
New Contributor III

I am new to real time scenarios and I need to create a spark structured streaming jobs in databricks. I am trying to apply some rule based validations from backend configurations on each incoming JSON message. I need to do the following actions on the incoming JSON

  1. I need to flatten the incoming JSON
  2. Based on the source property in JSON, I need to fetch the validations configured for the source and apply it on the message
  3. I need to log the validation errors to a synapse table
  4. If there are no validation errors the data needs to be saved to a separate table based on the backend config.

I tried implementing this using the foreachBatch in databricks notebook. My code snippets looks as below:

  • Reading from EventHub
streamingDF = spark.readStream.format("eventhubs").options(**ehConf).load() 
  • Starting the foreachbatch query
streamingDF.writeStream.outputMode("append") \
  .option("checkpointLocation","dbfs:/FileStore/checkpoint/rtdqchekpoint") \
  .foreachBatch(processStream).start().awaitTermination()
  • processStream function
msgDF = df.select(df.body.cast("string").alias("msgBody"))
msgDfVal = msgDF.collect()
messages = msgDfVal[0][0]
properties = df.select(df.properties).collect()
sourceFeedName = properties[0][0]['sourceFeedName']
ruleConfigDF = getBusinessRulesforSource(sourceFeedName)
dfJson = spark.read.json(sc.parallelize([messages]))
srcDqDataDf = flatten(dfJson)
errorDetails = applyBusinessRule(srcDataDf, ruleConfigDF)
if(errorDetails != None and len(errorDetails) >0):
    errCnt = len(errorDetails)
    saveErrorDetailsToSynapse(errorDetails)
saveDatatoSynapse(srcDataDf)

However while processing i found that only a few incoming messages are coming into the processStream function. For eg : if 10 messages came only 5 or 6 random messages are coming into the processStream. I tried removing all the processing logic. Still only few messages are coming.

Since I am new to the real time processing, can some one help me on why this is happening and on what I am doing wrongly.

1 ACCEPTED SOLUTION

Accepted Solutions

Hubert-Dudek
Esteemed Contributor III

Hmm maybe you don't read all partitions/partitions keys. Can you go to event hub in Azure and see process data -> explore and validate there?

In databricks you can use display(streamingDF) to make some validation.

In production .collect() shouldn't be used. Your code looks like you are processing only first row from batch. All logic could be implemented using spark dataframe action and transformation on df.

if you have some own functions like getBusinessRulesforSource etc it can be converted to spark udf function and than applied on datframe.

View solution in original post

5 REPLIES 5

Hubert-Dudek
Esteemed Contributor III

Hmm maybe you don't read all partitions/partitions keys. Can you go to event hub in Azure and see process data -> explore and validate there?

In databricks you can use display(streamingDF) to make some validation.

In production .collect() shouldn't be used. Your code looks like you are processing only first row from batch. All logic could be implemented using spark dataframe action and transformation on df.

if you have some own functions like getBusinessRulesforSource etc it can be converted to spark udf function and than applied on datframe.

Thank you for the quick reply. I had tried display(streamingDF​) and all the records were being displayed. But only random messages were being passed to the foreachbatch (processStream). From documentation I saw that for all batch operation foreachbatch is the option. Is it better to create udf for processing instead of foreachbatch.

Any solution for above problem

SaikatSengupta
New Contributor II

I am facing the same issue when I am trying to run forEachbatch with Azure event Hub. Can anyone help?

In my case, I keep receiving real time orders in azure event hub, but i always need to pick the latest order and remove all the history of the same trades that are already available inside event hub.

For ex: I can receive dataset as below (ignore 'a' in the end of field tx)

imageSo, in this example- I need to pick latest order which has maximum tx and for that maximum tx, minimum process_timestamp as sometimes the same tx comes twice with two different process timestamp. So, inside event hub you can have similar dataset with multiple rows for a particular order in the same batch.

I am using the below code in the foreach batch, but its not processing all the rows from event hub.

def upsertToMT4TradeDelta(microBatchOutputDF, batchId): 
  microBatchOutputDF = microBatchOutputDF.orderBy(col("tx").desc(), col("process_timestamp").asc()).dropDuplicates(["instance", "order_id"])
  microBatchOutputDF.createOrReplaceTempView("updates")
 
  microBatchOutputDF._jdf.sparkSession().sql("""
    MERGE INTO rdh.live_mt4_trade t
    USING updates u
    ON t.instance = u.instance and t.order_id = u.order_id 
    WHEN MATCHED AND u.tx > t.tx THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)

Rishi045
New Contributor III

Were you able to achieve any solutions if yes please can you help with it.

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!