Hello all,
I'm currently working on importing some SQL functions from Informix Database into Databricks using Asset Bundle deploying Delta Live Table to Unity Catalog. I'm struggling importing a recursive one, there is the code :
CREATE FUNCTION "informix".poids_article (p_no_art CHAR(17)) RETURNING REAL ;
DEFINE v_poids REAL ;
DEFINE v_typ_art CHAR(4) ;
DEFINE v_pds_unit REAL ;
LET v_poids = 0 ;
SELECT a.pds_unit, a.typ_art
INTO v_pds_unit, v_typ_art
FROM bas_art a
WHERE a.no_art = p_no_art ;
IF v_typ_art = 'K' THEN
SELECT SUM(n.qte_struct * poids_article(n.no_cposant))
INTO v_poids
FROM bas_nom n
WHERE n.no_cpose = p_no_art ;
ELSE
LET v_poids = v_pds_unit ;
END IF ;
IF v_poids IS NULL THEN
LET v_poids = 0 ;
END IF ;
RETURN v_poids ;
END FUNCTION
;
I've tried to refactor this to work into Databricks with multiple strategies including UDF
None of them was able to work, or some may have but would have needed around 80+hours to complete (less than 1 sec for the SQL initial function, on 500.000 lines table)
For more context i'll explain the goal of this function. I need to calculate the weight of articles, in fact every type of articles already have a weight calculated except for one type, the Kits, defined by K in the code. This Kits is composed of other articles that can be Pieces = P or Material = M and other Kits. That here you can start to figure out the meaning of the recursive purpose. Each kit can be composed of other kits without any depth limit and each need its weight to be calculated.
In the initial function, the calculation is done here, exactly where the recursive function is called
SELECT SUM(n.qte_struct * poids_article(n.no_cposant))
There is an extract of the table i've rework for this code to work in databricks
The first column, no_cpose, is the id of kits which is composed of one or more ids showed in the second column, no_cposant. then the typ_art shows if the noc_posant is an Material, piece or a kit. Finally, qte_structs and pds_unit is the quantity and weight used in the calculation of the kit's weight.
Here a sample of a kit composed by other kits
As we can see all kits are initialised at 0 in weight.
My first attempt was to do as follow
def calculate_kit_weight(df):
df.withColumn("pds_unit",
when(
(df.typ_art == "K") & ((df.pds_unit == 0) | (df.pds_unit.isNull())),
calculate_kit_weight(nom.select(F.col("no_cpose") == df.no_cposant))
).otherwise(df.agg(F.sum("pds_unit"))))
i expected the recursive call of the function to return a subquery with the no_cposant id of the kit to appear as a the current main kit in no_cpose column, but it's only returns, without any surprise, lines where no_cpose = no_cposant which didn't happen.
I've then tried this UDF
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
spark = SparkSession.builder.getOrCreate()
@udf(returnType='float')
def poids_article(p_no_art):
# Define the base case
bas_art_df = spark.table("art_weight")
v_pds_unit, v_typ_art = (
bas_art_df.select("pds_unit", "typ_art")
.where(f"no_cposant = '{p_no_art}'")
.first()
)
# Initialize the weight
v_poids = 0
if v_typ_art == 'K':
# If it's a kit, call this function for each of its components
v_poids = (
bas_art_df.join(poids_article(bas_art_df.no_cposant), bas_art_df.no_cpose == p_no_art, "inner")
.selectExpr("SUM(qte_struct * poids_article) as sum_poids")
.first()[0]
)
else:
v_poids = v_pds_unit
if v_poids is None:
v_poids = 0
return v_poids
# Register the UDF
spark.udf.register("poids_article", poids_article)
# Usage example
result_df = spark.sql("SELECT poids_article('BR08020040AABC099') as result")
result = result_df.first()[0]
But i'm not really confident in this one, anyway this gave me an error
PicklingError: Could not serialize object: TypeError: cannot pickle 'ClientThreadLocals' object
File /databricks/spark/python/pyspark/serializers.py:558, in CloudPickleSerializer.dumps(self, obj)
557 try:
--> 558 return cloudpickle.dumps(obj, pickle_protocol)
559 except pickle.PickleError:
File /databricks/spark/python/pyspark/serializers.py:568, in CloudPickleSerializer.dumps(self, obj)
566 msg = "Could not serialize object: %s: %s" % (e.__class__.__name__, emsg)
567 print_exec(sys.stderr)
--> 568 raise pickle.PicklingError(msg)
And other tries neither viable nor capable with infinite loop.
Did you have some advice or ideas to share with me on this issue ?
Thanks