cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Batch Python UDFs in Unity Catalog and Spark SQL

stefan-vulpe
New Contributor II

Hello datanauts 🧑🏻‍🚀,

I'm encountering a conceptual challenge regarding Batch Python UDFs within Spark SQL in Databricks. My primary question is: can Batch Python UDFs be used directly via Spark SQL? As a Databricks beginner, I'm seeking to understand the underlying reasons for the behavior I'm observing.

For testing, I've created a simple Batch Python UDF in my default catalog. This UDF processes the NYC Taxi trips sample dataset, simulating a data processing step by converting an iterator of pd.Series tuples into JSON strings, returning an iterator of a single pd.Series. Below is the UDF's code:

CREATE OR REPLACE FUNCTION process_taxi_data(
  pickup TIMESTAMP,
  dropoff TIMESTAMP,
  trip_dist DOUBLE,
  fare DOUBLE,
  pickup_zip INT,
  dropoff_zip INT
) RETURNS STRING LANGUAGE PYTHON PARAMETER STYLE PANDAS HANDLER 'handler_function' AS $$
import json
import pandas as pd
from typing import Iterator, Tuple

def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series, pd.Series, pd.Series, pd.Series, pd.Series]]) -> Iterator[pd.Series]:
    for pickup, dropoff, trip_dist, fare, pickup_zip, dropoff_zip in batch_iter:
        # Process each row in the batch
        results = []
        
        for i in range(len(trip_dist)):
            # Extract individual row values
            p_time = pickup.iloc[i]
            d_time = dropoff.iloc[i]
            dist = trip_dist.iloc[i] * 2  # Double the distance
            fare_val = fare.iloc[i]
            p_zip = pickup_zip.iloc[i]
            d_zip = dropoff_zip.iloc[i]
            
            # Create result object for this row
            result = {
                "trip_distance": dist,
                "fare": format(float(fare_val), '.2f'),
                "pickup_time": str(p_time),
                "dropoff_time": str(d_time),
                "pickup_zip": str(p_zip),
                "dropoff_zip": str(d_zip)
            }
            
            results.append(json.dumps(result))
        
        # Return a Series with one JSON string per row
        yield pd.Series(results)
$$;

The creation of the function executes successfully, and I'm able to use it within my SQL Editor for some test queries like the following:

SELECT
  pandas_process_taxi_data(
    tpep_pickup_datetime, tpep_dropoff_datetime, trip_distance, fare_amount, pickup_zip, dropoff_zip
  ) AS processed_data
FROM
  samples.nyctaxi.trips
LIMIT 5;

-- Output:
processed_data
"{""trip_distance"": 2.8, ""fare"": ""8.00"", ""pickup_time"": ""2016-02-13 21:47:53"", ""dropoff_time"": ""2016-02-13 21:57:15"", ""pickup_zip"": ""10103"", ""dropoff_zip"": ""10110""}"
"{""trip_distance"": 2.62, ""fare"": ""7.50"", ""pickup_time"": ""2016-02-13 18:29:09"", ""dropoff_time"": ""2016-02-13 18:37:23"", ""pickup_zip"": ""10023"", ""dropoff_zip"": ""10023""}"
"{""trip_distance"": 3.6, ""fare"": ""9.50"", ""pickup_time"": ""2016-02-06 19:40:58"", ""dropoff_time"": ""2016-02-06 19:52:32"", ""pickup_zip"": ""10001"", ""dropoff_zip"": ""10018""}"
"{""trip_distance"": 4.6, ""fare"": ""11.50"", ""pickup_time"": ""2016-02-12 19:06:43"", ""dropoff_time"": ""2016-02-12 19:20:54"", ""pickup_zip"": ""10044"", ""dropoff_zip"": ""10111""}"
"{""trip_distance"": 5.2, ""fare"": ""18.50"", ""pickup_time"": ""2016-02-23 10:27:56"", ""dropoff_time"": ""2016-02-23 10:58:33"", ""pickup_zip"": ""10199"", ""dropoff_zip"": ""10022""}"

The problem occurs when trying to use this UDF from a Python script / notebook. It seems that spark.sql can somehow find a reference to it, it executes it, but returns back only NULL rows. Here is a code example:

simple_test = spark.sql("""
SELECT
  <my_catalog>.<my_schema>.process_taxi_data(
    tpep_pickup_datetime, tpep_dropoff_datetime, trip_distance, fare_amount, pickup_zip, dropoff_zip
  ) AS processed_data
FROM
  samples.nyctaxi.trips;
""")

print("Simple UDF test:")
simple_test.groupBy('processed_data').count().show()
display(simple_test.select("processed_data").limit(5))

# Output:
processed_data:string
Simple UDF test:
+--------------+-----+
|processed_data|count|
+--------------+-----+
|          NULL|21932|
+--------------+-----+
processed_data
null
null
null
null
null

Does anyone know why that happens? Is there any way of having shared UDFs between Spark Sessions and Unity Catalog? Any response would be highly appreciated 🙏🏻!

1 ACCEPTED SOLUTION

Accepted Solutions

lingareddy_Alva
Honored Contributor II

Hi @stefan-vulpe 

Looking at your code and the behavior you're describing, I can identify the core issue and provide some insights about Batch Python UDFs in Databricks.

The Core Problem
The issue you're encountering is related to session isolation and UDF registration scope in Databricks. Here's what's happening:
1. SQL Editor vs Python Notebook Sessions: When you create a UDF using CREATE OR REPLACE FUNCTION in the SQL Editor,
it gets registered in Unity Catalog but the Python function handler is tied to that specific Spark session.
2. Session Isolation: When you run spark.sql() from a Python notebook, you're using a different Spark session than the SQL Editor,
even though both can reference the same Unity Catalog function metadata.
3. Missing Function Handler: The function metadata exists in Unity Catalog, but the actual Python function implementation isn't available
in the Python notebook's session, causing it to return NULL values.

Solutions:
Option 1: Register the UDF in the Same Python Session
Option 2: Use DataFrame API Instead of SQL

The NULL values you're seeing occur because the Python function handler isn't available in your Python notebook's Spark session, even though the function metadata exists in Unity Catalog. The most reliable approach is to register your pandas UDFs within the same session where you'll use them, either through direct registration or by importing from a shared library. This limitation is inherent to how Python UDFs work in distributed environments - the actual Python code needs to be available to all executors in the session where it's being used.

 

LR

View solution in original post

2 REPLIES 2

lingareddy_Alva
Honored Contributor II

Hi @stefan-vulpe 

Looking at your code and the behavior you're describing, I can identify the core issue and provide some insights about Batch Python UDFs in Databricks.

The Core Problem
The issue you're encountering is related to session isolation and UDF registration scope in Databricks. Here's what's happening:
1. SQL Editor vs Python Notebook Sessions: When you create a UDF using CREATE OR REPLACE FUNCTION in the SQL Editor,
it gets registered in Unity Catalog but the Python function handler is tied to that specific Spark session.
2. Session Isolation: When you run spark.sql() from a Python notebook, you're using a different Spark session than the SQL Editor,
even though both can reference the same Unity Catalog function metadata.
3. Missing Function Handler: The function metadata exists in Unity Catalog, but the actual Python function implementation isn't available
in the Python notebook's session, causing it to return NULL values.

Solutions:
Option 1: Register the UDF in the Same Python Session
Option 2: Use DataFrame API Instead of SQL

The NULL values you're seeing occur because the Python function handler isn't available in your Python notebook's Spark session, even though the function metadata exists in Unity Catalog. The most reliable approach is to register your pandas UDFs within the same session where you'll use them, either through direct registration or by importing from a shared library. This limitation is inherent to how Python UDFs work in distributed environments - the actual Python code needs to be available to all executors in the session where it's being used.

 

LR

Thanks for your response!

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now