cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

DLT multiple source table to single silver table generating unexpected result

garciargs
New Contributor III

Hi,

I´ve been trying this all day long. I'm build a POC of a pipeline that would be used on my everyday ETL.

I have two initial tables, vendas and produtos, and they are as the following:

vendas_raw

venda_idproduto_iddata_vendaquantidadevalor_totaldth_ingestao
1115/01/2025250002025-01-01T12:00:00
2216/01/2025125002025-01-01T12:00:00
3317/01/2025130002025-01-01T12:00:00
4418/01/2025120002025-01-01T12:00:00
5519/01/2025345002025-01-01T12:00:00
6620/01/202518002025-01-01T12:00:00
7721/01/2025220002025-01-01T12:00:00
8822/01/2025112002025-01-01T12:00:00
9923/01/202546002025-01-01T12:00:00
101024/01/202513002025-01-01T12:00:00

produtos_raw

produto_idnome_produtocategoriadth_ingestao
1NotebookEletrônicos2025-01-01T12:00:00
2SmartphoneEletrônicos2025-01-01T12:00:00
3TelevisãoEletrônicos2025-01-01T12:00:00
4GeladeiraEletrodomésticos2025-01-01T12:00:00
5FogãoEletrodomésticos2025-01-01T12:00:00
6Micro-ondasEletrodomésticos2025-01-01T12:00:00
7Cadeira GamerMóveis2025-01-01T12:00:00
8Mesa de EscritórioMóveis2025-01-01T12:00:00
9LumináriaDecoração2025-01-01T12:00:00
10VentiladorEletrodomésticos2025-01-01T12:00:00

The final table would be a join of both, with all columns from vendas_raw and only nome_produto from produtos_raw.

My code is:

 

 

 

 

import dlt
from pyspark.sql.functions import col, current_timestamp, lit

spark.conf.set("spark.databricks.delta.properties.defaults.enableChangeDataFeed", "true")

@dlt.table(
    table_properties={"quality": "bronze"}
)
def vendas_raw():
    return (spark.readStream.format("cloudFiles")
            .option("cloudFiles.format", "csv")  # Change to your file format
            .option("cloudFiles.schemaHints", "venda_id INT, produto_id INT, data_venda TIMESTAMP, quantidade INT, valor INT, dth_ingestao TIMESTAMP")
            .load("/Volumes/testedatabricks/default/datalake_desenvolvimento/DadosBrutos/teste_databricks/vendas/")
            .withColumnRenamed("dth_ingestao", "dth_ingestao_vendas")
            )  # Change to your cloud storage path

@dlt.table(
    table_properties={"quality": "bronze"}
)
def produtos_raw():
    return (spark.readStream.format("cloudFiles")
            .option("cloudFiles.format", "csv")  # Change to your file format
            .option("cloudFiles.schemaHints", "produto_id INT, nome_produto STRING, categoria STRING, dth_ingestao TIMESTAMP")
            .load("/Volumes/testedatabricks/default/datalake_desenvolvimento/DadosBrutos/teste_databricks/produtos/")
            .withColumnRenamed("dth_ingestao", "dth_ingestao_produtos")
    )  # Change to your cloud storage path

@dlt.view()
def vendas_view_silver():
    produtos_df = dlt.readStream("produtos_raw")
    vendas_df = dlt.readStream("vendas_raw")

    joined_df = (
      produtos_df.join(vendas_df, produtos_df["produto_id"] == vendas_df["produto_id"], "inner")
    )

    return joined_df.select(produtos_df["nome_produto"], vendas_df["*"]).withColumn("dth_ingestao", lit(current_timestamp()))


dlt.create_streaming_table("vendas_silver")
dlt.apply_changes(
    target="vendas_silver",
    source=joined_df,
    keys=["venda_id"],
    sequence_by=col("dth_ingestao")
)

 

 

 

 

After first run of the pipeline, the vendas_silver table looks like this (replaced values in data_venda for readability):

nome_produtovenda_idproduto_iddata_vendaquantidadevalor_totaldth_ingestao
Notebook11dt250002025-01-22T02:00:58.673Z
Micro-ondas66dt18002025-01-22T02:00:58.673Z
Televisão33dt130002025-01-22T02:00:58.673Z
Fogão55dt345002025-01-22T02:00:58.673Z
Luminária99dt46002025-01-22T02:00:58.673Z
Geladeira44dt120002025-01-22T02:00:58.673Z
Mesa de Escritório88dt112002025-01-22T02:00:58.673Z
Cadeira Gamer77dt220002025-01-22T02:00:58.673Z
Ventilador1010dt13002025-01-22T02:00:58.673Z
Smartphone22dt125002025-01-22T02:00:58.673Z

 

The problem is: updates will be in batch, and I might receive new batch lines simultaneously for vendas_raw and produtos_raw, but when I try to emulate it (adding new csv files in the folder) I have a strange output.

New produtos_raw

produto_idnome_produtocategoriadth_ingestao
1LaptopEletrônicos2025-01-03T12:00:00
2SmartphoneSmart2025-01-03T12:00:00

New vendas_raw

venda_idproduto_iddata_vendaquantidadevalor_totaldth_ingestao
1115/01/2025245002025-01-03T12:00:00
11125/01/2025150002025-01-03T12:00:00

Note that, in produtos_raw I am updating the nome_produto of produto_id 1 to Laptop (was Notebook) and in vendas_raw, I am updating the valor_total of venda_id 1 to 4500 (was 5000). Both share a relationship on produto_id == 1.

Expected output would be for vendas_id 1 in vendas_silver to have the updated both nome_produto and valor_total.

As you can see in the resulting table (showing only updated rows), valor_total in venda_id = 1 should be 4500, and nome_produto in venda_id = 11 should be Laptop.

nome_produtovenda_idproduto_iddata_vendaquantidadevalor_totaldth_ingestao
Notebook111dt150002025-01-22T02:06:20.823Z
Smartphone22dt125002025-01-22T02:06:20.823Z
Laptop11dt250002025-01-22T02:06:20.823Z

It seens it is processing the streaming data only with the stored data, and not searching to use the most recent.

Thank you in advance, and sorry for long question.

2 REPLIES 2

NandiniN
Databricks Employee
Databricks Employee

When dealing with Change Data Capture (CDC) in Delta Live Tables, it's crucial to handle out-of-order data correctly. You can use the APPLY CHANGES API to manage this. The APPLY CHANGES API ensures that the most recent data is used by specifying a column in the source data to sequence records.

dlt.apply_changes(
    target="vendas_silver",
    source="vendas",
    keys=["venda_id"],
    sequence_by=col("dth_ingestao"),
    apply_as_deletes=expr("operation = 'DELETE'"),
    except_column_list=["operation", "dth_ingestao"],
    stored_as_scd_type="1"
)

Which you are doing already, now ensure that you create and refresh the streaming tables correctly. This will help in propagating the updates downstream.

CREATE OR REFRESH STREAMING TABLE vendas_silver;
APPLY CHANGES INTO live.vendas_silver
FROM stream(vendas_raw)
KEYS (venda_id)
APPLY AS DELETE WHEN operation = 'DELETE'
SEQUENCE BY dth_ingestao
COLUMNS * EXCEPT (operation, dth_ingestao)
STORED AS SCD TYPE 1;

 

Read more here - https://docs.databricks.com/en/delta-live-tables/cdc.html

 

 

garciargs
New Contributor III

Hi @NandiniN , thank you for your reply.

In your example, you're using vendas_raw in the FROM statement, but I need to make a join of vendas_raw and produtos_raw. Using the SQL code you provided, I won't get the result I was expecting, that was the vendas_silver with additional information from produtos_raw.

Is there something I didn't get?

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group