cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Handling Audit Columns and SCD Type 1 in Databricks DLT Pipeline with Unity Catalog: Circular Depend

yvishal519
Contributor

I am working on a Delta Live Tables (DLT) pipeline with Unity Catalog, where we are reading data from Azure Data Lake Storage (ADLS) and creating a table in the silver layer with Slowly Changing Dimensions (SCD) Type 1 enabled. In addition, we are adding four audit columns:

  • __KeyHash: A hash generated from the primary key columns.
  • __RowHash: A hash generated from all non-audit columns to track changes.
  • __CreatedDateTime: A timestamp indicating when the row was initially created.
  • __UpdatedDateTime: A timestamp indicating when the row was last updated.

Key Challenge:

One of the main challenges I'm facing is ensuring that the __CreatedDateTime remains unchanged when existing records are updated. My approach is to:

  1. Read the latest data from the source in the DLT pipeline.
  2. Compare the new data with the existing table to check for updates.
  3. If a record exists in both the new and existing data, I retrieve the older __CreatedDateTime value from the existing table and use it, while only updating the __UpdatedDateTime.

Problem:

While implementing this, I encountered the following warnings and errors:

  1. Warning:
    "Your query 'employee' reads from '<catalog name>.<schema name>.employee' but must read from 'LIVE.employee' instead. Always use the LIVE keyword when referencing tables from the same pipeline so that DLT can track the dependencies in the pipeline."

  2. Error:
    "The downstream table 'employee' is referenced when creating the upstream table or view 'employee_stg'. Circular dependencies are not supported in a DLT pipeline. Please remove the dependency between 'employee_stg' and 'employee'."

These issues seem to arise due to a circular dependency in the pipeline when trying to compare the new records with the existing table.

My Code Looks like somethings

 

# Cloud file options for CSV files
cloud_file_options_csv = {
"cloudFiles.format": "csv",
"header": "true"
}

# Define the path to your files in ADLS
file_path = "path_to_adls_location/*"

@dlt.view(name="employee_stg")
def stg_employee():
df = (
spark.readStream
.format("cloudFiles")
.options(**cloud_file_options_csv)
.load(file_path)
)

from datetime import datetime
use_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
audit_handler = AuditHandler(use_time)
final_df = audit_handler.addAuditColumns(df, ["ID"])

if spark._jsparkSession.catalog().tableExists("catalog_name.schema_name.silver_employee"):
existing_df = spark.sql("SELECT * FROM catalog_name.schema_name.silver_employee")
# or existing_df = spark.read.table("catalog_name.schema_name.silver_employee")
# or existing_df = spark.readStream.table("catalog_name.schema_name.silver_employee")
# I tried with all above approaches also
final_df = update_created_datetime(existing_df, final_df, ["ID"])

return final_df

dlt.create_streaming_table(
name="silver_employee",
comment="Silver table for employee data with SCD Type 1."
)

dlt.apply_changes(
target="silver_employee",
source="employee_stg",
stored_as_scd_type=1,
keys=["ID"],
sequence_by="DATE"
)

 



Error - 

yvishal519_0-1729619599002.png

 

0 REPLIES 0

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group