cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

DLT pipeline evolution schema error

TiagoMag
New Contributor III

Hello everyone, 

I am currently working on my first dlt pipeline, and I stumped on a problem which I am struggling to solve.

I am working on several tables where I have a column called "my_column" with an array of json with two keys : 1 key : score, 2nd key : score_name.

I want to create a column for each score, with the name of the columns being the value of the "score_name" key.

Example : 

my_column = df.select("my_column").rdd.flatMAp(lambda x:x).collect()

where :

my_column = [[row("score_name" = "name1", score = 2), row("score_name" = "name2", score = 10)],

                       [row("score_name" = "name1", score = -1), row("score_name" = "name2", score = 11)],
                       ...,
                      ]
I want :
name1 = df.select("name1").rdd.flatMAp(lambda x:x).collect()
name2 = df.select("name2").rdd.flatMAp(lambda x:x).collect()
 
where:
name1 =        [2,
                       -1,
                       ...,
                      ]
name2 =         [10,
                       11,
                       ...,
                      ]
End of exemple.

The schema of "my_column" columns is :

ArrayType(
        StructType([
            StructField("score_name", StringType()),
            StructField("score", IntegerType())
        ])
)

So what I do for that is the following :

@dlt.table
def gold_table1():
   table = dlt.read(f"my_silver_table1")
   table_to_join = table.select(col("uniqueId"), explode(col("my_column")).alias("explosion"))
   table_to_join = table_to_join.select("uniqueId", "explosion.score_name", "explosion.score")
   table_to_join = table_to_join.withColumn("score", col("score").cast(IntegerType()))
   table_to_join = table_to_join.groupBy("uniqueId").pivot("score_name").sum("score")
   return table.join(table_to_join, on="uniqueId")
 
This join process works pretty well when being tested in a regular spark notebook environment, however when using it in the context of a dlt pipeline, I receive the following error message :

"""To overwrite your schema or change partitioning, please set: '.option("overwriteSchema", "true")'."""

From the error message, I believe it has something to do with authorizing schema change or not using in the correct way the schema change.

If you know the solution or have any advice on how to solve this issue I would love to hear it. Thanks !

1 ACCEPTED SOLUTION

Accepted Solutions

TiagoMag
New Contributor III

Thank you for your answer! I found a way to complete the pipeline; I had to use spark_conf = {"spark.databricks.delta.schema.autoMerge.enabled": "true"}, inside the decorator dlt.table of my table.

I still don't know exactly why the schema change here creates an error. When I create new columns through other pathways, such as with the method from_json, I don't get similar errors.

I will also try to follow your advice and see if it works or if there is a difference!

View solution in original post

2 REPLIES 2

Kaniz_Fatma
Community Manager
Community Manager

Hi @TiagoMagIt seems you’ve encountered an issue with your DLT pipeline while working on your first data lake table. Let’s dive into the problem and find a solution!

The error message you’re encountering states: “Detected a data update (for example part-00004-7397764a-48a1-432c-a104-03192f199def-c000.snappy.parquet) in the source table at version 3. This is currently not supported. If you’d like to ignore updates, set the option ‘ignoreChanges’ to ‘true’. If you would like the data update to be reflected, please restart this query with a fresh checkpoint directory.”

This issue seems to be related to data updates in your source table.

Here are a few steps you can take to troubleshoot and potentially resolve the problem:

  1. Check Data Updates: Verify if there are any unexpected data updates in your source table. Ensure that the data remains consistent during pipeline execution.

  2. Ignore Changes Option: As suggested in the error message, consider setting the ignoreChanges option to true in your pipeline configuration. This will allow you to ignore updates and maintain stability.

  3. Checkpoint Directory: If you want the data update to be reflected, restart your query with a fresh checkpoint directory. This ensures that the pipeline processes the data from scratch.

  4. Resource Quotas: Sometimes pipeline failures can be related to resource limitations. Check if your Databricks cluster has sufficient resources (cores, RAM, CPU). If not, consider increasing the resource quota.

Remember that DLT pipelines can behave differently based on various factors, so it’s essential to thoroughly test your pipeline in different environments.

Happy troubleshooting, and may your DLT pipeline flow smoothly! 🚀

TiagoMag
New Contributor III

Thank you for your answer! I found a way to complete the pipeline; I had to use spark_conf = {"spark.databricks.delta.schema.autoMerge.enabled": "true"}, inside the decorator dlt.table of my table.

I still don't know exactly why the schema change here creates an error. When I create new columns through other pathways, such as with the method from_json, I don't get similar errors.

I will also try to follow your advice and see if it works or if there is a difference!

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group