cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

DLT pipeline evolution schema error

TiagoMag
New Contributor III

Hello everyone, 

I am currently working on my first dlt pipeline, and I stumped on a problem which I am struggling to solve.

I am working on several tables where I have a column called "my_column" with an array of json with two keys : 1 key : score, 2nd key : score_name.

I want to create a column for each score, with the name of the columns being the value of the "score_name" key.

Example : 

my_column = df.select("my_column").rdd.flatMAp(lambda x:x).collect()

where :

my_column = [[row("score_name" = "name1", score = 2), row("score_name" = "name2", score = 10)],

                       [row("score_name" = "name1", score = -1), row("score_name" = "name2", score = 11)],
                       ...,
                      ]
I want :
name1 = df.select("name1").rdd.flatMAp(lambda x:x).collect()
name2 = df.select("name2").rdd.flatMAp(lambda x:x).collect()
 
where:
name1 =        [2,
                       -1,
                       ...,
                      ]
name2 =         [10,
                       11,
                       ...,
                      ]
End of exemple.

The schema of "my_column" columns is :

ArrayType(
        StructType([
            StructField("score_name", StringType()),
            StructField("score", IntegerType())
        ])
)

So what I do for that is the following :

@dlt.table
def gold_table1():
   table = dlt.read(f"my_silver_table1")
   table_to_join = table.select(col("uniqueId"), explode(col("my_column")).alias("explosion"))
   table_to_join = table_to_join.select("uniqueId", "explosion.score_name", "explosion.score")
   table_to_join = table_to_join.withColumn("score", col("score").cast(IntegerType()))
   table_to_join = table_to_join.groupBy("uniqueId").pivot("score_name").sum("score")
   return table.join(table_to_join, on="uniqueId")
 
This join process works pretty well when being tested in a regular spark notebook environment, however when using it in the context of a dlt pipeline, I receive the following error message :

"""To overwrite your schema or change partitioning, please set: '.option("overwriteSchema", "true")'."""

From the error message, I believe it has something to do with authorizing schema change or not using in the correct way the schema change.

If you know the solution or have any advice on how to solve this issue I would love to hear it. Thanks !

1 ACCEPTED SOLUTION

Accepted Solutions

TiagoMag
New Contributor III

Thank you for your answer! I found a way to complete the pipeline; I had to use spark_conf = {"spark.databricks.delta.schema.autoMerge.enabled": "true"}, inside the decorator dlt.table of my table.

I still don't know exactly why the schema change here creates an error. When I create new columns through other pathways, such as with the method from_json, I don't get similar errors.

I will also try to follow your advice and see if it works or if there is a difference!

View solution in original post

1 REPLY 1

TiagoMag
New Contributor III

Thank you for your answer! I found a way to complete the pipeline; I had to use spark_conf = {"spark.databricks.delta.schema.autoMerge.enabled": "true"}, inside the decorator dlt.table of my table.

I still don't know exactly why the schema change here creates an error. When I create new columns through other pathways, such as with the method from_json, I don't get similar errors.

I will also try to follow your advice and see if it works or if there is a difference!

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group