cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Use pandas in DLT pipeline

bulbur
New Contributor II

Hi,

I am trying to work with pandas in a delta live table. I have created some example code:

 

import pandas as pd
import pyspark.sql.functions as F
 
pdf = pd.DataFrame({"A": ["foo", "foo", "foo", "foo", "foo",
                          "bar", "bar", "bar", "bar"],
                    "B": ["one", "one", "one", "two", "two",
                          "one", "one", "two", "two"],
                    "C": ["small", "large", "large", "small",
                          "small", "large", "small", "small",
                          "large"],
                    "D": [1, 2, 2, 3, 3, 4, 5, 6, 7],
                    "E": [2, 4, 5, 5, 6, 6, 8, 9, 9]})
df = spark.createDataFrame(pdf)
df.write.mode('overwrite').saveAsTable('test_table')
 
import dlt
 
@dlt.table
def dlt_test_table():
    return spark.read.table('test_table')
 
@dlt.table
def dlt_test_table_pivoted():
  df = dlt.read('dlt_test_table')
  pd_df = df.toPandas()
  result_df = spark.createDataFrame(pd_df)
  return result_df

 

 

 

I get the error message "ValueError: can not infer schema from empty dataset"

Spoiler
py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last): File "/databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 642, in _call_proxy return_value = getattr(self.pool[obj_id], method)(*params) File "/databricks/spark/python/dlt/helpers.py", line 31, in call res = self.func() File "/home/spark-1ca3adb2-007d-4985-bc8f-47/.ipykernel/66446/command--1-10634334", line 26, in dlt_test_table_pivoted result_df = spark.createDataFrame(pd_df) File "/databricks/spark/python/pyspark/instrumentation_utils.py", line 47, in wrapper res = func(*args, **kwargs) File "/databricks/spark/python/pyspark/sql/session.py", line 1504, in createDataFrame return super(SparkSession, self).createDataFrame( # type: ignore[call-overload] File "/databricks/spark/python/pyspark/sql/pandas/conversion.py", line 424, in createDataFrame return self._create_dataframe(converted_data, schema, samplingRatio, verifySchema) File "/databricks/spark/python/pyspark/sql/session.py", line 1552, in _create_dataframe jdf, struct = self._createFromLocalTrusted(map(prepare, data), schema) File "/databricks/spark/python/pyspark/sql/session.py", line 1164, in _createFromLocalTrusted data, schema = self._wrap_data_schema(data, schema) File "/databricks/spark/python/pyspark/sql/session.py", line 1102, in _wrap_data_schema struct = self._inferSchemaFromList(data, names=schema) File "/databricks/spark/python/pyspark/sql/session.py", line 967, in _inferSchemaFromList raise ValueError("can not infer schema from empty dataset") ValueError: can not infer schema from empty dataset

 

If I remove the conversion to pandas and just return "df" in "dlt_test_table_pivoted()" I don't get an error message.

I have seen the limitations in the documentation. It says there:

  • The Python table and view functions must return a DataFrame. Some functions that operate on DataFrames do not return DataFrames and should not be used. Because DataFrame transformations are executed after the full dataflow graph has been resolved, using such operations might have unintended side effects. These operations include functions such as collect(), count(), toPandas(), save(), and saveAsTable(). However, you can include these functions outside of table or view function definitions because this code is run once during the graph initialization phase.

So it was not really surpising that it didn't work. Does this mean there is no way to use pandas in a DLT pipeline?

1 REPLY 1

bulbur
New Contributor II

I have taken the advice given by the documentation (However, you can include these functions outside of table or view function definitions because this code is run once during the graph initialization phase.) and moved the toPandas call to a function without a "@table" decorator. It is still referenced by it, so I don't know if that is the intention of the remark.

import dlt

def pivot_table(spark_df):
  # removed the pivoting to simplify the example
  pdf = spark_df.toPandas()
  return spark.createDataFrame(pdf)
 
@dlt.table
def dlt_test_table_pivoted():
  df = spark.read.table('test_table')
  result_df = pivot_table(df)
  return result_df

This does work, so I am a step further now.

But if I try to add an intermediate table function I get the same error as before ("ValueError: can not infer schema from empty dataset")

import dlt

def pivot_table(spark_df):
  pdf = spark_df.toPandas()
  return spark.createDataFrame(pdf)

@dlt.table
def dlt_test_table():
    return spark.read.table('test_table')
  
@dlt.table
def dlt_test_table_pivoted():
  df = dlt.read('dlt_test_table')
  result_df = pivot_table(df)
  return result_df

 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group