cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

APPLY_CHANGES with json data

Gilg
Contributor II

Hi Team,

I am building a DLT pipeline and planning to use APPLY_CHANGES from Bronze to Silver. In the bronze table, a column has a json value. This value contains questions and answers as key, value pair and can change depending on list of questions have been asked.

Question 1: Wondering if APPLY_CHANGES can detect what have been changed on this column?

Question 2: I know doing a pivot with APPLY_CHANGES is not supported, correct me if I wrong. Is there a way in DLT to transpose/pivot the columns using the column that has a json key, value pair? 

Example:

"dynamicQuestions": {
            "questionSetId""181d2382-fffe-11ed-be67-1b62e2674093",
            "answers": [
                {
                    "question": {
                        "id""07ec1e7a-3306-11ed-af52-ff5ea6716e4a",
                        "key""nationality"
                    },
                    "answer": {
                        "value""NZL"
                    }
                },
                .
                .
                .
                .
          ]
}

Cheers,

G

4 REPLIES 4

Kaniz
Community Manager
Community Manager

Hi @Gilg

Question 1:
APPLY_CHANGES in Delta Live Tables is designed to handle changes in the data. However, it does not inherently understand the structure of JSON data. It treats a JSON column as a single value and can detect changes in the JSON column as a whole, but it does not parse the JSON to see changes within the JSON structure itself. So if the JSON value in a column changes, APPLY_CHANGES can detect that the column has changed, but it cannot identify what precisely within the JSON has changed.

Question 2:
You are correct that APPLY_CHANGES does not support pivot operations. However, you can perform transformations on your data, including procedures to pivot columns, before using APPLY_CHANGES. This would involve parsing the JSON and creating new columns based on the key-value pairs. This can be done using Spark's built-in functions for working with JSON data.

Gilg
Contributor II

Thanks for the answers!

As an additional question, can APPLY_CHANGES handle schema evolution from its source bronze table?

Kaniz
Community Manager
Community Manager

Hi @Gilg , The APPLY_CHANGES The command does not directly handle schema evolution. However, the underlying MERGE INTO operation, which is used in the APPLY_CHANGES . The command supports automatic schema evolution. This allows users to resolve schema mismatches between the target and source table in a merge operation.

The schema evolution handles two cases:
1. A column in the source table is absent in the target table. In this case, the new column is added to the target schema and its values are inserted or updated using the source values.
2. A column in the target table is not present in the source table.

In this case, the target schema is left unchanged; the values in the additional target column are either left unchanged (for UPDATE) or set to NULL (for INSERT).

To use schema evolution, you must set the Spark session configuration spark.databricks.delta.schema.autoMerge.enabled to true Before you run the merge command.

However, it's important to note that in DBR 7.3 LTS, merge supports schema evolution of only top-level columns and not of nested columns.

 

Gilg
Contributor II

 

So I tried to implement a pivot inside @dlt.view. 

@dlt.view(
    name="silver_dynquestion_vw"
)
def silver_dynquestion_vw():
    df = dlt.read_stream("bronze_events_py")

    df = (
        df
        .select(      
            explode("dynamicQuestions.answers").alias("answers")
            , "answers.question.key"
            , "answers.answer.value"
            , "eventCreated"
        )
    )

    df = (
        df
        .groupBy(
            "eventCreated"
            )
        .pivot("key")
        .agg(max("value"))
        .select("*")
    )
    
    return df

But it is giving me this error.

pyspark.errors.exceptions.AnalysisException: Queries with streaming sources must be executed with writeStream.start();

 

What is a better way to do this?

Thanks again!

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.