Question about "foreachBatch" to remove duplicate records when streaming data
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-19-2022 10:23 PM
Hi,
I am practicing with Databricks sample notebook published here:
https://github.com/databricks-academy/advanced-data-engineering-with-databricks
In one of the notebooks (ADE 3.1 - Streaming Deduplication) (URL), there is a sample code to remove duplicate records while streaming data.
I have a few questions about it, and would appreciate your help. I copy main parts of the code below:
from pyspark.sql import functions as F
json_schema = "device_id LONG, time TIMESTAMP, heartrate DOUBLE"
deduped_df = (spark.readStream
.table("bronze")
.filter("topic = 'bpm'")
.select(F.from_json(F.col("value").cast("string"), json_schema).alias("v"))
.select("v.*")
.withWatermark("time", "30 seconds")
.dropDuplicates(["device_id", "time"]))
sql_query = """
MERGE INTO heart_rate_silver a
USING stream_updates b
ON a.device_id=b.device_id AND a.time=b.time
WHEN NOT MATCHED THEN INSERT *
"""
class Upsert:
def __init__(self, sql_query, update_temp="stream_updates"):
self.sql_query = sql_query
self.update_temp = update_temp
def upsert_to_delta(self, microBatchDF, batch):
microBatchDF.createOrReplaceTempView(self.update_temp)
microBatchDF._jdf.sparkSession().sql(self.sql_query)
streaming_merge = Upsert(sql_query)
query = (deduped_df.writeStream
.foreachBatch(streaming_merge.upsert_to_delta) # run query for each batch
.outputMode("update")
.option("checkpointLocation", f"{DA.paths.checkpoints}/recordings")
.trigger(availableNow=True)
.start())
query.awaitTermination()
Q1) What is the reason to define class "Upsert" and use method "foreachBatch"?
Q2) What if I don't use "foreachBatch"?
The method "dropDuplicates(["device_id", "time"])" removes duplicates when reading records. Isn't it enough to be sure that there is no duplicate records?
Q3) The method "upsert_to_delta" of class "Upsert" has two input arguments (microBatchDF, batch). But, when we call it in the following line:
.foreachBatch(streaming_merge.upsert_to_delta)
, we don't pass its arguments. How does it get the values of (microBatchDF, batch)?
Thank you for your time to read my questions.
- Labels:
-
Duplicate Records
-
Foreachbatch
-
Q2
-
Streaming
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-27-2022 05:46 AM
Hi @Mohammad Saber
Great to meet you, and thanks for your question!
Let's see if your peers in the community have an answer to your question first. Or else bricksters will get back to you soon.
Thanks
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-27-2022 12:37 PM
Thanks for your message. I have not found good answers to this question.

