cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Issue with merge command between streaming dataframe and delta table

Venu_DE1
New Contributor

Hi,

We are trying to build and upsert logic for a Delta table for that we are writing a merge command between streaming dataframe and delta table dataframe. Please find the below code

    merge_sql = f"""
        Merge command come here
"""
spark.sql(merge_sql).writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "bbbbb") \
.option("mergeSchema", "true") \
.option("path","aaaaaaa") \
.toTable(f"{target_catalog}.{target_schema}.silver_{sl_tablename}")
 
 
getting this below error. Please help us on how to resolve this error.
Error during merge operation on 'tablename': Queries with streaming sources must be executed with writeStream.start();
 
1 REPLY 1

Kaniz
Community Manager
Community Manager

Hi @Venu_DE1, The error message you’re encountering indicates that you’re trying to execute a query with streaming sources, but you’re missing the necessary .start() method for your streaming DataFrame.

 

Let’s address this issue step by step:

 

Streaming DataFrames and .writeStream:

  • When working with streaming DataFrames, you need to use .writeStream instead of .write.
  • The .writeStream method is specifically designed for streaming data and allows you to define the output sink (e.g., Delta table, file, etc.).

Using .writeStream.start():

  • To start the streaming query, you should call .start() after defining your write stream.
  • This method initiates the streaming process and ensures that the data is processed continuously.

Updating Your Code:

  • Modify your code as follows to address the error:merge_sql = f"""    -- Your merge command here """ delta_stream = spark.sql(merge_sql).writeStream \    .format("delta") \    .outputMode("append") \    .option("checkpointLocation", "bbbbb") \    .option("mergeSchema", "true") \    .option("path", "aaaaaaa") \    .start()  # Add .start() to initiate the streaming process delta_stream.awaitTermination()  # Wait for the streaming query to finish

Explanation:

  • I’ve replaced the .toTable() method with .start() to correctly start the streaming process.
  • The .awaitTermination() call ensures that the query runs until manually stopped.

Remember to adjust the placeholders ("bbbbb", "aaaaaaa", etc.) with the actual paths and table names specific to your use case.

 

Once you make these changes, your streaming merge operation should work as expected.

 

If you encounter any further issues, feel free to ask! 😊

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.