- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-17-2025 11:59 AM
Hi,
I am looking to build a ETL process for a incremental load silver table.
This silver table, lets say "contracts_silver", is built by joining two bronze tables, "contracts_raw" and "customer".
contracts_silver
CONTRACT_ID | STATUS | CUSTOMER_NAME |
1 | SIGNED | Peter Smith |
2 | SIGNED | John Smith |
contracts_raw
ID | STATUS | DATE | CUSTOMER_ID |
1 | SIGNED | 2025-01-15 | 1 |
2 | SIGNED | 2025-01-15 | 2 |
customer_raw
ID | NAME | DOB |
1 | Peter Smith | 2025-01-15 |
2 | John Smith | 2025-01-15 |
The "contracts_raw" table will grow faster than "customer_raw".
Updates are supposed to be in batch. In any update, I can have inserts, updates or deletes on both raw tables.
Considering that at any given time I can have updates happening on both raw tables, or only one of then, is databricks capable auomatically detecting the need to update the "contracts_silver" table if:
- Update 1: in contracts_raw, ID 1 is changed to STATUS cancelled
- Update 2: in customer_raw, ID2, name is changed to John J. Smith
The goal is to always reprocess, but only rows that will eventually change in the silver table.
- Labels:
-
Delta Lake
-
Workflows
Accepted Solutions
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-18-2025 06:55 AM
Hi @garciargs ,
Yes, in databricks you can do it using DLT (Delta Live Table) and Spark Structured Streaming, where you have to enable CDF (Change Data Feed) on both contracts_raw and customer_raw which would track all DML changes over raw tables.
-- New Delta table with CDF enabled
CREATE TABLE myDeltaTable (
id INT,
name STRING,
age INT
)
TBLPROPERTIES (delta.enableChangeDataFeed = true);
-- Enable CDF on existing table
ALTER TABLE myDeltaTable
SET TBLPROPERTIES (delta.enableChangeDataFeed = true);
In a DLT notebook, you can read from both tables during data operations such as append, update, and delete, and then update your silver table accordingly. The following code is a rough example of how you can achieve this.
import dlt
from pyspark.sql.functions import col
# Enable CDF on all new tables by default
spark.sql("SET spark.databricks.delta.properties.defaults.enableChangeDataFeed = true")
@dlt.table(quality='bronze')
def customer_raw():
return (spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv") # Change to your file format
.load("s3a://<BUCKET_NAME>/<FILE_PATH>/customer")) # Change to your cloud storage path
@dlt.table(quality='bronze')
def contracts_raw():
return (spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv") # Change to your file format
.load("s3a://<BUCKET_NAME>/<FILE_PATH>/contracts")) # Change to your cloud storage path
@dlt.table(quality='silver')
def contracts_silver():
customer_df = (spark.readStream
.option("readChangeFeed", "true")
.table("customer_raw"))
contracts_df = (spark.readStream
.option("readChangeFeed", "true")
.table("contracts_raw"))
joined_df = customer_df.join(contracts_df, customer_df["customer_id"] == contracts_df["customer_id"], "inner")
# Note: You can perform a merge statement for each batch of data
return joined_df.select(customer_df["*"], contracts_df["contract_details"])
Refer following link to how apply changes works in DLT The APPLY CHANGES APIs: Simplify change data capture with Delta Live Tables | Databricks on AWS
Regards,
Hari Prasad
Regards,
Hari Prasad
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-18-2025 06:55 AM
Hi @garciargs ,
Yes, in databricks you can do it using DLT (Delta Live Table) and Spark Structured Streaming, where you have to enable CDF (Change Data Feed) on both contracts_raw and customer_raw which would track all DML changes over raw tables.
-- New Delta table with CDF enabled
CREATE TABLE myDeltaTable (
id INT,
name STRING,
age INT
)
TBLPROPERTIES (delta.enableChangeDataFeed = true);
-- Enable CDF on existing table
ALTER TABLE myDeltaTable
SET TBLPROPERTIES (delta.enableChangeDataFeed = true);
In a DLT notebook, you can read from both tables during data operations such as append, update, and delete, and then update your silver table accordingly. The following code is a rough example of how you can achieve this.
import dlt
from pyspark.sql.functions import col
# Enable CDF on all new tables by default
spark.sql("SET spark.databricks.delta.properties.defaults.enableChangeDataFeed = true")
@dlt.table(quality='bronze')
def customer_raw():
return (spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv") # Change to your file format
.load("s3a://<BUCKET_NAME>/<FILE_PATH>/customer")) # Change to your cloud storage path
@dlt.table(quality='bronze')
def contracts_raw():
return (spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv") # Change to your file format
.load("s3a://<BUCKET_NAME>/<FILE_PATH>/contracts")) # Change to your cloud storage path
@dlt.table(quality='silver')
def contracts_silver():
customer_df = (spark.readStream
.option("readChangeFeed", "true")
.table("customer_raw"))
contracts_df = (spark.readStream
.option("readChangeFeed", "true")
.table("contracts_raw"))
joined_df = customer_df.join(contracts_df, customer_df["customer_id"] == contracts_df["customer_id"], "inner")
# Note: You can perform a merge statement for each batch of data
return joined_df.select(customer_df["*"], contracts_df["contract_details"])
Refer following link to how apply changes works in DLT The APPLY CHANGES APIs: Simplify change data capture with Delta Live Tables | Databricks on AWS
Regards,
Hari Prasad
Regards,
Hari Prasad
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-20-2025 03:51 AM

