08-19-2022 04:35 AM
Hello guys,
I'm trying to use upsert via delta lake following the documentation, but the command doesn't update or insert newlines.
scenario: my source table is separated in bronze layer and updates or inserts are in silver layer.
from delta.tables import *
deltaTableVendas = DeltaTable.forPath(spark, '/user/hive/warehouse/bronze.db/vendas')
deltaTableVendasUpdates = DeltaTable.forPath(spark, '/user/hive/warehouse/silver.db/vendas')
dfUpdates = deltaTableVendasUpdates.toDF()
deltaTableVendas.alias('vendas') \
.merge(
dfUpdates.alias('updates'),
'vendas.numero_transacao = updates.numero_transacao'
) \
.whenMatchedUpdate(set =
{
"numero_transacao": "updates.numero_transacao",
"numped": "updates.numped",
"codcli": "updates.codcli",
"codprod": "updates.codprod",
"data_venda": "updates.data_venda",
"quantidade": "updates.quantidade",
"valor": "updates.valor"
}
) \
.whenNotMatchedInsert(values =
{
"numero_transacao": "updates.numero_transacao",
"numped": "updates.numped",
"codcli": "updates.codcli",
"codprod": "updates.codprod",
"data_venda": "updates.data_venda",
"quantidade": "updates.quantidade",
"valor": "updates.valor"
}
) \
.execute()
08-22-2022 05:55 AM
Right now I am kinda confused what you try to do.
In your code, you are merging the bronze table with data from the silver table (so the bronze table is updated), but you post screenshots of the silver table, which does not change.
And that is normal because the data is updated in the bronze table in your code.
08-19-2022 09:35 AM
@Hubert Dudek , @Werner Stinckens
Do you have any idea what's going on in this case?
08-21-2022 11:13 PM
is that the actual location of the data in the delta lake table? seems like a weird place.
The forPath parameter expects the storage location where the data is stored. You point to the location where hive stores its metadata.
Normally this is something like "/mnt/datalake/bronze/deltatable" or something.
08-22-2022 05:41 AM
08-22-2022 05:55 AM
Right now I am kinda confused what you try to do.
In your code, you are merging the bronze table with data from the silver table (so the bronze table is updated), but you post screenshots of the silver table, which does not change.
And that is normal because the data is updated in the bronze table in your code.
08-22-2022 06:56 AM
@Werner Stinckens In fact, the information is inserted in the bronze table, in the silver layer it would receive the update or insertion of the bronze table.
08-22-2022 07:01 AM
ok so if the bronze table contains the new updated data which you want to propagate to the silver table, you should create a df with the updates from the bronze table (you use the silver table for that)
Next do a merge into silver table using updates (bronze).
That should work.
08-22-2022 07:08 AM
08-22-2022 11:55 AM
I managed to find the solution. In insert and update I was setting the target.
tanks @Werner Stinckens !
delta_df = DeltaTable.forPath(spark, 'dbfs:/mnt/silver/vendas/')
delta_df.alias('target').merge(
source = bronzedf.alias("source"),
condition = 'target.numero_transacao = source.numero_transacao'
) \
.whenMatchedUpdate(set =
{
"numero_transacao": "source.numero_transacao",
"numped": "source.numped",
"codcli": "source.codcli",
"codprod": "source.codprod",
"data_venda": "source.data_venda",
"quantidade": "source.quantidade",
"valor": "source.valor"
}
) \
.whenNotMatchedInsert(values =
{
"numero_transacao": "source.numero_transacao",
"numped": "source.numped",
"codcli": "source.codcli",
"codprod": "source.codprod",
"data_venda": "source.data_venda",
"quantidade": "source.quantidade",
"valor": "source.valor"
}
) \
.execute()
08-23-2022 12:29 AM
That is what I was trying to explain. Nice you got it working!
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group