APPLY_CHANGES with json data

Gilg
Contributor II

Hi Team,

I am building a DLT pipeline and planning to use APPLY_CHANGES from Bronze to Silver. In the bronze table, a column has a json value. This value contains questions and answers as key, value pair and can change depending on list of questions have been asked.

Question 1: Wondering if APPLY_CHANGES can detect what have been changed on this column?

Question 2: I know doing a pivot with APPLY_CHANGES is not supported, correct me if I wrong. Is there a way in DLT to transpose/pivot the columns using the column that has a json key, value pair? 

Example:

"dynamicQuestions": {
            "questionSetId""181d2382-fffe-11ed-be67-1b62e2674093",
            "answers": [
                {
                    "question": {
                        "id""07ec1e7a-3306-11ed-af52-ff5ea6716e4a",
                        "key""nationality"
                    },
                    "answer": {
                        "value""NZL"
                    }
                },
                .
                .
                .
                .
          ]
}

Cheers,

G

Thanks for the answers!

As an additional question, can APPLY_CHANGES handle schema evolution from its source bronze table?

 

So I tried to implement a pivot inside @dlt.view. 

@dlt.view(
    name="silver_dynquestion_vw"
)
def silver_dynquestion_vw():
    df = dlt.read_stream("bronze_events_py")

    df = (
        df
        .select(      
            explode("dynamicQuestions.answers").alias("answers")
            , "answers.question.key"
            , "answers.answer.value"
            , "eventCreated"
        )
    )

    df = (
        df
        .groupBy(
            "eventCreated"
            )
        .pivot("key")
        .agg(max("value"))
        .select("*")
    )
    
    return df

But it is giving me this error.

pyspark.errors.exceptions.AnalysisException: Queries with streaming sources must be executed with writeStream.start();

 

What is a better way to do this?

Thanks again!