DLT Pipeline and Pivot tables
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
09-03-2025 05:10 AM
TLDR:
Can DLT determine a dynamic schema - one which is generated from the results of a pivot?
Issue
I know you cant use spark `.pivot` in DLT pipeline and that if you wish to pivot data you need to do that outside of the DLT decorated functions. I have created a function a pivot on a Dataframe (from a DLT table) that pivots it turning values to columns - but as I am not the "owner" of this data I cannot know the full set of values and thus create a schema....
.... without knowing the schema I dont think DLT is able to know/derive the columns returned from the pitovt function and my data is not added to output. A common thread (though not explicitly stated) in other pivot questions/solutions is that schemas are always provided.
This has all been tested in regular python/notebooks and works; only when using within a DLT context does it fail.
It's entirely possible that another issue in DLT that affected the pivot function, casuing it to apparently be generated without columns is aslo causing an issue. Coould it be that the SQL is generated by DLT in the intialisation phase?
As my data is source from a DLT table and needs to create a DLT table... How canI create the necessary schema (if that indeed, is the right inference on my part)
Pivot function:
# ---------------------------------------------------------------------------------------------------- # Pivot a table without using "pivot" # ---------------------------------------------------------------------------------------------------- def dlt_pivot_table(df_unpivoted: DataFrame, keys: list, pivot_column: str, data_column: str) -> DataFrame: """ Pivots a given DataFrame using specified keys, pivoting the values in the column pivot_column as a column and the its values from data_column. Parameters df_unpivoted : DataFrame The DataFrame to pivot keys: list A list containing all the keys to pivot on pivot_column : str The name of the column to be pivoted data_column: str The name of the column that contains the data to be pivoted Returns DataFrame A DataFrame pivoted by the columns in keys """ # create a pivot key in the datasert df_work = df_unpivoted.withColumn("pivot_key", concat_ws("_", *keys)) # df_back_up will hold the pivot key and original keys df_back_up = df_work # drop the original keys (we're pivoting on the pivot key generated from the given keys) df_work = df_work.drop(*keys) # create a temporary view that we can select from df_work.createOrReplaceTempView("data_to_pivot") # derive the values that will be used to create the new columns pivot_cols = [row[pivot_column] for row in df_work.select(pivot_column).distinct().collect()] # create the clauses that will cause the pivot and convert to sql like string # NOTE: DLT seems to generate a blank select statement - so we append the key after.... select_clauses = [f"FIRST(CASE WHEN {pivot_column} = '{name}' THEN {data_column} END, TRUE) AS {name}" for name in pivot_cols] select_clauses.append(f"pivot_key") select_clause = ",\n".join(select_clauses) # create the query embedding the clauses query = f""" SELECT {select_clause} FROM data_to_pivot GROUP BY pivot_key """ # Execute the query df_pivot = util_spark.sql(query) # df_pivot contains the pivoted data, it does not though have the original key values (only the pivot_key) # So we rejoin the original keys to the pivoted data, by joining the table to the df_back_up. As df_back_up # is not pivoted, the key list, must be made 'distinct' else the join will have multiple rows per key col_list = [*keys] col_list.append("pivot_key") df_keys = df_back_up.select(*col_list).distinct() df_pivot = df_keys.join(df_pivot, on="pivot_key", how="left") # drop the pivot key from the pivoted dataframe df_pivot = df_pivot.drop("pivot_key") return df_pivot |
- Labels:
-
Workflows
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
09-04-2025 05:43 AM
Hi @stucas ,
Adding following configuration spark.databricks.delta.schema.autoMerge.enabled = true
to the DLT pipeline will allow new pivoted columns to be merged into the target table automatically. However, DLT still requires a defined schema at initialization, so this won’t fully solve the issue of handling a completely dynamic pivot schema. It can help if your pivoted values evolve gradually, but DLT doesn’t natively support unknown schemas upfront.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
09-04-2025 10:27 PM
Thank you for the reply - I have tried this (it was suggested in earlier solutions); but that may well be a side effect of the above function.
However on Pipeline initialisation it failed with an invalid SQL error as the {select_clause} was empty. I believe this is the root cause as there is no schema defined at this point in the process; so DLT just assumes an empty string.
When the autoMerge was added - the job worked, but no columns from the select statement were added.
For a beginner this is all very strange; but I assume linked to the way DLT relies on Sparks lazy loading (hence certain functions that require full data loading are prohibited e..g collect(), pivot())?