cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Delta Live Tables and Pivoting

YS1
Contributor

Hello,

I'm trying to create a DLT pipeline where I read data as a streaming dataset from a Kafka source, save it in a table, and then filter, transform, and pivot the data. However, I've encountered an issue: DLT doesn't support pivoting, and using foreachBatch doesn't allow me to return a streaming query to the DLT function.

What is the best way to handle this?

Here's the code I'm trying to run:

 

 

def parse_event(df, batch_id, event_id, target):
    result_df = (df.filter(col("event_id") == event_id)
          .withColumn("parsed_data", from_json(col("message_content"), "map<string,string>"))
          .select("time", "user", "event_id", explode(col("parsed_data")).alias("key", "value"))
          .groupby("time", "user", "event_id").pivot("key").agg(first("value"))
    )
    (result_df.write
              .format("delta")
              .mode("append")
              .saveAsTable(target))

@dlt.table(
  name="bronze_event_xxxx",
  table_properties={"quality": "bronze",
                    "pipelines.reset.allowed": "true"},
  temporary=False)
    
def create_bronze_table():
    
    df = (dlt.read_stream("bronze_event")
          .writeStream
         .foreachBatch(lambda df, epoch_id: parse_eventlog(df, epoch_id, "xxxx", "dev.dev.bronze_event_xxxx"))
         .outputMode("append")
         .start()
         )

    return (df)

 

 

 

2 REPLIES 2

szymon_dybczak
Esteemed Contributor III

Hi @YS1 ,

As a workaround you can rewrite pivot to sql with case statements.

Below Pivot:

data = [
    ("ProductA", "North", 100),
    ("ProductA", "South", 150),
    ("ProductA", "East", 200),
    ("ProductA", "West", 250),
    ("ProductB", "North", 300),
    ("ProductB", "South", 350),
    ("ProductB", "East", 400),
    ("ProductB", "West", 450)
]

columns = ["product", "region", "sales"]


df = spark.createDataFrame(data, columns)


df.createOrReplaceTempView("sales_data")


sql_query = """
SELECT * FROM (
    SELECT product, region, sales
    FROM sales_data
) PIVOT (
    SUM(sales) FOR region IN ('North', 'South', 'East', 'West')
)
"""


pivot_df_sql = spark.sql(sql_query)

display(pivot_df_sql)

is equivalent to this sql:

sql_query = """
SELECT
    product,
    SUM(CASE WHEN region = 'North' THEN sales ELSE 0 END) AS North,
    SUM(CASE WHEN region = 'South' THEN sales ELSE 0 END) AS South,
    SUM(CASE WHEN region = 'East' THEN sales ELSE 0 END) AS East,
    SUM(CASE WHEN region = 'West' THEN sales ELSE 0 END) AS West
FROM sales_data
GROUP BY product
"""

display(spark.sql(sql_query))

@szymon_dybczak Thanks for the workaround. That is helpful.

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now