cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Databricks Delta Live Table stored as SCD 2 is creating new records when no data changes. How do I stop this?

Kearon
New Contributor III

I have a streaming pipeline that ingests json files from a data lake using autoloader. These files are dumped there periodically. Mostly the files contain duplicate data, but there are occasional changes. I am trying to process these files into a data warehouse using the STORED AS SCD 2 option of the Delta Live tables pipeline. Everything looks to be working fine, but I am getting duplicate rows in the SCD table - even though no data is changing.

In the pipeline cluster configuration, I have added 

pipelines.enableTrackHistory True

Here is the SQL for the SCD table:

CREATE OR REFRESH STREAMING LIVE TABLE currStudents_SCD;
 
APPLY CHANGES INTO
  live.currStudents_SCD
FROM
  stream(live.currStudents_ingest)
KEYS
  (id)
SEQUENCE BY
  file_modification_time
STORED AS
  SCD TYPE 2
TRACK HISTORY ON * EXCEPT (file_modification_time)
;

Any ideas why I get this and how I can stop these duplicates?

I can't find any information in the documentation to help with this. Except possibly, the vague suggestion that, in SCD 2, there will be new records when no columns change. But the Track History documentation seems to indicate that only changes to monitored columns will result in a new record....

1 ACCEPTED SOLUTION

Accepted Solutions

Anonymous
Not applicable

@Kearon McNicol​ :

You are correct that if you use the de-duped_stream function, it will select the latest version of the JSON file, even if it is identical to the previous version. This means that if you run the pipeline every night and retrieve a new JSON file, a new row will be added to the SCD table.

However, if you enable de-duplication on the stream, the de-duped_stream function will only return unique records based on a specified set of columns. This means that if the new JSON file is identical to the previous one, it will be filtered out and not added to the SCD table.

To enable de-duplication on the stream, you can add the option('streaming.deduplicate','true')

parameter to your stream definition. You can also specify the columns to use for de duplication using the option('streaming.deduplicateKeys', '<column_list>') parameter.

View solution in original post

6 REPLIES 6

Anonymous
Not applicable

@Kearon McNicol​ :

In SCD Type 2, new records are created when there are changes to monitored columns as well as when there are no changes to monitored columns but the current record has expired. This is because the current record is considered outdated and a new record with the same key but updated values needs to be created. It's possible that you are seeing duplicate records because the current records have expired and new records are being created with the same key but the same values as the expired records.

To prevent this, you can try increasing the validity period of the records by setting a larger value for the valid_time_column in the STORED AS SCD 2 option. This will allow the records to remain valid for a longer period of time and reduce the number of new records that are created.

Alternatively, you can try using a deduplication step in your pipeline to remove the duplicate records before they are processed by the SCD Type 2 logic. This can be done by grouping the records by their key and selecting the most recent record for each group based on the file_modification_time column. This can be done using Spark's window function.

For example:

import pyspark.sql.functions as F
from pyspark.sql.window import Window
 
deduped_stream = (stream(live.currStudents_ingest)
                 .groupBy('id')
                 .agg(F.max('file_modification_time').alias('latest_file_modification_time'),
                      F.first('json', True).alias('json'))
                 .select('id', 'latest_file_modification_time', 'json'))
 
APPLY CHANGES INTO
  live.currStudents_SCD
FROM
  deduped_stream
KEYS
  (id)
SEQUENCE BY
  latest_file_modification_time
STORED AS
  SCD TYPE 2
TRACK HISTORY ON * EXCEPT (latest_file_modification_time)

Here, we group the records by the id column and select the most recent record for each group based on the file_modification_time column. We then pass this deduplicated stream into the SCD Type 2 logic for further processing.

Hope this helps!

Kearon
New Contributor III

@Suteja Kanuri​ - Thank you for this. Been pulled onto other projects and getting back to this now, so will try your solution.

I think the de-duplication option is the one we want, but I'm not certain I'm understanding fully.

I have a call to an api that retrieves a json file each night. As this data changes infrequently, the records are normally identical.

If I understand your de-duped_stream correctly, this will select the latest version of the json file.

Won't this just change each night when I retrieve a new json file, therefore creating a new duplicate entry in the SCD at that point?

I think I'm missing how the de-duplication comes into effect, which is probably due to my lack of experience in this area.

Thank you for your help.

Anonymous
Not applicable

@Kearon McNicol​ :

You are correct that if you use the de-duped_stream function, it will select the latest version of the JSON file, even if it is identical to the previous version. This means that if you run the pipeline every night and retrieve a new JSON file, a new row will be added to the SCD table.

However, if you enable de-duplication on the stream, the de-duped_stream function will only return unique records based on a specified set of columns. This means that if the new JSON file is identical to the previous one, it will be filtered out and not added to the SCD table.

To enable de-duplication on the stream, you can add the option('streaming.deduplicate','true')

parameter to your stream definition. You can also specify the columns to use for de duplication using the option('streaming.deduplicateKeys', '<column_list>') parameter.

Vartika
Moderator
Moderator

Hey there @Kearon McNicol​ 

Hope everything is going great!

Just wanted to check in if you were able to resolve your issue. If yes, would you be happy to mark @Suteja Kanuri​'s answer as best so that other members can find the solution more quickly? If not, please tell us so we can help you.

Thanks!

Kearon
New Contributor III

For clarity, here is the final code that avoids duplicates, using @Suteja Kanuri​ 's suggestion:

import dlt
 
@dlt.table
def currStudents_dedup():
    df = spark.readStream.format("delta").table("live.currStudents_ingest")
    return (
        df.dropDuplicates([col for col in df.columns if col != "file_modification_time"])
        .select('*')
    )

I am curious if there is a way to do this in SQL? I only found documentation for python.

Anonymous
Not applicable

@Kearon McNicol​ :

Yes, it's possible to remove duplicates in SQL. Here's an example query that achieves the same result as the Python code you provided:

SELECT DISTINCT *
FROM live.currStudents_ingest
WHERE file_modification_time IS NOT NULL;

In this SQL query, the DISTINCT keyword is used to remove duplicates from the result set. The WHERE clause filters out any rows where the file_modification_time column is NULL.

Note that this query assumes that live.currStudents_ingest is a Delta table in your Spark environment, and that you have access to run SQL queries against it.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group