cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

TypeError on DataFrame via spark readStream transform invocation of UDF

883702
New Contributor III

Our use case is to "clean up" column names (remove spaces, etc) on ingestion of CSV data using the Delta Live Table capability. We desire to use the schema inference capability during ingestion so schema specification (up front) will not be happening.

Explored usage of .withColumnRenamed for offending columns but quickly became intractable as column count is over 100 now.

After some research we settled on defining a UDF for the use case that will .replace characters we want to remove on column names. The UDF uses python list comprehension; however, the call to df.columns does not return the lift of column names as expected.

File "/databricks/spark/python/pyspark/sql/column.py", line 87, in _to_seq

cols = [converter(c) for c in cols]

File "/databricks/spark/python/pyspark/sql/column.py", line 87, in <listcomp>

cols = [converter(c) for c in cols]

File "/databricks/spark/python/pyspark/sql/column.py", line 66, in _to_java_column

raise TypeError(

TypeError: Invalid argument, not a string or column: DataFrame[Run #: string, Tank #: string, ...

Here is the implementation:

--

import dlt

from pyspark.sql.functions import col as my_col

from pyspark.sql.functions import udf as my_udf

@my_udf

def clean_columns(df):

return df.select([my_col(x).alias(x

.replace(" ", "_")

.replace("/", "")

.replace("%", "pct")

.replace("(", "")

.replace(")", "")

) for x in df.columns])

@dlt.table(

comment="test"

)

def test_rawzone_data():

return(

spark.readStream.format("cloudFiles")

.option("cloudFiles.format", "csv")

.option("pathGlobfilter", "*.csv")

.option("header", True)

.load("s3://obfuscated-of-course/etc/etc/Pipeline//")

.transform(clean_columns)

)

The transform function invokes "with context" and passes the DataFrame but the Type isn't resolving. Namespace collisions? Anyone have recommendations for improvements to the UDF or pursuing a different approach?

Thank you for any pointers.

1 ACCEPTED SOLUTION

Accepted Solutions

883702
New Contributor III

The issue was erroneously believing the transform function needed UDF decorator. With the decorator removed the transform invokes (and works) as expected.

image

View solution in original post

1 REPLY 1

883702
New Contributor III

The issue was erroneously believing the transform function needed UDF decorator. With the decorator removed the transform invokes (and works) as expected.

image

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.