cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Delta table upsert - databricks community

weldermartins
Honored Contributor

Hello guys,

I'm trying to use upsert via delta lake following the documentation, but the command doesn't update or insert newlines.

scenario: my source table is separated in bronze layer and updates or inserts are in silver layer.

from delta.tables import *
 
deltaTableVendas = DeltaTable.forPath(spark, '/user/hive/warehouse/bronze.db/vendas')
deltaTableVendasUpdates = DeltaTable.forPath(spark, '/user/hive/warehouse/silver.db/vendas')                                                   
 
dfUpdates = deltaTableVendasUpdates.toDF()
 
deltaTableVendas.alias('vendas') \
  .merge(
    dfUpdates.alias('updates'),
    'vendas.numero_transacao = updates.numero_transacao'
  ) \
  .whenMatchedUpdate(set =
    {
      "numero_transacao": "updates.numero_transacao",
      "numped": "updates.numped",
      "codcli": "updates.codcli",
      "codprod": "updates.codprod",
      "data_venda": "updates.data_venda",
      "quantidade": "updates.quantidade",
      "valor": "updates.valor"
      
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "numero_transacao": "updates.numero_transacao",
      "numped": "updates.numped",
      "codcli": "updates.codcli",
      "codprod": "updates.codprod",
      "data_venda": "updates.data_venda",
      "quantidade": "updates.quantidade",
      "valor": "updates.valor"
    }
  ) \
  .execute()

1 ACCEPTED SOLUTION

Accepted Solutions

-werners-
Esteemed Contributor III

Right now I am kinda confused what you try to do.

In your code, you are merging the bronze table with data from the silver table (so the bronze table is updated), but you post screenshots of the silver table, which does not change.

And that is normal because the data is updated in the bronze table in your code.

View solution in original post

9 REPLIES 9

weldermartins
Honored Contributor

@Hubert Dudek​ , @Werner Stinckens​ 

Do you have any idea what's going on in this case?

-werners-
Esteemed Contributor III

is that the actual location of the data in the delta lake table? seems like a weird place.

The forPath parameter expects the storage location where the data is stored. You point to the location where hive stores its metadata.

Normally this is something like "/mnt/datalake/bronze/deltatable" or something.

hello @Werner Stinckens​ , even changing the table to Datalake still doesn't update. In the images as shown the update does not reflect in the datalake.

imageupsert 

image

-werners-
Esteemed Contributor III

Right now I am kinda confused what you try to do.

In your code, you are merging the bronze table with data from the silver table (so the bronze table is updated), but you post screenshots of the silver table, which does not change.

And that is normal because the data is updated in the bronze table in your code.

@Werner Stinckens​  In fact, the information is inserted in the bronze table, in the silver layer it would receive the update or insertion of the bronze table.

-werners-
Esteemed Contributor III

ok so if the bronze table contains the new updated data which you want to propagate to the silver table, you should create a df with the updates from the bronze table (you use the silver table for that)

Next do a merge into silver table using updates (bronze).

That should work.

@Werner Stinckens​  But the upsert is already setting the two tables, I don't understand why I should have to set it again.

image.png

weldermartins
Honored Contributor

I managed to find the solution. In insert and update I was setting the target.

tanks @Werner Stinckens​ !

delta_df = DeltaTable.forPath(spark, 'dbfs:/mnt/silver/vendas/')                                                   
 
 
delta_df.alias('target').merge(
    source = bronzedf.alias("source"),
    condition = 'target.numero_transacao = source.numero_transacao'
  ) \
  .whenMatchedUpdate(set =
    {
      "numero_transacao": "source.numero_transacao",
      "numped": "source.numped",
      "codcli": "source.codcli",
      "codprod": "source.codprod",
      "data_venda": "source.data_venda",
      "quantidade": "source.quantidade",
      "valor": "source.valor"
      
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "numero_transacao": "source.numero_transacao",
      "numped": "source.numped",
      "codcli": "source.codcli",
      "codprod": "source.codprod",
      "data_venda": "source.data_venda",
      "quantidade": "source.quantidade",
      "valor": "source.valor"
    }
  ) \
  .execute()

-werners-
Esteemed Contributor III

That is what I was trying to explain. Nice you got it working!

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group