Hi guys!
I am having an issue with passing the "streaming flow" between layers of the DLT.
first layer "ETD_Bz" is passing through, but then "ETD_Flattened_Bz" is failing with "pyspark.errors.exceptions.captured.AnalysisException: Queries with streaming sources must be executed with writeStream.start();" error.
Code:
@dlt.table(
name="ETD_Bz",
temporary=False)
def Bronze():
return (spark.readStream
.format("delta")
.option("skipChangeCommits", "true")
.table("default.tbl_raw_etd_data")
)
# function that flattens json
def process_raw_data(raw_tbl_name) :
df = (spark.readStream # <- starts working once I am changing from readStream to read, but then it obviously stops processing incrementally
.format("delta")
.option("mergeSchema", "true")
.table(raw_tbl_name)
)
json_schema = spark.read.json(df.rdd.map(lambda row: row.JsonString)).schema # <- fails here
kafka_df = df.withColumn("JsonStruct", from_json(col("JsonString"), json_schema))
fj = FlattenJson()
kafka_flattened_json = fj.flatten_json(kafka_df)
return kafka_flattened_json
# failing layer
@dlt.table(
name="ETD_Flattened_Bz",
spark_conf = {"spark.databricks.delta.schema.autoMerge.enabled" : "true"},
temporary=False)
def Bronze_Flattend():
return process_raw_data("live.ETD_Bz")
any help appreciated! Thank you very much in advance