cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Question: Delta Live Table, multiple streaming sources to the single target

bzh
New Contributor

We are trying to writing multiple sources to the same target table using DLT, but getting the below errors. 

Not sure what we are missing here in the code....

File /databricks/spark/python/dlt/api.py:817, in apply_changes(target, source, keys, sequence_by, where, ignore_null_updates, apply_as_deletes, apply_as_truncates, column_list, except_column_list, stored_as_scd_type, track_history_column_list, track_history_except_column_list, flow_name) 814 raise RuntimeError("Only SCD Type 1 and SCD Type 2 are supported for now.")

 

dlt.create_streaming_table(name='unified_events_test_11')

dlt.apply_changes(
target="unified_events_test_11",
source="unified_events_pv_raw",
keys=["event_id"],
sequence_by=F.col("cdcTimestamp"),
apply_as_deletes=F.expr("operation = 'D'"),
except_column_list=["operation", "cdcTimestamp"],
stored_as_scd_type=1
)
 
dlt.apply_changes(
target="unified_events_test_11",
source="unified_events_wc_raw",
keys=["event_id"],
sequence_by=F.col("cdcTimestamp"),
apply_as_deletes=F.expr("operation = 'D'"),
except_column_list=["operation", "cdcTimestamp"],
stored_as_scd_type=1
)
 
Note: unified_events_pv_raw and unified_events_wc_raw are streaming table
 
@dlt.table(
name="unified_events_wc_raw"
)
def unified_events_wc_raw():
df = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("sep", "||||")
.schema(wallet_connect_schema)
.load("dbfs:/FileStore/wallet_connect_events")
)
3 REPLIES 3

JunYang
Databricks Employee
Databricks Employee

Hi bzh, I understand that you are writing multiple sources to the same target table on DLT and you are following the SCD type 1 process as documented (https://docs.databricks.com/en/delta-live-tables/cdc.html#process-scd-type-1-updates). However, the way how you declared two dlt.apply_changes from two different sources triggered this error. Firstly, you can narrow down the issue to troubleshoot by removing the 1st or the 2nd apply_changes declaration, so that you can test it with only one declaration and see if the same error persists. If that doesn't throw you an error, then try to change your code to declare one apply_changes function with two sources. The apply_changes function allows you to specify multiple sources by providing a list of tables or views as the source parameter. Please refer to this sample code:

import dlt

@dlt.table
def source1():
   return dlt.read("table1")

@dlt.table
def source2():
   return dlt.read("table2")

@dlt.table
def target():
   return dlt.apply_changes(
       target="target_table",
       source=["source1", "source2"],
       keys=["key_column"]
   )

Hope this helps, and happy coding on DLT, bzh!

jose_gonzalez
Databricks Employee
Databricks Employee

@bzh 
Just a friendly follow-up. Did any of the responses help you to resolve your question? if it did, please mark it as best. Otherwise, please let us know if you still need help.

nag_kanchan
New Contributor III

The solution did not work for me. It was throwing an error stating: raise Py4JError( py4j.protocol.Py4JError: An error occurred while calling o434.readStream. Trace: py4j.Py4JException: Method readStream([class java. util.ArrayList]) does not exist.

An alternate solution which worked for me was:

import dlt

@dlt.table
def final_table():
table1 = spark.table(
"table_1")
table2 = spark.table(
"table_2")
return table1.union(table2)

@dlt.table
def target():
   return dlt.apply_changes(
       target="target_table",
       source="final_table",
       keys=["key_column"],
sequence_by="abc",
stored_as_scd_type="1"
   )

 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group