<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic TypeError on DataFrame via spark readStream transform invocation of UDF in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/typeerror-on-dataframe-via-spark-readstream-transform-invocation/m-p/6872#M2874</link>
    <description>&lt;P&gt;Our use case is to "clean up" column names (remove spaces, etc) on ingestion of CSV data using the Delta Live Table capability.  We desire to use the schema inference capability during ingestion so schema specification (up front) will not be happening.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Explored usage of .withColumnRenamed for offending columns but quickly became intractable as column count is over 100 now.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;After some research we settled on defining a UDF for the use case that will .replace characters we want to remove on column names.  The UDF uses python list comprehension; however, the call to df.columns does not return the lift of column names as expected.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;  File "/databricks/spark/python/pyspark/sql/column.py", line 87, in _to_seq&lt;/P&gt;&lt;P&gt;    cols = [converter(c) for c in cols]&lt;/P&gt;&lt;P&gt;  File "/databricks/spark/python/pyspark/sql/column.py", line 87, in &amp;lt;listcomp&amp;gt;&lt;/P&gt;&lt;P&gt;    cols = [converter(c) for c in cols]&lt;/P&gt;&lt;P&gt;  File "/databricks/spark/python/pyspark/sql/column.py", line 66, in _to_java_column&lt;/P&gt;&lt;P&gt;    raise TypeError(&lt;/P&gt;&lt;P&gt;&lt;B&gt;TypeError: Invalid argument, not a string or column&lt;/B&gt;: DataFrame[Run #: string, Tank #: string, ...&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Here is the implementation:&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;--&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;import dlt&lt;/P&gt;&lt;P&gt;from pyspark.sql.functions import col as my_col&lt;/P&gt;&lt;P&gt;from pyspark.sql.functions import udf as my_udf&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;@my_udf&lt;/P&gt;&lt;P&gt;def clean_columns(df):&lt;/P&gt;&lt;P&gt;    return  df.select([my_col(x).alias(x&lt;/P&gt;&lt;P&gt;        .replace(" ", "_")&lt;/P&gt;&lt;P&gt;        .replace("/", "")&lt;/P&gt;&lt;P&gt;        .replace("%", "pct")&lt;/P&gt;&lt;P&gt;        .replace("(", "")&lt;/P&gt;&lt;P&gt;        .replace(")", "")&lt;/P&gt;&lt;P&gt;        ) for x in df.columns])&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;@dlt.table(&lt;/P&gt;&lt;P&gt;    comment="test"&lt;/P&gt;&lt;P&gt;)&lt;/P&gt;&lt;P&gt;def test_rawzone_data():&lt;/P&gt;&lt;P&gt;    return(&lt;/P&gt;&lt;P&gt;        spark.readStream.format("cloudFiles")&lt;/P&gt;&lt;P&gt;            .option("cloudFiles.format", "csv")&lt;/P&gt;&lt;P&gt;            .option("pathGlobfilter", "*.csv")&lt;/P&gt;&lt;P&gt;            .option("header", True)&lt;/P&gt;&lt;P&gt;            .load("s3://obfuscated-of-course/etc/etc/Pipeline//")&lt;/P&gt;&lt;P&gt;            .transform(clean_columns)&lt;/P&gt;&lt;P&gt;    )&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;The transform function invokes "with context" and passes the DataFrame but the Type isn't resolving.  Namespace collisions?  Anyone have  recommendations for improvements to the UDF or pursuing a different approach?&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Thank you for any pointers.&lt;/P&gt;</description>
    <pubDate>Wed, 29 Mar 2023 10:05:26 GMT</pubDate>
    <dc:creator>883702</dc:creator>
    <dc:date>2023-03-29T10:05:26Z</dc:date>
    <item>
      <title>TypeError on DataFrame via spark readStream transform invocation of UDF</title>
      <link>https://community.databricks.com/t5/data-engineering/typeerror-on-dataframe-via-spark-readstream-transform-invocation/m-p/6872#M2874</link>
      <description>&lt;P&gt;Our use case is to "clean up" column names (remove spaces, etc) on ingestion of CSV data using the Delta Live Table capability.  We desire to use the schema inference capability during ingestion so schema specification (up front) will not be happening.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Explored usage of .withColumnRenamed for offending columns but quickly became intractable as column count is over 100 now.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;After some research we settled on defining a UDF for the use case that will .replace characters we want to remove on column names.  The UDF uses python list comprehension; however, the call to df.columns does not return the lift of column names as expected.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;  File "/databricks/spark/python/pyspark/sql/column.py", line 87, in _to_seq&lt;/P&gt;&lt;P&gt;    cols = [converter(c) for c in cols]&lt;/P&gt;&lt;P&gt;  File "/databricks/spark/python/pyspark/sql/column.py", line 87, in &amp;lt;listcomp&amp;gt;&lt;/P&gt;&lt;P&gt;    cols = [converter(c) for c in cols]&lt;/P&gt;&lt;P&gt;  File "/databricks/spark/python/pyspark/sql/column.py", line 66, in _to_java_column&lt;/P&gt;&lt;P&gt;    raise TypeError(&lt;/P&gt;&lt;P&gt;&lt;B&gt;TypeError: Invalid argument, not a string or column&lt;/B&gt;: DataFrame[Run #: string, Tank #: string, ...&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Here is the implementation:&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;--&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;import dlt&lt;/P&gt;&lt;P&gt;from pyspark.sql.functions import col as my_col&lt;/P&gt;&lt;P&gt;from pyspark.sql.functions import udf as my_udf&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;@my_udf&lt;/P&gt;&lt;P&gt;def clean_columns(df):&lt;/P&gt;&lt;P&gt;    return  df.select([my_col(x).alias(x&lt;/P&gt;&lt;P&gt;        .replace(" ", "_")&lt;/P&gt;&lt;P&gt;        .replace("/", "")&lt;/P&gt;&lt;P&gt;        .replace("%", "pct")&lt;/P&gt;&lt;P&gt;        .replace("(", "")&lt;/P&gt;&lt;P&gt;        .replace(")", "")&lt;/P&gt;&lt;P&gt;        ) for x in df.columns])&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;@dlt.table(&lt;/P&gt;&lt;P&gt;    comment="test"&lt;/P&gt;&lt;P&gt;)&lt;/P&gt;&lt;P&gt;def test_rawzone_data():&lt;/P&gt;&lt;P&gt;    return(&lt;/P&gt;&lt;P&gt;        spark.readStream.format("cloudFiles")&lt;/P&gt;&lt;P&gt;            .option("cloudFiles.format", "csv")&lt;/P&gt;&lt;P&gt;            .option("pathGlobfilter", "*.csv")&lt;/P&gt;&lt;P&gt;            .option("header", True)&lt;/P&gt;&lt;P&gt;            .load("s3://obfuscated-of-course/etc/etc/Pipeline//")&lt;/P&gt;&lt;P&gt;            .transform(clean_columns)&lt;/P&gt;&lt;P&gt;    )&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;The transform function invokes "with context" and passes the DataFrame but the Type isn't resolving.  Namespace collisions?  Anyone have  recommendations for improvements to the UDF or pursuing a different approach?&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Thank you for any pointers.&lt;/P&gt;</description>
      <pubDate>Wed, 29 Mar 2023 10:05:26 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/typeerror-on-dataframe-via-spark-readstream-transform-invocation/m-p/6872#M2874</guid>
      <dc:creator>883702</dc:creator>
      <dc:date>2023-03-29T10:05:26Z</dc:date>
    </item>
    <item>
      <title>Re: TypeError on DataFrame via spark readStream transform invocation of UDF</title>
      <link>https://community.databricks.com/t5/data-engineering/typeerror-on-dataframe-via-spark-readstream-transform-invocation/m-p/6873#M2875</link>
      <description>&lt;P&gt;The issue was erroneously believing the transform function needed UDF decorator.  With the decorator removed the transform invokes (and works) as expected.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper" image-alt="image"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/455i5719A3A3FCCF1516/image-size/large?v=v2&amp;amp;px=999" role="button" title="image" alt="image" /&gt;&lt;/span&gt;&lt;/P&gt;</description>
      <pubDate>Thu, 30 Mar 2023 03:02:37 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/typeerror-on-dataframe-via-spark-readstream-transform-invocation/m-p/6873#M2875</guid>
      <dc:creator>883702</dc:creator>
      <dc:date>2023-03-30T03:02:37Z</dc:date>
    </item>
  </channel>
</rss>

