cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Delta table upsert - databricks community

weldermartins
Honored Contributor

Hello guys,

I'm trying to use upsert via delta lake following the documentation, but the command doesn't update or insert newlines.

scenario: my source table is separated in bronze layer and updates or inserts are in silver layer.

from delta.tables import *
 
deltaTableVendas = DeltaTable.forPath(spark, '/user/hive/warehouse/bronze.db/vendas')
deltaTableVendasUpdates = DeltaTable.forPath(spark, '/user/hive/warehouse/silver.db/vendas')                                                   
 
dfUpdates = deltaTableVendasUpdates.toDF()
 
deltaTableVendas.alias('vendas') \
  .merge(
    dfUpdates.alias('updates'),
    'vendas.numero_transacao = updates.numero_transacao'
  ) \
  .whenMatchedUpdate(set =
    {
      "numero_transacao": "updates.numero_transacao",
      "numped": "updates.numped",
      "codcli": "updates.codcli",
      "codprod": "updates.codprod",
      "data_venda": "updates.data_venda",
      "quantidade": "updates.quantidade",
      "valor": "updates.valor"
      
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "numero_transacao": "updates.numero_transacao",
      "numped": "updates.numped",
      "codcli": "updates.codcli",
      "codprod": "updates.codprod",
      "data_venda": "updates.data_venda",
      "quantidade": "updates.quantidade",
      "valor": "updates.valor"
    }
  ) \
  .execute()

1 ACCEPTED SOLUTION

Accepted Solutions

-werners-
Esteemed Contributor III

Right now I am kinda confused what you try to do.

In your code, you are merging the bronze table with data from the silver table (so the bronze table is updated), but you post screenshots of the silver table, which does not change.

And that is normal because the data is updated in the bronze table in your code.

View solution in original post

9 REPLIES 9

weldermartins
Honored Contributor

@Hubert Dudek​ , @Werner Stinckens​ 

Do you have any idea what's going on in this case?

-werners-
Esteemed Contributor III

is that the actual location of the data in the delta lake table? seems like a weird place.

The forPath parameter expects the storage location where the data is stored. You point to the location where hive stores its metadata.

Normally this is something like "/mnt/datalake/bronze/deltatable" or something.

hello @Werner Stinckens​ , even changing the table to Datalake still doesn't update. In the images as shown the update does not reflect in the datalake.

imageupsert 

image

-werners-
Esteemed Contributor III

Right now I am kinda confused what you try to do.

In your code, you are merging the bronze table with data from the silver table (so the bronze table is updated), but you post screenshots of the silver table, which does not change.

And that is normal because the data is updated in the bronze table in your code.

@Werner Stinckens​  In fact, the information is inserted in the bronze table, in the silver layer it would receive the update or insertion of the bronze table.

-werners-
Esteemed Contributor III

ok so if the bronze table contains the new updated data which you want to propagate to the silver table, you should create a df with the updates from the bronze table (you use the silver table for that)

Next do a merge into silver table using updates (bronze).

That should work.

@Werner Stinckens​  But the upsert is already setting the two tables, I don't understand why I should have to set it again.

image.png

weldermartins
Honored Contributor

I managed to find the solution. In insert and update I was setting the target.

tanks @Werner Stinckens​ !

delta_df = DeltaTable.forPath(spark, 'dbfs:/mnt/silver/vendas/')                                                   
 
 
delta_df.alias('target').merge(
    source = bronzedf.alias("source"),
    condition = 'target.numero_transacao = source.numero_transacao'
  ) \
  .whenMatchedUpdate(set =
    {
      "numero_transacao": "source.numero_transacao",
      "numped": "source.numped",
      "codcli": "source.codcli",
      "codprod": "source.codprod",
      "data_venda": "source.data_venda",
      "quantidade": "source.quantidade",
      "valor": "source.valor"
      
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "numero_transacao": "source.numero_transacao",
      "numped": "source.numped",
      "codcli": "source.codcli",
      "codprod": "source.codprod",
      "data_venda": "source.data_venda",
      "quantidade": "source.quantidade",
      "valor": "source.valor"
    }
  ) \
  .execute()

-werners-
Esteemed Contributor III

That is what I was trying to explain. Nice you got it working!

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.