cancel
Showing results for 
Search instead for 
Did you mean: 
Technical Blog
Explore in-depth articles, tutorials, and insights on data analytics and machine learning in the Databricks Technical Blog. Stay updated on industry trends, best practices, and advanced techniques.
cancel
Showing results for 
Search instead for 
Did you mean: 
MuraliTalluri
Databricks Employee
Databricks Employee

In this article we will cover in depth about streaming deduplication using watermarking with dropDuplicates and dropDuplicatesWithinWatermark, how they are different. This blog expects you to have a good understanding on how watermarking works in Spark Structured streaming. Lets dive right in.

 

dropDuplicates

When dropDuplicates is used in conjunction with withWatermark operator, it maintains a global state, the stream is unbounded and will keep all the keys across triggers, i.e., global deduplication. Whatever watermark is specified using withWatermark makes the stream to filter out the late arriving records, i.e., records with event_time T get discarded at the beginning of the micro-batch if the watermark is greater than or equal to T. This approach is generally not recommended; an unbounded state means that the state store grows continuously, potentially causing out-of-memory issues. Let's look at an example. 

Create a source table to stream from:

 

drop table if exists mt_asqs.demo.dd_source;
create table mt_asqs.demo.dd_source (
 event_id string,
 event_time timestamp
)

 

Streaming code: Note that we are using event_time as the watermarking column and not including the event_time column as deduplication key in dropDuplicates function. 

 

volume_path = '/Volumes/mt_asqs/demo/checkpoint_paths'
stream_df = (
 spark.readStream.format("delta")
   .table("mt_asqs.demo.dd_source")
   .withWatermark("event_time", "5 minutes")
   .dropDuplicates(["event_id"])
)
(
 stream_df.writeStream
   .format("delta")
   .option("checkpointLocation", f"{volume_path}/dd_test3")
   .table("mt_asqs.demo.dd_output")
)

 

Inserting records into streaming source table 

 

insert into dd_source values ('A', to_timestamp('2024-01-01 10:00:00'));
insert into dd_source values ('B', to_timestamp('2024-01-01 10:00:00'));

 

Since the watermark delay is set to 5 minutes, above records set the watermark to 9:55.  The state store would have 2 keys A & B. 

 

insert into dd_source values ('C', to_timestamp('2024-01-01 10:07:00'));

 

Watermarking advances to 10:02, C gets inserted into the state store. Ideally since the watermarking advanced to 10:02, Keys A & B should get evicted from the state store but that doesn’t happen with dropDuplicates, this makes global deduplication possible. Viewing the contents of the state store.

 

display(spark.read.format("statestore").load("/Volumes/mt_asqs/demo/checkpoint_paths/dd_test3"))

 

MuraliTalluri_0-1736447301690.pngMuraliTalluri_1-1736447301693.png

 

insert into dd_source values ('A', to_timestamp('2024-01-01 10:01:00'));

 

Since watermarking is set to 10:02, the above record A gets discarded at the beginning of the micro-batch. At this point record A is not even compared to state store for deduplication since it's discarded prior to that. 

 

insert into dd_source values ('D', to_timestamp('2024-01-01 10:10:00'));

 

Advances the watermark to 10:05, D gets appended to the state store.  

 

insert into dd_source values ('A', to_timestamp('2024-01-01 10:12:00'));

 

The above record gets checked against the watermark, since it's not a late arriving record it won’t be discarded. This record advances watermarking time to 10:07, but since the key(A) already exists in the state store it gets discarded in the deduplication process.

 

insert into dd_source values ('E', to_timestamp('2024-01-01 10:07:00'));

 

E gets discarded due to the watermark(10:07). Contents of the streaming output table shows that there are no duplicate event_ids

MuraliTalluri_2-1736447301693.png

dropDuplicatesWithinWatermark

dropDuplicatesWithinWatermark does exactly what you would expect it to do, this is what you should use for deduplication when you know the time window within which you expect duplicate records in your stream. When dropDuplicatesWithinWatermark is used in conjunction with withWatermark delay D, a record with event_time T gets removed from the state store only when the watermark is greater than or equal to T + D. But the record will be discarded(late arriving) at the beginning of the micro-batch if the watermark is greater than or equal to T. Let's look at an example. 

Create a source table to stream from:

 

drop table if exists mt_asqs.demo.ddww_source;
create table mt_asqs.demo.ddww_source (
 event_id string,
 event_time timestamp
)

 

Streaming code: It’s basically the same as above but using the dropDuplicatesWithinWatermark instead of dropDuplicates

 

stream_df = (
 spark.readStream.format("delta")
   .table("mt_asqs.demo.ddww_source")
   .withWatermark("event_time", "5 minutes")
   .dropDuplicatesWithinWatermark(["event_id"])
)
volume_path = '/Volumes/mt_asqs/demo/checkpoint_paths'
(
 stream_df.writeStream
   .format("delta")
   .option("checkpointLocation", f"{volume_path}/ddww_test")
   .table("mt_asqs.demo.ddww_output")
)

 

Inserting records into streaming source table 

 

insert into ddww_source values('A',to_timestamp('2024-01-01 10:00:00'));
insert into ddww_source values('B',to_timestamp('2024-01-01 10:00:00'));

 

Since the watermarking delay is set to 5 minutes, the watermark advances to 9:55. The state store would have 2 keys A & B. 

 

insert into ddww_source values('C',to_timestamp('2024-01-01 10:07:00'));

 

Watermark advances to 10:02, C gets inserted into the state store. At the end of every micro-batch, existing records in the state store are checked if the watermark is greater than or equal to event_time + watermark delay (5 minutes). So both A & B prevail.

Viewing the contents of the state store, you can see that every key now has expiration time. 

 

display(spark.read.format("statestore").load("/Volumes/mt_asqs/demo/checkpoint_paths/ddww_test"))

 

MuraliTalluri_3-1736447301693.pngMuraliTalluri_4-1736447301693.png

 

insert into ddww_source values('A',to_timestamp('2024-01-01 10:01:00'));

 

Since watermarking is set to 10:02, the above record A gets discarded at the beginning of the micro-batch. At this point record A is not even compared to the state store for deduplication since it's discarded prior to that. 

 

insert into ddww_source values('D',to_timestamp('2024-01-01 10:09:00'));

 

Watermark advances to 10:04, D gets inserted into the state store. Existing records in the state store are checked if the watermark is greater than or equal to event_time + watermark delay (5 minutes). So all the records prevail.

MuraliTalluri_5-1736447301691.pngMuraliTalluri_6-1736447301694.png

 

insert into ddww_source values('A',to_timestamp('2024-01-01 10:12:00'));

 

Watermark advances to 10:07. Since A already exists in the state store, the new record is considered as duplicate and gets discarded. Existing records in the state store are checked if the watermark(10:07) is greater than or equal to event_time + watermark delay (5 minutes). So C & D prevails, A & B gets evicted from the state store. 

MuraliTalluri_7-1736447301693.pngMuraliTalluri_8-1736447301692.png

 

insert into ddww_source values('E',to_timestamp('2024-01-01 10:17:00'));

 

Watermark advances to 10:12, E gets inserted into the state store. Existing records in the state store are checked if the watermark is greater than or equal to event_time + watermark delay (5 minutes). So D prevails and C gets evicted from the state store.

MuraliTalluri_9-1736447301690.pngMuraliTalluri_10-1736447301692.png

 

insert into ddww_source values('C',to_timestamp('2024-01-01 10:13:00'));

 

The watermark still remains at 10:12. Since C doesn’t exist in the state store, it won’t be considered a duplicate record. So C gets inserted into the state store. Existing records in the state store are checked if the watermark is greater than or equal to event_time + watermark delay (5 minutes). So D & E prevails.

MuraliTalluri_11-1736447301691.pngMuraliTalluri_12-1736447301691.png

Contents of the streaming output table shows that there is one duplicate entry for event_id C. This would have been avoided if you had a larger watermark window, like 10 minutes. 

MuraliTalluri_13-1736447301691.png

dropDuplicates - event_time as deduplication key

We covered above where you specify event_time as the watermarking column but don’t include it as a deduplication key(parameter) in the dropDuplicates. But when event_time is specified as one of the deduplication keys, dropDuplicates won’t maintain an unbounded state, i.e., keys get evicted from the state store. If you set withWatermark delay D, a record with event_time T gets removed from the state store when the watermark is greater than or equal to T. Records will be checked at the beginning of the micro-batch execution and discarded(late arriving) if the watermark is greater than or equal to T. Let's look at an example. 

Create a source table to stream from:

 

drop table if exists mt_asqs.demo.dd_source2;
create table mt_asqs.demo.dd_source2 (
 event_id string,
 event_time timestamp
)

 

Streaming code: Note that we are using event_time as the watermarking column and it's also used as the deduplication key in the dropDuplicates function. 

 

stream_df = (
 spark.readStream.format("delta")
   .table("mt_asqs.demo.dd_source2")
   .withWatermark("event_time", "5 minutes")
   .dropDuplicates(["event_id", "event_time"])
)
volume_path = '/Volumes/mt_asqs/demo/checkpoint_paths'
(
 stream_df.writeStream
   .format("delta")
   .option("checkpointLocation", f"{volume_path}/dd_test2")
   .table("mt_asqs.demo.dd_output2")
)

 

Inserting records into streaming source table 

 

insert into dd_source2 values ('A',to_timestamp('2024-01-01 10:00:00'));
insert into dd_source2 values ('B',to_timestamp('2024-01-01 10:00:00'));

 

Since the watermarking delay is set to 5 minutes, above records set the watermark to 9:55.  The state store would have 2 records, A & B. 

 

insert into dd_source2 values ('C',to_timestamp('2024-01-01 10:07:00'));

 

Watermark advances to 10:02, C gets inserted into the state store. At the end of every micro-batch, existing records in the state store are checked if the watermark is greater than or equal to event_time. So both A & B get evicted from the state store.

Viewing the contents of the state store.

 

display(spark.read.format("statestore").load("/Volumes/mt_asqs/demo/checkpoint_paths/dd_test2"))

 

MuraliTalluri_14-1736447301693.pngMuraliTalluri_15-1736447301691.png

 

insert into dd_source2 values ('A',to_timestamp('2024-01-01 10:01:00'));

 

Since watermarking is set to 10:02, the above record A gets discarded at the beginning of the micro-batch. At this point record A is not even compared to state store for deduplication since it's discarded prior to that. 

 

insert into dd_source2 values ('C',to_timestamp('2024-01-01 10:09:00'));

 

Advances the watermark to 10:04. C gets appended to the state store since event_time is also a deduplication key. The state store will have multiple rows for C.  

MuraliTalluri_16-1736447301692.png

 

insert into dd_source2 values ('D',to_timestamp('2024-01-01 10:10:00'));

 

Advances the watermark to 10:05, D gets appended to the state store. Existing records in the state store are checked if the watermark is greater than or equal to event_time. Both the entries for C prevails.  The state store will have 3 entries(2 for C and 1 for D).

 

insert into dd_source2 values ('A',to_timestamp('2024-01-01 10:12:00'));

 

Watermark advances to 10:07, A gets inserted into the state store. Existing records in the state store are checked if the watermark is greater than or equal to event_time. One of the entries for C (event_time 10:07) gets evicted while the other prevails.

Viewing the contents of the state store.

MuraliTalluri_17-1736447301692.png

Contents of the streaming output table.

MuraliTalluri_18-1736447301693.png

Conclusion

Each of these approaches solve different problems. 

Approach

Scenario

State store behavior

dropDuplicates + withWatermark

Global deduplication and don’t expect the duplicates with the same event_time

- Keep all the keys in the state store.

- The state store grows continuously, potentially causing out-of-memory issues.

dropDuplicatesWithinWatermark + withWatermark

- When you know the interval of time within which you might have duplicate records.

- Deduplication within the specified time interval. 

- State store grows/shrinks based on the watermark delay. 

- For a watermark delay D, a record with event_time T gets removed from the state store only when the watermark is greater than or equal to T + D.

dropDuplicates + withWatermark, event_time is one of the deduplication keys. 

- When you expect the stream to have duplicates with the same event_time

- State store grows/shrinks based on the watermark delay. 

- For a watermark delay D, a record with event_time T gets removed from the state store only when the watermark is greater than or equal to T.