cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Question: Delta Live Table, multiple streaming sources to the single target

bzh
New Contributor

We are trying to writing multiple sources to the same target table using DLT, but getting the below errors. 

Not sure what we are missing here in the code....

File /databricks/spark/python/dlt/api.py:817, in apply_changes(target, source, keys, sequence_by, where, ignore_null_updates, apply_as_deletes, apply_as_truncates, column_list, except_column_list, stored_as_scd_type, track_history_column_list, track_history_except_column_list, flow_name) 814 raise RuntimeError("Only SCD Type 1 and SCD Type 2 are supported for now.")

 

dlt.create_streaming_table(name='unified_events_test_11')

dlt.apply_changes(
target="unified_events_test_11",
source="unified_events_pv_raw",
keys=["event_id"],
sequence_by=F.col("cdcTimestamp"),
apply_as_deletes=F.expr("operation = 'D'"),
except_column_list=["operation", "cdcTimestamp"],
stored_as_scd_type=1
)
 
dlt.apply_changes(
target="unified_events_test_11",
source="unified_events_wc_raw",
keys=["event_id"],
sequence_by=F.col("cdcTimestamp"),
apply_as_deletes=F.expr("operation = 'D'"),
except_column_list=["operation", "cdcTimestamp"],
stored_as_scd_type=1
)
 
Note: unified_events_pv_raw and unified_events_wc_raw are streaming table
 
@dlt.table(
name="unified_events_wc_raw"
)
def unified_events_wc_raw():
df = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("sep", "||||")
.schema(wallet_connect_schema)
.load("dbfs:/FileStore/wallet_connect_events")
)
3 REPLIES 3

JunYang
New Contributor III
New Contributor III

Hi bzh, I understand that you are writing multiple sources to the same target table on DLT and you are following the SCD type 1 process as documented (https://docs.databricks.com/en/delta-live-tables/cdc.html#process-scd-type-1-updates). However, the way how you declared two dlt.apply_changes from two different sources triggered this error. Firstly, you can narrow down the issue to troubleshoot by removing the 1st or the 2nd apply_changes declaration, so that you can test it with only one declaration and see if the same error persists. If that doesn't throw you an error, then try to change your code to declare one apply_changes function with two sources. The apply_changes function allows you to specify multiple sources by providing a list of tables or views as the source parameter. Please refer to this sample code:

import dlt

@dlt.table
def source1():
   return dlt.read("table1")

@dlt.table
def source2():
   return dlt.read("table2")

@dlt.table
def target():
   return dlt.apply_changes(
       target="target_table",
       source=["source1", "source2"],
       keys=["key_column"]
   )

Hope this helps, and happy coding on DLT, bzh!

jose_gonzalez
Moderator
Moderator

@bzh 
Just a friendly follow-up. Did any of the responses help you to resolve your question? if it did, please mark it as best. Otherwise, please let us know if you still need help.

nag_kanchan
New Contributor III

The solution did not work for me. It was throwing an error stating: raise Py4JError( py4j.protocol.Py4JError: An error occurred while calling o434.readStream. Trace: py4j.Py4JException: Method readStream([class java. util.ArrayList]) does not exist.

An alternate solution which worked for me was:

import dlt

@dlt.table
def final_table():
table1 = spark.table(
"table_1")
table2 = spark.table(
"table_2")
return table1.union(table2)

@dlt.table
def target():
   return dlt.apply_changes(
       target="target_table",
       source="final_table",
       keys=["key_column"],
sequence_by="abc",
stored_as_scd_type="1"
   )

 

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.