12-22-2021 06:53 AM
I am new to real time scenarios and I need to create a spark structured streaming jobs in databricks. I am trying to apply some rule based validations from backend configurations on each incoming JSON message. I need to do the following actions on the incoming JSON
I tried implementing this using the foreachBatch in databricks notebook. My code snippets looks as below:
streamingDF = spark.readStream.format("eventhubs").options(**ehConf).load()
streamingDF.writeStream.outputMode("append") \
.option("checkpointLocation","dbfs:/FileStore/checkpoint/rtdqchekpoint") \
.foreachBatch(processStream).start().awaitTermination()
msgDF = df.select(df.body.cast("string").alias("msgBody"))
msgDfVal = msgDF.collect()
messages = msgDfVal[0][0]
properties = df.select(df.properties).collect()
sourceFeedName = properties[0][0]['sourceFeedName']
ruleConfigDF = getBusinessRulesforSource(sourceFeedName)
dfJson = spark.read.json(sc.parallelize([messages]))
srcDqDataDf = flatten(dfJson)
errorDetails = applyBusinessRule(srcDataDf, ruleConfigDF)
if(errorDetails != None and len(errorDetails) >0):
errCnt = len(errorDetails)
saveErrorDetailsToSynapse(errorDetails)
saveDatatoSynapse(srcDataDf)
However while processing i found that only a few incoming messages are coming into the processStream function. For eg : if 10 messages came only 5 or 6 random messages are coming into the processStream. I tried removing all the processing logic. Still only few messages are coming.
Since I am new to the real time processing, can some one help me on why this is happening and on what I am doing wrongly.
12-22-2021 07:14 AM
Hmm maybe you don't read all partitions/partitions keys. Can you go to event hub in Azure and see process data -> explore and validate there?
In databricks you can use display(streamingDF) to make some validation.
In production .collect() shouldn't be used. Your code looks like you are processing only first row from batch. All logic could be implemented using spark dataframe action and transformation on df.
if you have some own functions like getBusinessRulesforSource etc it can be converted to spark udf function and than applied on datframe.
12-22-2021 07:14 AM
Hmm maybe you don't read all partitions/partitions keys. Can you go to event hub in Azure and see process data -> explore and validate there?
In databricks you can use display(streamingDF) to make some validation.
In production .collect() shouldn't be used. Your code looks like you are processing only first row from batch. All logic could be implemented using spark dataframe action and transformation on df.
if you have some own functions like getBusinessRulesforSource etc it can be converted to spark udf function and than applied on datframe.
12-22-2021 07:58 AM
08-03-2023 03:36 AM
Any solution for above problem
05-27-2022 12:08 AM
I am facing the same issue when I am trying to run forEachbatch with Azure event Hub. Can anyone help?
In my case, I keep receiving real time orders in azure event hub, but i always need to pick the latest order and remove all the history of the same trades that are already available inside event hub.
For ex: I can receive dataset as below (ignore 'a' in the end of field tx)
So, in this example- I need to pick latest order which has maximum tx and for that maximum tx, minimum process_timestamp as sometimes the same tx comes twice with two different process timestamp. So, inside event hub you can have similar dataset with multiple rows for a particular order in the same batch.
I am using the below code in the foreach batch, but its not processing all the rows from event hub.
def upsertToMT4TradeDelta(microBatchOutputDF, batchId):
microBatchOutputDF = microBatchOutputDF.orderBy(col("tx").desc(), col("process_timestamp").asc()).dropDuplicates(["instance", "order_id"])
microBatchOutputDF.createOrReplaceTempView("updates")
microBatchOutputDF._jdf.sparkSession().sql("""
MERGE INTO rdh.live_mt4_trade t
USING updates u
ON t.instance = u.instance and t.order_id = u.order_id
WHEN MATCHED AND u.tx > t.tx THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
08-03-2023 03:33 AM
Were you able to achieve any solutions if yes please can you help with it.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group