Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
09-07-2023 06:45 PM
So I tried to implement a pivot inside @dlt.view.
@dlt.view(
name="silver_dynquestion_vw"
)
def silver_dynquestion_vw():
df = dlt.read_stream("bronze_events_py")
df = (
df
.select(
explode("dynamicQuestions.answers").alias("answers")
, "answers.question.key"
, "answers.answer.value"
, "eventCreated"
)
)
df = (
df
.groupBy(
"eventCreated"
)
.pivot("key")
.agg(max("value"))
.select("*")
)
return dfBut it is giving me this error.
pyspark.errors.exceptions.AnalysisException: Queries with streaming sources must be executed with writeStream.start();
What is a better way to do this?
Thanks again!