Hi,
our delta live tables(Lakeflow declarative pipelines) pipeline started failing after the Sep 30 / Oct 1 service upgrade with the following error :
AnalysisException: Cannot have multiple queries named `<table_name>_realtime_flow` for `<table_name>`.
Additional queries on that table must be named. Note that unnamed queries default
to the same name as the table.
We define multiple append flows dynamically in a loop, e.g. :
for config in table_info:
table_name = config["table_name"]
path = f"{config['conn_details'].rstrip('/')}/{config['table_name']}/"
load_type = config["load_type"]
# create the streaming table
try:
dlt.create_streaming_table(name=table_name, comment=f"Raw {table_name} data from S3")
except Exception as e:
print("Table already exists")
if load_type == "DLT":
# create regular streaming flow
@Dlt.append_flow(
target=table_name,
name=f"{table_name}_realtime_flow",
comment=f"Raw streaming {table_name} data from S3")
def _ingest_dynamic_table(table_path=path):
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(table_path)
)
if load_type == "Full_load":
# create one time append flow
@Dlt.append_flow(
target = table_name,
name = f"{table_name}_ingest_history_flow",
once = True,
comment=f"Raw historical {table_name} data from S3"
)
def _ingest_historic_table(table_path=path):
return (
spark.read
.format("json")
.option("recursiveFileLookup", "true")
.load(table_path)
.withColumn("md5OfBody", md5(col("body")))
.withColumn("ingest_date", to_date(col("ingest_ts")).cast("string"))
.withColumn("hour", hour(col("ingest_ts")).cast("string"))
.drop("ingest_ts")
)
This pattern worked fine before (and matches the official docs example with Kafka topics).
https://docs.databricks.com/aws/en/dlt/flow-examples > second code snippet
The only workaround right now is full refresh or adding unique suffixes to flow names (but that breaks checkpoint resumption).
1. Has anyone else hit this since the last service upgrade? My pipeline just stopped and restarted itself by the service_upgrade and haven't been able to make it work since(since October 1st, yesterday).
2. Is this documented behaviour change (enforcement of unique flow names, because i cant seem to find the documentation to support that)?
3. Whats the recommended restart-safe patter if we want stable flow names(to resume from checkpoints) , and not needing to do a full-refresh every time we restart the pipeline?
Any advice would be greatly appreciated!
Data Plumber