cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Delta Live Tables and Pivoting

YS1
Contributor

Hello,

I'm trying to create a DLT pipeline where I read data as a streaming dataset from a Kafka source, save it in a table, and then filter, transform, and pivot the data. However, I've encountered an issue: DLT doesn't support pivoting, and using foreachBatch doesn't allow me to return a streaming query to the DLT function.

What is the best way to handle this?

Here's the code I'm trying to run:

 

 

def parse_event(df, batch_id, event_id, target):
    result_df = (df.filter(col("event_id") == event_id)
          .withColumn("parsed_data", from_json(col("message_content"), "map<string,string>"))
          .select("time", "user", "event_id", explode(col("parsed_data")).alias("key", "value"))
          .groupby("time", "user", "event_id").pivot("key").agg(first("value"))
    )
    (result_df.write
              .format("delta")
              .mode("append")
              .saveAsTable(target))

@dlt.table(
  name="bronze_event_xxxx",
  table_properties={"quality": "bronze",
                    "pipelines.reset.allowed": "true"},
  temporary=False)
    
def create_bronze_table():
    
    df = (dlt.read_stream("bronze_event")
          .writeStream
         .foreachBatch(lambda df, epoch_id: parse_eventlog(df, epoch_id, "xxxx", "dev.dev.bronze_event_xxxx"))
         .outputMode("append")
         .start()
         )

    return (df)

 

 

 

2 REPLIES 2

szymon_dybczak
Contributor

Hi @YS1 ,

As a workaround you can rewrite pivot to sql with case statements.

Below Pivot:

data = [
    ("ProductA", "North", 100),
    ("ProductA", "South", 150),
    ("ProductA", "East", 200),
    ("ProductA", "West", 250),
    ("ProductB", "North", 300),
    ("ProductB", "South", 350),
    ("ProductB", "East", 400),
    ("ProductB", "West", 450)
]

columns = ["product", "region", "sales"]


df = spark.createDataFrame(data, columns)


df.createOrReplaceTempView("sales_data")


sql_query = """
SELECT * FROM (
    SELECT product, region, sales
    FROM sales_data
) PIVOT (
    SUM(sales) FOR region IN ('North', 'South', 'East', 'West')
)
"""


pivot_df_sql = spark.sql(sql_query)

display(pivot_df_sql)

is equivalent to this sql:

sql_query = """
SELECT
    product,
    SUM(CASE WHEN region = 'North' THEN sales ELSE 0 END) AS North,
    SUM(CASE WHEN region = 'South' THEN sales ELSE 0 END) AS South,
    SUM(CASE WHEN region = 'East' THEN sales ELSE 0 END) AS East,
    SUM(CASE WHEN region = 'West' THEN sales ELSE 0 END) AS West
FROM sales_data
GROUP BY product
"""

display(spark.sql(sql_query))

@szymon_dybczak Thanks for the workaround. That is helpful.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group