We are trying to writing multiple sources to the same target table using DLT, but getting the below errors.
Not sure what we are missing here in the code....
File /databricks/spark/python/dlt/api.py:817, in apply_changes(target, source, keys, sequence_by, where, ignore_null_updates, apply_as_deletes, apply_as_truncates, column_list, except_column_list, stored_as_scd_type, track_history_column_list, track_history_except_column_list, flow_name) 814 raise RuntimeError("Only SCD Type 1 and SCD Type 2 are supported for now.")
dlt.create_streaming_table(name='unified_events_test_11')
dlt.apply_changes(
target="unified_events_test_11",
source="unified_events_pv_raw",
keys=["event_id"],
sequence_by=F.col("cdcTimestamp"),
apply_as_deletes=F.expr("operation = 'D'"),
except_column_list=["operation", "cdcTimestamp"],
)
dlt.apply_changes(
target="unified_events_test_11",
source="unified_events_wc_raw",
keys=["event_id"],
sequence_by=F.col("cdcTimestamp"),
apply_as_deletes=F.expr("operation = 'D'"),
except_column_list=["operation", "cdcTimestamp"],
)
Note: unified_events_pv_raw and unified_events_wc_raw are streaming table
@dlt.table(
name="unified_events_wc_raw"
)
def unified_events_wc_raw():
df = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("sep", "||||")
.schema(wallet_connect_schema)
.load("dbfs:/FileStore/wallet_connect_events")
)