โ12-13-2024 02:28 AM
So this is my workflow which i am trying to build
There are 2 databricks job Lets say A and B.
So Job A has 3 tasks and the
3rd task Check if target table is present or not
if present it updates its schema if any changes or normally refreshes the pipeline.
dont
else if table does not existcreates an empty streaming table with schema defined and captured programmatically.
I am able to do this. This particular task is a dlt pipeline.
Similarly Now for Job B. It has 4 tasks,
And the 1st task does some things and then it does the same steps which we do in the 3rd task of Job A i.e creating an empty streaming table or refreshing the present table with schema evolution.
The error that I was getting at first was
A table can only be owned by one pipeline. Concurrent pipeline operations such as maintenance and full refresh will conflict with each other.
After doing some research on databricks docs. I found a solution that we can use append_flow for handling append operations on dame tables with different streaming sources. Though my condition was a little different but I tried.
So in Job B task 1
if table exists instead of refreshing as it causes error. I used append_flow and read the same table as readSream and then returned it but instead of appending the data in that target table. It created a new view
.
# If the table exists and is not empty, use it as a streaming source
if table_exists and not table_is_empty:
dataframe = spark.readStream.table(table_path)
create_dlt_table(
dataframe=dataframe,
table_name=target_table_name,
schema=schema,
table_type="streaming"
)
log_event(logger, f"Streaming table {target_table_name} initialized with existing data.", "INFO")
# If the table doesn't exist or is empty, create an empty streaming table
else:
@Dlt.append_flow(target=target_table_name)
def append_to_target_table():
source_data = spark.readStream.table(table_path)
return source_data
I tried to find the error behind this and got the reason that the sources need to be different but since I am reading from the same table and return that same thing in target which any different source. It wont be possible. But even when I tried with a different source. I created a new dataframe and returned it then also. Its creating a view instead of appending data in the streaming table
# If the table exists and is not empty, use it as a streaming source
if table_exists and not table_is_empty:
dataframe = spark.readStream.table(table_path)
create_dlt_table(
dataframe=dataframe,
table_name=target_table_name,
schema=schema,
table_type="streaming"
)
log_event(logger, f"Streaming table {target_table_name} initialized with existing data.", "INFO")
# If the table doesn't exist or is empty, create an empty streaming table
else:
@Dlt.append_flow(target=target_table_name)
def append_to_target_table():
initial_data = [("loan_1", Decimal("200.02"), "cust_1", "cust_nm", "unknown")]
source_data = spark.createDataFrame(initial_data, schema)
return source_data
Is there any way I can handle this scenario using append_flow or by using anything.
i have to do it in dlt pipelines. I tried merge also but that didn't work.
Later in this same Job B I will have task 2 where I will be ingesting data into this same target table from auto_loader
and I am using append_flow there as well. but I think it might work there bcz there we have completely different source but still I don't know.
I see this databicks doc of backfilling https://docs.databricks.com/en/delta-live-tables/flows.html#example-run-a-one-time-data-backfill though it is not same but maybe I am missing something
โ12-13-2024 08:04 AM
Hi @ashraf1395 ,
Unfortunately, itโs not possible to achieve this using append_flow or similar approaches across multiple pipelines. Delta Live Tables are designed so that each target table is managed and owned by a single pipeline. This means maintenance, full refreshes, and append operations need to happen within the pipeline that owns the table.
If you need to bring in multiple data sources, consider integrating them into one pipeline and using append_flow there. If you must work with multiple pipelines, make sure only one pipeline owns and manages the target table, and let the other pipelines produce upstream data that the owning pipeline can read and append.
โ12-13-2024 10:24 PM
""If you must work with multiple pipelines, make sure only one pipeline owns and manages the target table, and let the other pipelines produce upstream data that the owning pipeline can read and append.""
Hi there @VZLA , Can you give an example of it. Or help me correctly understand it.
I can make sure that only one tables owns the target table. But the part where letting other pipelines produce upstream data. I didn't understand. Do you mean other pipelines produce the upstream data which can be appended back in the same target table or new target.
If same target table can you give me a demo example of how to do it. It will be really helpful
โ12-16-2024 04:14 AM
Hi @ashraf1395
Yes, your understanding is partially correct. Let me clarify:
Now, addressing your specific question: Yes, this upstream data can be appended back to the same target table, but the append operation must happen through the owning pipeline. Other pipelines act as feeders.
Example:
Pipeline A (Owning the Table): Handles schema evolution, maintenance, and appends data from both its source and the staging table.
import dlt
@dlt.table(
name="target_table",
comment="This table is owned by Pipeline A."
)
def target_table():
# Read data from its own source
main_source = dlt.read_stream("source_stream")
# Append staging data from Pipeline B
staging_data = dlt.read("staging_table")
return main_source.unionByName(staging_data, allowMissingColumns=True)
Pipeline B (Generating Upstream Data): Writes data to a staging table that Pipeline A reads.
import dlt
@dlt.table(
name="staging_table",
comment="Intermediate data for Pipeline A."
)
def staging_table():
source_data = spark.readStream.format("delta").load("path_to_source")
return source_data
Workflow Summary
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group