Hi,
I am having an issue when I try to load the DLT bronze table for the first time, it goes through multiple parquet file using a loop(loop is just helping to provide the snapshot and version sequentially to "dlt.apply_changes_from_snapshot()"), And stores only the latest parquet file in the table.
However I want it to keep the records as their values are changing through parquet files in the first load as scd type 2.
Interesting thing is after the first load if any new parquet files comes in, it is working fine as it stores the new record which have different values for the same 'ID' column and puts an end date to the old record for the same 'ID'.
Has anyone experienced this and what could be the cause?
using the below code:
dlt.create_streaming_table(
name=(f"{table_name}"),
table_properties={"quality": "bronze","delta.enableChangeDataFeed":"true"}
)
def apply_changes(snapshot, version, keys, table_name):
dlt.apply_changes_from_snapshot(
target=(f"{table_name}"),
snapshot_and_version= (snapshot,version),
keys=keys,
stored_as_scd_type=2,
track_history_except_column_list=["_metadata","extract_datetime", "formatted_datetime"]
)
if data_point:
for dp in data_point:
nsav = next_snapshot_and_version(dp)
snapshot = nsav[0] # The DataFrame containing the snapshot
version = nsav[1] # The sortable version, however brings one date at a time
print(version) # Print the version (formatted datetime)
apply_changes(snapshot, version, keys, table_name)
else:
print("No data available in data_point. Skipping processing.")
And the loop (for dp in datapoint), does provide me with all the snapshots and datetime so its I am not missing any snapshots or dates.