Unable to call UDF inside the Spark SQL: RuntimeError: SparkSession should be create

guangyi
Contributor III

Here is how I define the UDF inside the file udf_define.py:

from pyspark.sql.functions import length, udf
from pyspark.sql.types import IntegerType
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

def strlen(s):
    return length(s)

spark.udf.register("my_strlen_fn", strlen, IntegerType())

And here is how I use the UDF in the length_quality_check.sql:

CREATE OR REFRESH MATERIALIZED VIEW length_verification(
    CONSTRAINT valid_length_count EXPECT (count > 1230)
)
AS
select COUNT(o_comment) as count
from live.bronze_table 
where o_comment is not null and my_strlen_fn(o_comment) > 1

And here is how integrate them together inside the DLT pipeline

  pipelines:
    asset_bundle_workflow_demo_pipeline:
      name: asset_bundle_workflow_demo_pipeline
      libraries:
        - file:
            path: ../src/udf_define.py
        - notebook:
            path: ../src/dlt_pipeline.ipynb
        - file:
            path: ../src/length_quality_check.sql

And here is the error message I got when I running the pipeline:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 95.0 failed 4 times, most recent failure: Lost task 2.3 in stage 95.0 (TID 167) (10.139.64.14 executor 0): org.apache.spark.SparkRuntimeException: [UDF_USER_CODE_ERROR.GENERIC] Execution of function my_strlen_fn(o_comment#3374) failed.
== Error ==
RuntimeError: SparkContext or SparkSession should be created first.
== Stacktrace ==

I don't get it, I create or get the exist Spark session already inside the udf definfition file

How to solve this problem?

szymon_dybczak
Esteemed Contributor III

Hi @guangyi ,

It seems that the Spark session might not be properly shared, could you try to change code responsible for obtaining spark session in a module?

from pyspark.sql.functions import length, udf
from pyspark.sql.types import IntegerType
from pyspark.sql import SparkSession

spark = SparkSession.getActiveSession()

def strlen(s):
    return length(s)

spark.udf.register("my_strlen_fn", strlen, IntegerType())

 

Hi @guangyi ,

I've just tested and following approach will work. Register similar Python UDF function in UC.

CREATE OR REPLACE FUNCTION catalog.schema.GetLength(strlen STRING)
RETURNS STRING
LANGUAGE PYTHON
AS $$
  return len(strlen)
$$

 Then you can refer to that function in materialized view:

CREATE MATERIALIZED VIEW length_verification
AS
select COUNT(department) as count
from dev.default.employee
where department is not null and dev.default.GetLength(department) > 1;

szymon_dybczak_0-1726054233665.png

 

 

View solution in original post

guangyi
Contributor III

Hi @szymon_dybczak , Thank you for your advice, actually it is works. I find the correct document by following your code. The code I show above is the scalar function version which is not what I wanted. 

guangyi
Contributor III

And I tried getActiveSession() it is not working