Hello everyone,
I am currently working on my first dlt pipeline, and I stumped on a problem which I am struggling to solve.
I am working on several tables where I have a column called "my_column" with an array of json with two keys : 1 key : score, 2nd key : score_name.
I want to create a column for each score, with the name of the columns being the value of the "score_name" key.
Example :
my_column = df.select("my_column").rdd.flatMAp(lambda x:x).collect()
where :
my_column = [[row("score_name" = "name1", score = 2), row("score_name" = "name2", score = 10)],
[row("score_name" = "name1", score = -1), row("score_name" = "name2", score = 11)],
...,
]
I want :
name1 = df.select("name1").rdd.flatMAp(lambda x:x).collect()
name2 = df.select("name2").rdd.flatMAp(lambda x:x).collect()
where:
The schema of "my_column" columns is :
ArrayType(
StructType([
StructField("score_name", StringType()),
StructField("score", IntegerType())
])
)
So what I do for that is the following :
@dlt.table
def gold_table1(): table = dlt.read(f"my_silver_table1")
table_to_join = table.select(col("uniqueId"), explode(col("my_column")).alias("explosion"))
table_to_join = table_to_join.select("uniqueId", "explosion.score_name", "explosion.score")
table_to_join = table_to_join.withColumn("score", col("score").cast(IntegerType()))
table_to_join = table_to_join.groupBy("uniqueId").pivot("score_name").sum("score")
return table.join(table_to_join, on="uniqueId")
This join process works pretty well when being tested in a regular spark notebook environment, however when using it in the context of a dlt pipeline, I receive the following error message :
"""To overwrite your schema or change partitioning, please set: '.option("overwriteSchema", "true")'."""
From the error message, I believe it has something to do with authorizing schema change or not using in the correct way the schema change.
If you know the solution or have any advice on how to solve this issue I would love to hear it. Thanks !