<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Delta Live Tables and Pivoting in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/delta-live-tables-and-pivoting/m-p/79293#M35724</link>
    <description>&lt;P&gt;Hello,&lt;/P&gt;&lt;P&gt;I'm trying to create a DLT pipeline where I read data as a streaming dataset from a Kafka source, save it in a table, and then filter, transform, and pivot the data. However, I've encountered an issue: DLT doesn't support pivoting, and using foreachBatch&amp;nbsp;doesn't allow me to return a streaming query to the DLT function.&lt;/P&gt;&lt;P&gt;What is the best way to handle this?&lt;/P&gt;&lt;P&gt;Here's the code I'm trying to run:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;def parse_event(df, batch_id, event_id, target):
    result_df = (df.filter(col("event_id") == event_id)
          .withColumn("parsed_data", from_json(col("message_content"), "map&amp;lt;string,string&amp;gt;"))
          .select("time", "user", "event_id", explode(col("parsed_data")).alias("key", "value"))
          .groupby("time", "user", "event_id").pivot("key").agg(first("value"))
    )
    (result_df.write
              .format("delta")
              .mode("append")
              .saveAsTable(target))

@dlt.table(
  name="bronze_event_xxxx",
  table_properties={"quality": "bronze",
                    "pipelines.reset.allowed": "true"},
  temporary=False)
    
def create_bronze_table():
    
    df = (dlt.read_stream("bronze_event")
          .writeStream
         .foreachBatch(lambda df, epoch_id: parse_eventlog(df, epoch_id, "xxxx", "dev.dev.bronze_event_xxxx"))
         .outputMode("append")
         .start()
         )

    return (df)&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
    <pubDate>Thu, 18 Jul 2024 17:51:42 GMT</pubDate>
    <dc:creator>YS1</dc:creator>
    <dc:date>2024-07-18T17:51:42Z</dc:date>
    <item>
      <title>Delta Live Tables and Pivoting</title>
      <link>https://community.databricks.com/t5/data-engineering/delta-live-tables-and-pivoting/m-p/79293#M35724</link>
      <description>&lt;P&gt;Hello,&lt;/P&gt;&lt;P&gt;I'm trying to create a DLT pipeline where I read data as a streaming dataset from a Kafka source, save it in a table, and then filter, transform, and pivot the data. However, I've encountered an issue: DLT doesn't support pivoting, and using foreachBatch&amp;nbsp;doesn't allow me to return a streaming query to the DLT function.&lt;/P&gt;&lt;P&gt;What is the best way to handle this?&lt;/P&gt;&lt;P&gt;Here's the code I'm trying to run:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;def parse_event(df, batch_id, event_id, target):
    result_df = (df.filter(col("event_id") == event_id)
          .withColumn("parsed_data", from_json(col("message_content"), "map&amp;lt;string,string&amp;gt;"))
          .select("time", "user", "event_id", explode(col("parsed_data")).alias("key", "value"))
          .groupby("time", "user", "event_id").pivot("key").agg(first("value"))
    )
    (result_df.write
              .format("delta")
              .mode("append")
              .saveAsTable(target))

@dlt.table(
  name="bronze_event_xxxx",
  table_properties={"quality": "bronze",
                    "pipelines.reset.allowed": "true"},
  temporary=False)
    
def create_bronze_table():
    
    df = (dlt.read_stream("bronze_event")
          .writeStream
         .foreachBatch(lambda df, epoch_id: parse_eventlog(df, epoch_id, "xxxx", "dev.dev.bronze_event_xxxx"))
         .outputMode("append")
         .start()
         )

    return (df)&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Thu, 18 Jul 2024 17:51:42 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/delta-live-tables-and-pivoting/m-p/79293#M35724</guid>
      <dc:creator>YS1</dc:creator>
      <dc:date>2024-07-18T17:51:42Z</dc:date>
    </item>
    <item>
      <title>Re: Delta Live Tables and Pivoting</title>
      <link>https://community.databricks.com/t5/data-engineering/delta-live-tables-and-pivoting/m-p/79329#M35737</link>
      <description>&lt;P&gt;Hi&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/83285"&gt;@YS1&lt;/a&gt;&amp;nbsp;,&lt;/P&gt;&lt;P&gt;As a workaround you can rewrite pivot to sql with case statements.&lt;BR /&gt;&lt;BR /&gt;Below Pivot:&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;data = [
    ("ProductA", "North", 100),
    ("ProductA", "South", 150),
    ("ProductA", "East", 200),
    ("ProductA", "West", 250),
    ("ProductB", "North", 300),
    ("ProductB", "South", 350),
    ("ProductB", "East", 400),
    ("ProductB", "West", 450)
]

columns = ["product", "region", "sales"]


df = spark.createDataFrame(data, columns)


df.createOrReplaceTempView("sales_data")


sql_query = """
SELECT * FROM (
    SELECT product, region, sales
    FROM sales_data
) PIVOT (
    SUM(sales) FOR region IN ('North', 'South', 'East', 'West')
)
"""


pivot_df_sql = spark.sql(sql_query)

display(pivot_df_sql)&lt;/LI-CODE&gt;&lt;P&gt;is equivalent to this sql:&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;sql_query = """
SELECT
    product,
    SUM(CASE WHEN region = 'North' THEN sales ELSE 0 END) AS North,
    SUM(CASE WHEN region = 'South' THEN sales ELSE 0 END) AS South,
    SUM(CASE WHEN region = 'East' THEN sales ELSE 0 END) AS East,
    SUM(CASE WHEN region = 'West' THEN sales ELSE 0 END) AS West
FROM sales_data
GROUP BY product
"""

display(spark.sql(sql_query))&lt;/LI-CODE&gt;</description>
      <pubDate>Fri, 19 Jul 2024 05:59:45 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/delta-live-tables-and-pivoting/m-p/79329#M35737</guid>
      <dc:creator>szymon_dybczak</dc:creator>
      <dc:date>2024-07-19T05:59:45Z</dc:date>
    </item>
    <item>
      <title>Re: Delta Live Tables and Pivoting</title>
      <link>https://community.databricks.com/t5/data-engineering/delta-live-tables-and-pivoting/m-p/80748#M36147</link>
      <description>&lt;P&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/110502"&gt;@szymon_dybczak&lt;/a&gt;&amp;nbsp;Thanks for the&amp;nbsp;&lt;SPAN&gt;workaround. That is helpful.&lt;/SPAN&gt;&lt;/P&gt;</description>
      <pubDate>Fri, 26 Jul 2024 16:51:23 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/delta-live-tables-and-pivoting/m-p/80748#M36147</guid>
      <dc:creator>YS1</dc:creator>
      <dc:date>2024-07-26T16:51:23Z</dc:date>
    </item>
  </channel>
</rss>

