cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Queries with streaming sources must be executed with writeStream.start();

Bency
New Contributor III

When I try to perform some transformations on a streaming data , I get Queries with streaming sources must be executed with writeStream.start(); error

My aim is to do a lookup for every column in each rows in the streaming data .

steaming_table=spark.readStream.format("delta").table("tableName")

df = (steaming_table.transform(decode_func("a config dic","query to get the lookup value from another table ")))

   

Where decode_func is the function where I have a custom logic written

1 REPLY 1

Noopur_Nigam
Valued Contributor II

Hi @Bency Mathewโ€‹ You can use forEachBatch to perform the custom logic on each microbatch. Please refer to below document:

https://docs.databricks.com/structured-streaming/foreach.html#perform-streaming-writes-to-arbitrary-...

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group