cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

SQL function refactoring into Databricks environment

YannLevavasseur
New Contributor

Hello all,

I'm currently working on importing  some SQL functions from Informix Database into Databricks using Asset Bundle deploying Delta Live Table to Unity Catalog. I'm struggling importing a recursive one, there is the code :

CREATE FUNCTION "informix".poids_article (p_no_art CHAR(17))  RETURNING REAL ; 
	DEFINE v_poids REAL ;	
	DEFINE v_typ_art CHAR(4) ;
	DEFINE v_pds_unit REAL ;
	LET v_poids = 0 ;
	
	SELECT a.pds_unit, a.typ_art 
		INTO v_pds_unit, v_typ_art 
		FROM bas_art a 
		WHERE a.no_art = p_no_art ;		
	
	IF v_typ_art = 'K' THEN
		
		SELECT SUM(n.qte_struct * poids_article(n.no_cposant))								
				INTO v_poids
				FROM bas_nom n 
				WHERE n.no_cpose = p_no_art  ;	
	ELSE
		LET v_poids = v_pds_unit ;
	END IF ;
	IF v_poids IS NULL THEN
		LET v_poids = 0 ;
	END IF ;	
	RETURN v_poids ;
END FUNCTION
;

 I've tried to refactor this to work into Databricks with multiple strategies including UDF
None of them was able to work, or some may have but would have needed around 80+hours to complete (less than 1 sec for the SQL initial function, on 500.000 lines table)

For more context i'll explain the goal of this function. I need to calculate the weight of articles, in fact every type of articles already have a weight calculated except for one type, the Kits, defined by K in the code. This Kits is composed of other articles that can be Pieces = P or Material = M and other Kits. That here you can start to figure out the meaning of the recursive purpose. Each kit can be composed of other kits without any depth limit and each need its weight to be calculated.

In the initial function, the calculation is done here, exactly where the recursive function is called

SELECT SUM(n.qte_struct * poids_article(n.no_cposant)) 

 There is an extract of the table i've rework for this code to work in databricks

YannLevavasseur_0-1713952085696.png

The first column, no_cpose,  is the id of kits which is composed of one or more ids showed in the second column, no_cposant. then the typ_art shows if the noc_posant is an Material, piece or a kit. Finally, qte_structs and pds_unit is the quantity and weight used in the calculation of the kit's weight.

Here a sample of a kit composed by other kits 

YannLevavasseur_1-1713952236903.png

As we can see all kits are initialised at 0 in weight.

My first attempt was to do as follow 

def calculate_kit_weight(df):
    df.withColumn("pds_unit",
                when(
                    (df.typ_art == "K") & ((df.pds_unit == 0) | (df.pds_unit.isNull())),
                    calculate_kit_weight(nom.select(F.col("no_cpose") == df.no_cposant))
                    ).otherwise(df.agg(F.sum("pds_unit"))))

i expected the recursive call of the function to return a subquery with the no_cposant id of the kit to appear as a the current main kit in no_cpose column, but it's only returns, without any surprise, lines where no_cpose = no_cposant which didn't happen. 

I've then tried this UDF

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf

spark = SparkSession.builder.getOrCreate()

@udf(returnType='float')
def poids_article(p_no_art):
    # Define the base case
    bas_art_df = spark.table("art_weight")
    v_pds_unit, v_typ_art = (
        bas_art_df.select("pds_unit", "typ_art")
        .where(f"no_cposant = '{p_no_art}'")
        .first()
    )

    # Initialize the weight
    v_poids = 0
   
    if v_typ_art == 'K':
        # If it's a kit, call this function for each of its components
        v_poids = (
            bas_art_df.join(poids_article(bas_art_df.no_cposant), bas_art_df.no_cpose == p_no_art, "inner")
            .selectExpr("SUM(qte_struct * poids_article) as sum_poids")
            .first()[0]
        )
    else:
        v_poids = v_pds_unit
   
    if v_poids is None:
        v_poids = 0
   
    return v_poids

# Register the UDF
spark.udf.register("poids_article", poids_article)

# Usage example
result_df = spark.sql("SELECT poids_article('BR08020040AABC099') as result")
result = result_df.first()[0]

But i'm not really confident in this one, anyway this gave me an error 

PicklingError: Could not serialize object: TypeError: cannot pickle 'ClientThreadLocals' object
File /databricks/spark/python/pyspark/serializers.py:558, in CloudPickleSerializer.dumps(self, obj)
    557 try:
--> 558     return cloudpickle.dumps(obj, pickle_protocol)
    559 except pickle.PickleError:
File /databricks/spark/python/pyspark/serializers.py:568, in CloudPickleSerializer.dumps(self, obj)
    566     msg = "Could not serialize object: %s: %s" % (e.__class__.__name__, emsg)
    567 print_exec(sys.stderr)
--> 568 raise pickle.PicklingError(msg)

And other tries neither viable nor capable with infinite loop.

Did you have some advice or ideas to share with me on this issue ?

Thanks

1 REPLY 1

Kaniz
Community Manager
Community Manager

Hi @YannLevavasseur, It looks like youโ€™re dealing with a recursive SQL function for calculating the weight of articles in a Databricks environment. Handling recursion in SQL can be tricky, especially when translating existing Informix code to Databricks.

Letโ€™s break down the problem and explore potential solutions:

  1. Understanding the Problem:

    • You have a function called poids_article that calculates the weight of articles.
    • The function takes an article ID (p_no_art) as input.
    • Articles can be of different types: Pieces (P), Material (M), or Kits (K).
    • Kits can be composed of other articles (including other kits) recursively.
    • The goal is to calculate the weight of each article, considering its components.
  2. Challenges:

    • Recursion: The recursive nature of the problem makes it challenging to implement directly in SQL.
    • Performance: The existing SQL function performs well, but translating it to Databricks using UDFs can be time-consuming.
  3. Approaches:

    • Recursive Common Table Expression (CTE):
      • Databricks supports recursive CTEs, which can handle recursive queries.
      • You can rewrite your Informix function using a recursive CTE.
      • Define a base case (non-kit articles) and recursively calculate the weight for kits.
      • Example (pseudo-code):
        WITH RECURSIVE KitWeights AS (
            SELECT no_cpose, no_cposant, typ_art, pds_unit, qte_struct
            FROM bas_nom
            WHERE no_cpose = 'your_article_id'
            UNION ALL
            SELECT n.no_cpose, n.no_cposant, n.typ_art, a.pds_unit, n.qte_struct
            FROM KitWeights w
            JOIN bas_nom n ON w.no_cposant = n.no_cpose
            JOIN bas_art a ON n.no_cposant = a.no_art
        )
        SELECT SUM(qte_struct * pds_unit) AS kit_weight
        FROM KitWeights
        WHERE typ_art = 'K';
        
    • UDF Approach:
      • Your attempt with a UDF is a valid approach.
      • However, ensure that the UDF handles recursion correctly.
      • Make sure the UDF calls itself recursively for kit components.
      • Debugging UDFs can be time-consuming, so be patient.
      • Consider using Spark DataFrames instead of UDFs for better performance.
  4. Tips:

    • Ensure that your Databricks environment has the necessary tables (bas_nom, bas_art, etc.) available.
    • Test your solution on a smaller dataset first to avoid long execution times.
    • Monitor query performance and optimize as needed.

Remember that translating complex recursive SQL functions to Databricks can be challenging, but with patience and careful debugging, you can achieve the desired results. Good luck! ๐Ÿ˜Š