# ----------------------------------------------------------------------------------------------------
# Pivot a table without using "pivot"
# ----------------------------------------------------------------------------------------------------
def dlt_pivot_table(df_unpivoted: DataFrame, keys: list, pivot_column: str, data_column: str) -> DataFrame:
"""
Pivots a given DataFrame using specified keys, pivoting the values in the column
pivot_column as a column and the its values
from data_column.
Parameters
df_unpivoted : DataFrame
The DataFrame to pivot
keys: list
A list containing all the keys to pivot on
pivot_column : str
The name of the column to be pivoted
data_column: str
The name of the column that contains the data to be pivoted
Returns
DataFrame
A DataFrame pivoted by the columns in keys
"""
# create a pivot key in the datasert
df_work = df_unpivoted.withColumn("pivot_key", concat_ws("_", *keys))
# df_back_up will hold the pivot key and original keys
df_back_up = df_work
# drop the original keys (we're pivoting on the pivot key generated from the given keys)
df_work = df_work.drop(*keys)
# create a temporary view that we can select from
df_work.createOrReplaceTempView("data_to_pivot")
# derive the values that will be used to create the new columns
pivot_cols = [row[pivot_column] for row in df_work.select(pivot_column).distinct().collect()]
# create the clauses that will cause the pivot and convert to sql like string
# NOTE: DLT seems to generate a blank select statement - so we append the key after....
select_clauses = [f"FIRST(CASE WHEN {pivot_column} = '{name}' THEN {data_column} END, TRUE) AS {name}" for name in pivot_cols]
select_clauses.append(f"pivot_key")
select_clause = ",\n".join(select_clauses)
# create the query embedding the clauses
query = f"""
SELECT
{select_clause}
FROM
data_to_pivot
GROUP BY
pivot_key
"""
# Execute the query
df_pivot = util_spark.sql(query)
# df_pivot contains the pivoted data, it does not though have the original key values (only the pivot_key)
# So we rejoin the original keys to the pivoted data, by joining the table to the df_back_up. As df_back_up
# is not pivoted, the key list, must be made 'distinct' else the join will have multiple rows per key
col_list = [*keys]
col_list.append("pivot_key")
df_keys = df_back_up.select(*col_list).distinct()
df_pivot = df_keys.join(df_pivot, on="pivot_key", how="left")
# drop the pivot key from the pivoted dataframe
df_pivot = df_pivot.drop("pivot_key")
return df_pivot