cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

TypeError on DataFrame via spark readStream transform invocation of UDF

883702
New Contributor III

Our use case is to "clean up" column names (remove spaces, etc) on ingestion of CSV data using the Delta Live Table capability. We desire to use the schema inference capability during ingestion so schema specification (up front) will not be happening.

Explored usage of .withColumnRenamed for offending columns but quickly became intractable as column count is over 100 now.

After some research we settled on defining a UDF for the use case that will .replace characters we want to remove on column names. The UDF uses python list comprehension; however, the call to df.columns does not return the lift of column names as expected.

File "/databricks/spark/python/pyspark/sql/column.py", line 87, in _to_seq

cols = [converter(c) for c in cols]

File "/databricks/spark/python/pyspark/sql/column.py", line 87, in <listcomp>

cols = [converter(c) for c in cols]

File "/databricks/spark/python/pyspark/sql/column.py", line 66, in _to_java_column

raise TypeError(

TypeError: Invalid argument, not a string or column: DataFrame[Run #: string, Tank #: string, ...

Here is the implementation:

--

import dlt

from pyspark.sql.functions import col as my_col

from pyspark.sql.functions import udf as my_udf

@my_udf

def clean_columns(df):

return df.select([my_col(x).alias(x

.replace(" ", "_")

.replace("/", "")

.replace("%", "pct")

.replace("(", "")

.replace(")", "")

) for x in df.columns])

@dlt.table(

comment="test"

)

def test_rawzone_data():

return(

spark.readStream.format("cloudFiles")

.option("cloudFiles.format", "csv")

.option("pathGlobfilter", "*.csv")

.option("header", True)

.load("s3://obfuscated-of-course/etc/etc/Pipeline//")

.transform(clean_columns)

)

The transform function invokes "with context" and passes the DataFrame but the Type isn't resolving. Namespace collisions? Anyone have recommendations for improvements to the UDF or pursuing a different approach?

Thank you for any pointers.

1 ACCEPTED SOLUTION

Accepted Solutions

883702
New Contributor III

The issue was erroneously believing the transform function needed UDF decorator. With the decorator removed the transform invokes (and works) as expected.

image

View solution in original post

1 REPLY 1

883702
New Contributor III

The issue was erroneously believing the transform function needed UDF decorator. With the decorator removed the transform invokes (and works) as expected.

image

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group