cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Tring to create incremental pipeline but fails when I try to use outputMode "update"

BorislavBlagoev
Valued Contributor III
def upsertToDelta(microBatchOutputDF, batchId): 
  
  microBatchOutputDF.createOrReplaceTempView("updates")
 
  microBatchOutputDF._jdf.sparkSession().sql("""
    MERGE INTO old o
    USING updates u
    ON u.id = o.id
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)
 
stream_new_df = spark.readStream.format("delta").load(new_data_frame_path)
stream_old_df = spark.readStream.format("delta").load(old_data_frame_path)
 
stream_old_df.createOrReplaceTempView("old")
 
stream_new_df.writeStream.format("delta") \
            .option("checkpointLocation", "") \
            .option("mergeSchema", "true") \
            .option("path", "") \
            .foreachBatch(upsertToDelta) \
            .trigger(once=True) \
            .outputMode("update") \
            .table("")

I'm trying to execute this code but I get the following error:

Data source com.databricks.sql.transaction.tahoe.sources.DeltaDataSource does not support Update output mode

1 ACCEPTED SOLUTION

Accepted Solutions

Hubert-Dudek
Esteemed Contributor III

Delta table/file version is too old. Please try to upgrade it as described here https://docs.microsoft.com/en-us/azure/databricks/delta/versioning​

View solution in original post

9 REPLIES 9

Hubert-Dudek
Esteemed Contributor III

Delta table/file version is too old. Please try to upgrade it as described here https://docs.microsoft.com/en-us/azure/databricks/delta/versioning​

Which is the latest version?

@Hubert Dudek​ I get the same error

AnalysisException: Data source com.databricks.sql.transaction.tahoe.sources.DeltaDataSource does not support Update output mode

I tried with the both ways

Hubert-Dudek
Esteemed Contributor III

Did it work? Databricks runtime is also imported as older one (like one used by data factory)

I think you can also refactor code a bit to use .start() in last line not .table() and change a bit def upsertToDelta to just use something like that (it is in scala but similar logic for python) https://docs.databricks.com/_static/notebooks/merge-in-streaming.html

@Hubert Dudek​  The runtime version is 9.1LTS. And I want to use the `.table()` because I want to have a table in my metastore/catalog

@Hubert Dudek​  I also tried with 10.2 runtime and with toTable() but it's the same

Hubert-Dudek
Esteemed Contributor III

to have table in metastore just register your delta location there using seperate sql script (it is enough to do that one time):

%sql
CREATE TABLE IF NOT EXISTS your_db.your_table
( 
 id LONG NOT NULL COMMENT,
 ......
)
USING DELTA
PARTITIONED BY (partition_column)
LOCATION 'path_to_your_delta'

@Hubert Dudek​ It works like that. I have one more question. How can I include and delete that query?

  microBatchOutputDF._jdf.sparkSession().sql("""
    MERGE INTO old o
    USING updates u
    ON u.id= o.id
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)

Or how I can add and delete rows from this pipeline.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group