cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Incremental load from two tables

garciargs
New Contributor III

Hi, 

I am looking to build a ETL process for a incremental load silver table.

This silver table, lets say "contracts_silver", is built by joining two bronze tables, "contracts_raw" and "customer".

contracts_silver

CONTRACT_IDSTATUSCUSTOMER_NAME
1SIGNEDPeter Smith
2SIGNEDJohn Smith

contracts_raw

IDSTATUSDATECUSTOMER_ID
1SIGNED2025-01-151
2SIGNED2025-01-152

customer_raw

IDNAMEDOB
1Peter Smith2025-01-15
2John Smith2025-01-15

The "contracts_raw" table will grow faster than "customer_raw".

Updates are supposed to be in batch. In any update, I can have inserts, updates or deletes on both raw tables.

Considering that at any given time I can have updates happening on both raw tables, or only one of then, is databricks capable auomatically detecting the need to update the "contracts_silver" table if:

  • Update 1: in contracts_raw, ID 1 is changed to STATUS cancelled
  • Update 2: in customer_raw, ID2, name is changed to John J. Smith

The goal is to always reprocess, but only rows that will eventually change in the silver table.

1 ACCEPTED SOLUTION

Accepted Solutions

hari-prasad
Valued Contributor II

Hi @garciargs ,
Yes, in databricks you can do it using DLT (Delta Live Table) and Spark Structured Streaming, where you have to enable CDF (Change Data Feed) on both contracts_raw and customer_raw which would track all DML changes over raw tables.

-- New Delta table with CDF enabled
CREATE TABLE myDeltaTable (
    id INT,
    name STRING,
    age INT
)
TBLPROPERTIES (delta.enableChangeDataFeed = true);

-- Enable CDF on existing table
ALTER TABLE myDeltaTable
SET TBLPROPERTIES (delta.enableChangeDataFeed = true);

 

In a DLT notebook, you can read from both tables during data operations such as append, update, and delete, and then update your silver table accordingly. The following code is a rough example of how you can achieve this.

import dlt
from pyspark.sql.functions import col

# Enable CDF on all new tables by default
spark.sql("SET spark.databricks.delta.properties.defaults.enableChangeDataFeed = true")

@dlt.table(quality='bronze')
def customer_raw():
    return (spark.readStream.format("cloudFiles")
            .option("cloudFiles.format", "csv")  # Change to your file format
            .load("s3a://<BUCKET_NAME>/<FILE_PATH>/customer"))  # Change to your cloud storage path

@dlt.table(quality='bronze')
def contracts_raw():
    return (spark.readStream.format("cloudFiles")
            .option("cloudFiles.format", "csv")  # Change to your file format
            .load("s3a://<BUCKET_NAME>/<FILE_PATH>/contracts"))  # Change to your cloud storage path

@dlt.table(quality='silver')
def contracts_silver():
    customer_df = (spark.readStream
                   .option("readChangeFeed", "true")
                   .table("customer_raw"))

    contracts_df = (spark.readStream
                    .option("readChangeFeed", "true")
                    .table("contracts_raw"))

    joined_df = customer_df.join(contracts_df, customer_df["customer_id"] == contracts_df["customer_id"], "inner")
    # Note: You can perform a merge statement for each batch of data
    return joined_df.select(customer_df["*"], contracts_df["contract_details"])

Refer following link to how apply changes works in DLT The APPLY CHANGES APIs: Simplify change data capture with Delta Live Tables | Databricks on AWS


Regards,
Hari Prasad



Regards,
Hari Prasad

View solution in original post

2 REPLIES 2

hari-prasad
Valued Contributor II

Hi @garciargs ,
Yes, in databricks you can do it using DLT (Delta Live Table) and Spark Structured Streaming, where you have to enable CDF (Change Data Feed) on both contracts_raw and customer_raw which would track all DML changes over raw tables.

-- New Delta table with CDF enabled
CREATE TABLE myDeltaTable (
    id INT,
    name STRING,
    age INT
)
TBLPROPERTIES (delta.enableChangeDataFeed = true);

-- Enable CDF on existing table
ALTER TABLE myDeltaTable
SET TBLPROPERTIES (delta.enableChangeDataFeed = true);

 

In a DLT notebook, you can read from both tables during data operations such as append, update, and delete, and then update your silver table accordingly. The following code is a rough example of how you can achieve this.

import dlt
from pyspark.sql.functions import col

# Enable CDF on all new tables by default
spark.sql("SET spark.databricks.delta.properties.defaults.enableChangeDataFeed = true")

@dlt.table(quality='bronze')
def customer_raw():
    return (spark.readStream.format("cloudFiles")
            .option("cloudFiles.format", "csv")  # Change to your file format
            .load("s3a://<BUCKET_NAME>/<FILE_PATH>/customer"))  # Change to your cloud storage path

@dlt.table(quality='bronze')
def contracts_raw():
    return (spark.readStream.format("cloudFiles")
            .option("cloudFiles.format", "csv")  # Change to your file format
            .load("s3a://<BUCKET_NAME>/<FILE_PATH>/contracts"))  # Change to your cloud storage path

@dlt.table(quality='silver')
def contracts_silver():
    customer_df = (spark.readStream
                   .option("readChangeFeed", "true")
                   .table("customer_raw"))

    contracts_df = (spark.readStream
                    .option("readChangeFeed", "true")
                    .table("contracts_raw"))

    joined_df = customer_df.join(contracts_df, customer_df["customer_id"] == contracts_df["customer_id"], "inner")
    # Note: You can perform a merge statement for each batch of data
    return joined_df.select(customer_df["*"], contracts_df["contract_details"])

Refer following link to how apply changes works in DLT The APPLY CHANGES APIs: Simplify change data capture with Delta Live Tables | Databricks on AWS


Regards,
Hari Prasad



Regards,
Hari Prasad

garciargs
New Contributor III

Hi @hari-prasad ,

Thank you! Will give it a try.

Regards!

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group