cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Handling Audit Columns and SCD Type 1 in Databricks DLT Pipeline with Unity Catalog: Circular Depend

yvishal519
Contributor

I am working on a Delta Live Tables (DLT) pipeline with Unity Catalog, where we are reading data from Azure Data Lake Storage (ADLS) and creating a table in the silver layer with Slowly Changing Dimensions (SCD) Type 1 enabled. In addition, we are adding four audit columns:

  • __KeyHash: A hash generated from the primary key columns.
  • __RowHash: A hash generated from all non-audit columns to track changes.
  • __CreatedDateTime: A timestamp indicating when the row was initially created.
  • __UpdatedDateTime: A timestamp indicating when the row was last updated.

Key Challenge:

One of the main challenges I'm facing is ensuring that the __CreatedDateTime remains unchanged when existing records are updated. My approach is to:

  1. Read the latest data from the source in the DLT pipeline.
  2. Compare the new data with the existing table to check for updates.
  3. If a record exists in both the new and existing data, I retrieve the older __CreatedDateTime value from the existing table and use it, while only updating the __UpdatedDateTime.

Problem:

While implementing this, I encountered the following warnings and errors:

  1. Warning:
    "Your query 'employee' reads from '<catalog name>.<schema name>.employee' but must read from 'LIVE.employee' instead. Always use the LIVE keyword when referencing tables from the same pipeline so that DLT can track the dependencies in the pipeline."

  2. Error:
    "The downstream table 'employee' is referenced when creating the upstream table or view 'employee_stg'. Circular dependencies are not supported in a DLT pipeline. Please remove the dependency between 'employee_stg' and 'employee'."

These issues seem to arise due to a circular dependency in the pipeline when trying to compare the new records with the existing table.

My Code Looks like somethings

 

# Cloud file options for CSV files
cloud_file_options_csv = {
"cloudFiles.format": "csv",
"header": "true"
}

# Define the path to your files in ADLS
file_path = "path_to_adls_location/*"

@dlt.view(name="employee_stg")
def stg_employee():
df = (
spark.readStream
.format("cloudFiles")
.options(**cloud_file_options_csv)
.load(file_path)
)

from datetime import datetime
use_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
audit_handler = AuditHandler(use_time)
final_df = audit_handler.addAuditColumns(df, ["ID"])

if spark._jsparkSession.catalog().tableExists("catalog_name.schema_name.silver_employee"):
existing_df = spark.sql("SELECT * FROM catalog_name.schema_name.silver_employee")
# or existing_df = spark.read.table("catalog_name.schema_name.silver_employee")
# or existing_df = spark.readStream.table("catalog_name.schema_name.silver_employee")
# I tried with all above approaches also
final_df = update_created_datetime(existing_df, final_df, ["ID"])

return final_df

dlt.create_streaming_table(
name="silver_employee",
comment="Silver table for employee data with SCD Type 1."
)

dlt.apply_changes(
target="silver_employee",
source="employee_stg",
stored_as_scd_type=1,
keys=["ID"],
sequence_by="DATE"
)

 



Error - 

yvishal519_0-1729619599002.png

 

8 REPLIES 8

NandiniN
Databricks Employee
Databricks Employee

Hi @yvishal519 

  • Avoid Circular Dependency: The employee_stg view should not directly reference the silver_employee table. Instead, the apply_changes function will handle the merging logic.
  • Use LIVE Keyword: Ensure that the source table in apply_changes is referenced with the LIVE keyword.
  • Audit Columns: The __CreatedDateTime and __UpdatedDateTime columns are managed within the apply_changes function to ensure correct handling of timestamps.

This approach should help you avoid the circular dependency issue and correctly manage the audit columns in your DLT pipeline.

Docs - https://docs.databricks.com/en/delta-live-tables/cdc.html

Thanks!

RBlum
New Contributor III

I don't see where or how the __CreatedDateTime and __UpdateDateTime columns are managed within the apply_changes function. I understand __Start_At and __End_At when Type 2 is used but am not seeing what you imply. Would you expand on this in your reply?

Audit Columns: The __CreatedDateTime and __UpdatedDateTime columns are managed within the apply_changes function to ensure correct handling of timestamps.

 

RBlum
New Contributor III

Hi @NandiniN - I did not mention you in the above reply though had meant to. I would appreciate if you had ability to provide a follow up.

"I don't see where or how the __CreatedDateTime and __UpdateDateTime columns are managed within the apply_changes function. I understand __Start_At and __End_At when Type 2 is used but am not seeing what you imply. Would you expand on this in your reply?

Audit Columns: The __CreatedDateTime and __UpdatedDateTime columns are managed within the apply_changes function to ensure correct handling of timestamps."

NandiniN
Databricks Employee
Databricks Employee

 

Hi @RBlum ,

The original author of this ticket had mentioned.

n addition, we are adding four audit columns:

  • __KeyHash: A hash generated from the primary key columns.
  • __RowHash: A hash generated from all non-audit columns to track changes.
  • __CreatedDateTime: A timestamp indicating when the row was initially created.
  • __UpdatedDateTime: A timestamp indicating when the row was last updated.

RBlum
New Contributor III

Hi @yvishal519 
Did you end up finding that the " __CreatedDateTime and __UpdatedDateTime columns are managed as expected within the apply_changes function to ensure correct handling of timestamps." as  @NandiniN had replied originally OR are you managing the values in your own function outside of apply_changes? Either way can you provide some details in examples?
This has been confusing to me as I find that the lack of Create Date Time and Update Date Time within the apply_changes.

NandiniN
Databricks Employee
Databricks Employee

Hi @RBlum , what's your use case? Are you not able to find __CreatedDateTime and __UpdatedDateTime columns and that's what your concern is, or some other challenge?

RBlum
New Contributor III

When a SCD Type 1 is processed using the APPLY CHANGES INTO (or APPLY FROM SNAPSHOT) APIs there are not columns like the  "__CreatedDateTime" and "__UpdatedDateTime" automatically created in the structure and maintained through the APPLY. 
That is unlike the SCD Type 2 where "__START_AT" and "__END_AT" columns are automatically created in the table structure and maintained through the APPLY. 

The use case for having the SCD Type 1 including the something like an automated  __START_AT (original creation of the record .. which does not change) and __UPDATE_AT (from latest update of the record) is that it would allow simple identification of what records were first created and last changed which can be utilized downstream or to summarize the activity within the object since previously run.

yvishal519
Contributor

@NandiniN  @RBlum 

I haven’t found an ideal solution for handling audit columns effectively in Databricks Delta Live Tables (DLT) when implementing SCD Type 1. It seems there’s no straightforward way to incorporate these columns into the apply_changes function for this scenario.

That said, I completely agree with the previous response from @RBlum this scenario is indeed feasible when implementing SCD Type 2 with Databricks DLT.

If anyone has insights or workarounds for handling audit columns in SCD1, I’d love to hear your thoughts!

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group