So this is my workflow which i am trying to build
There are 2 databricks job Lets say A and B.
So Job A has 3 tasks and the
3rd task Check if target table is present or not
if present it updates its schema if any changes or normally refreshes the pipeline.
dont
else if table does not existcreates an empty streaming table with schema defined and captured programmatically.
I am able to do this. This particular task is a dlt pipeline.
Similarly Now for Job B. It has 4 tasks,
And the 1st task does some things and then it does the same steps which we do in the 3rd task of Job A i.e creating an empty streaming table or refreshing the present table with schema evolution.
The error that I was getting at first was
A table can only be owned by one pipeline. Concurrent pipeline operations such as maintenance and full refresh will conflict with each other.
After doing some research on databricks docs. I found a solution that we can use append_flow for handling append operations on dame tables with different streaming sources. Though my condition was a little different but I tried.
So in Job B task 1
if table exists instead of refreshing as it causes error. I used append_flow and read the same table as readSream and then returned it but instead of appending the data in that target table. It created a new view
.
# If the table exists and is not empty, use it as a streaming source
if table_exists and not table_is_empty:
dataframe = spark.readStream.table(table_path)
create_dlt_table(
dataframe=dataframe,
table_name=target_table_name,
schema=schema,
table_type="streaming"
)
log_event(logger, f"Streaming table {target_table_name} initialized with existing data.", "INFO")
# If the table doesn't exist or is empty, create an empty streaming table
else:
@Dlt.append_flow(target=target_table_name)
def append_to_target_table():
source_data = spark.readStream.table(table_path)
return source_data
I tried to find the error behind this and got the reason that the sources need to be different but since I am reading from the same table and return that same thing in target which any different source. It wont be possible. But even when I tried with a different source. I created a new dataframe and returned it then also. Its creating a view instead of appending data in the streaming table
# If the table exists and is not empty, use it as a streaming source
if table_exists and not table_is_empty:
dataframe = spark.readStream.table(table_path)
create_dlt_table(
dataframe=dataframe,
table_name=target_table_name,
schema=schema,
table_type="streaming"
)
log_event(logger, f"Streaming table {target_table_name} initialized with existing data.", "INFO")
# If the table doesn't exist or is empty, create an empty streaming table
else:
@Dlt.append_flow(target=target_table_name)
def append_to_target_table():
initial_data = [("loan_1", Decimal("200.02"), "cust_1", "cust_nm", "unknown")]
source_data = spark.createDataFrame(initial_data, schema)
return source_data
Is there any way I can handle this scenario using append_flow or by using anything.
i have to do it in dlt pipelines. I tried merge also but that didn't work.
Later in this same Job B I will have task 2 where I will be ingesting data into this same target table from auto_loader
and I am using append_flow there as well. but I think it might work there bcz there we have completely different source but still I don't know.
I see this databicks doc of backfilling https://docs.databricks.com/en/delta-live-tables/flows.html#example-run-a-one-time-data-backfill though it is not same but maybe I am missing something