โ09-11-2024 02:20 AM
Here is how I define the UDF inside the file udf_define.py:
from pyspark.sql.functions import length, udf
from pyspark.sql.types import IntegerType
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
def strlen(s):
return length(s)
spark.udf.register("my_strlen_fn", strlen, IntegerType())
And here is how I use the UDF in the length_quality_check.sql:
CREATE OR REFRESH MATERIALIZED VIEW length_verification(
CONSTRAINT valid_length_count EXPECT (count > 1230)
)
AS
select COUNT(o_comment) as count
from live.bronze_table
where o_comment is not null and my_strlen_fn(o_comment) > 1
And here is how integrate them together inside the DLT pipeline
pipelines:
asset_bundle_workflow_demo_pipeline:
name: asset_bundle_workflow_demo_pipeline
libraries:
- file:
path: ../src/udf_define.py
- notebook:
path: ../src/dlt_pipeline.ipynb
- file:
path: ../src/length_quality_check.sql
And here is the error message I got when I running the pipeline:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 95.0 failed 4 times, most recent failure: Lost task 2.3 in stage 95.0 (TID 167) (10.139.64.14 executor 0): org.apache.spark.SparkRuntimeException: [UDF_USER_CODE_ERROR.GENERIC] Execution of function my_strlen_fn(o_comment#3374) failed.
== Error ==
RuntimeError: SparkContext or SparkSession should be created first.
== Stacktrace ==
I don't get it, I create or get the exist Spark session already inside the udf definfition file
How to solve this problem?
โ09-11-2024 04:30 AM
Hi @guangyi ,
I've just tested and following approach will work. Register similar Python UDF function in UC.
CREATE OR REPLACE FUNCTION catalog.schema.GetLength(strlen STRING)
RETURNS STRING
LANGUAGE PYTHON
AS $$
return len(strlen)
$$
Then you can refer to that function in materialized view:
CREATE MATERIALIZED VIEW length_verification
AS
select COUNT(department) as count
from dev.default.employee
where department is not null and dev.default.GetLength(department) > 1;
โ09-11-2024 02:51 AM
Hi @guangyi ,
It seems that the Spark session might not be properly shared, could you try to change code responsible for obtaining spark session in a module?
from pyspark.sql.functions import length, udf
from pyspark.sql.types import IntegerType
from pyspark.sql import SparkSession
spark = SparkSession.getActiveSession()
def strlen(s):
return length(s)
spark.udf.register("my_strlen_fn", strlen, IntegerType())
โ09-11-2024 04:30 AM
Hi @guangyi ,
I've just tested and following approach will work. Register similar Python UDF function in UC.
CREATE OR REPLACE FUNCTION catalog.schema.GetLength(strlen STRING)
RETURNS STRING
LANGUAGE PYTHON
AS $$
return len(strlen)
$$
Then you can refer to that function in materialized view:
CREATE MATERIALIZED VIEW length_verification
AS
select COUNT(department) as count
from dev.default.employee
where department is not null and dev.default.GetLength(department) > 1;
โ09-12-2024 05:53 PM
Hi @szymon_dybczak , Thank you for your advice, actually it is works. I find the correct document by following your code. The code I show above is the scalar function version which is not what I wanted.
โ09-12-2024 06:41 PM
And I tried getActiveSession() it is not working
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group