cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Delta Live Tables error pivot

mangel
New Contributor III

I'm facing an error in Delta Live Tables when I want to pivot a table. The error is the following:

image 

And the code to replicate the error is the following:

import pandas as pd
import pyspark.sql.functions as F
 
pdf = pd.DataFrame({"A": ["foo", "foo", "foo", "foo", "foo",
                          "bar", "bar", "bar", "bar"],
                    "B": ["one", "one", "one", "two", "two",
                          "one", "one", "two", "two"],
                    "C": ["small", "large", "large", "small",
                          "small", "large", "small", "small",
                          "large"],
                    "D": [1, 2, 2, 3, 3, 4, 5, 6, 7],
                    "E": [2, 4, 5, 5, 6, 6, 8, 9, 9]})
df = spark.createDataFrame(pdf)
df.write.mode('overwrite').saveAsTable('test_table')
 
import dlt
 
@dlt.view
def test_table():
    return spark.read.table('test_table')
 
@dlt.table
def test_table_pivoted():
    return (
        spark.table('LIVE.test_table')
        .groupBy('A', 'B')
        .pivot('C')
        .agg(F.first('D'))
    )

Does anybody know why I can not pivot a table in Delta Live Tables Pipelines?

1 ACCEPTED SOLUTION

Accepted Solutions

mangel
New Contributor III

The solution seems to add the following configuration to the Delta Live Tables Pipeline:

spark.databricks.delta.schema.autoMerge.enabled: true

It allows "schema evolution" in the pipeline and solves the problem.

View solution in original post

6 REPLIES 6

ccary
New Contributor III

Can you try passing in the column names as a second argument to the pivot function?

.pivot('C', ["small", "large"])

mangel
New Contributor III

Hi, this would only make the query run faster, thanks for the try. I will post below the solution I found to this issue.

mangel
New Contributor III

The solution seems to add the following configuration to the Delta Live Tables Pipeline:

spark.databricks.delta.schema.autoMerge.enabled: true

It allows "schema evolution" in the pipeline and solves the problem.

According to both SQL and Python DLT documentations, "pivot" is not supported in DLT.

So I wonder what are the complications of using "pivot" in such an unsupported way?

JackyL
New Contributor II

I'm a bit of a muppet, it's implied but took be a second to figure out that you need to write it like this:

spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

 

Khalil
Contributor

It's said in the DLT documentation that "pivot" is not supported in DLT but I noticed that if you want the pivot function to work you have to do one of the the following things:

  1. apply the pivot in your first a dlt.view + the config "spark.databricks.delta.schema.autoMerge.enabled: true" .
  2. apply pivot outside of dlt decorators then start using the output in dlt.view or dlt.table.

Note: I noticed that this works but you get a warning saying that `GroupedData.pivot` function that will be deprecated soon, you will have the same warning if you use Collect for instance.

Hope that help!

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group