Handling Audit Columns and SCD Type 1 in Databricks DLT Pipeline with Unity Catalog: Circular Depend
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-22-2024 11:01 AM
I am working on a Delta Live Tables (DLT) pipeline with Unity Catalog, where we are reading data from Azure Data Lake Storage (ADLS) and creating a table in the silver layer with Slowly Changing Dimensions (SCD) Type 1 enabled. In addition, we are adding four audit columns:
- __KeyHash: A hash generated from the primary key columns.
- __RowHash: A hash generated from all non-audit columns to track changes.
- __CreatedDateTime: A timestamp indicating when the row was initially created.
- __UpdatedDateTime: A timestamp indicating when the row was last updated.
Key Challenge:
One of the main challenges I'm facing is ensuring that the __CreatedDateTime remains unchanged when existing records are updated. My approach is to:
- Read the latest data from the source in the DLT pipeline.
- Compare the new data with the existing table to check for updates.
- If a record exists in both the new and existing data, I retrieve the older __CreatedDateTime value from the existing table and use it, while only updating the __UpdatedDateTime.
Problem:
While implementing this, I encountered the following warnings and errors:
Warning:
"Your query 'employee' reads from '<catalog name>.<schema name>.employee' but must read from 'LIVE.employee' instead. Always use the LIVE keyword when referencing tables from the same pipeline so that DLT can track the dependencies in the pipeline."Error:
"The downstream table 'employee' is referenced when creating the upstream table or view 'employee_stg'. Circular dependencies are not supported in a DLT pipeline. Please remove the dependency between 'employee_stg' and 'employee'."
These issues seem to arise due to a circular dependency in the pipeline when trying to compare the new records with the existing table.
My Code Looks like somethings -
# Cloud file options for CSV files
cloud_file_options_csv = {
"cloudFiles.format": "csv",
"header": "true"
}
# Define the path to your files in ADLS
file_path = "path_to_adls_location/*"
@dlt.view(name="employee_stg")
def stg_employee():
df = (
spark.readStream
.format("cloudFiles")
.options(**cloud_file_options_csv)
.load(file_path)
)
from datetime import datetime
use_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
audit_handler = AuditHandler(use_time)
final_df = audit_handler.addAuditColumns(df, ["ID"])
if spark._jsparkSession.catalog().tableExists("catalog_name.schema_name.silver_employee"):
existing_df = spark.sql("SELECT * FROM catalog_name.schema_name.silver_employee")
# or existing_df = spark.read.table("catalog_name.schema_name.silver_employee")
# or existing_df = spark.readStream.table("catalog_name.schema_name.silver_employee")
# I tried with all above approaches also
final_df = update_created_datetime(existing_df, final_df, ["ID"])
return final_df
dlt.create_streaming_table(
name="silver_employee",
comment="Silver table for employee data with SCD Type 1."
)
dlt.apply_changes(
target="silver_employee",
source="employee_stg",
stored_as_scd_type=1,
keys=["ID"],
sequence_by="DATE"
)
Error -
- Labels:
-
Delta Lake
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-31-2024 11:59 AM
Hi @yvishal519
- Avoid Circular Dependency: The
employee_stg
view should not directly reference thesilver_employee
table. Instead, theapply_changes
function will handle the merging logic. - Use
LIVE
Keyword: Ensure that the source table inapply_changes
is referenced with theLIVE
keyword. - Audit Columns: The
__CreatedDateTime
and__UpdatedDateTime
columns are managed within theapply_changes
function to ensure correct handling of timestamps.
This approach should help you avoid the circular dependency issue and correctly manage the audit columns in your DLT pipeline.
Docs - https://docs.databricks.com/en/delta-live-tables/cdc.html
Thanks!
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-02-2025 01:17 PM
I don't see where or how the __CreatedDateTime and __UpdateDateTime columns are managed within the apply_changes function. I understand __Start_At and __End_At when Type 2 is used but am not seeing what you imply. Would you expand on this in your reply?
Audit Columns: The __CreatedDateTime and __UpdatedDateTime columns are managed within the apply_changes function to ensure correct handling of timestamps.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-06-2025 09:57 AM
Hi @NandiniN - I did not mention you in the above reply though had meant to. I would appreciate if you had ability to provide a follow up.
"I don't see where or how the __CreatedDateTime and __UpdateDateTime columns are managed within the apply_changes function. I understand __Start_At and __End_At when Type 2 is used but am not seeing what you imply. Would you expand on this in your reply?
Audit Columns: The __CreatedDateTime and __UpdatedDateTime columns are managed within the apply_changes function to ensure correct handling of timestamps."
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-08-2025 03:14 PM
Hi @RBlum ,
The original author of this ticket had mentioned.
n addition, we are adding four audit columns:
- __KeyHash: A hash generated from the primary key columns.
- __RowHash: A hash generated from all non-audit columns to track changes.
- __CreatedDateTime: A timestamp indicating when the row was initially created.
- __UpdatedDateTime: A timestamp indicating when the row was last updated.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
a month ago
Hi @yvishal519
Did you end up finding that the " __CreatedDateTime and __UpdatedDateTime columns are managed as expected within the apply_changes function to ensure correct handling of timestamps." as @NandiniN had replied originally OR are you managing the values in your own function outside of apply_changes? Either way can you provide some details in examples?
This has been confusing to me as I find that the lack of Create Date Time and Update Date Time within the apply_changes.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
4 weeks ago
Hi @RBlum , what's your use case? Are you not able to find __CreatedDateTime and __UpdatedDateTime columns and that's what your concern is, or some other challenge?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
4 weeks ago
When a SCD Type 1 is processed using the APPLY CHANGES INTO (or APPLY FROM SNAPSHOT) APIs there are not columns like the "__CreatedDateTime" and "__UpdatedDateTime" automatically created in the structure and maintained through the APPLY.
That is unlike the SCD Type 2 where "__START_AT" and "__END_AT" columns are automatically created in the table structure and maintained through the APPLY.
The use case for having the SCD Type 1 including the something like an automated __START_AT (original creation of the record .. which does not change) and __UPDATE_AT (from latest update of the record) is that it would allow simple identification of what records were first created and last changed which can be utilized downstream or to summarize the activity within the object since previously run.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
3 weeks ago
I haven’t found an ideal solution for handling audit columns effectively in Databricks Delta Live Tables (DLT) when implementing SCD Type 1. It seems there’s no straightforward way to incorporate these columns into the apply_changes function for this scenario.
That said, I completely agree with the previous response from @RBlum this scenario is indeed feasible when implementing SCD Type 2 with Databricks DLT.
If anyone has insights or workarounds for handling audit columns in SCD1, I’d love to hear your thoughts!