โ01-19-2022 09:27 AM
def upsertToDelta(microBatchOutputDF, batchId):
microBatchOutputDF.createOrReplaceTempView("updates")
microBatchOutputDF._jdf.sparkSession().sql("""
MERGE INTO old o
USING updates u
ON u.id = o.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
stream_new_df = spark.readStream.format("delta").load(new_data_frame_path)
stream_old_df = spark.readStream.format("delta").load(old_data_frame_path)
stream_old_df.createOrReplaceTempView("old")
stream_new_df.writeStream.format("delta") \
.option("checkpointLocation", "") \
.option("mergeSchema", "true") \
.option("path", "") \
.foreachBatch(upsertToDelta) \
.trigger(once=True) \
.outputMode("update") \
.table("")
I'm trying to execute this code but I get the following error:
Data source com.databricks.sql.transaction.tahoe.sources.DeltaDataSource does not support Update output mode
โ01-19-2022 09:52 AM
Delta table/file version is too old. Please try to upgrade it as described here https://docs.microsoft.com/en-us/azure/databricks/delta/versioningโ
โ01-19-2022 09:52 AM
Delta table/file version is too old. Please try to upgrade it as described here https://docs.microsoft.com/en-us/azure/databricks/delta/versioningโ
โ01-19-2022 09:56 AM
Which is the latest version?
โ01-19-2022 10:20 AM
@Hubert Dudekโ I get the same error
AnalysisException: Data source com.databricks.sql.transaction.tahoe.sources.DeltaDataSource does not support Update output mode
โ01-19-2022 10:32 AM
I tried with the both ways
โ01-20-2022 02:38 AM
Did it work? Databricks runtime is also imported as older one (like one used by data factory)
I think you can also refactor code a bit to use .start() in last line not .table() and change a bit def upsertToDelta to just use something like that (it is in scala but similar logic for python) https://docs.databricks.com/_static/notebooks/merge-in-streaming.html
โ01-20-2022 02:42 AM
@Hubert Dudekโ The runtime version is 9.1LTS. And I want to use the `.table()` because I want to have a table in my metastore/catalog
โ01-20-2022 03:06 AM
@Hubert Dudekโ I also tried with 10.2 runtime and with toTable() but it's the same
โ01-20-2022 03:54 AM
to have table in metastore just register your delta location there using seperate sql script (it is enough to do that one time):
%sql
CREATE TABLE IF NOT EXISTS your_db.your_table
(
id LONG NOT NULL COMMENT,
......
)
USING DELTA
PARTITIONED BY (partition_column)
LOCATION 'path_to_your_delta'
โ01-20-2022 06:11 AM
@Hubert Dudekโ It works like that. I have one more question. How can I include and delete that query?
microBatchOutputDF._jdf.sparkSession().sql("""
MERGE INTO old o
USING updates u
ON u.id= o.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
Or how I can add and delete rows from this pipeline.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group