cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

DLT Pipeline and Pivot tables

stucas
New Contributor II

TLDR:
Can DLT determine a dynamic schema - one which is generated from the results of a pivot?

Issue
I know you cant use spark `.pivot` in DLT pipeline and that if you wish to pivot data you need to do that outside of the DLT decorated functions. I have created a function a pivot on a Dataframe (from a DLT table) that pivots it turning values to columns - but as I am not the "owner" of this data I cannot know the full set of values and thus create a schema....
        .... without knowing the schema I dont think DLT is able to know/derive the columns returned from the pitovt function and my data is not added to output. A common thread (though not explicitly stated) in other pivot questions/solutions is that schemas are always provided.

This has all been tested in regular python/notebooks and works; only when using within a DLT context does it fail.

It's entirely possible that another issue in DLT that affected the pivot function, casuing it to apparently be generated without columns is aslo causing an issue. Coould it be that the SQL is generated by DLT in the intialisation phase?

As my data is source from a DLT table and needs to create a DLT table... How canI create the necessary schema (if that indeed, is the right inference on my part)

Pivot function:

# ----------------------------------------------------------------------------------------------------
# Pivot a table without using "pivot"
# ----------------------------------------------------------------------------------------------------
def dlt_pivot_table(df_unpivoted: DataFrame, keys: list, pivot_column: str, data_column: str) -> DataFrame:
    """
    Pivots a given DataFrame using specified keys, pivoting the values in the column
    pivot_column as a column and the its values
    from data_column.

    Parameters
        df_unpivoted : DataFrame
            The DataFrame to pivot

        keys: list
            A list containing all the keys to pivot on

        pivot_column : str
            The name of the column to be pivoted

        data_column: str
            The name of the column that contains the data to be pivoted

    Returns
        DataFrame
            A DataFrame pivoted by the columns in keys
    """

    # create a pivot key in the datasert
    df_work = df_unpivoted.withColumn("pivot_key", concat_ws("_", *keys))

    # df_back_up will hold the pivot key and original keys
    df_back_up = df_work

    # drop the original keys (we're pivoting on the pivot key generated from the given keys)
    df_work = df_work.drop(*keys)

    # create a temporary view that we can select from
    df_work.createOrReplaceTempView("data_to_pivot")

    # derive the values that will be used to create the new columns
    pivot_cols = [row[pivot_column] for row in df_work.select(pivot_column).distinct().collect()]

    # create the clauses that will cause the pivot and convert to sql like string
    # NOTE: DLT seems to generate a blank select statement - so we append the key after....
    select_clauses = [f"FIRST(CASE WHEN {pivot_column} = '{name}' THEN {data_column} END, TRUE) AS {name}" for name in pivot_cols]    
    select_clauses.append(f"pivot_key")
    select_clause = ",\n".join(select_clauses)

    # create the query embedding the clauses
    query = f"""
            SELECT
                {select_clause}
            FROM
                data_to_pivot
            GROUP BY
                pivot_key
            """
    # Execute the query
    df_pivot = util_spark.sql(query)

    # df_pivot contains the pivoted data, it does not though have the original key values (only the pivot_key)
    # So we rejoin the original keys to the pivoted data, by joining the table to the df_back_up. As df_back_up
    # is not pivoted, the key list, must be made 'distinct' else the join will have multiple rows per key
    col_list = [*keys]
    col_list.append("pivot_key")
    df_keys = df_back_up.select(*col_list).distinct()

    df_pivot = df_keys.join(df_pivot, on="pivot_key", how="left")

    # drop the pivot key from the pivoted dataframe
    df_pivot = df_pivot.drop("pivot_key")

    return df_pivot
 
2 REPLIES 2

SP_6721
Contributor III

Hi @stucas ,

Adding following configuration spark.databricks.delta.schema.autoMerge.enabled = true
to the DLT pipeline will allow new pivoted columns to be merged into the target table automatically. However, DLT still requires a defined schema at initialization, so this won’t fully solve the issue of handling a completely dynamic pivot schema. It can help if your pivoted values evolve gradually, but DLT doesn’t natively support unknown schemas upfront.

stucas
New Contributor II

Thank you for the reply - I have tried this (it was suggested in earlier solutions); but that may well be a side effect of the above function.

query = f"""
            SELECT pivot_key,
                {select_clause}
            FROM
                data_to_pivot
            GROUP BY
                pivot_key
            """

However on Pipeline initialisation it failed with an invalid SQL error as the {select_clause} was empty. I believe this is the root cause as there is no schema defined at this point in the process; so DLT just assumes an empty string.

When the autoMerge was added - the job worked, but no columns from the select statement were added.

For a beginner this is all very strange; but I assume linked to the way DLT relies on Sparks lazy loading (hence certain functions that require full data loading are prohibited e..g collect(), pivot())?

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now