cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

How to readstream from multiple sources?

dbuser1234
New Contributor

Hi 

I am trying to readstream from 2 sources and join them into a target table. How can I do this in pyspark? 

Eg

t1 + t2 as my bronze table. I want to readstream from t1 and t2, and merge the changes into t3 (silver table)

1 ACCEPTED SOLUTION

Accepted Solutions

Kaniz
Community Manager
Community Manager

Hi @dbuser1234 , Certainly! To read stream data from two sources, join them, and merge the changes into a target table in PySpark, you can follow these steps:

 

Read Stream Data from Sources (t1 and t2):

  • Use spark.readStream to read data from both t1 and t2. Make sure you specify the appropriate format (e.g., Delta, Parquet, etc.) and any other relevant options (e.g., schema, path, etc.).

Join the DataFrames:

  • Join the DataFrames t1 and t2 based on the desired join conditions. You can use various join types (inner, outer, left, right) depending on your requirements.

Write Stream Data to the Target Table (t3):

  • Use writeStream to write the joined DataFrame (joined_df) to your target table (t3). Specify the output mode (e.g., append, complete, update) based on your use case.
  • If you want to merge changes (upsert) into the target table, you can use a custom function within foreachBatch. 

Run the Stream Job:

  • Start the stream job by invoking awaitTermination() on the query.

Remember to adjust the column names, conditions, and other specifics according to your actual use case. The above example demonstrates a basic approach to achieving your goal.

 

Good luck with your streaming workflow! ๐Ÿš€

View solution in original post

1 REPLY 1

Kaniz
Community Manager
Community Manager

Hi @dbuser1234 , Certainly! To read stream data from two sources, join them, and merge the changes into a target table in PySpark, you can follow these steps:

 

Read Stream Data from Sources (t1 and t2):

  • Use spark.readStream to read data from both t1 and t2. Make sure you specify the appropriate format (e.g., Delta, Parquet, etc.) and any other relevant options (e.g., schema, path, etc.).

Join the DataFrames:

  • Join the DataFrames t1 and t2 based on the desired join conditions. You can use various join types (inner, outer, left, right) depending on your requirements.

Write Stream Data to the Target Table (t3):

  • Use writeStream to write the joined DataFrame (joined_df) to your target table (t3). Specify the output mode (e.g., append, complete, update) based on your use case.
  • If you want to merge changes (upsert) into the target table, you can use a custom function within foreachBatch. 

Run the Stream Job:

  • Start the stream job by invoking awaitTermination() on the query.

Remember to adjust the column names, conditions, and other specifics according to your actual use case. The above example demonstrates a basic approach to achieving your goal.

 

Good luck with your streaming workflow! ๐Ÿš€

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.