<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Use pandas in DLT pipeline in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/use-pandas-in-dlt-pipeline/m-p/72793#M34606</link>
    <description>&lt;P&gt;Hi,&lt;/P&gt;&lt;P&gt;I am trying to work with pandas in a delta live table. I have created some example code:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;import pandas as pd
import pyspark.sql.functions as F
 
pdf = pd.DataFrame({"A": ["foo", "foo", "foo", "foo", "foo",
                          "bar", "bar", "bar", "bar"],
                    "B": ["one", "one", "one", "two", "two",
                          "one", "one", "two", "two"],
                    "C": ["small", "large", "large", "small",
                          "small", "large", "small", "small",
                          "large"],
                    "D": [1, 2, 2, 3, 3, 4, 5, 6, 7],
                    "E": [2, 4, 5, 5, 6, 6, 8, 9, 9]})
df = spark.createDataFrame(pdf)
df.write.mode('overwrite').saveAsTable('test_table')
 
import dlt
 
@dlt.table
def dlt_test_table():
    return spark.read.table('test_table')
 
@dlt.table
def dlt_test_table_pivoted():
  df = dlt.read('dlt_test_table')
  pd_df = df.toPandas()
  result_df = spark.createDataFrame(pd_df)
  return result_df&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;I get the error message "ValueError: can not infer schema from empty dataset"&lt;/SPAN&gt;&lt;/P&gt;&lt;LI-SPOILER&gt;&lt;SPAN&gt;py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last): File "/databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 642, in _call_proxy return_value = getattr(self.pool[obj_id], method)(*params) File "/databricks/spark/python/dlt/helpers.py", line 31, in call res = self.func() File "/home/spark-1ca3adb2-007d-4985-bc8f-47/.ipykernel/66446/command--1-10634334", line 26, in dlt_test_table_pivoted result_df = spark.createDataFrame(pd_df) File "/databricks/spark/python/pyspark/instrumentation_utils.py", line 47, in wrapper res = func(*args, **kwargs) File "/databricks/spark/python/pyspark/sql/session.py", line 1504, in createDataFrame return super(SparkSession, self).createDataFrame( # type: ignore[call-overload] File "/databricks/spark/python/pyspark/sql/pandas/conversion.py", line 424, in createDataFrame return self._create_dataframe(converted_data, schema, samplingRatio, verifySchema) File "/databricks/spark/python/pyspark/sql/session.py", line 1552, in _create_dataframe jdf, struct = self._createFromLocalTrusted(map(prepare, data), schema) File "/databricks/spark/python/pyspark/sql/session.py", line 1164, in _createFromLocalTrusted data, schema = self._wrap_data_schema(data, schema) File "/databricks/spark/python/pyspark/sql/session.py", line 1102, in _wrap_data_schema struct = self._inferSchemaFromList(data, names=schema) File "/databricks/spark/python/pyspark/sql/session.py", line 967, in _inferSchemaFromList raise ValueError("can not infer schema from empty dataset") ValueError: can not infer schema from empty dataset&lt;/SPAN&gt;&lt;/LI-SPOILER&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;If I remove the conversion to pandas and just return "df" in "&lt;SPAN&gt;dlt_test_table_pivoted()" I don't get an error message.&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;I have seen the limitations in the&amp;nbsp;&lt;A href="https://docs.databricks.com/en/delta-live-tables/python-ref.html#limitations" target="_blank" rel="noopener"&gt;documentation&lt;/A&gt;. It says there:&lt;/P&gt;&lt;UL class=""&gt;&lt;LI&gt;&lt;P&gt;The Python&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN class=""&gt;table&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;and&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN class=""&gt;view&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;functions must return a DataFrame. Some functions that operate on DataFrames do not return DataFrames and should not be used. Because DataFrame transformations are executed&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;EM&gt;after&lt;/EM&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;the full dataflow graph has been resolved, using such operations might have unintended side effects. These operations include functions such as&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN class=""&gt;collect()&lt;/SPAN&gt;,&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN class=""&gt;count()&lt;/SPAN&gt;,&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN class=""&gt;toPandas()&lt;/SPAN&gt;,&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN class=""&gt;save()&lt;/SPAN&gt;, and&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN class=""&gt;saveAsTable()&lt;/SPAN&gt;. However, you can include these functions outside of&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN class=""&gt;table&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;or&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN class=""&gt;view&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;function definitions because this code is run once during the graph initialization phase.&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;So it was not really surpising that it didn't work. Does this mean there is no way to use pandas in a DLT pipeline?&lt;/P&gt;</description>
    <pubDate>Wed, 12 Jun 2024 14:22:53 GMT</pubDate>
    <dc:creator>bulbur</dc:creator>
    <dc:date>2024-06-12T14:22:53Z</dc:date>
    <item>
      <title>Use pandas in DLT pipeline</title>
      <link>https://community.databricks.com/t5/data-engineering/use-pandas-in-dlt-pipeline/m-p/72793#M34606</link>
      <description>&lt;P&gt;Hi,&lt;/P&gt;&lt;P&gt;I am trying to work with pandas in a delta live table. I have created some example code:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;import pandas as pd
import pyspark.sql.functions as F
 
pdf = pd.DataFrame({"A": ["foo", "foo", "foo", "foo", "foo",
                          "bar", "bar", "bar", "bar"],
                    "B": ["one", "one", "one", "two", "two",
                          "one", "one", "two", "two"],
                    "C": ["small", "large", "large", "small",
                          "small", "large", "small", "small",
                          "large"],
                    "D": [1, 2, 2, 3, 3, 4, 5, 6, 7],
                    "E": [2, 4, 5, 5, 6, 6, 8, 9, 9]})
df = spark.createDataFrame(pdf)
df.write.mode('overwrite').saveAsTable('test_table')
 
import dlt
 
@dlt.table
def dlt_test_table():
    return spark.read.table('test_table')
 
@dlt.table
def dlt_test_table_pivoted():
  df = dlt.read('dlt_test_table')
  pd_df = df.toPandas()
  result_df = spark.createDataFrame(pd_df)
  return result_df&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;I get the error message "ValueError: can not infer schema from empty dataset"&lt;/SPAN&gt;&lt;/P&gt;&lt;LI-SPOILER&gt;&lt;SPAN&gt;py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last): File "/databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 642, in _call_proxy return_value = getattr(self.pool[obj_id], method)(*params) File "/databricks/spark/python/dlt/helpers.py", line 31, in call res = self.func() File "/home/spark-1ca3adb2-007d-4985-bc8f-47/.ipykernel/66446/command--1-10634334", line 26, in dlt_test_table_pivoted result_df = spark.createDataFrame(pd_df) File "/databricks/spark/python/pyspark/instrumentation_utils.py", line 47, in wrapper res = func(*args, **kwargs) File "/databricks/spark/python/pyspark/sql/session.py", line 1504, in createDataFrame return super(SparkSession, self).createDataFrame( # type: ignore[call-overload] File "/databricks/spark/python/pyspark/sql/pandas/conversion.py", line 424, in createDataFrame return self._create_dataframe(converted_data, schema, samplingRatio, verifySchema) File "/databricks/spark/python/pyspark/sql/session.py", line 1552, in _create_dataframe jdf, struct = self._createFromLocalTrusted(map(prepare, data), schema) File "/databricks/spark/python/pyspark/sql/session.py", line 1164, in _createFromLocalTrusted data, schema = self._wrap_data_schema(data, schema) File "/databricks/spark/python/pyspark/sql/session.py", line 1102, in _wrap_data_schema struct = self._inferSchemaFromList(data, names=schema) File "/databricks/spark/python/pyspark/sql/session.py", line 967, in _inferSchemaFromList raise ValueError("can not infer schema from empty dataset") ValueError: can not infer schema from empty dataset&lt;/SPAN&gt;&lt;/LI-SPOILER&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;If I remove the conversion to pandas and just return "df" in "&lt;SPAN&gt;dlt_test_table_pivoted()" I don't get an error message.&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;I have seen the limitations in the&amp;nbsp;&lt;A href="https://docs.databricks.com/en/delta-live-tables/python-ref.html#limitations" target="_blank" rel="noopener"&gt;documentation&lt;/A&gt;. It says there:&lt;/P&gt;&lt;UL class=""&gt;&lt;LI&gt;&lt;P&gt;The Python&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN class=""&gt;table&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;and&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN class=""&gt;view&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;functions must return a DataFrame. Some functions that operate on DataFrames do not return DataFrames and should not be used. Because DataFrame transformations are executed&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;EM&gt;after&lt;/EM&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;the full dataflow graph has been resolved, using such operations might have unintended side effects. These operations include functions such as&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN class=""&gt;collect()&lt;/SPAN&gt;,&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN class=""&gt;count()&lt;/SPAN&gt;,&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN class=""&gt;toPandas()&lt;/SPAN&gt;,&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN class=""&gt;save()&lt;/SPAN&gt;, and&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN class=""&gt;saveAsTable()&lt;/SPAN&gt;. However, you can include these functions outside of&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN class=""&gt;table&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;or&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN class=""&gt;view&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;function definitions because this code is run once during the graph initialization phase.&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;So it was not really surpising that it didn't work. Does this mean there is no way to use pandas in a DLT pipeline?&lt;/P&gt;</description>
      <pubDate>Wed, 12 Jun 2024 14:22:53 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/use-pandas-in-dlt-pipeline/m-p/72793#M34606</guid>
      <dc:creator>bulbur</dc:creator>
      <dc:date>2024-06-12T14:22:53Z</dc:date>
    </item>
    <item>
      <title>Re: Use pandas in DLT pipeline</title>
      <link>https://community.databricks.com/t5/data-engineering/use-pandas-in-dlt-pipeline/m-p/82716#M36724</link>
      <description>&lt;P&gt;I have taken the advice given by the documentation (&lt;SPAN&gt;However, you can include these functions outside of&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN class=""&gt;table&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;or&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN class=""&gt;view&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;function definitions because this code is run once during the graph initialization phase.) and moved the&amp;nbsp;&lt;EM&gt;toPandas&lt;/EM&gt; call to a function without a "@table" decorator. It is still referenced by it, so I don't know if that is the intention of the remark.&lt;/SPAN&gt;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;import dlt

def pivot_table(spark_df):
  # removed the pivoting to simplify the example
  pdf = spark_df.toPandas()
  return spark.createDataFrame(pdf)
 
@dlt.table
def dlt_test_table_pivoted():
  df = spark.read.table('test_table')
  result_df = pivot_table(df)
  return result_df&lt;/LI-CODE&gt;&lt;P&gt;This does work, so I am a step further now.&lt;/P&gt;&lt;P&gt;But if I try to add an intermediate table function I get the same error as before ("&lt;SPAN&gt;ValueError: can not infer schema from empty dataset")&lt;/SPAN&gt;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;import dlt

def pivot_table(spark_df):
  pdf = spark_df.toPandas()
  return spark.createDataFrame(pdf)

@dlt.table
def dlt_test_table():
    return spark.read.table('test_table')
  
@dlt.table
def dlt_test_table_pivoted():
  df = dlt.read('dlt_test_table')
  result_df = pivot_table(df)
  return result_df&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Mon, 12 Aug 2024 08:52:59 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/use-pandas-in-dlt-pipeline/m-p/82716#M36724</guid>
      <dc:creator>bulbur</dc:creator>
      <dc:date>2024-08-12T08:52:59Z</dc:date>
    </item>
  </channel>
</rss>

