cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

How to apply a UDF to a property in an array of structs

rbelidrv
New Contributor II

I have a column that contains an array of structs as follows:

"column" : [ 
{ "struct_field1": "struct_value",  "struct_field2": "struct_value" }, 
{ "struct_field1": "struct_value",  "struct_field2": "struct_value" } 
]

I want to apply a udf to each field of the structs. I am currently trying to do this using a transform however does not seem to work because the udf is not receiving the context.

The error I get is "Cannot generate code for expression: <lambda>(lambda x_1#123.struct_field1)#45678"

df.select(transform("column", lambda x: struct( 
  my_udf_for(x.struct_field1).alias("struct_field1"),
  my_udf_for(x.struct_field2).alias("struct_field2"),
)).alias("column"))

How do I nest a udf inside a transform?

3 REPLIES 3

Kaniz
Community Manager
Community Manager

Hi @Richard Belihomji​, It looks like you are trying to apply a UDF to each field of the structs in an array column in a Spark DataFrame. However, it seems you are encountering an issue with the UDF not receiving the context.

To nest a UDF inside a transform, you can define the UDF separately and then call it within the transform function. Here is an example of how you can apply a UDF to each field of the structs in an array column:

from pyspark.sql.functions import udf, struct, col
from pyspark.sql.types import StringType
 
# define the UDF
my_udf = udf(lambda x: x.upper() if x is not None else None, StringType())
 
# apply the UDF to each field of the structs in the array column
df.selectExpr("transform(column, x -> struct(my_udf(x.struct_field1).alias('struct_field1'), my_udf(x.struct_field2).alias('struct_field2'))) as column")

In this example, the UDF is defined separately using the UDF function and then called within the

transform function using a lambda function to apply the UDF to each field of the structs in the array column. Note that the selectExpr method is used to pass the lambda function as a string for parsing by Spark. Also, the col function refers to the column "column" for the transform function.

I hope this helps!

rbelidrv
New Contributor II

Hi Kaniz,

Thank you for your response. However, it does not look like your code will compile. You reference the udf within SQL without registering the udf. Also you seem to be mixing pyspark code within the SQL query, where you use alias.

Even if I fix these issues with your code, it still does not execute and I get the same error:

SparkUnsupportedOperationException: [INTERNAL ERROR] Cannot generate code for expression: my_udf(lambda x#306.struct_field1)#307

Appreciate if you could advise if this is expected behaviour or if the functionality is supported.

Anonymous
Not applicable

@Richard Belihomji​ : Please try this

To apply a UDF to a property in an array of structs using PySpark, you can define your UDF as a Python function and register it using the udf method from pyspark.sql.functions. Then, you can use the getItem method to extract the value of a particular field from the struct, and pass it as an argument to your UDF.

Here's an example code snippet that shows how to do this:

from pyspark.sql.functions import udf, struct, col
 
# define your UDF
@udf
def my_udf(x):
    return x.upper()
 
# apply the UDF to the struct_field1 property in the array of structs
df = df.withColumn("column", 
                   transform(col("column"), 
                             lambda x: struct(
                                 my_udf(x.getItem("struct_field1")).alias("struct_field1"), 
                                 x.getItem("struct_field2").alias("struct_field2"))))

In this example, we define a UDF called my_udf that converts the input string to uppercase. We then use the withColumn method to apply the transform function to the column array. In the lambda function passed to transform, we use the getItem method to extract the value of the struct_field1 property, and pass it as an argument to my_udf. We then use the alias method to rename the resulting column to struct_field1. Similarly, we extract the struct_field2 property using

getItem, and rename it using alias.

Note that it's important to register the UDF using the @udf decorator, as this allows PySpark to infer the return type of the UDF. Without this, you may encounter errors or performance issues.

I hope this helps, and please let me know if you have any further questions or concerns.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.