Hi,
I´ve been trying this all day long. I'm build a POC of a pipeline that would be used on my everyday ETL.
I have two initial tables, vendas and produtos, and they are as the following:
vendas_raw
venda_id | produto_id | data_venda | quantidade | valor_total | dth_ingestao |
1 | 1 | 15/01/2025 | 2 | 5000 | 2025-01-01T12:00:00 |
2 | 2 | 16/01/2025 | 1 | 2500 | 2025-01-01T12:00:00 |
3 | 3 | 17/01/2025 | 1 | 3000 | 2025-01-01T12:00:00 |
4 | 4 | 18/01/2025 | 1 | 2000 | 2025-01-01T12:00:00 |
5 | 5 | 19/01/2025 | 3 | 4500 | 2025-01-01T12:00:00 |
6 | 6 | 20/01/2025 | 1 | 800 | 2025-01-01T12:00:00 |
7 | 7 | 21/01/2025 | 2 | 2000 | 2025-01-01T12:00:00 |
8 | 8 | 22/01/2025 | 1 | 1200 | 2025-01-01T12:00:00 |
9 | 9 | 23/01/2025 | 4 | 600 | 2025-01-01T12:00:00 |
10 | 10 | 24/01/2025 | 1 | 300 | 2025-01-01T12:00:00 |
produtos_raw
produto_id | nome_produto | categoria | dth_ingestao |
1 | Notebook | Eletrônicos | 2025-01-01T12:00:00 |
2 | Smartphone | Eletrônicos | 2025-01-01T12:00:00 |
3 | Televisão | Eletrônicos | 2025-01-01T12:00:00 |
4 | Geladeira | Eletrodomésticos | 2025-01-01T12:00:00 |
5 | Fogão | Eletrodomésticos | 2025-01-01T12:00:00 |
6 | Micro-ondas | Eletrodomésticos | 2025-01-01T12:00:00 |
7 | Cadeira Gamer | Móveis | 2025-01-01T12:00:00 |
8 | Mesa de Escritório | Móveis | 2025-01-01T12:00:00 |
9 | Luminária | Decoração | 2025-01-01T12:00:00 |
10 | Ventilador | Eletrodomésticos | 2025-01-01T12:00:00 |
The final table would be a join of both, with all columns from vendas_raw and only nome_produto from produtos_raw.
My code is:
import dlt
from pyspark.sql.functions import col, current_timestamp, lit
spark.conf.set("spark.databricks.delta.properties.defaults.enableChangeDataFeed", "true")
@dlt.table(
table_properties={"quality": "bronze"}
)
def vendas_raw():
return (spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv") # Change to your file format
.option("cloudFiles.schemaHints", "venda_id INT, produto_id INT, data_venda TIMESTAMP, quantidade INT, valor INT, dth_ingestao TIMESTAMP")
.load("/Volumes/testedatabricks/default/datalake_desenvolvimento/DadosBrutos/teste_databricks/vendas/")
.withColumnRenamed("dth_ingestao", "dth_ingestao_vendas")
) # Change to your cloud storage path
@dlt.table(
table_properties={"quality": "bronze"}
)
def produtos_raw():
return (spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv") # Change to your file format
.option("cloudFiles.schemaHints", "produto_id INT, nome_produto STRING, categoria STRING, dth_ingestao TIMESTAMP")
.load("/Volumes/testedatabricks/default/datalake_desenvolvimento/DadosBrutos/teste_databricks/produtos/")
.withColumnRenamed("dth_ingestao", "dth_ingestao_produtos")
) # Change to your cloud storage path
@dlt.view()
def vendas_view_silver():
produtos_df = dlt.readStream("produtos_raw")
vendas_df = dlt.readStream("vendas_raw")
joined_df = (
produtos_df.join(vendas_df, produtos_df["produto_id"] == vendas_df["produto_id"], "inner")
)
return joined_df.select(produtos_df["nome_produto"], vendas_df["*"]).withColumn("dth_ingestao", lit(current_timestamp()))
dlt.create_streaming_table("vendas_silver")
dlt.apply_changes(
target="vendas_silver",
source=joined_df,
keys=["venda_id"],
sequence_by=col("dth_ingestao")
)
After first run of the pipeline, the vendas_silver table looks like this (replaced values in data_venda for readability):
nome_produto | venda_id | produto_id | data_venda | quantidade | valor_total | dth_ingestao |
Notebook | 1 | 1 | dt | 2 | 5000 | 2025-01-22T02:00:58.673Z |
Micro-ondas | 6 | 6 | dt | 1 | 800 | 2025-01-22T02:00:58.673Z |
Televisão | 3 | 3 | dt | 1 | 3000 | 2025-01-22T02:00:58.673Z |
Fogão | 5 | 5 | dt | 3 | 4500 | 2025-01-22T02:00:58.673Z |
Luminária | 9 | 9 | dt | 4 | 600 | 2025-01-22T02:00:58.673Z |
Geladeira | 4 | 4 | dt | 1 | 2000 | 2025-01-22T02:00:58.673Z |
Mesa de Escritório | 8 | 8 | dt | 1 | 1200 | 2025-01-22T02:00:58.673Z |
Cadeira Gamer | 7 | 7 | dt | 2 | 2000 | 2025-01-22T02:00:58.673Z |
Ventilador | 10 | 10 | dt | 1 | 300 | 2025-01-22T02:00:58.673Z |
Smartphone | 2 | 2 | dt | 1 | 2500 | 2025-01-22T02:00:58.673Z |
The problem is: updates will be in batch, and I might receive new batch lines simultaneously for vendas_raw and produtos_raw, but when I try to emulate it (adding new csv files in the folder) I have a strange output.
New produtos_raw
produto_id | nome_produto | categoria | dth_ingestao |
1 | Laptop | Eletrônicos | 2025-01-03T12:00:00 |
2 | Smartphone | Smart | 2025-01-03T12:00:00 |
New vendas_raw
venda_id | produto_id | data_venda | quantidade | valor_total | dth_ingestao |
1 | 1 | 15/01/2025 | 2 | 4500 | 2025-01-03T12:00:00 |
11 | 1 | 25/01/2025 | 1 | 5000 | 2025-01-03T12:00:00 |
Note that, in produtos_raw I am updating the nome_produto of produto_id 1 to Laptop (was Notebook) and in vendas_raw, I am updating the valor_total of venda_id 1 to 4500 (was 5000). Both share a relationship on produto_id == 1.
Expected output would be for vendas_id 1 in vendas_silver to have the updated both nome_produto and valor_total.
As you can see in the resulting table (showing only updated rows), valor_total in venda_id = 1 should be 4500, and nome_produto in venda_id = 11 should be Laptop.
nome_produto | venda_id | produto_id | data_venda | quantidade | valor_total | dth_ingestao |
Notebook | 11 | 1 | dt | 1 | 5000 | 2025-01-22T02:06:20.823Z |
Smartphone | 2 | 2 | dt | 1 | 2500 | 2025-01-22T02:06:20.823Z |
Laptop | 1 | 1 | dt | 2 | 5000 | 2025-01-22T02:06:20.823Z |
It seens it is processing the streaming data only with the stored data, and not searching to use the most recent.
Thank you in advance, and sorry for long question.