Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last): File "/databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 642, in _call_proxy return_value = getattr(self.pool[obj_id], method)(*params) File "/databricks/spark/python/dlt/helpers.py", line 31, in call res = self.func() File "/home/spark-1ca3adb2-007d-4985-bc8f-47/.ipykernel/66446/command--1-10634334", line 26, in dlt_test_table_pivoted result_df = spark.createDataFrame(pd_df) File "/databricks/spark/python/pyspark/instrumentation_utils.py", line 47, in wrapper res = func(*args, **kwargs) File "/databricks/spark/python/pyspark/sql/session.py", line 1504, in createDataFrame return super(SparkSession, self).createDataFrame( # type: ignore[call-overload] File "/databricks/spark/python/pyspark/sql/pandas/conversion.py", line 424, in createDataFrame return self._create_dataframe(converted_data, schema, samplingRatio, verifySchema) File "/databricks/spark/python/pyspark/sql/session.py", line 1552, in _create_dataframe jdf, struct = self._createFromLocalTrusted(map(prepare, data), schema) File "/databricks/spark/python/pyspark/sql/session.py", line 1164, in _createFromLocalTrusted data, schema = self._wrap_data_schema(data, schema) File "/databricks/spark/python/pyspark/sql/session.py", line 1102, in _wrap_data_schema struct = self._inferSchemaFromList(data, names=schema) File "/databricks/spark/python/pyspark/sql/session.py", line 967, in _inferSchemaFromList raise ValueError("can not infer schema from empty dataset") ValueError: can not infer schema from empty dataset
If I remove the conversion to pandas and just return "df" in "dlt_test_table_pivoted()" I don't get an error message.
I have seen the limitations in the documentation. It says there:
The Pythontableandviewfunctions must return a DataFrame. Some functions that operate on DataFrames do not return DataFrames and should not be used. Because DataFrame transformations are executedafterthe full dataflow graph has been resolved, using such operations might have unintended side effects. These operations include functions such ascollect(),count(),toPandas(),save(), andsaveAsTable(). However, you can include these functions outside oftableorviewfunction definitions because this code is run once during the graph initialization phase.
So it was not really surpising that it didn't work. Does this mean there is no way to use pandas in a DLT pipeline?
I have taken the advice given by the documentation (However, you can include these functions outside oftableorviewfunction definitions because this code is run once during the graph initialization phase.) and moved the toPandas call to a function without a "@table" decorator. It is still referenced by it, so I don't know if that is the intention of the remark.
import dlt
def pivot_table(spark_df):
# removed the pivoting to simplify the example
pdf = spark_df.toPandas()
return spark.createDataFrame(pdf)
@dlt.table
def dlt_test_table_pivoted():
df = spark.read.table('test_table')
result_df = pivot_table(df)
return result_df
This does work, so I am a step further now.
But if I try to add an intermediate table function I get the same error as before ("ValueError: can not infer schema from empty dataset")
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.