cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

databricks spark sql Custom table valued function + struct really slow (minutes for a single row)

jakubk
Contributor

I'm using azure databricks

I have a custom table valued function which takes a URL as a parameter and outputs a single row table with certain elements from the URL extracted/labelled

(i get search activity URLs and when in a specific format I can retrieve values from the url path and ?parameters which i put into separate columns for analysis)

it works but it's extremely slow and I think it's due to the split(parse_url(url,'PATH')) call I do messing up the query optimiser. I can't do any kind of logic ops on the output[] inside the function either

here's the function

%sql
 CREATE
 OR REPLACE FUNCTION urlSplitter(url STRING) RETURNS TABLE(
   Url STRING,
   UrlPath STRING,
   UrlParams STRING,
   SearchState STRING,
   SearchRegion STRING,
   SearchParkName STRING,
   PropertyCode STRING,
   NoOfTokens INT
 ) RETURN with src as (
   SELECT
   -- prepare the url into the separate components for further processing
     url,
     parse_url(url, "PATH") as UrlPath,
     split(parse_url(url, "PATH"), '/') as PathArray,
     split(parse_url(url, "PATH"), '/')[1] as PathArray1,
     split(parse_url(url, "PATH"), '/')[2] as PathArray2,
     split(parse_url(url, "PATH"), '/')[3] as PathArray3,
     split(parse_url(url, "PATH"), '/')[4] as PathArray4,
     parse_url(url, "QUERY") as UrlParams,
     parse_url(url, "QUERY", "propertyCode") as PropertyCode
 )
 SELECT
   s.Url,
   s.UrlPath,
   s.UrlParams,
   PathArray[2],
   PathArray[3],
   PathArray[4],
   s.PropertyCode,
   size(PathArray) -2 AS NoOfTokens
 from
   src s

Now, the weird thing is if i swap the PathArray[x] calls in the final select to the PathArrayX columns I create in the initial CTE it all goes really fast as long as I dont try to use PathArrayX in a predicate or case statement or whatever

I like to use staged CTEs so I can test parts in isolation and not have repeated function calls, plus this is a very simplified version of what I'm doing even though it exhibits the behaviour - I do more complex stuff later and dont want have to paste nested split(parse_url(etc etc)) functions all over the place in case statements etc

The 'slow' function works fine for literals passed to it but falls apart when referencing a normal table even with a predicate which retrieves a single row based on a unique id (table is a few gig, partitioned, optimized, zordered etc)

Pulling the code out of the function and doing it in a 'raw' sql statement is fast

slow plan

 == Physical Plan ==
 AdaptiveSparkPlan isFinalPlan=false
 +- CollectLimit 100
    +- Project [ActivityId#41282, ActivityURL#41289, Url#41327, UrlPath#41328, UrlParams#41334, SearchState#41307, SearchRegion#41308, SearchParkName#41309, PropertyCode#41335, NoOfTokens#41311]
       +- SortMergeJoin [coalesce(ActivityUrl#41289, ), isnull(ActivityUrl#41289)], [coalesce(ActivityUrl#41364, ), isnull(ActivityUrl#41364)], Inner
          :- Sort [coalesce(ActivityUrl#41289, ) ASC NULLS FIRST, isnull(ActivityUrl#41289) ASC NULLS FIRST], false, 0
          :  +- ColumnarToRow
          :     +- PhotonResultStage
          :        +- PhotonShuffleExchangeSource
          :           +- PhotonShuffleMapStage
          :              +- PhotonShuffleExchangeSink hashpartitioning(coalesce(ActivityUrl#41289, ), isnull(ActivityUrl#41289), 200)
          :                 +- PhotonProject [ActivityID#41282, ActivityURL#41289]
          :                    +- PhotonAdapter
          :                       +- FileScan parquet *[ActivityID#41282,ActivityURL#41289,PartitionKey#41303] Batched: true, DataFilters: [], Format: Parquet, Location: PreparedDeltaFileIndex(1 paths)[abfss://*.dfs.core.windows.net/*..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<ActivityID:int,ActivityURL:string>
          +- Sort [coalesce(ActivityUrl#41364, ) ASC NULLS FIRST, isnull(ActivityUrl#41364) ASC NULLS FIRST], false, 0
             +- Exchange hashpartitioning(coalesce(ActivityUrl#41364, ), isnull(ActivityUrl#41364), 200), ENSURE_REQUIREMENTS, [id=#19269]
                +- Project [Url#41327, UrlPath#41328, UrlParams#41334, PathArray#41329[2] AS SearchState#41307, PathArray#41329[3] AS SearchRegion#41308, PathArray#41329[4] AS SearchParkName#41309, PropertyCode#41335, (size(PathArray#41329, true) - 2) AS NoOfTokens#41311, ActivityUrl#41364]
                   +- Project [ActivityUrl#41289 AS url#41327, parse_url(ActivityUrl#41289, PATH, false) AS UrlPath#41328, split(parse_url(ActivityUrl#41289, PATH, false), /, -1) AS PathArray#41329, parse_url(ActivityUrl#41289, QUERY, false) AS UrlParams#41334, parse_url(ActivityUrl#41289, QUERY, propertyCode, false) AS PropertyCode#41335, ActivityUrl#41289 AS ActivityUrl#41364]
                      +- ColumnarToRow
                         +- PhotonResultStage
                            +- PhotonGroupingAgg(keys=[ActivityUrl#41289], functions=[])
                               +- PhotonShuffleExchangeSource
                                  +- PhotonShuffleMapStage
                                     +- PhotonShuffleExchangeSink hashpartitioning(ActivityUrl#41289, 200)
                                        +- PhotonGroupingAgg(keys=[ActivityUrl#41289], functions=[])
                                           +- PhotonProject [ActivityURL#41289]
                                              +- PhotonAdapter
                                                 +- FileScan parquet *[ActivityURL#41289,PartitionKey#41303] Batched: true, DataFilters: [], Format: Parquet, Location: PreparedDeltaFileIndex(1 paths)[abfss://*.dfs.core.windows.net/*..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<ActivityURL:string>

fast plan

 == Physical Plan ==
 CollectLimit 100
 +- Project [ActivityId#41482, ActivityURL#41489, ActivityUrl#41489 AS Url#41504, parse_url(ActivityUrl#41489, PATH, false) AS UrlPath#41505, parse_url(ActivityUrl#41489, QUERY, false) AS UrlParams#41506, split(parse_url(ActivityUrl#41489, PATH, false), /, 4)[2] AS SearchState#41507, split(parse_url(ActivityUrl#41489, PATH, false), /, 5)[3] AS SearchRegion#41508, split(parse_url(ActivityUrl#41489, PATH, false), /, 6)[4] AS SearchParkName#41509, parse_url(ActivityUrl#41489, QUERY, propertyCode, false) AS PropertyCode#41510, (size(split(parse_url(ActivityUrl#41489, PATH, false), /, -1), true) - 2) AS NoOfTokens#41511]
    +- *(1) ColumnarToRow
       +- PhotonResultStage
          +- PhotonAdapter
             +- FileScan parquet *[ActivityID#41482,ActivityURL#41489,PartitionKey#41503] Batched: true, DataFilters: [], Format: Parquet, Location: PreparedDeltaFileIndex(1 paths)[abfss://*.dfs.core.windows.net/*, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<ActivityID:int,ActivityURL:string>
    
    
 == Photon Explanation ==
 Photon does not fully support the query because:
  Unsupported expression(s): parse_url(ActivityUrl#41489, PATH, false), parse_url(ActivityUrl#41489, QUERY, false), parse_url(ActivityUrl#41489, PATH, false), parse_url(ActivityUrl#41489, PATH, false), parse_url(ActivityUrl#41489, PATH, false), parse_url(ActivityUrl#41489, QUERY, propertyCode, false), parse_url(ActivityUrl#41489, PATH, false)
 reference node:
  Project [ActivityId#41482, ActivityURL#41489, ActivityUrl#41489 AS Url#41504, parse_url(ActivityUrl#41489, PATH, false) AS UrlPath#41505, parse_url(ActivityUrl#41489, QUERY, false) AS UrlParams#41506, split(parse_url(ActivityUrl#41489, PATH, false), /, 4)[2] AS SearchState#41507, split(parse_url(ActivityUrl#41489, PATH, false), /, 5)[3] AS SearchRegion#41508, split(parse_url(ActivityUrl#41489, PATH, false), /, 6)[4] AS SearchParkName#41509, parse_url(ActivityUrl#41489, QUERY, propertyCode, false) AS PropertyCode#41510, (size(split(parse_url(ActivityUrl#41489, PATH, false), /, -1), true) - 2) AS NoOfTokens#41511]

is this a bug, alternative approaches?

Would writing this in python and registering it make a difference? (i'd rather not as the %sql fn is under source control managed along with the other sql objects)

the usage is

select a.url, u.*

from activities a, lateral urlsplitter(a.url) u

where id = 1

0 REPLIES 0

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group