cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Tring to create incremental pipeline but fails when I try to use outputMode "update"

BorislavBlagoev
Valued Contributor III
def upsertToDelta(microBatchOutputDF, batchId): 
  
  microBatchOutputDF.createOrReplaceTempView("updates")
 
  microBatchOutputDF._jdf.sparkSession().sql("""
    MERGE INTO old o
    USING updates u
    ON u.id = o.id
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)
 
stream_new_df = spark.readStream.format("delta").load(new_data_frame_path)
stream_old_df = spark.readStream.format("delta").load(old_data_frame_path)
 
stream_old_df.createOrReplaceTempView("old")
 
stream_new_df.writeStream.format("delta") \
            .option("checkpointLocation", "") \
            .option("mergeSchema", "true") \
            .option("path", "") \
            .foreachBatch(upsertToDelta) \
            .trigger(once=True) \
            .outputMode("update") \
            .table("")

I'm trying to execute this code but I get the following error:

Data source com.databricks.sql.transaction.tahoe.sources.DeltaDataSource does not support Update output mode

1 ACCEPTED SOLUTION

Accepted Solutions

Hubert-Dudek
Esteemed Contributor III

Delta table/file version is too old. Please try to upgrade it as described here https://docs.microsoft.com/en-us/azure/databricks/delta/versioning​

View solution in original post

9 REPLIES 9

Hubert-Dudek
Esteemed Contributor III

Delta table/file version is too old. Please try to upgrade it as described here https://docs.microsoft.com/en-us/azure/databricks/delta/versioning​

Which is the latest version?

@Hubert Dudek​ I get the same error

AnalysisException: Data source com.databricks.sql.transaction.tahoe.sources.DeltaDataSource does not support Update output mode

I tried with the both ways

Hubert-Dudek
Esteemed Contributor III

Did it work? Databricks runtime is also imported as older one (like one used by data factory)

I think you can also refactor code a bit to use .start() in last line not .table() and change a bit def upsertToDelta to just use something like that (it is in scala but similar logic for python) https://docs.databricks.com/_static/notebooks/merge-in-streaming.html

@Hubert Dudek​  The runtime version is 9.1LTS. And I want to use the `.table()` because I want to have a table in my metastore/catalog

@Hubert Dudek​  I also tried with 10.2 runtime and with toTable() but it's the same

Hubert-Dudek
Esteemed Contributor III

to have table in metastore just register your delta location there using seperate sql script (it is enough to do that one time):

%sql
CREATE TABLE IF NOT EXISTS your_db.your_table
( 
 id LONG NOT NULL COMMENT,
 ......
)
USING DELTA
PARTITIONED BY (partition_column)
LOCATION 'path_to_your_delta'

@Hubert Dudek​ It works like that. I have one more question. How can I include and delete that query?

  microBatchOutputDF._jdf.sparkSession().sql("""
    MERGE INTO old o
    USING updates u
    ON u.id= o.id
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)

Or how I can add and delete rows from this pipeline.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.