cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

SQL function refactoring into Databricks environment

YannLevavasseur
New Contributor

Hello all,

I'm currently working on importing  some SQL functions from Informix Database into Databricks using Asset Bundle deploying Delta Live Table to Unity Catalog. I'm struggling importing a recursive one, there is the code :

CREATE FUNCTION "informix".poids_article (p_no_art CHAR(17))  RETURNING REAL ; 
	DEFINE v_poids REAL ;	
	DEFINE v_typ_art CHAR(4) ;
	DEFINE v_pds_unit REAL ;
	LET v_poids = 0 ;
	
	SELECT a.pds_unit, a.typ_art 
		INTO v_pds_unit, v_typ_art 
		FROM bas_art a 
		WHERE a.no_art = p_no_art ;		
	
	IF v_typ_art = 'K' THEN
		
		SELECT SUM(n.qte_struct * poids_article(n.no_cposant))								
				INTO v_poids
				FROM bas_nom n 
				WHERE n.no_cpose = p_no_art  ;	
	ELSE
		LET v_poids = v_pds_unit ;
	END IF ;
	IF v_poids IS NULL THEN
		LET v_poids = 0 ;
	END IF ;	
	RETURN v_poids ;
END FUNCTION
;

 I've tried to refactor this to work into Databricks with multiple strategies including UDF
None of them was able to work, or some may have but would have needed around 80+hours to complete (less than 1 sec for the SQL initial function, on 500.000 lines table)

For more context i'll explain the goal of this function. I need to calculate the weight of articles, in fact every type of articles already have a weight calculated except for one type, the Kits, defined by K in the code. This Kits is composed of other articles that can be Pieces = P or Material = M and other Kits. That here you can start to figure out the meaning of the recursive purpose. Each kit can be composed of other kits without any depth limit and each need its weight to be calculated.

In the initial function, the calculation is done here, exactly where the recursive function is called

SELECT SUM(n.qte_struct * poids_article(n.no_cposant)) 

 There is an extract of the table i've rework for this code to work in databricks

YannLevavasseur_0-1713952085696.png

The first column, no_cpose,  is the id of kits which is composed of one or more ids showed in the second column, no_cposant. then the typ_art shows if the noc_posant is an Material, piece or a kit. Finally, qte_structs and pds_unit is the quantity and weight used in the calculation of the kit's weight.

Here a sample of a kit composed by other kits 

YannLevavasseur_1-1713952236903.png

As we can see all kits are initialised at 0 in weight.

My first attempt was to do as follow 

def calculate_kit_weight(df):
    df.withColumn("pds_unit",
                when(
                    (df.typ_art == "K") & ((df.pds_unit == 0) | (df.pds_unit.isNull())),
                    calculate_kit_weight(nom.select(F.col("no_cpose") == df.no_cposant))
                    ).otherwise(df.agg(F.sum("pds_unit"))))

i expected the recursive call of the function to return a subquery with the no_cposant id of the kit to appear as a the current main kit in no_cpose column, but it's only returns, without any surprise, lines where no_cpose = no_cposant which didn't happen. 

I've then tried this UDF

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf

spark = SparkSession.builder.getOrCreate()

@udf(returnType='float')
def poids_article(p_no_art):
    # Define the base case
    bas_art_df = spark.table("art_weight")
    v_pds_unit, v_typ_art = (
        bas_art_df.select("pds_unit", "typ_art")
        .where(f"no_cposant = '{p_no_art}'")
        .first()
    )

    # Initialize the weight
    v_poids = 0
   
    if v_typ_art == 'K':
        # If it's a kit, call this function for each of its components
        v_poids = (
            bas_art_df.join(poids_article(bas_art_df.no_cposant), bas_art_df.no_cpose == p_no_art, "inner")
            .selectExpr("SUM(qte_struct * poids_article) as sum_poids")
            .first()[0]
        )
    else:
        v_poids = v_pds_unit
   
    if v_poids is None:
        v_poids = 0
   
    return v_poids

# Register the UDF
spark.udf.register("poids_article", poids_article)

# Usage example
result_df = spark.sql("SELECT poids_article('BR08020040AABC099') as result")
result = result_df.first()[0]

But i'm not really confident in this one, anyway this gave me an error 

PicklingError: Could not serialize object: TypeError: cannot pickle 'ClientThreadLocals' object
File /databricks/spark/python/pyspark/serializers.py:558, in CloudPickleSerializer.dumps(self, obj)
    557 try:
--> 558     return cloudpickle.dumps(obj, pickle_protocol)
    559 except pickle.PickleError:
File /databricks/spark/python/pyspark/serializers.py:568, in CloudPickleSerializer.dumps(self, obj)
    566     msg = "Could not serialize object: %s: %s" % (e.__class__.__name__, emsg)
    567 print_exec(sys.stderr)
--> 568 raise pickle.PicklingError(msg)

And other tries neither viable nor capable with infinite loop.

Did you have some advice or ideas to share with me on this issue ?

Thanks

0 REPLIES 0
Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.