cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

DLT Notebook Error - Queries with streaming sources must be executed with writeStream.start();

JJ_LVS1
New Contributor III

I'm trying to parse incoming stream files in DLT which have variable length records. I'm getting the error:

Queries with streaming sources must be executed with writeStream.start();

Notebook code

@dlt.table (
    comment="xAudit Parsed"
)
def b_table_parsed():
    df = dlt.readStream("dlt_able_raw_view")
          
    for i in range(df.select(F.max(F.size('split_col'))).collect()[0][0]):
        df = df.withColumn("col"+str(i),df["split_col"][i])
    
    df = (df
          .drop("value","split_col")
         )
      
    return df

This all works fine against the actual source text files or a delta table using the interactive cluster but when I put it in DLT and and the source is streaming files from autoloader, it doesn't like it. I assume it's stream related.

I saw a different post about using foreach maybe but that was using writeStream and not sure if I can use it to return in a DLT table. I'm very new to python, streaming and DLT so would appreciate if anyone has a detailed solution.

1 ACCEPTED SOLUTION

Accepted Solutions

Murthy1
Contributor II

Hello @Jason Johnsonโ€‹ ,

It seems like, you want to have some transformations on the incoming micro batches - in that case I guess, you cannot achieve it through Delta live tables. You have to use a foreachbatch and then use writestream to achieve this.

View solution in original post

2 REPLIES 2

Murthy1
Contributor II

Hello @Jason Johnsonโ€‹ ,

It seems like, you want to have some transformations on the incoming micro batches - in that case I guess, you cannot achieve it through Delta live tables. You have to use a foreachbatch and then use writestream to achieve this.

Anonymous
Not applicable

Hi @Jason Johnsonโ€‹ 

Hope all is well! Just wanted to check in if you were able to resolve your issue and would you be happy to share the solution or mark an answer as best? Else please let us know if you need more help. 

We'd love to hear from you.

Thanks!

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group