- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-19-2022 09:27 AM
def upsertToDelta(microBatchOutputDF, batchId):
microBatchOutputDF.createOrReplaceTempView("updates")
microBatchOutputDF._jdf.sparkSession().sql("""
MERGE INTO old o
USING updates u
ON u.id = o.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
stream_new_df = spark.readStream.format("delta").load(new_data_frame_path)
stream_old_df = spark.readStream.format("delta").load(old_data_frame_path)
stream_old_df.createOrReplaceTempView("old")
stream_new_df.writeStream.format("delta") \
.option("checkpointLocation", "") \
.option("mergeSchema", "true") \
.option("path", "") \
.foreachBatch(upsertToDelta) \
.trigger(once=True) \
.outputMode("update") \
.table("")
I'm trying to execute this code but I get the following error:
Data source com.databricks.sql.transaction.tahoe.sources.DeltaDataSource does not support Update output mode
Accepted Solutions
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-19-2022 09:52 AM
Delta table/file version is too old. Please try to upgrade it as described here https://docs.microsoft.com/en-us/azure/databricks/delta/versioning
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-19-2022 09:52 AM
Delta table/file version is too old. Please try to upgrade it as described here https://docs.microsoft.com/en-us/azure/databricks/delta/versioning
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-19-2022 09:56 AM
Which is the latest version?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-19-2022 10:20 AM
@Hubert Dudek I get the same error
AnalysisException: Data source com.databricks.sql.transaction.tahoe.sources.DeltaDataSource does not support Update output mode
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-19-2022 10:32 AM
I tried with the both ways
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-20-2022 02:38 AM
Did it work? Databricks runtime is also imported as older one (like one used by data factory)
I think you can also refactor code a bit to use .start() in last line not .table() and change a bit def upsertToDelta to just use something like that (it is in scala but similar logic for python) https://docs.databricks.com/_static/notebooks/merge-in-streaming.html
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-20-2022 02:42 AM
@Hubert Dudek The runtime version is 9.1LTS. And I want to use the `.table()` because I want to have a table in my metastore/catalog
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-20-2022 03:06 AM
@Hubert Dudek I also tried with 10.2 runtime and with toTable() but it's the same
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-20-2022 03:54 AM
to have table in metastore just register your delta location there using seperate sql script (it is enough to do that one time):
%sql
CREATE TABLE IF NOT EXISTS your_db.your_table
(
id LONG NOT NULL COMMENT,
......
)
USING DELTA
PARTITIONED BY (partition_column)
LOCATION 'path_to_your_delta'
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-20-2022 06:11 AM
@Hubert Dudek It works like that. I have one more question. How can I include and delete that query?
microBatchOutputDF._jdf.sparkSession().sql("""
MERGE INTO old o
USING updates u
ON u.id= o.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
Or how I can add and delete rows from this pipeline.