Delta Live Tables and Pivoting
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-18-2024 10:51 AM
Hello,
I'm trying to create a DLT pipeline where I read data as a streaming dataset from a Kafka source, save it in a table, and then filter, transform, and pivot the data. However, I've encountered an issue: DLT doesn't support pivoting, and using foreachBatch doesn't allow me to return a streaming query to the DLT function.
What is the best way to handle this?
Here's the code I'm trying to run:
def parse_event(df, batch_id, event_id, target):
result_df = (df.filter(col("event_id") == event_id)
.withColumn("parsed_data", from_json(col("message_content"), "map<string,string>"))
.select("time", "user", "event_id", explode(col("parsed_data")).alias("key", "value"))
.groupby("time", "user", "event_id").pivot("key").agg(first("value"))
)
(result_df.write
.format("delta")
.mode("append")
.saveAsTable(target))
@dlt.table(
name="bronze_event_xxxx",
table_properties={"quality": "bronze",
"pipelines.reset.allowed": "true"},
temporary=False)
def create_bronze_table():
df = (dlt.read_stream("bronze_event")
.writeStream
.foreachBatch(lambda df, epoch_id: parse_eventlog(df, epoch_id, "xxxx", "dev.dev.bronze_event_xxxx"))
.outputMode("append")
.start()
)
return (df)
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-18-2024 10:59 PM
Hi @YS1 ,
As a workaround you can rewrite pivot to sql with case statements.
Below Pivot:
data = [
("ProductA", "North", 100),
("ProductA", "South", 150),
("ProductA", "East", 200),
("ProductA", "West", 250),
("ProductB", "North", 300),
("ProductB", "South", 350),
("ProductB", "East", 400),
("ProductB", "West", 450)
]
columns = ["product", "region", "sales"]
df = spark.createDataFrame(data, columns)
df.createOrReplaceTempView("sales_data")
sql_query = """
SELECT * FROM (
SELECT product, region, sales
FROM sales_data
) PIVOT (
SUM(sales) FOR region IN ('North', 'South', 'East', 'West')
)
"""
pivot_df_sql = spark.sql(sql_query)
display(pivot_df_sql)
is equivalent to this sql:
sql_query = """
SELECT
product,
SUM(CASE WHEN region = 'North' THEN sales ELSE 0 END) AS North,
SUM(CASE WHEN region = 'South' THEN sales ELSE 0 END) AS South,
SUM(CASE WHEN region = 'East' THEN sales ELSE 0 END) AS East,
SUM(CASE WHEN region = 'West' THEN sales ELSE 0 END) AS West
FROM sales_data
GROUP BY product
"""
display(spark.sql(sql_query))
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-26-2024 09:51 AM
@szymon_dybczak Thanks for the workaround. That is helpful.

