So I tried to implement a pivot inside @dlt.view. 

@dlt.view(
    name="silver_dynquestion_vw"
)
def silver_dynquestion_vw():
    df = dlt.read_stream("bronze_events_py")

    df = (
        df
        .select(      
            explode("dynamicQuestions.answers").alias("answers")
            , "answers.question.key"
            , "answers.answer.value"
            , "eventCreated"
        )
    )

    df = (
        df
        .groupBy(
            "eventCreated"
            )
        .pivot("key")
        .agg(max("value"))
        .select("*")
    )
    
    return df

But it is giving me this error.

pyspark.errors.exceptions.AnalysisException: Queries with streaming sources must be executed with writeStream.start();

 

What is a better way to do this?

Thanks again!