cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Unable to call UDF inside the Spark SQL: RuntimeError: SparkSession should be create

guangyi
Contributor

Here is how I define the UDF inside the file udf_define.py:

from pyspark.sql.functions import length, udf
from pyspark.sql.types import IntegerType
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

def strlen(s):
    return length(s)

spark.udf.register("my_strlen_fn", strlen, IntegerType())

And here is how I use the UDF in the length_quality_check.sql:

CREATE OR REFRESH MATERIALIZED VIEW length_verification(
    CONSTRAINT valid_length_count EXPECT (count > 1230)
)
AS
select COUNT(o_comment) as count
from live.bronze_table 
where o_comment is not null and my_strlen_fn(o_comment) > 1

And here is how integrate them together inside the DLT pipeline

  pipelines:
    asset_bundle_workflow_demo_pipeline:
      name: asset_bundle_workflow_demo_pipeline
      libraries:
        - file:
            path: ../src/udf_define.py
        - notebook:
            path: ../src/dlt_pipeline.ipynb
        - file:
            path: ../src/length_quality_check.sql

And here is the error message I got when I running the pipeline:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 95.0 failed 4 times, most recent failure: Lost task 2.3 in stage 95.0 (TID 167) (10.139.64.14 executor 0): org.apache.spark.SparkRuntimeException: [UDF_USER_CODE_ERROR.GENERIC] Execution of function my_strlen_fn(o_comment#3374) failed.
== Error ==
RuntimeError: SparkContext or SparkSession should be created first.
== Stacktrace ==

I don't get it, I create or get the exist Spark session already inside the udf definfition file

How to solve this problem?

1 ACCEPTED SOLUTION

Accepted Solutions

Hi @guangyi ,

I've just tested and following approach will work. Register similar Python UDF function in UC.

CREATE OR REPLACE FUNCTION catalog.schema.GetLength(strlen STRING)
RETURNS STRING
LANGUAGE PYTHON
AS $$
  return len(strlen)
$$

 Then you can refer to that function in materialized view:

CREATE MATERIALIZED VIEW length_verification
AS
select COUNT(department) as count
from dev.default.employee
where department is not null and dev.default.GetLength(department) > 1;

szymon_dybczak_0-1726054233665.png

 

 

View solution in original post

4 REPLIES 4

szymon_dybczak
Contributor

Hi @guangyi ,

It seems that the Spark session might not be properly shared, could you try to change code responsible for obtaining spark session in a module?

from pyspark.sql.functions import length, udf
from pyspark.sql.types import IntegerType
from pyspark.sql import SparkSession

spark = SparkSession.getActiveSession()

def strlen(s):
    return length(s)

spark.udf.register("my_strlen_fn", strlen, IntegerType())

 

Hi @guangyi ,

I've just tested and following approach will work. Register similar Python UDF function in UC.

CREATE OR REPLACE FUNCTION catalog.schema.GetLength(strlen STRING)
RETURNS STRING
LANGUAGE PYTHON
AS $$
  return len(strlen)
$$

 Then you can refer to that function in materialized view:

CREATE MATERIALIZED VIEW length_verification
AS
select COUNT(department) as count
from dev.default.employee
where department is not null and dev.default.GetLength(department) > 1;

szymon_dybczak_0-1726054233665.png

 

 

guangyi
Contributor

Hi @szymon_dybczak , Thank you for your advice, actually it is works. I find the correct document by following your code. The code I show above is the scalar function version which is not what I wanted. 

guangyi
Contributor

And I tried getActiveSession() it is not working

 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group