cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

DLT Flow Redeclaration Error After Service Upgrade

DiskoSuperStar
New Contributor

Hi,

 

our delta live tables(Lakeflow declarative pipelines) pipeline started failing after the Sep 30 / Oct 1 service upgrade with the following error :

AnalysisException: Cannot have multiple queries named `<table_name>_realtime_flow` for `<table_name>`.
Additional queries on that table must be named. Note that unnamed queries default
to the same name as the table.

We define multiple append flows dynamically in a loop, e.g. :

 

for config in table_info:
    table_name = config["table_name"]
    path = f"{config['conn_details'].rstrip('/')}/{config['table_name']}/"
    load_type = config["load_type"]

    # create the streaming table
    try:
        dlt.create_streaming_table(name=table_name, comment=f"Raw {table_name} data from S3")
    except Exception as e:
        print("Table already exists")

    if load_type == "DLT":
        # create regular streaming flow
        @Dlt.append_flow(
            target=table_name,
            name=f"{table_name}_realtime_flow",
            comment=f"Raw streaming {table_name} data from S3")
        def _ingest_dynamic_table(table_path=path):
            return (
                spark.readStream
                .format("cloudFiles")
                .option("cloudFiles.format", "json")
                .load(table_path)
            )

    if load_type == "Full_load":
        # create one time append flow
        @Dlt.append_flow(
            target = table_name,
            name = f"{table_name}_ingest_history_flow",
            once = True,
            comment=f"Raw historical {table_name} data from S3"
        )
        def _ingest_historic_table(table_path=path):
            return (
                spark.read
                .format("json")
                .option("recursiveFileLookup", "true")
                .load(table_path)
                .withColumn("md5OfBody", md5(col("body")))
                .withColumn("ingest_date", to_date(col("ingest_ts")).cast("string"))
                .withColumn("hour", hour(col("ingest_ts")).cast("string"))
                .drop("ingest_ts")
            )

 

 

This pattern worked fine before (and matches the official docs example with Kafka topics).
https://docs.databricks.com/aws/en/dlt/flow-examples > second code snippet

The only workaround right now is full refresh or adding unique suffixes to flow names (but that breaks checkpoint resumption).

 

1. Has anyone else hit this since the last service upgrade? My pipeline just stopped and restarted itself by the service_upgrade and haven't been able to make it work since(since October 1st, yesterday).

2. Is this documented behaviour change (enforcement of unique flow names, because i cant seem to find the documentation to support that)?

3. Whats the recommended restart-safe patter if we want stable flow names(to resume from checkpoints) , and not needing to do a full-refresh every time we restart the pipeline?

Any advice would be greatly appreciated! 

Data Plumber
1 REPLY 1

saurabh18cs
Honored Contributor II

Hi @DiskoSuperStar IT seems you’ve run into a recently enforced change in Databricks DLT/Lakeflow:
Multiple flows (append or otherwise) targeting the same table must have unique names. actually it looks correct on your code. Check if your  table_info list contains duplicate entries for the same table and flow type.

  • Assign unique, deterministic names to each flow per table and flow type.
  • Do not change the flow name logic between pipeline runs.

Can you also Reset/refresh pipeline to clear stale state and checkpoints (In the pipeline details page, look for the "Reset" or "Full Refresh")??

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now