cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

unionbyname several streaming dataframes of different sources

cdn_yyz_yul
Contributor

Is the following type of union safe with spark structured streaming?

union multiple streaming dataframes, and each from a different source.

Anything better solution ?

for example,
df1 = spark.readStream.table(f"{bronze_catalog}.{bronze_schema}.table1")
df2 = spark.readStream.table(f"{bronze_catalog}.{bronze_schema}.table2")
df3 = spark.readStream.table(f"{bronze_catalog}.{bronze_schema}.table3")

df1a = df1.select(....).transform(....)
df2a = df1.select(....).transform(....)
df3a = df1.select(....).transform(....)

df = df1a.unionByName(df2a).unionByName(df3a).dropDuplicates(.....)

df.writeStream.format("delta").outputMode("append").option(
    "checkpointLocation", my_checkpoint_path)
).trigger(availableNow=True).table(f"{silver_catalog}.{silver_schema_}.my_silver_table")
df.

 

3 REPLIES 3

stbjelcevic
Databricks Employee
Databricks Employee

Hi @cdn_yyz_yul ,

Yes, this looks like a standard and safe approach - is there something in particular you were hoping to improve on? Or were you mainly looking for validation?

cdn_yyz_yul
Contributor

Thanks @stbjelcevic ,

I am looking for a solution .... 

=== Let's say, I have already had:

df1 = spark.readStream.table(f"{bronze_catalog}.{bronze_schema}.table1")
df2 = spark.readStream.table(f"{bronze_catalog}.{bronze_schema}.table2")

df1a = df1.select(....).transform(....)
df2a = df1.select(....).transform(....)

df = df1a.unionByName(df2a)

df.writeStream.format("delta").outputMode("append").option(
    "checkpointLocation", my_checkpoint_path)
).trigger(availableNow=True).table(f"{silver_catalog}.{silver_schema}.my_silver_table")

=== Now, table3 is created by autoloader, and I read it as df3.
After some transformation, df3a has the same columns as the existing f"{silver_catalog}.{silver_schema}.my_silver_table".

df3 = spark.readStream.table(f"{bronze_catalog}.{bronze_schema}.table3")
df3a = df1.select(....).transform(....)

What would be the recommended solution to append the content of df3a to the existing f"{silver_catalog}.{silver_schema}.my_silver_table".

Ok, I see - so with the updated scenario above, the easiest option would be to add a second streaming writer for df3a to the same existing f"{silver_catalog}.{silver_schema}.my_silver_table". I would only stick with the easiest option if this is an append-only table and if you are not trying to handle duplicates here.

If this could introduce duplicates, than the first thing you posted is more recommended, where you handle the duplicates by doing a union with all 3 dataframes. One thing I didn't consider on my first response is that you can use "withWatermark" + "dropDuplicatesWithinWatermark" instead of just dropDuplicates