cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Delta Live Tables and Pivoting

YS1
Contributor

Hello,

I'm trying to create a DLT pipeline where I read data as a streaming dataset from a Kafka source, save it in a table, and then filter, transform, and pivot the data. However, I've encountered an issue: DLT doesn't support pivoting, and using foreachBatch doesn't allow me to return a streaming query to the DLT function.

What is the best way to handle this?

Here's the code I'm trying to run:

 

 

def parse_event(df, batch_id, event_id, target):
    result_df = (df.filter(col("event_id") == event_id)
          .withColumn("parsed_data", from_json(col("message_content"), "map<string,string>"))
          .select("time", "user", "event_id", explode(col("parsed_data")).alias("key", "value"))
          .groupby("time", "user", "event_id").pivot("key").agg(first("value"))
    )
    (result_df.write
              .format("delta")
              .mode("append")
              .saveAsTable(target))

@dlt.table(
  name="bronze_event_xxxx",
  table_properties={"quality": "bronze",
                    "pipelines.reset.allowed": "true"},
  temporary=False)
    
def create_bronze_table():
    
    df = (dlt.read_stream("bronze_event")
          .writeStream
         .foreachBatch(lambda df, epoch_id: parse_eventlog(df, epoch_id, "xxxx", "dev.dev.bronze_event_xxxx"))
         .outputMode("append")
         .start()
         )

    return (df)

 

 

 

2 REPLIES 2

Slash
Contributor

Hi @YS1 ,

As a workaround you can rewrite pivot to sql with case statements.

Below Pivot:

data = [
    ("ProductA", "North", 100),
    ("ProductA", "South", 150),
    ("ProductA", "East", 200),
    ("ProductA", "West", 250),
    ("ProductB", "North", 300),
    ("ProductB", "South", 350),
    ("ProductB", "East", 400),
    ("ProductB", "West", 450)
]

columns = ["product", "region", "sales"]


df = spark.createDataFrame(data, columns)


df.createOrReplaceTempView("sales_data")


sql_query = """
SELECT * FROM (
    SELECT product, region, sales
    FROM sales_data
) PIVOT (
    SUM(sales) FOR region IN ('North', 'South', 'East', 'West')
)
"""


pivot_df_sql = spark.sql(sql_query)

display(pivot_df_sql)

is equivalent to this sql:

sql_query = """
SELECT
    product,
    SUM(CASE WHEN region = 'North' THEN sales ELSE 0 END) AS North,
    SUM(CASE WHEN region = 'South' THEN sales ELSE 0 END) AS South,
    SUM(CASE WHEN region = 'East' THEN sales ELSE 0 END) AS East,
    SUM(CASE WHEN region = 'West' THEN sales ELSE 0 END) AS West
FROM sales_data
GROUP BY product
"""

display(spark.sql(sql_query))

@Slash Thanks for the workaround. That is helpful.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group