cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Question about "foreachBatch" to remove duplicate records when streaming data

Mado
Valued Contributor II

Hi,

I am practicing with Databricks sample notebook published here:

https://github.com/databricks-academy/advanced-data-engineering-with-databricks

In one of the notebooks (ADE 3.1 - Streaming Deduplication) (URL), there is a sample code to remove duplicate records while streaming data.

I have a few questions about it, and would appreciate your help. I copy main parts of the code below:

from pyspark.sql import functions as F
 
 
 
json_schema = "device_id LONG, time TIMESTAMP, heartrate DOUBLE"
 
 
 
deduped_df = (spark.readStream
 
          .table("bronze")
 
          .filter("topic = 'bpm'")
 
          .select(F.from_json(F.col("value").cast("string"), json_schema).alias("v"))
 
          .select("v.*")
 
          .withWatermark("time", "30 seconds")
 
          .dropDuplicates(["device_id", "time"]))
 
 
 
sql_query = """
 
 MERGE INTO heart_rate_silver a
 
 USING stream_updates b
 
 ON a.device_id=b.device_id AND a.time=b.time
 
 WHEN NOT MATCHED THEN INSERT *
 
"""
 
 
 
class Upsert:
 
  def __init__(self, sql_query, update_temp="stream_updates"):
 
    self.sql_query = sql_query
 
    self.update_temp = update_temp 
 
     
 
  def upsert_to_delta(self, microBatchDF, batch):
 
    microBatchDF.createOrReplaceTempView(self.update_temp)
 
    microBatchDF._jdf.sparkSession().sql(self.sql_query)
 
 
 
 
 
streaming_merge = Upsert(sql_query)
 
 
 
query = (deduped_df.writeStream
 
          .foreachBatch(streaming_merge.upsert_to_delta)  # run query for each batch
 
          .outputMode("update")
 
          .option("checkpointLocation", f"{DA.paths.checkpoints}/recordings")
 
          .trigger(availableNow=True)
 
          .start())
 
 
 
query.awaitTermination()
 
 
 
 
 
 
 

Q1) What is the reason to define class "Upsert" and use method "foreachBatch"?

Q2) What if I don't use "foreachBatch"?

The method "dropDuplicates(["device_id", "time"])" removes duplicates when reading records. Isn't it enough to be sure that there is no duplicate records?

Q3) The method "upsert_to_delta" of class "Upsert" has two input arguments (microBatchDF, batch). But, when we call it in the following line:

.foreachBatch(streaming_merge.upsert_to_delta)

, we don't pass its arguments. How does it get the values of (microBatchDF, batch)?

Thank you for your time to read my questions.

2 REPLIES 2

Anonymous
Not applicable

Hi @Mohammad Saber​ 

Great to meet you, and thanks for your question! 

Let's see if your peers in the community have an answer to your question first. Or else bricksters will get back to you soon.

Thanks

Mado
Valued Contributor II

Thanks for your message. I have not found good answers to this question.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group