cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Use pandas in DLT pipeline

bulbur
New Contributor

Hi,

I am trying to work with pandas in a delta live table. I have created some example code:

 

import pandas as pd
import pyspark.sql.functions as F
 
pdf = pd.DataFrame({"A": ["foo", "foo", "foo", "foo", "foo",
                          "bar", "bar", "bar", "bar"],
                    "B": ["one", "one", "one", "two", "two",
                          "one", "one", "two", "two"],
                    "C": ["small", "large", "large", "small",
                          "small", "large", "small", "small",
                          "large"],
                    "D": [1, 2, 2, 3, 3, 4, 5, 6, 7],
                    "E": [2, 4, 5, 5, 6, 6, 8, 9, 9]})
df = spark.createDataFrame(pdf)
df.write.mode('overwrite').saveAsTable('test_table')
 
import dlt
 
@dlt.table
def dlt_test_table():
    return spark.read.table('test_table')
 
@dlt.table
def dlt_test_table_pivoted():
  df = dlt.read('dlt_test_table')
  pd_df = df.toPandas()
  result_df = spark.createDataFrame(pd_df)
  return result_df

 

 

 

I get the error message "ValueError: can not infer schema from empty dataset"

Spoiler
py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last): File "/databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 642, in _call_proxy return_value = getattr(self.pool[obj_id], method)(*params) File "/databricks/spark/python/dlt/helpers.py", line 31, in call res = self.func() File "/home/spark-1ca3adb2-007d-4985-bc8f-47/.ipykernel/66446/command--1-10634334", line 26, in dlt_test_table_pivoted result_df = spark.createDataFrame(pd_df) File "/databricks/spark/python/pyspark/instrumentation_utils.py", line 47, in wrapper res = func(*args, **kwargs) File "/databricks/spark/python/pyspark/sql/session.py", line 1504, in createDataFrame return super(SparkSession, self).createDataFrame( # type: ignore[call-overload] File "/databricks/spark/python/pyspark/sql/pandas/conversion.py", line 424, in createDataFrame return self._create_dataframe(converted_data, schema, samplingRatio, verifySchema) File "/databricks/spark/python/pyspark/sql/session.py", line 1552, in _create_dataframe jdf, struct = self._createFromLocalTrusted(map(prepare, data), schema) File "/databricks/spark/python/pyspark/sql/session.py", line 1164, in _createFromLocalTrusted data, schema = self._wrap_data_schema(data, schema) File "/databricks/spark/python/pyspark/sql/session.py", line 1102, in _wrap_data_schema struct = self._inferSchemaFromList(data, names=schema) File "/databricks/spark/python/pyspark/sql/session.py", line 967, in _inferSchemaFromList raise ValueError("can not infer schema from empty dataset") ValueError: can not infer schema from empty dataset

 

If I remove the conversion to pandas and just return "df" in "dlt_test_table_pivoted()" I don't get an error message.

I have seen the limitations in the documentation. It says there:

  • The Python table and view functions must return a DataFrame. Some functions that operate on DataFrames do not return DataFrames and should not be used. Because DataFrame transformations are executed after the full dataflow graph has been resolved, using such operations might have unintended side effects. These operations include functions such as collect(), count(), toPandas(), save(), and saveAsTable(). However, you can include these functions outside of table or view function definitions because this code is run once during the graph initialization phase.

So it was not really surpising that it didn't work. Does this mean there is no way to use pandas in a DLT pipeline?

0 REPLIES 0
Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!