cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

DLT Pipeline and Pivot tables

stucas
New Contributor

TLDR:
Can DLT determine a dynamic schema - one which is generated from the results of a pivot?

Issue
I know you cant use spark `.pivot` in DLT pipeline and that if you wish to pivot data you need to do that outside of the DLT decorated functions. I have created a function a pivot on a Dataframe (from a DLT table) that pivots it turning values to columns - but as I am not the "owner" of this data I cannot know the full set of values and thus create a schema....
        .... without knowing the schema I dont think DLT is able to know/derive the columns returned from the pitovt function and my data is not added to output. A common thread (though not explicitly stated) in other pivot questions/solutions is that schemas are always provided.

This has all been tested in regular python/notebooks and works; only when using within a DLT context does it fail.

It's entirely possible that another issue in DLT that affected the pivot function, casuing it to apparently be generated without columns is aslo causing an issue. Coould it be that the SQL is generated by DLT in the intialisation phase?

As my data is source from a DLT table and needs to create a DLT table... How canI create the necessary schema (if that indeed, is the right inference on my part)

Pivot function:

# ----------------------------------------------------------------------------------------------------
# Pivot a table without using "pivot"
# ----------------------------------------------------------------------------------------------------
def dlt_pivot_table(df_unpivoted: DataFrame, keys: list, pivot_column: str, data_column: str) -> DataFrame:
    """
    Pivots a given DataFrame using specified keys, pivoting the values in the column
    pivot_column as a column and the its values
    from data_column.

    Parameters
        df_unpivoted : DataFrame
            The DataFrame to pivot

        keys: list
            A list containing all the keys to pivot on

        pivot_column : str
            The name of the column to be pivoted

        data_column: str
            The name of the column that contains the data to be pivoted

    Returns
        DataFrame
            A DataFrame pivoted by the columns in keys
    """

    # create a pivot key in the datasert
    df_work = df_unpivoted.withColumn("pivot_key", concat_ws("_", *keys))

    # df_back_up will hold the pivot key and original keys
    df_back_up = df_work

    # drop the original keys (we're pivoting on the pivot key generated from the given keys)
    df_work = df_work.drop(*keys)

    # create a temporary view that we can select from
    df_work.createOrReplaceTempView("data_to_pivot")

    # derive the values that will be used to create the new columns
    pivot_cols = [row[pivot_column] for row in df_work.select(pivot_column).distinct().collect()]

    # create the clauses that will cause the pivot and convert to sql like string
    # NOTE: DLT seems to generate a blank select statement - so we append the key after....
    select_clauses = [f"FIRST(CASE WHEN {pivot_column} = '{name}' THEN {data_column} END, TRUE) AS {name}" for name in pivot_cols]    
    select_clauses.append(f"pivot_key")
    select_clause = ",\n".join(select_clauses)

    # create the query embedding the clauses
    query = f"""
            SELECT
                {select_clause}
            FROM
                data_to_pivot
            GROUP BY
                pivot_key
            """
    # Execute the query
    df_pivot = util_spark.sql(query)

    # df_pivot contains the pivoted data, it does not though have the original key values (only the pivot_key)
    # So we rejoin the original keys to the pivoted data, by joining the table to the df_back_up. As df_back_up
    # is not pivoted, the key list, must be made 'distinct' else the join will have multiple rows per key
    col_list = [*keys]
    col_list.append("pivot_key")
    df_keys = df_back_up.select(*col_list).distinct()

    df_pivot = df_keys.join(df_pivot, on="pivot_key", how="left")

    # drop the pivot key from the pivoted dataframe
    df_pivot = df_pivot.drop("pivot_key")

    return df_pivot
 
0 REPLIES 0

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now