I'm trying to parse incoming stream files in DLT which have variable length records. I'm getting the error:
Queries with streaming sources must be executed with writeStream.start();
Notebook code
@dlt.table (
comment="xAudit Parsed"
)
def b_table_parsed():
df = dlt.readStream("dlt_able_raw_view")
for i in range(df.select(F.max(F.size('split_col'))).collect()[0][0]):
df = df.withColumn("col"+str(i),df["split_col"][i])
df = (df
.drop("value","split_col")
)
return df
This all works fine against the actual source text files or a delta table using the interactive cluster but when I put it in DLT and and the source is streaming files from autoloader, it doesn't like it. I assume it's stream related.
I saw a different post about using foreach maybe but that was using writeStream and not sure if I can use it to return in a DLT table. I'm very new to python, streaming and DLT so would appreciate if anyone has a detailed solution.