cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

dlt.apply_changes_from_snapshot() is not picking up historical data

ImranA
New Contributor III

Hi,
I am having an issue when I try to load the DLT bronze table for the first time, it goes through multiple parquet file using a loop(loop is just helping to provide the snapshot and version sequentially to "dlt.apply_changes_from_snapshot()"), And stores only the latest parquet file in the table.

However I want it to keep the records as their values are changing through parquet files in the first load as scd type 2.

Interesting thing is after the first load if any new parquet files comes in, it is working fine as it stores the new record which have different values for the same 'ID' column and puts an end date to the old record for the same 'ID'. 
Has anyone experienced this and what could be the cause?

using the below code:

 

dlt.create_streaming_table(
name=(f"{table_name}"),
table_properties={"quality": "bronze","delta.enableChangeDataFeed":"true"}
)

def apply_changes(snapshot, version, keys, table_name):
    dlt.apply_changes_from_snapshot(
    target=(f"{table_name}"),
    snapshot_and_version= (snapshot,version),
    keys=keys,
    stored_as_scd_type=2,
    track_history_except_column_list=["_metadata","extract_datetime", "formatted_datetime"]  
    )

if data_point:
    for dp in data_point:
        nsav = next_snapshot_and_version(dp)
        snapshot = nsav[0]  # The DataFrame containing the snapshot
        version = nsav[1]   # The sortable version, however brings one date at a time
        print(version)  # Print the version (formatted datetime)
        apply_changes(snapshot, version, keys, table_name)
        
else:
    print("No data available in data_point. Skipping processing.")

 

And the loop (for dp in datapoint), does provide me with all the snapshots and datetime so its I am not missing any snapshots or dates. 

2 REPLIES 2

ImranA
New Contributor III

By First Load I mean Full Refresh in DLT pipeline.

ImranA
New Contributor III

@gchandracan you help me with the above?

In summary, When I loop through the snapshots, the API doesn't create a history in the table that how the data is changing but only keeps the latest records from the most recent parquet file when I do full refresh.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group