09-11-2024 02:20 AM
Here is how I define the UDF inside the file udf_define.py:
from pyspark.sql.functions import length, udf
from pyspark.sql.types import IntegerType
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
def strlen(s):
return length(s)
spark.udf.register("my_strlen_fn", strlen, IntegerType())
And here is how I use the UDF in the length_quality_check.sql:
CREATE OR REFRESH MATERIALIZED VIEW length_verification(
CONSTRAINT valid_length_count EXPECT (count > 1230)
)
AS
select COUNT(o_comment) as count
from live.bronze_table
where o_comment is not null and my_strlen_fn(o_comment) > 1
And here is how integrate them together inside the DLT pipeline
pipelines:
asset_bundle_workflow_demo_pipeline:
name: asset_bundle_workflow_demo_pipeline
libraries:
- file:
path: ../src/udf_define.py
- notebook:
path: ../src/dlt_pipeline.ipynb
- file:
path: ../src/length_quality_check.sql
And here is the error message I got when I running the pipeline:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 95.0 failed 4 times, most recent failure: Lost task 2.3 in stage 95.0 (TID 167) (10.139.64.14 executor 0): org.apache.spark.SparkRuntimeException: [UDF_USER_CODE_ERROR.GENERIC] Execution of function my_strlen_fn(o_comment#3374) failed.
== Error ==
RuntimeError: SparkContext or SparkSession should be created first.
== Stacktrace ==
I don't get it, I create or get the exist Spark session already inside the udf definfition file
How to solve this problem?
09-11-2024 04:30 AM
Hi @guangyi ,
I've just tested and following approach will work. Register similar Python UDF function in UC.
CREATE OR REPLACE FUNCTION catalog.schema.GetLength(strlen STRING)
RETURNS STRING
LANGUAGE PYTHON
AS $$
return len(strlen)
$$
Then you can refer to that function in materialized view:
CREATE MATERIALIZED VIEW length_verification
AS
select COUNT(department) as count
from dev.default.employee
where department is not null and dev.default.GetLength(department) > 1;
09-11-2024 02:51 AM
Hi @guangyi ,
It seems that the Spark session might not be properly shared, could you try to change code responsible for obtaining spark session in a module?
from pyspark.sql.functions import length, udf
from pyspark.sql.types import IntegerType
from pyspark.sql import SparkSession
spark = SparkSession.getActiveSession()
def strlen(s):
return length(s)
spark.udf.register("my_strlen_fn", strlen, IntegerType())
09-11-2024 04:30 AM
Hi @guangyi ,
I've just tested and following approach will work. Register similar Python UDF function in UC.
CREATE OR REPLACE FUNCTION catalog.schema.GetLength(strlen STRING)
RETURNS STRING
LANGUAGE PYTHON
AS $$
return len(strlen)
$$
Then you can refer to that function in materialized view:
CREATE MATERIALIZED VIEW length_verification
AS
select COUNT(department) as count
from dev.default.employee
where department is not null and dev.default.GetLength(department) > 1;
09-12-2024 05:53 PM
Hi @szymon_dybczak , Thank you for your advice, actually it is works. I find the correct document by following your code. The code I show above is the scalar function version which is not what I wanted.
09-12-2024 06:41 PM
And I tried getActiveSession() it is not working
Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!
Sign Up Now