<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic SQL function refactoring into Databricks environment in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/sql-function-refactoring-into-databricks-environment/m-p/67176#M33298</link>
    <description>&lt;P&gt;Hello all,&lt;/P&gt;&lt;P&gt;I'm currently working on importing&amp;nbsp; some SQL functions from Informix Database into Databricks using Asset Bundle deploying Delta Live Table to Unity Catalog. I'm struggling importing a recursive one, there is the code :&lt;/P&gt;&lt;LI-CODE lang="python"&gt;CREATE FUNCTION "informix".poids_article (p_no_art CHAR(17))  RETURNING REAL ; 
	DEFINE v_poids REAL ;	
	DEFINE v_typ_art CHAR(4) ;
	DEFINE v_pds_unit REAL ;
	LET v_poids = 0 ;
	
	SELECT a.pds_unit, a.typ_art 
		INTO v_pds_unit, v_typ_art 
		FROM bas_art a 
		WHERE a.no_art = p_no_art ;		
	
	IF v_typ_art = 'K' THEN
		
		SELECT SUM(n.qte_struct * poids_article(n.no_cposant))								
				INTO v_poids
				FROM bas_nom n 
				WHERE n.no_cpose = p_no_art  ;	
	ELSE
		LET v_poids = v_pds_unit ;
	END IF ;
	IF v_poids IS NULL THEN
		LET v_poids = 0 ;
	END IF ;	
	RETURN v_poids ;
END FUNCTION
;&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;I've tried to refactor this to work into Databricks with multiple strategies including UDF&lt;BR /&gt;None of them was able to work, or some may have but would have needed around 80+hours to complete (less than 1 sec for the SQL initial function, on 500.000 lines table)&lt;/P&gt;&lt;P&gt;For more context i'll explain the goal of this function. I need to calculate the weight of articles, in fact every type of articles already have a weight calculated except for one type, the Kits, defined by K in the code. This Kits is composed of other articles that can be Pieces = P or Material = M and other Kits. That here you can start to figure out the meaning of the recursive purpose. Each kit can be composed of other kits without any depth limit and each need its weight to be calculated.&lt;/P&gt;&lt;P&gt;In the initial function, the calculation is done here, exactly where the recursive function is called&lt;/P&gt;&lt;LI-CODE lang="python"&gt;SELECT SUM(n.qte_struct * poids_article(n.no_cposant)) &lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;There is an extract of the table i've rework for this code to work in databricks&lt;/P&gt;&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="YannLevavasseur_0-1713952085696.png" style="width: 400px;"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/7212i6329F522DE92BDE1/image-size/medium/is-moderation-mode/true?v=v2&amp;amp;px=400" role="button" title="YannLevavasseur_0-1713952085696.png" alt="YannLevavasseur_0-1713952085696.png" /&gt;&lt;/span&gt;&lt;/P&gt;&lt;P&gt;The first column, no_cpose,&amp;nbsp; is the id of kits which is composed of one or more ids showed in the second column, no_cposant. then the typ_art shows if the noc_posant is an Material, piece or a kit. Finally, qte_structs and pds_unit is the quantity and weight used in the calculation of the kit's weight.&lt;/P&gt;&lt;P&gt;Here a sample of a kit composed by other kits&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="YannLevavasseur_1-1713952236903.png" style="width: 400px;"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/7213i26A12CA190895EAB/image-size/medium/is-moderation-mode/true?v=v2&amp;amp;px=400" role="button" title="YannLevavasseur_1-1713952236903.png" alt="YannLevavasseur_1-1713952236903.png" /&gt;&lt;/span&gt;&lt;/P&gt;&lt;P&gt;As we can see all kits are initialised at 0 in weight.&lt;/P&gt;&lt;P&gt;My first attempt was to do as follow&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;def calculate_kit_weight(df):
    df.withColumn("pds_unit",
                when(
                    (df.typ_art == "K") &amp;amp; ((df.pds_unit == 0) | (df.pds_unit.isNull())),
                    calculate_kit_weight(nom.select(F.col("no_cpose") == df.no_cposant))
                    ).otherwise(df.agg(F.sum("pds_unit"))))&lt;/LI-CODE&gt;&lt;P&gt;i expected the recursive call of the function to return a subquery with the no_cposant id of the kit to appear as a the current main kit in no_cpose column, but it's only returns, without any surprise, lines where no_cpose = no_cposant which didn't happen.&amp;nbsp;&lt;/P&gt;&lt;P&gt;I've then tried this UDF&lt;/P&gt;&lt;LI-CODE lang="python"&gt;from pyspark.sql import SparkSession
from pyspark.sql.functions import udf

spark = SparkSession.builder.getOrCreate()

@udf(returnType='float')
def poids_article(p_no_art):
    # Define the base case
    bas_art_df = spark.table("art_weight")
    v_pds_unit, v_typ_art = (
        bas_art_df.select("pds_unit", "typ_art")
        .where(f"no_cposant = '{p_no_art}'")
        .first()
    )

    # Initialize the weight
    v_poids = 0
   
    if v_typ_art == 'K':
        # If it's a kit, call this function for each of its components
        v_poids = (
            bas_art_df.join(poids_article(bas_art_df.no_cposant), bas_art_df.no_cpose == p_no_art, "inner")
            .selectExpr("SUM(qte_struct * poids_article) as sum_poids")
            .first()[0]
        )
    else:
        v_poids = v_pds_unit
   
    if v_poids is None:
        v_poids = 0
   
    return v_poids

# Register the UDF
spark.udf.register("poids_article", poids_article)

# Usage example
result_df = spark.sql("SELECT poids_article('BR08020040AABC099') as result")
result = result_df.first()[0]&lt;/LI-CODE&gt;&lt;P&gt;But i'm not really confident in this one, anyway this gave me an error&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;PicklingError: Could not serialize object: TypeError: cannot pickle 'ClientThreadLocals' object
File /databricks/spark/python/pyspark/serializers.py:558, in CloudPickleSerializer.dumps(self, obj)
    557 try:
--&amp;gt; 558     return cloudpickle.dumps(obj, pickle_protocol)
    559 except pickle.PickleError:
File /databricks/spark/python/pyspark/serializers.py:568, in CloudPickleSerializer.dumps(self, obj)
    566     msg = "Could not serialize object: %s: %s" % (e.__class__.__name__, emsg)
    567 print_exec(sys.stderr)
--&amp;gt; 568 raise pickle.PicklingError(msg)&lt;/LI-CODE&gt;&lt;P&gt;And other tries neither viable nor capable with infinite loop.&lt;/P&gt;&lt;P&gt;Did you have some advice or ideas to share with me on this issue ?&lt;/P&gt;&lt;P&gt;Thanks&lt;/P&gt;</description>
    <pubDate>Wed, 24 Apr 2024 09:59:28 GMT</pubDate>
    <dc:creator>YannLevavasseur</dc:creator>
    <dc:date>2024-04-24T09:59:28Z</dc:date>
    <item>
      <title>SQL function refactoring into Databricks environment</title>
      <link>https://community.databricks.com/t5/data-engineering/sql-function-refactoring-into-databricks-environment/m-p/67176#M33298</link>
      <description>&lt;P&gt;Hello all,&lt;/P&gt;&lt;P&gt;I'm currently working on importing&amp;nbsp; some SQL functions from Informix Database into Databricks using Asset Bundle deploying Delta Live Table to Unity Catalog. I'm struggling importing a recursive one, there is the code :&lt;/P&gt;&lt;LI-CODE lang="python"&gt;CREATE FUNCTION "informix".poids_article (p_no_art CHAR(17))  RETURNING REAL ; 
	DEFINE v_poids REAL ;	
	DEFINE v_typ_art CHAR(4) ;
	DEFINE v_pds_unit REAL ;
	LET v_poids = 0 ;
	
	SELECT a.pds_unit, a.typ_art 
		INTO v_pds_unit, v_typ_art 
		FROM bas_art a 
		WHERE a.no_art = p_no_art ;		
	
	IF v_typ_art = 'K' THEN
		
		SELECT SUM(n.qte_struct * poids_article(n.no_cposant))								
				INTO v_poids
				FROM bas_nom n 
				WHERE n.no_cpose = p_no_art  ;	
	ELSE
		LET v_poids = v_pds_unit ;
	END IF ;
	IF v_poids IS NULL THEN
		LET v_poids = 0 ;
	END IF ;	
	RETURN v_poids ;
END FUNCTION
;&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;I've tried to refactor this to work into Databricks with multiple strategies including UDF&lt;BR /&gt;None of them was able to work, or some may have but would have needed around 80+hours to complete (less than 1 sec for the SQL initial function, on 500.000 lines table)&lt;/P&gt;&lt;P&gt;For more context i'll explain the goal of this function. I need to calculate the weight of articles, in fact every type of articles already have a weight calculated except for one type, the Kits, defined by K in the code. This Kits is composed of other articles that can be Pieces = P or Material = M and other Kits. That here you can start to figure out the meaning of the recursive purpose. Each kit can be composed of other kits without any depth limit and each need its weight to be calculated.&lt;/P&gt;&lt;P&gt;In the initial function, the calculation is done here, exactly where the recursive function is called&lt;/P&gt;&lt;LI-CODE lang="python"&gt;SELECT SUM(n.qte_struct * poids_article(n.no_cposant)) &lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;There is an extract of the table i've rework for this code to work in databricks&lt;/P&gt;&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="YannLevavasseur_0-1713952085696.png" style="width: 400px;"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/7212i6329F522DE92BDE1/image-size/medium/is-moderation-mode/true?v=v2&amp;amp;px=400" role="button" title="YannLevavasseur_0-1713952085696.png" alt="YannLevavasseur_0-1713952085696.png" /&gt;&lt;/span&gt;&lt;/P&gt;&lt;P&gt;The first column, no_cpose,&amp;nbsp; is the id of kits which is composed of one or more ids showed in the second column, no_cposant. then the typ_art shows if the noc_posant is an Material, piece or a kit. Finally, qte_structs and pds_unit is the quantity and weight used in the calculation of the kit's weight.&lt;/P&gt;&lt;P&gt;Here a sample of a kit composed by other kits&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="YannLevavasseur_1-1713952236903.png" style="width: 400px;"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/7213i26A12CA190895EAB/image-size/medium/is-moderation-mode/true?v=v2&amp;amp;px=400" role="button" title="YannLevavasseur_1-1713952236903.png" alt="YannLevavasseur_1-1713952236903.png" /&gt;&lt;/span&gt;&lt;/P&gt;&lt;P&gt;As we can see all kits are initialised at 0 in weight.&lt;/P&gt;&lt;P&gt;My first attempt was to do as follow&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;def calculate_kit_weight(df):
    df.withColumn("pds_unit",
                when(
                    (df.typ_art == "K") &amp;amp; ((df.pds_unit == 0) | (df.pds_unit.isNull())),
                    calculate_kit_weight(nom.select(F.col("no_cpose") == df.no_cposant))
                    ).otherwise(df.agg(F.sum("pds_unit"))))&lt;/LI-CODE&gt;&lt;P&gt;i expected the recursive call of the function to return a subquery with the no_cposant id of the kit to appear as a the current main kit in no_cpose column, but it's only returns, without any surprise, lines where no_cpose = no_cposant which didn't happen.&amp;nbsp;&lt;/P&gt;&lt;P&gt;I've then tried this UDF&lt;/P&gt;&lt;LI-CODE lang="python"&gt;from pyspark.sql import SparkSession
from pyspark.sql.functions import udf

spark = SparkSession.builder.getOrCreate()

@udf(returnType='float')
def poids_article(p_no_art):
    # Define the base case
    bas_art_df = spark.table("art_weight")
    v_pds_unit, v_typ_art = (
        bas_art_df.select("pds_unit", "typ_art")
        .where(f"no_cposant = '{p_no_art}'")
        .first()
    )

    # Initialize the weight
    v_poids = 0
   
    if v_typ_art == 'K':
        # If it's a kit, call this function for each of its components
        v_poids = (
            bas_art_df.join(poids_article(bas_art_df.no_cposant), bas_art_df.no_cpose == p_no_art, "inner")
            .selectExpr("SUM(qte_struct * poids_article) as sum_poids")
            .first()[0]
        )
    else:
        v_poids = v_pds_unit
   
    if v_poids is None:
        v_poids = 0
   
    return v_poids

# Register the UDF
spark.udf.register("poids_article", poids_article)

# Usage example
result_df = spark.sql("SELECT poids_article('BR08020040AABC099') as result")
result = result_df.first()[0]&lt;/LI-CODE&gt;&lt;P&gt;But i'm not really confident in this one, anyway this gave me an error&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;PicklingError: Could not serialize object: TypeError: cannot pickle 'ClientThreadLocals' object
File /databricks/spark/python/pyspark/serializers.py:558, in CloudPickleSerializer.dumps(self, obj)
    557 try:
--&amp;gt; 558     return cloudpickle.dumps(obj, pickle_protocol)
    559 except pickle.PickleError:
File /databricks/spark/python/pyspark/serializers.py:568, in CloudPickleSerializer.dumps(self, obj)
    566     msg = "Could not serialize object: %s: %s" % (e.__class__.__name__, emsg)
    567 print_exec(sys.stderr)
--&amp;gt; 568 raise pickle.PicklingError(msg)&lt;/LI-CODE&gt;&lt;P&gt;And other tries neither viable nor capable with infinite loop.&lt;/P&gt;&lt;P&gt;Did you have some advice or ideas to share with me on this issue ?&lt;/P&gt;&lt;P&gt;Thanks&lt;/P&gt;</description>
      <pubDate>Wed, 24 Apr 2024 09:59:28 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/sql-function-refactoring-into-databricks-environment/m-p/67176#M33298</guid>
      <dc:creator>YannLevavasseur</dc:creator>
      <dc:date>2024-04-24T09:59:28Z</dc:date>
    </item>
    <item>
      <title>Re: SQL function refactoring into Databricks environment</title>
      <link>https://community.databricks.com/t5/data-engineering/sql-function-refactoring-into-databricks-environment/m-p/89297#M37756</link>
      <description>&lt;P&gt;Hey&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/9"&gt;@Retired_mod&lt;/a&gt;&amp;nbsp;- How do we enable "RECURSIVE" cte in Databricks SQL?&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Tue, 10 Sep 2024 13:18:13 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/sql-function-refactoring-into-databricks-environment/m-p/89297#M37756</guid>
      <dc:creator>bhushant</dc:creator>
      <dc:date>2024-09-10T13:18:13Z</dc:date>
    </item>
  </channel>
</rss>

