I am working on a Delta Live Tables (DLT) pipeline with Unity Catalog, where we are reading data from Azure Data Lake Storage (ADLS) and creating a table in the silver layer with Slowly Changing Dimensions (SCD) Type 1 enabled. In addition, we are adding four audit columns:
- __KeyHash: A hash generated from the primary key columns.
- __RowHash: A hash generated from all non-audit columns to track changes.
- __CreatedDateTime: A timestamp indicating when the row was initially created.
- __UpdatedDateTime: A timestamp indicating when the row was last updated.
Key Challenge:
One of the main challenges I'm facing is ensuring that the __CreatedDateTime remains unchanged when existing records are updated. My approach is to:
- Read the latest data from the source in the DLT pipeline.
- Compare the new data with the existing table to check for updates.
- If a record exists in both the new and existing data, I retrieve the older __CreatedDateTime value from the existing table and use it, while only updating the __UpdatedDateTime.
Problem:
While implementing this, I encountered the following warnings and errors:
Warning:
"Your query 'employee' reads from '<catalog name>.<schema name>.employee' but must read from 'LIVE.employee' instead. Always use the LIVE keyword when referencing tables from the same pipeline so that DLT can track the dependencies in the pipeline."
Error:
"The downstream table 'employee' is referenced when creating the upstream table or view 'employee_stg'. Circular dependencies are not supported in a DLT pipeline. Please remove the dependency between 'employee_stg' and 'employee'."
These issues seem to arise due to a circular dependency in the pipeline when trying to compare the new records with the existing table.
My Code Looks like somethings -
# Cloud file options for CSV files
cloud_file_options_csv = {
"cloudFiles.format": "csv",
"header": "true"
}
# Define the path to your files in ADLS
file_path = "path_to_adls_location/*"
@dlt.view(name="employee_stg")
def stg_employee():
df = (
spark.readStream
.format("cloudFiles")
.options(**cloud_file_options_csv)
.load(file_path)
)
from datetime import datetime
use_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
audit_handler = AuditHandler(use_time)
final_df = audit_handler.addAuditColumns(df, ["ID"])
if spark._jsparkSession.catalog().tableExists("catalog_name.schema_name.silver_employee"):
existing_df = spark.sql("SELECT * FROM catalog_name.schema_name.silver_employee")
# or existing_df = spark.read.table("catalog_name.schema_name.silver_employee")
# or existing_df = spark.readStream.table("catalog_name.schema_name.silver_employee")
# I tried with all above approaches also
final_df = update_created_datetime(existing_df, final_df, ["ID"])
return final_df
dlt.create_streaming_table(
name="silver_employee",
comment="Silver table for employee data with SCD Type 1."
)
dlt.apply_changes(
target="silver_employee",
source="employee_stg",
stored_as_scd_type=1,
keys=["ID"],
sequence_by="DATE"
)
Error -