- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
08-19-2022 04:35 AM
Hello guys,
I'm trying to use upsert via delta lake following the documentation, but the command doesn't update or insert newlines.
scenario: my source table is separated in bronze layer and updates or inserts are in silver layer.
from delta.tables import *
deltaTableVendas = DeltaTable.forPath(spark, '/user/hive/warehouse/bronze.db/vendas')
deltaTableVendasUpdates = DeltaTable.forPath(spark, '/user/hive/warehouse/silver.db/vendas')
dfUpdates = deltaTableVendasUpdates.toDF()
deltaTableVendas.alias('vendas') \
.merge(
dfUpdates.alias('updates'),
'vendas.numero_transacao = updates.numero_transacao'
) \
.whenMatchedUpdate(set =
{
"numero_transacao": "updates.numero_transacao",
"numped": "updates.numped",
"codcli": "updates.codcli",
"codprod": "updates.codprod",
"data_venda": "updates.data_venda",
"quantidade": "updates.quantidade",
"valor": "updates.valor"
}
) \
.whenNotMatchedInsert(values =
{
"numero_transacao": "updates.numero_transacao",
"numped": "updates.numped",
"codcli": "updates.codcli",
"codprod": "updates.codprod",
"data_venda": "updates.data_venda",
"quantidade": "updates.quantidade",
"valor": "updates.valor"
}
) \
.execute()
- Labels:
-
Delta
-
Delta Lake Upsert
-
Source Table
Accepted Solutions
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
08-22-2022 05:55 AM
Right now I am kinda confused what you try to do.
In your code, you are merging the bronze table with data from the silver table (so the bronze table is updated), but you post screenshots of the silver table, which does not change.
And that is normal because the data is updated in the bronze table in your code.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
08-19-2022 09:35 AM
@Hubert Dudek , @Werner Stinckens
Do you have any idea what's going on in this case?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
08-21-2022 11:13 PM
is that the actual location of the data in the delta lake table? seems like a weird place.
The forPath parameter expects the storage location where the data is stored. You point to the location where hive stores its metadata.
Normally this is something like "/mnt/datalake/bronze/deltatable" or something.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
08-22-2022 05:41 AM
hello @Werner Stinckens , even changing the table to Datalake still doesn't update. In the images as shown the update does not reflect in the datalake.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
08-22-2022 05:55 AM
Right now I am kinda confused what you try to do.
In your code, you are merging the bronze table with data from the silver table (so the bronze table is updated), but you post screenshots of the silver table, which does not change.
And that is normal because the data is updated in the bronze table in your code.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
08-22-2022 06:56 AM
@Werner Stinckens In fact, the information is inserted in the bronze table, in the silver layer it would receive the update or insertion of the bronze table.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
08-22-2022 07:01 AM
ok so if the bronze table contains the new updated data which you want to propagate to the silver table, you should create a df with the updates from the bronze table (you use the silver table for that)
Next do a merge into silver table using updates (bronze).
That should work.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
08-22-2022 07:08 AM
@Werner Stinckens But the upsert is already setting the two tables, I don't understand why I should have to set it again.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
08-22-2022 11:55 AM
I managed to find the solution. In insert and update I was setting the target.
tanks @Werner Stinckens !
delta_df = DeltaTable.forPath(spark, 'dbfs:/mnt/silver/vendas/')
delta_df.alias('target').merge(
source = bronzedf.alias("source"),
condition = 'target.numero_transacao = source.numero_transacao'
) \
.whenMatchedUpdate(set =
{
"numero_transacao": "source.numero_transacao",
"numped": "source.numped",
"codcli": "source.codcli",
"codprod": "source.codprod",
"data_venda": "source.data_venda",
"quantidade": "source.quantidade",
"valor": "source.valor"
}
) \
.whenNotMatchedInsert(values =
{
"numero_transacao": "source.numero_transacao",
"numped": "source.numped",
"codcli": "source.codcli",
"codprod": "source.codprod",
"data_venda": "source.data_venda",
"quantidade": "source.quantidade",
"valor": "source.valor"
}
) \
.execute()
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
08-23-2022 12:29 AM
That is what I was trying to explain. Nice you got it working!