<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Question about &amp;quot;foreachBatch&amp;quot; to remove duplicate records when streaming data in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/question-about-quot-foreachbatch-quot-to-remove-duplicate/m-p/26421#M18479</link>
    <description>&lt;P&gt;Hi,&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;I am practicing with Databricks sample notebook published here:&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;A href="https://github.com/databricks-academy/advanced-data-engineering-with-databricks" alt="https://github.com/databricks-academy/advanced-data-engineering-with-databricks" target="_blank"&gt;https://github.com/databricks-academy/advanced-data-engineering-with-databricks&lt;/A&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;In one of the notebooks (ADE 3.1 - Streaming Deduplication) (&lt;A href="https://github.com/databricks-academy/advanced-data-engineering-with-databricks/blob/published/Advanced-Data-Engineering-with-Databricks/03%20-%20Promoting%20to%20Silver/ADE%203.1%20-%20Streaming%20Deduplication.py" alt="https://github.com/databricks-academy/advanced-data-engineering-with-databricks/blob/published/Advanced-Data-Engineering-with-Databricks/03%20-%20Promoting%20to%20Silver/ADE%203.1%20-%20Streaming%20Deduplication.py" target="_blank"&gt;URL&lt;/A&gt;), there is a sample code to remove duplicate records while streaming data.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt; I have a few questions about it, and would appreciate your help. I copy main parts of the code below:&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;from pyspark.sql import functions as F
&amp;nbsp;
&amp;nbsp;
&amp;nbsp;
json_schema = "device_id LONG, time TIMESTAMP, heartrate DOUBLE"
&amp;nbsp;
&amp;nbsp;
&amp;nbsp;
deduped_df = (spark.readStream
&amp;nbsp;
          .table("bronze")
&amp;nbsp;
          .filter("topic = 'bpm'")
&amp;nbsp;
          .select(F.from_json(F.col("value").cast("string"), json_schema).alias("v"))
&amp;nbsp;
          .select("v.*")
&amp;nbsp;
          .withWatermark("time", "30 seconds")
&amp;nbsp;
          .dropDuplicates(["device_id", "time"]))
&amp;nbsp;
&amp;nbsp;
&amp;nbsp;
sql_query = """
&amp;nbsp;
 MERGE INTO heart_rate_silver a
&amp;nbsp;
 USING stream_updates b
&amp;nbsp;
 ON a.device_id=b.device_id AND a.time=b.time
&amp;nbsp;
 WHEN NOT MATCHED THEN INSERT *
&amp;nbsp;
"""
&amp;nbsp;
&amp;nbsp;
&amp;nbsp;
class Upsert:
&amp;nbsp;
  def __init__(self, sql_query, update_temp="stream_updates"):
&amp;nbsp;
    self.sql_query = sql_query
&amp;nbsp;
    self.update_temp = update_temp 
&amp;nbsp;
     
&amp;nbsp;
  def upsert_to_delta(self, microBatchDF, batch):
&amp;nbsp;
    microBatchDF.createOrReplaceTempView(self.update_temp)
&amp;nbsp;
    microBatchDF._jdf.sparkSession().sql(self.sql_query)
&amp;nbsp;
&amp;nbsp;
&amp;nbsp;
&amp;nbsp;
&amp;nbsp;
streaming_merge = Upsert(sql_query)
&amp;nbsp;
&amp;nbsp;
&amp;nbsp;
query = (deduped_df.writeStream
&amp;nbsp;
          .foreachBatch(streaming_merge.upsert_to_delta)  # run query for each batch
&amp;nbsp;
          .outputMode("update")
&amp;nbsp;
          .option("checkpointLocation", f"{DA.paths.checkpoints}/recordings")
&amp;nbsp;
          .trigger(availableNow=True)
&amp;nbsp;
          .start())
&amp;nbsp;
&amp;nbsp;
&amp;nbsp;
query.awaitTermination()
&amp;nbsp;
&amp;nbsp;
&amp;nbsp;
&amp;nbsp;
&amp;nbsp;
&amp;nbsp;
&amp;nbsp;&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Q1) What is the reason to define class "Upsert" and use method "foreachBatch"?&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Q2) What if I don't use "foreachBatch"? &lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;The method "dropDuplicates(["device_id", "time"])" removes duplicates when reading records. Isn't it enough to be sure that there is no duplicate records?&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Q3) The method "upsert_to_delta" of class "Upsert" has two input arguments (microBatchDF, batch). But, when we call it in the following line:&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;.foreachBatch(streaming_merge.upsert_to_delta)&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;, we don't pass its arguments. How does it get the values of (microBatchDF, batch)?&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Thank you for your time to read my questions. &lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;</description>
    <pubDate>Thu, 20 Oct 2022 05:23:57 GMT</pubDate>
    <dc:creator>Mado</dc:creator>
    <dc:date>2022-10-20T05:23:57Z</dc:date>
    <item>
      <title>Question about "foreachBatch" to remove duplicate records when streaming data</title>
      <link>https://community.databricks.com/t5/data-engineering/question-about-quot-foreachbatch-quot-to-remove-duplicate/m-p/26421#M18479</link>
      <description>&lt;P&gt;Hi,&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;I am practicing with Databricks sample notebook published here:&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;A href="https://github.com/databricks-academy/advanced-data-engineering-with-databricks" alt="https://github.com/databricks-academy/advanced-data-engineering-with-databricks" target="_blank"&gt;https://github.com/databricks-academy/advanced-data-engineering-with-databricks&lt;/A&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;In one of the notebooks (ADE 3.1 - Streaming Deduplication) (&lt;A href="https://github.com/databricks-academy/advanced-data-engineering-with-databricks/blob/published/Advanced-Data-Engineering-with-Databricks/03%20-%20Promoting%20to%20Silver/ADE%203.1%20-%20Streaming%20Deduplication.py" alt="https://github.com/databricks-academy/advanced-data-engineering-with-databricks/blob/published/Advanced-Data-Engineering-with-Databricks/03%20-%20Promoting%20to%20Silver/ADE%203.1%20-%20Streaming%20Deduplication.py" target="_blank"&gt;URL&lt;/A&gt;), there is a sample code to remove duplicate records while streaming data.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt; I have a few questions about it, and would appreciate your help. I copy main parts of the code below:&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;from pyspark.sql import functions as F
&amp;nbsp;
&amp;nbsp;
&amp;nbsp;
json_schema = "device_id LONG, time TIMESTAMP, heartrate DOUBLE"
&amp;nbsp;
&amp;nbsp;
&amp;nbsp;
deduped_df = (spark.readStream
&amp;nbsp;
          .table("bronze")
&amp;nbsp;
          .filter("topic = 'bpm'")
&amp;nbsp;
          .select(F.from_json(F.col("value").cast("string"), json_schema).alias("v"))
&amp;nbsp;
          .select("v.*")
&amp;nbsp;
          .withWatermark("time", "30 seconds")
&amp;nbsp;
          .dropDuplicates(["device_id", "time"]))
&amp;nbsp;
&amp;nbsp;
&amp;nbsp;
sql_query = """
&amp;nbsp;
 MERGE INTO heart_rate_silver a
&amp;nbsp;
 USING stream_updates b
&amp;nbsp;
 ON a.device_id=b.device_id AND a.time=b.time
&amp;nbsp;
 WHEN NOT MATCHED THEN INSERT *
&amp;nbsp;
"""
&amp;nbsp;
&amp;nbsp;
&amp;nbsp;
class Upsert:
&amp;nbsp;
  def __init__(self, sql_query, update_temp="stream_updates"):
&amp;nbsp;
    self.sql_query = sql_query
&amp;nbsp;
    self.update_temp = update_temp 
&amp;nbsp;
     
&amp;nbsp;
  def upsert_to_delta(self, microBatchDF, batch):
&amp;nbsp;
    microBatchDF.createOrReplaceTempView(self.update_temp)
&amp;nbsp;
    microBatchDF._jdf.sparkSession().sql(self.sql_query)
&amp;nbsp;
&amp;nbsp;
&amp;nbsp;
&amp;nbsp;
&amp;nbsp;
streaming_merge = Upsert(sql_query)
&amp;nbsp;
&amp;nbsp;
&amp;nbsp;
query = (deduped_df.writeStream
&amp;nbsp;
          .foreachBatch(streaming_merge.upsert_to_delta)  # run query for each batch
&amp;nbsp;
          .outputMode("update")
&amp;nbsp;
          .option("checkpointLocation", f"{DA.paths.checkpoints}/recordings")
&amp;nbsp;
          .trigger(availableNow=True)
&amp;nbsp;
          .start())
&amp;nbsp;
&amp;nbsp;
&amp;nbsp;
query.awaitTermination()
&amp;nbsp;
&amp;nbsp;
&amp;nbsp;
&amp;nbsp;
&amp;nbsp;
&amp;nbsp;
&amp;nbsp;&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Q1) What is the reason to define class "Upsert" and use method "foreachBatch"?&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Q2) What if I don't use "foreachBatch"? &lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;The method "dropDuplicates(["device_id", "time"])" removes duplicates when reading records. Isn't it enough to be sure that there is no duplicate records?&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Q3) The method "upsert_to_delta" of class "Upsert" has two input arguments (microBatchDF, batch). But, when we call it in the following line:&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;.foreachBatch(streaming_merge.upsert_to_delta)&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;, we don't pass its arguments. How does it get the values of (microBatchDF, batch)?&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Thank you for your time to read my questions. &lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Thu, 20 Oct 2022 05:23:57 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/question-about-quot-foreachbatch-quot-to-remove-duplicate/m-p/26421#M18479</guid>
      <dc:creator>Mado</dc:creator>
      <dc:date>2022-10-20T05:23:57Z</dc:date>
    </item>
    <item>
      <title>Re: Question about "foreachBatch" to remove duplicate records when streaming data</title>
      <link>https://community.databricks.com/t5/data-engineering/question-about-quot-foreachbatch-quot-to-remove-duplicate/m-p/26422#M18480</link>
      <description>&lt;P&gt;Hi @Mohammad Saber​&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Great to meet you, and thanks for your question!&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Let's see if your peers in the community have an answer to your question first. Or else bricksters will get back to you soon. &lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Thanks&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Sun, 27 Nov 2022 13:46:00 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/question-about-quot-foreachbatch-quot-to-remove-duplicate/m-p/26422#M18480</guid>
      <dc:creator>Anonymous</dc:creator>
      <dc:date>2022-11-27T13:46:00Z</dc:date>
    </item>
    <item>
      <title>Re: Question about "foreachBatch" to remove duplicate records when streaming data</title>
      <link>https://community.databricks.com/t5/data-engineering/question-about-quot-foreachbatch-quot-to-remove-duplicate/m-p/26423#M18481</link>
      <description>&lt;P&gt;Thanks for your message. I have not found good answers to this question. &lt;/P&gt;</description>
      <pubDate>Sun, 27 Nov 2022 20:37:31 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/question-about-quot-foreachbatch-quot-to-remove-duplicate/m-p/26423#M18481</guid>
      <dc:creator>Mado</dc:creator>
      <dc:date>2022-11-27T20:37:31Z</dc:date>
    </item>
  </channel>
</rss>

