<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Missing rows while processing records using foreachbatch in spark structured streaming from Azur in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/missing-rows-while-processing-records-using-foreachbatch-in/m-p/39007#M26842</link>
    <description>&lt;P&gt;Any solution for above problem&lt;/P&gt;</description>
    <pubDate>Thu, 03 Aug 2023 10:36:18 GMT</pubDate>
    <dc:creator>Rishi045</dc:creator>
    <dc:date>2023-08-03T10:36:18Z</dc:date>
    <item>
      <title>Missing rows while processing records using foreachbatch in spark structured streaming from Azure Event Hub</title>
      <link>https://community.databricks.com/t5/data-engineering/missing-rows-while-processing-records-using-foreachbatch-in/m-p/32622#M23770</link>
      <description>&lt;P&gt;I am new to real time scenarios and I need to create a spark structured streaming jobs in databricks. I am trying to apply some rule based validations from backend configurations on each incoming JSON message. I need to do the following actions on the incoming JSON&lt;/P&gt;&lt;OL&gt;&lt;LI&gt;I need to flatten the incoming JSON&lt;/LI&gt;&lt;LI&gt;Based on the source property in JSON, I need to fetch the validations configured for the source and apply it on the message&lt;/LI&gt;&lt;LI&gt;I need to log the validation errors to a synapse table&lt;/LI&gt;&lt;LI&gt;If there are no validation errors the data needs to be saved to a separate table based on the backend config.&lt;/LI&gt;&lt;/OL&gt;&lt;P&gt;I tried implementing this using the foreachBatch in databricks notebook. My code snippets looks as below:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;Reading from EventHub&lt;/LI&gt;&lt;/UL&gt;&lt;PRE&gt;&lt;CODE&gt;streamingDF = spark.readStream.format("eventhubs").options(**ehConf).load() &lt;/CODE&gt;&lt;/PRE&gt;&lt;UL&gt;&lt;LI&gt;Starting the foreachbatch query&lt;/LI&gt;&lt;/UL&gt;&lt;PRE&gt;&lt;CODE&gt;streamingDF.writeStream.outputMode("append") \
  .option("checkpointLocation","dbfs:/FileStore/checkpoint/rtdqchekpoint") \
  .foreachBatch(processStream).start().awaitTermination()&lt;/CODE&gt;&lt;/PRE&gt;&lt;UL&gt;&lt;LI&gt;processStream function&lt;/LI&gt;&lt;/UL&gt;&lt;PRE&gt;&lt;CODE&gt;msgDF = df.select(df.body.cast("string").alias("msgBody"))
msgDfVal = msgDF.collect()
messages = msgDfVal[0][0]
properties = df.select(df.properties).collect()
sourceFeedName = properties[0][0]['sourceFeedName']
ruleConfigDF = getBusinessRulesforSource(sourceFeedName)
dfJson = spark.read.json(sc.parallelize([messages]))
srcDqDataDf = flatten(dfJson)
errorDetails = applyBusinessRule(srcDataDf, ruleConfigDF)
if(errorDetails != None and len(errorDetails) &amp;gt;0):
    errCnt = len(errorDetails)
    saveErrorDetailsToSynapse(errorDetails)
saveDatatoSynapse(srcDataDf)&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;However while processing i found that only a few incoming messages are coming into the processStream function. For eg : if 10 messages came only 5 or 6 random messages are coming into the processStream. I tried removing all the processing logic. Still only few messages are coming.&lt;/P&gt;&lt;P&gt;Since I am new to the real time processing, can some one help me on why this is happening and on what I am doing wrongly.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Wed, 22 Dec 2021 14:53:34 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/missing-rows-while-processing-records-using-foreachbatch-in/m-p/32622#M23770</guid>
      <dc:creator>sparkstreaming</dc:creator>
      <dc:date>2021-12-22T14:53:34Z</dc:date>
    </item>
    <item>
      <title>Re: Missing rows while processing records using foreachbatch in spark structured streaming from Azure Event Hub</title>
      <link>https://community.databricks.com/t5/data-engineering/missing-rows-while-processing-records-using-foreachbatch-in/m-p/32623#M23771</link>
      <description>&lt;P&gt;Hmm maybe you don't read all partitions/partitions keys. Can you go to event hub in Azure and see process data -&amp;gt; explore and validate there?&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;In databricks you can use display(streamingDF)  to make some validation.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;In production .collect() shouldn't be used. Your code looks like you are processing only first row from batch. All logic could be implemented using spark dataframe action and transformation on df.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;if you have some own functions like getBusinessRulesforSource etc it can be converted to spark udf function and than applied on datframe.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Wed, 22 Dec 2021 15:14:16 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/missing-rows-while-processing-records-using-foreachbatch-in/m-p/32623#M23771</guid>
      <dc:creator>Hubert-Dudek</dc:creator>
      <dc:date>2021-12-22T15:14:16Z</dc:date>
    </item>
    <item>
      <title>Re: Missing rows while processing records using foreachbatch in spark structured streaming from Azure Event Hub</title>
      <link>https://community.databricks.com/t5/data-engineering/missing-rows-while-processing-records-using-foreachbatch-in/m-p/32624#M23772</link>
      <description>Thank you for the quick reply. I had tried display(streamingDF​) and all the records were being displayed. But only random messages were being passed to the foreachbatch (processStream). From documentation I saw that for all batch operation foreachbatch is the option. Is it better to create udf for processing instead of foreachbatch.</description>
      <pubDate>Wed, 22 Dec 2021 15:58:47 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/missing-rows-while-processing-records-using-foreachbatch-in/m-p/32624#M23772</guid>
      <dc:creator>sparkstreaming</dc:creator>
      <dc:date>2021-12-22T15:58:47Z</dc:date>
    </item>
    <item>
      <title>Re: Missing rows while processing records using foreachbatch in spark structured streaming from Azure Event Hub</title>
      <link>https://community.databricks.com/t5/data-engineering/missing-rows-while-processing-records-using-foreachbatch-in/m-p/32625#M23773</link>
      <description>&lt;P&gt;I am facing the same issue when I am trying to run forEachbatch with Azure event Hub. Can anyone help?&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;In my case, I keep receiving real time orders in azure event hub, but i always need to pick the latest order and remove all the history of the same trades that are already available inside event hub.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;For ex: I can receive dataset as below (ignore 'a' in the end of field tx)&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper" image-alt="image"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/2213i46BA0530D7568921/image-size/large?v=v2&amp;amp;px=999" role="button" title="image" alt="image" /&gt;&lt;/span&gt;So, in this example- I need to pick latest order which has maximum tx and for that maximum tx, minimum process_timestamp as sometimes the same tx comes twice with two different process timestamp. So, inside event hub you can have similar dataset with multiple rows for a particular order in the same batch.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;I am using the below code in the foreach batch, but its not processing all the rows from event hub.&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;def upsertToMT4TradeDelta(microBatchOutputDF, batchId): 
  microBatchOutputDF = microBatchOutputDF.orderBy(col("tx").desc(), col("process_timestamp").asc()).dropDuplicates(["instance", "order_id"])
  microBatchOutputDF.createOrReplaceTempView("updates")
&amp;nbsp;
  microBatchOutputDF._jdf.sparkSession().sql("""
    MERGE INTO rdh.live_mt4_trade t
    USING updates u
    ON t.instance = u.instance and t.order_id = u.order_id 
    WHEN MATCHED AND u.tx &amp;gt; t.tx THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Fri, 27 May 2022 07:08:46 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/missing-rows-while-processing-records-using-foreachbatch-in/m-p/32625#M23773</guid>
      <dc:creator>SaikatSengupta</dc:creator>
      <dc:date>2022-05-27T07:08:46Z</dc:date>
    </item>
    <item>
      <title>Re: Missing rows while processing records using foreachbatch in spark structured streaming from Azur</title>
      <link>https://community.databricks.com/t5/data-engineering/missing-rows-while-processing-records-using-foreachbatch-in/m-p/39006#M26841</link>
      <description>&lt;P&gt;Were you able to achieve any solutions if yes please can you help with it.&lt;/P&gt;</description>
      <pubDate>Thu, 03 Aug 2023 10:33:51 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/missing-rows-while-processing-records-using-foreachbatch-in/m-p/39006#M26841</guid>
      <dc:creator>Rishi045</dc:creator>
      <dc:date>2023-08-03T10:33:51Z</dc:date>
    </item>
    <item>
      <title>Re: Missing rows while processing records using foreachbatch in spark structured streaming from Azur</title>
      <link>https://community.databricks.com/t5/data-engineering/missing-rows-while-processing-records-using-foreachbatch-in/m-p/39007#M26842</link>
      <description>&lt;P&gt;Any solution for above problem&lt;/P&gt;</description>
      <pubDate>Thu, 03 Aug 2023 10:36:18 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/missing-rows-while-processing-records-using-foreachbatch-in/m-p/39007#M26842</guid>
      <dc:creator>Rishi045</dc:creator>
      <dc:date>2023-08-03T10:36:18Z</dc:date>
    </item>
  </channel>
</rss>

