cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Handling single table in multiple dlt pipeines

ashraf1395
Valued Contributor

So this is my workflow which i am trying to build

There are 2 databricks job Lets say A and B.
So Job A has 3 tasks and the
3rd task Check if target table is present or not 

if present it updates its schema if any changes or normally refreshes the pipeline.
dont
else if table does not existcreates an empty streaming table with schema defined and captured programmatically.

I am able to do this. This particular task is a dlt pipeline.

Similarly Now for Job B. It has 4 tasks,

And the 1st task does some things and then it does the same steps which we do in the 3rd task of Job A i.e creating an empty streaming table or refreshing the present table with schema evolution.

The error that I was getting at first was

 

 

A table can only be owned by one pipeline. Concurrent pipeline operations such as maintenance and full refresh will conflict with each other.

 

 

 After doing some research on databricks docs. I found a solution that we can use append_flow for handling append operations on dame tables with different streaming sources. Though my condition was a little different but I tried.

So in Job B task 1 
if table exists instead of refreshing as it causes error. I used append_flow and read the same table as readSream and then returned it but instead of appending the data in that target table. It created a new view

.

 

 

# If the table exists and is not empty, use it as a streaming source
if table_exists and not table_is_empty:
    dataframe = spark.readStream.table(table_path)
    create_dlt_table(
        dataframe=dataframe,
        table_name=target_table_name,
        schema=schema,
        table_type="streaming"
    )
    log_event(logger, f"Streaming table {target_table_name} initialized with existing data.", "INFO")

# If the table doesn't exist or is empty, create an empty streaming table
else:
    @Dlt.append_flow(target=target_table_name)
    def append_to_target_table():
        source_data = spark.readStream.table(table_path)
        return source_data

 

 

 

I tried to find the error behind this and got the reason that the sources need to be different but since I am reading from the same table and return that same thing in target which any different source. It wont be possible. But even when I tried with a different source. I created a new dataframe and returned it then also. Its creating a view instead of appending data in the streaming table

 

 

 

# If the table exists and is not empty, use it as a streaming source
if table_exists and not table_is_empty:
    dataframe = spark.readStream.table(table_path)
    create_dlt_table(
        dataframe=dataframe,
        table_name=target_table_name,
        schema=schema,
        table_type="streaming"
    )
    log_event(logger, f"Streaming table {target_table_name} initialized with existing data.", "INFO")

# If the table doesn't exist or is empty, create an empty streaming table
else:
    @Dlt.append_flow(target=target_table_name)
    def append_to_target_table():
        initial_data = [("loan_1", Decimal("200.02"), "cust_1", "cust_nm", "unknown")]
        source_data = spark.createDataFrame(initial_data, schema)
        
        return source_data

 

 

 

Is there any way I can handle this scenario using append_flow or by using anything. 
i have to do it in dlt pipelines. I tried merge also but that didn't work.

Later in this same Job B I will have task 2 where I will be ingesting data into this same target table from auto_loader

and I am using append_flow there as well. but I think it might work there bcz there we have completely different source but still I don't know.
I see this databicks doc of backfilling https://docs.databricks.com/en/delta-live-tables/flows.html#example-run-a-one-time-data-backfill though it is not same but maybe I am missing something

3 REPLIES 3

VZLA
Databricks Employee
Databricks Employee

Hi @ashraf1395 ,

Unfortunately, itโ€™s not possible to achieve this using append_flow or similar approaches across multiple pipelines. Delta Live Tables are designed so that each target table is managed and owned by a single pipeline. This means maintenance, full refreshes, and append operations need to happen within the pipeline that owns the table.

If you need to bring in multiple data sources, consider integrating them into one pipeline and using append_flow there. If you must work with multiple pipelines, make sure only one pipeline owns and manages the target table, and let the other pipelines produce upstream data that the owning pipeline can read and append.



ashraf1395
Valued Contributor

""If you must work with multiple pipelines, make sure only one pipeline owns and manages the target table, and let the other pipelines produce upstream data that the owning pipeline can read and append.""


Hi there @VZLA , Can you give an example of it. Or help me correctly understand it.

I can make sure that only one tables owns the target table. But the part where letting other pipelines produce upstream data. I didn't understand. Do you mean other pipelines produce the upstream data which can be appended back in the same target table or new target.

If same target table can you give me a demo example of how to do it. It will be really helpful

 

VZLA
Databricks Employee
Databricks Employee

Hi @ashraf1395 

Yes, your understanding is partially correct. Let me clarify:

  • Only one pipeline can own and manage a target table, including operations like schema evolution, maintenance, and refreshes, etc
  • When other pipelines are mentioned as "producing upstream data" it means they can generate or prepare data that can be consumed by the owning pipeline. These other pipelines do not directly append to the target table but instead write to intermediate or staging locations.

Now, addressing your specific question: Yes, this upstream data can be appended back to the same target table, but the append operation must happen through the owning pipeline. Other pipelines act as feeders.

Example:

Pipeline A (Owning the Table): Handles schema evolution, maintenance, and appends data from both its source and the staging table.

import dlt

@dlt.table(
    name="target_table",
    comment="This table is owned by Pipeline A."
)
def target_table():
    # Read data from its own source
    main_source = dlt.read_stream("source_stream")

    # Append staging data from Pipeline B
    staging_data = dlt.read("staging_table")

    return main_source.unionByName(staging_data, allowMissingColumns=True)

Pipeline B (Generating Upstream Data): Writes data to a staging table that Pipeline A reads.

import dlt

@dlt.table(
    name="staging_table",
    comment="Intermediate data for Pipeline A."
)
def staging_table():
    source_data = spark.readStream.format("delta").load("path_to_source")
    return source_data

 

Workflow Summary

  • Pipeline A owns and maintains the target_table. It consolidates data from its primary source and the staging_table_from_pipeline_b.
  • Pipeline B processes its data and writes to staging_table_from_pipeline_b. It does not directly interact with target_table to avoid ownership conflicts.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group