a month ago
Here is how I define the UDF inside the file udf_define.py:
from pyspark.sql.functions import length, udf
from pyspark.sql.types import IntegerType
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
def strlen(s):
return length(s)
spark.udf.register("my_strlen_fn", strlen, IntegerType())
And here is how I use the UDF in the length_quality_check.sql:
CREATE OR REFRESH MATERIALIZED VIEW length_verification(
CONSTRAINT valid_length_count EXPECT (count > 1230)
)
AS
select COUNT(o_comment) as count
from live.bronze_table
where o_comment is not null and my_strlen_fn(o_comment) > 1
And here is how integrate them together inside the DLT pipeline
pipelines:
asset_bundle_workflow_demo_pipeline:
name: asset_bundle_workflow_demo_pipeline
libraries:
- file:
path: ../src/udf_define.py
- notebook:
path: ../src/dlt_pipeline.ipynb
- file:
path: ../src/length_quality_check.sql
And here is the error message I got when I running the pipeline:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 95.0 failed 4 times, most recent failure: Lost task 2.3 in stage 95.0 (TID 167) (10.139.64.14 executor 0): org.apache.spark.SparkRuntimeException: [UDF_USER_CODE_ERROR.GENERIC] Execution of function my_strlen_fn(o_comment#3374) failed.
== Error ==
RuntimeError: SparkContext or SparkSession should be created first.
== Stacktrace ==
I don't get it, I create or get the exist Spark session already inside the udf definfition file
How to solve this problem?
a month ago
Hi @guangyi ,
I've just tested and following approach will work. Register similar Python UDF function in UC.
CREATE OR REPLACE FUNCTION catalog.schema.GetLength(strlen STRING)
RETURNS STRING
LANGUAGE PYTHON
AS $$
return len(strlen)
$$
Then you can refer to that function in materialized view:
CREATE MATERIALIZED VIEW length_verification
AS
select COUNT(department) as count
from dev.default.employee
where department is not null and dev.default.GetLength(department) > 1;
a month ago
Hi @guangyi ,
It seems that the Spark session might not be properly shared, could you try to change code responsible for obtaining spark session in a module?
from pyspark.sql.functions import length, udf
from pyspark.sql.types import IntegerType
from pyspark.sql import SparkSession
spark = SparkSession.getActiveSession()
def strlen(s):
return length(s)
spark.udf.register("my_strlen_fn", strlen, IntegerType())
a month ago
Hi @guangyi ,
I've just tested and following approach will work. Register similar Python UDF function in UC.
CREATE OR REPLACE FUNCTION catalog.schema.GetLength(strlen STRING)
RETURNS STRING
LANGUAGE PYTHON
AS $$
return len(strlen)
$$
Then you can refer to that function in materialized view:
CREATE MATERIALIZED VIEW length_verification
AS
select COUNT(department) as count
from dev.default.employee
where department is not null and dev.default.GetLength(department) > 1;
a month ago
Hi @szymon_dybczak , Thank you for your advice, actually it is works. I find the correct document by following your code. The code I show above is the scalar function version which is not what I wanted.
a month ago
And I tried getActiveSession() it is not working
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group