cancel
Showing results forĀ 
Search instead forĀ 
Did you mean:Ā 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forĀ 
Search instead forĀ 
Did you mean:Ā 

Creating an SCD Type 2 Table with Auto CDC API (One-Time Load + Ongoing Updates)

aonurdemir
Contributor

Hello everyone,

I’m working with two CDC tables:

table_x: 23,467,761 rows (and growing)

table_y: 27,868,173,722 rows

My goal is to build an SCD Type 2 table (table_z) using the Auto CDC API.

The workflow I’d like to achieve is:

Initial Load: Populate table_z using the historical data from table_y (used only once).

Ongoing Updates: Once the initial load is complete, keep table_z updated using the incremental hourly data from table_x.

Constraints / Challenges:

I can usually handle append_flow + streaming tables, but I haven’t found a way to implement it cleanly with the Auto CDC API.

I don’t want to create a merged copy of table_x and table_y with append_flows just for the one-time bootstrap, since table_y is extremely large and would be wasteful to duplicate.

Has anyone set up a similar pattern? using a one-time full load from a large CDC table, followed by incremental Auto CDC updates from another table? If so, how did you approach it?

Thanks in advance!

1 ACCEPTED SOLUTION

Accepted Solutions

aonurdemir
Contributor

I have solved it with the name parameter as this:

dlt.create_streaming_table(name="table_z")
dlt.create_auto_cdc_flow(
name="backfill",
target="table_z",
source="table_y",
keys=["user_id"],
sequence_by=col("source_ts_ms"),
ignore_null_updates=False,
apply_as_deletes=expr("op = 'd'"),
except_column_list=["op", "source_ts_ms"],
stored_as_scd_type="2",
track_history_column_list=["change_date"]
)

dlt.create_auto_cdc_flow(
name="update",
target="table_z",
source="table_x",
keys=["user_id"],
sequence_by=col("source_ts_ms"),
ignore_null_updates=False,
apply_as_deletes=expr("op = 'd'"),
except_column_list=["op", "source_ts_ms"],
stored_as_scd_type="2",
track_history_column_list=["change_date"]
)

 

View solution in original post

2 REPLIES 2

aonurdemir
Contributor

I have solved it with the name parameter as this:

dlt.create_streaming_table(name="table_z")
dlt.create_auto_cdc_flow(
name="backfill",
target="table_z",
source="table_y",
keys=["user_id"],
sequence_by=col("source_ts_ms"),
ignore_null_updates=False,
apply_as_deletes=expr("op = 'd'"),
except_column_list=["op", "source_ts_ms"],
stored_as_scd_type="2",
track_history_column_list=["change_date"]
)

dlt.create_auto_cdc_flow(
name="update",
target="table_z",
source="table_x",
keys=["user_id"],
sequence_by=col("source_ts_ms"),
ignore_null_updates=False,
apply_as_deletes=expr("op = 'd'"),
except_column_list=["op", "source_ts_ms"],
stored_as_scd_type="2",
track_history_column_list=["change_date"]
)

 

szymon_dybczak
Esteemed Contributor III

Great @aonurdemir , thanks for sharing. Could you mark your answer as a solution? 

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now