cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Queries with streaming sources must be executed with writeStream.start();

Bency
New Contributor III

When I try to perform some transformations on a streaming data , I get Queries with streaming sources must be executed with writeStream.start(); error

My aim is to do a lookup for every column in each rows in the streaming data .

steaming_table=spark.readStream.format("delta").table("tableName")

df = (steaming_table.transform(decode_func("a config dic","query to get the lookup value from another table ")))

   

Where decode_func is the function where I have a custom logic written

1 REPLY 1

Noopur_Nigam
Valued Contributor II
Valued Contributor II

Hi @Bency Mathew​ You can use forEachBatch to perform the custom logic on each microbatch. Please refer to below document:

https://docs.databricks.com/structured-streaming/foreach.html#perform-streaming-writes-to-arbitrary-...

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.