cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

APPLY_CHANGES with json data

Gilg
Contributor II

Hi Team,

I am building a DLT pipeline and planning to use APPLY_CHANGES from Bronze to Silver. In the bronze table, a column has a json value. This value contains questions and answers as key, value pair and can change depending on list of questions have been asked.

Question 1: Wondering if APPLY_CHANGES can detect what have been changed on this column?

Question 2: I know doing a pivot with APPLY_CHANGES is not supported, correct me if I wrong. Is there a way in DLT to transpose/pivot the columns using the column that has a json key, value pair? 

Example:

"dynamicQuestions": {
            "questionSetId""181d2382-fffe-11ed-be67-1b62e2674093",
            "answers": [
                {
                    "question": {
                        "id""07ec1e7a-3306-11ed-af52-ff5ea6716e4a",
                        "key""nationality"
                    },
                    "answer": {
                        "value""NZL"
                    }
                },
                .
                .
                .
                .
          ]
}

Cheers,

G

2 REPLIES 2

Thanks for the answers!

As an additional question, can APPLY_CHANGES handle schema evolution from its source bronze table?

 

So I tried to implement a pivot inside @dlt.view. 

@dlt.view(
    name="silver_dynquestion_vw"
)
def silver_dynquestion_vw():
    df = dlt.read_stream("bronze_events_py")

    df = (
        df
        .select(      
            explode("dynamicQuestions.answers").alias("answers")
            , "answers.question.key"
            , "answers.answer.value"
            , "eventCreated"
        )
    )

    df = (
        df
        .groupBy(
            "eventCreated"
            )
        .pivot("key")
        .agg(max("value"))
        .select("*")
    )
    
    return df

But it is giving me this error.

pyspark.errors.exceptions.AnalysisException: Queries with streaming sources must be executed with writeStream.start();

 

What is a better way to do this?

Thanks again!

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group