cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

DLT- apply_changes() SCD2 is not applying defined schema only for first run

HoussemBL
New Contributor III

Hello community,

I am using dlt.apply_changes function to implement SCD2. I am specifying the schema of my streaming_table that should result from apply_changes().

This schema contains a generated column.

Somehow, my DLT pipeline returns always in first run my streaming table with generated column set to null.

Whenever, I fully refresh my pipeline, the generated column is computed correctly.

Is there any explanation why this problem arises only in the first run?
How can I avoid it as I want to destroy (using Databricks asset bundle) this pipeline and launches it from scratch only in testing phase of my CI pipeline?

Below you find my code,

dlt.create_streaming_table(
name="silver_table",
schema="""row_id STRING NOT NULL,
col_a STRING NOT NULL,
`__START_AT` TIMESTAMP NOT NULL,
`__END_AT` TIMESTAMP,
last_updated TIMESTAMP,
is_current BOOLEAN NOT NULL GENERATED ALWAYS AS (CASE WHEN `__END_AT` IS NULL THEN true ELSE false END),
""",
cluster_by=["col_a"],
comment="scd2 table in silver layer",
)

dlt.apply_changes(
source="data_input_cdc",
target="silver_table",
keys=["row_id"],
sequence_by=F.col("synced"),
except_column_list=[
"synced",
"record_deleted"
],
stored_as_scd_type=2,
apply_as_deletes=F.expr("record_deleted= true"),
)
2 REPLIES 2

Alberto_Umana
Databricks Employee
Databricks Employee

Hello @HoussemBL,

Here are a few points to consider:

  1. Initialization of Generated Columns: Generated columns, such as is_current, rely on the values of other columns (__END_AT in this case) to be correctly populated. During the first run, if the sequencing or initialization of these columns is not handled correctly, it can result in null values.

  2. Sequencing and Ordering: The apply_changes function uses the sequence_by column to determine the order of changes. If the sequencing is not correctly established during the first run, it can lead to issues with the generated columns. Ensure that the sequence_by column (synced in your case) is correctly populated and ordered. 

When you fully refresh the pipeline, it reprocesses the data, which can correct the sequencing and initialization issues, leading to the correct computation of the generated column

Hello @Alberto_Umana 

Thanks for your reply.
I checked the impact of sequencing and ordering. For that, I ran my DLT pipeline with an input dataset of one row. 
Still, I am getting the same behavior (error for the first run of the DLT pipeline, then success for the second trial with full refresh)

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now