Use pandas in DLT pipeline
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
06-12-2024 07:22 AM
Hi,
I am trying to work with pandas in a delta live table. I have created some example code:
import pandas as pd
import pyspark.sql.functions as F
pdf = pd.DataFrame({"A": ["foo", "foo", "foo", "foo", "foo",
"bar", "bar", "bar", "bar"],
"B": ["one", "one", "one", "two", "two",
"one", "one", "two", "two"],
"C": ["small", "large", "large", "small",
"small", "large", "small", "small",
"large"],
"D": [1, 2, 2, 3, 3, 4, 5, 6, 7],
"E": [2, 4, 5, 5, 6, 6, 8, 9, 9]})
df = spark.createDataFrame(pdf)
df.write.mode('overwrite').saveAsTable('test_table')
import dlt
@dlt.table
def dlt_test_table():
return spark.read.table('test_table')
@dlt.table
def dlt_test_table_pivoted():
df = dlt.read('dlt_test_table')
pd_df = df.toPandas()
result_df = spark.createDataFrame(pd_df)
return result_df
I get the error message "ValueError: can not infer schema from empty dataset"
If I remove the conversion to pandas and just return "df" in "dlt_test_table_pivoted()" I don't get an error message.
I have seen the limitations in the documentation. It says there:
The Python table and view functions must return a DataFrame. Some functions that operate on DataFrames do not return DataFrames and should not be used. Because DataFrame transformations are executed after the full dataflow graph has been resolved, using such operations might have unintended side effects. These operations include functions such as collect(), count(), toPandas(), save(), and saveAsTable(). However, you can include these functions outside of table or view function definitions because this code is run once during the graph initialization phase.
So it was not really surpising that it didn't work. Does this mean there is no way to use pandas in a DLT pipeline?
- Labels:
-
Delta Lake
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
08-12-2024 01:52 AM
I have taken the advice given by the documentation (However, you can include these functions outside of table or view function definitions because this code is run once during the graph initialization phase.) and moved the toPandas call to a function without a "@table" decorator. It is still referenced by it, so I don't know if that is the intention of the remark.
import dlt
def pivot_table(spark_df):
# removed the pivoting to simplify the example
pdf = spark_df.toPandas()
return spark.createDataFrame(pdf)
@dlt.table
def dlt_test_table_pivoted():
df = spark.read.table('test_table')
result_df = pivot_table(df)
return result_df
This does work, so I am a step further now.
But if I try to add an intermediate table function I get the same error as before ("ValueError: can not infer schema from empty dataset")
import dlt
def pivot_table(spark_df):
pdf = spark_df.toPandas()
return spark.createDataFrame(pdf)
@dlt.table
def dlt_test_table():
return spark.read.table('test_table')
@dlt.table
def dlt_test_table_pivoted():
df = dlt.read('dlt_test_table')
result_df = pivot_table(df)
return result_df