โ03-09-2023 07:19 AM
I have a streaming pipeline that ingests json files from a data lake using autoloader. These files are dumped there periodically. Mostly the files contain duplicate data, but there are occasional changes. I am trying to process these files into a data warehouse using the STORED AS SCD 2 option of the Delta Live tables pipeline. Everything looks to be working fine, but I am getting duplicate rows in the SCD table - even though no data is changing.
In the pipeline cluster configuration, I have added
pipelines.enableTrackHistory True
Here is the SQL for the SCD table:
CREATE OR REFRESH STREAMING LIVE TABLE currStudents_SCD;
APPLY CHANGES INTO
live.currStudents_SCD
FROM
stream(live.currStudents_ingest)
KEYS
(id)
SEQUENCE BY
file_modification_time
STORED AS
SCD TYPE 2
TRACK HISTORY ON * EXCEPT (file_modification_time)
;
Any ideas why I get this and how I can stop these duplicates?
I can't find any information in the documentation to help with this. Except possibly, the vague suggestion that, in SCD 2, there will be new records when no columns change. But the Track History documentation seems to indicate that only changes to monitored columns will result in a new record....
โ04-01-2023 08:54 PM
@Kearon McNicolโ :
You are correct that if you use the de-duped_stream function, it will select the latest version of the JSON file, even if it is identical to the previous version. This means that if you run the pipeline every night and retrieve a new JSON file, a new row will be added to the SCD table.
However, if you enable de-duplication on the stream, the de-duped_stream function will only return unique records based on a specified set of columns. This means that if the new JSON file is identical to the previous one, it will be filtered out and not added to the SCD table.
To enable de-duplication on the stream, you can add the option('streaming.deduplicate','true')
parameter to your stream definition. You can also specify the columns to use for de duplication using the option('streaming.deduplicateKeys', '<column_list>') parameter.
โ03-13-2023 01:21 AM
@Kearon McNicolโ :
In SCD Type 2, new records are created when there are changes to monitored columns as well as when there are no changes to monitored columns but the current record has expired. This is because the current record is considered outdated and a new record with the same key but updated values needs to be created. It's possible that you are seeing duplicate records because the current records have expired and new records are being created with the same key but the same values as the expired records.
To prevent this, you can try increasing the validity period of the records by setting a larger value for the valid_time_column in the STORED AS SCD 2 option. This will allow the records to remain valid for a longer period of time and reduce the number of new records that are created.
Alternatively, you can try using a deduplication step in your pipeline to remove the duplicate records before they are processed by the SCD Type 2 logic. This can be done by grouping the records by their key and selecting the most recent record for each group based on the file_modification_time column. This can be done using Spark's window function.
For example:
import pyspark.sql.functions as F
from pyspark.sql.window import Window
deduped_stream = (stream(live.currStudents_ingest)
.groupBy('id')
.agg(F.max('file_modification_time').alias('latest_file_modification_time'),
F.first('json', True).alias('json'))
.select('id', 'latest_file_modification_time', 'json'))
APPLY CHANGES INTO
live.currStudents_SCD
FROM
deduped_stream
KEYS
(id)
SEQUENCE BY
latest_file_modification_time
STORED AS
SCD TYPE 2
TRACK HISTORY ON * EXCEPT (latest_file_modification_time)
Here, we group the records by the id column and select the most recent record for each group based on the file_modification_time column. We then pass this deduplicated stream into the SCD Type 2 logic for further processing.
Hope this helps!
โ03-28-2023 04:04 AM
@Suteja Kanuriโ - Thank you for this. Been pulled onto other projects and getting back to this now, so will try your solution.
I think the de-duplication option is the one we want, but I'm not certain I'm understanding fully.
I have a call to an api that retrieves a json file each night. As this data changes infrequently, the records are normally identical.
If I understand your de-duped_stream correctly, this will select the latest version of the json file.
Won't this just change each night when I retrieve a new json file, therefore creating a new duplicate entry in the SCD at that point?
I think I'm missing how the de-duplication comes into effect, which is probably due to my lack of experience in this area.
Thank you for your help.
โ04-01-2023 08:54 PM
@Kearon McNicolโ :
You are correct that if you use the de-duped_stream function, it will select the latest version of the JSON file, even if it is identical to the previous version. This means that if you run the pipeline every night and retrieve a new JSON file, a new row will be added to the SCD table.
However, if you enable de-duplication on the stream, the de-duped_stream function will only return unique records based on a specified set of columns. This means that if the new JSON file is identical to the previous one, it will be filtered out and not added to the SCD table.
To enable de-duplication on the stream, you can add the option('streaming.deduplicate','true')
parameter to your stream definition. You can also specify the columns to use for de duplication using the option('streaming.deduplicateKeys', '<column_list>') parameter.
โ03-31-2023 02:31 AM
Hey there @Kearon McNicolโ
Hope everything is going great!
Just wanted to check in if you were able to resolve your issue. If yes, would you be happy to mark @Suteja Kanuriโ's answer as best so that other members can find the solution more quickly? If not, please tell us so we can help you.
Thanks!
โ04-06-2023 07:11 AM
For clarity, here is the final code that avoids duplicates, using @Suteja Kanuriโ 's suggestion:
import dlt
@dlt.table
def currStudents_dedup():
df = spark.readStream.format("delta").table("live.currStudents_ingest")
return (
df.dropDuplicates([col for col in df.columns if col != "file_modification_time"])
.select('*')
)
I am curious if there is a way to do this in SQL? I only found documentation for python.
โ04-06-2023 07:28 AM
@Kearon McNicolโ :
Yes, it's possible to remove duplicates in SQL. Here's an example query that achieves the same result as the Python code you provided:
SELECT DISTINCT *
FROM live.currStudents_ingest
WHERE file_modification_time IS NOT NULL;
In this SQL query, the DISTINCT keyword is used to remove duplicates from the result set. The WHERE clause filters out any rows where the file_modification_time column is NULL.
Note that this query assumes that live.currStudents_ingest is a Delta table in your Spark environment, and that you have access to run SQL queries against it.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group