cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

databricks spark sql Custom table valued function + struct really slow (minutes for a single row)

jakubk
Contributor

I'm using azure databricks

I have a custom table valued function which takes a URL as a parameter and outputs a single row table with certain elements from the URL extracted/labelled

(i get search activity URLs and when in a specific format I can retrieve values from the url path and ?parameters which i put into separate columns for analysis)

it works but it's extremely slow and I think it's due to the split(parse_url(url,'PATH')) call I do messing up the query optimiser. I can't do any kind of logic ops on the output[] inside the function either

here's the function

%sql
 CREATE
 OR REPLACE FUNCTION urlSplitter(url STRING) RETURNS TABLE(
   Url STRING,
   UrlPath STRING,
   UrlParams STRING,
   SearchState STRING,
   SearchRegion STRING,
   SearchParkName STRING,
   PropertyCode STRING,
   NoOfTokens INT
 ) RETURN with src as (
   SELECT
   -- prepare the url into the separate components for further processing
     url,
     parse_url(url, "PATH") as UrlPath,
     split(parse_url(url, "PATH"), '/') as PathArray,
     split(parse_url(url, "PATH"), '/')[1] as PathArray1,
     split(parse_url(url, "PATH"), '/')[2] as PathArray2,
     split(parse_url(url, "PATH"), '/')[3] as PathArray3,
     split(parse_url(url, "PATH"), '/')[4] as PathArray4,
     parse_url(url, "QUERY") as UrlParams,
     parse_url(url, "QUERY", "propertyCode") as PropertyCode
 )
 SELECT
   s.Url,
   s.UrlPath,
   s.UrlParams,
   PathArray[2],
   PathArray[3],
   PathArray[4],
   s.PropertyCode,
   size(PathArray) -2 AS NoOfTokens
 from
   src s

Now, the weird thing is if i swap the PathArray[x] calls in the final select to the PathArrayX columns I create in the initial CTE it all goes really fast as long as I dont try to use PathArrayX in a predicate or case statement or whatever

I like to use staged CTEs so I can test parts in isolation and not have repeated function calls, plus this is a very simplified version of what I'm doing even though it exhibits the behaviour - I do more complex stuff later and dont want have to paste nested split(parse_url(etc etc)) functions all over the place in case statements etc

The 'slow' function works fine for literals passed to it but falls apart when referencing a normal table even with a predicate which retrieves a single row based on a unique id (table is a few gig, partitioned, optimized, zordered etc)

Pulling the code out of the function and doing it in a 'raw' sql statement is fast

slow plan

 == Physical Plan ==
 AdaptiveSparkPlan isFinalPlan=false
 +- CollectLimit 100
    +- Project [ActivityId#41282, ActivityURL#41289, Url#41327, UrlPath#41328, UrlParams#41334, SearchState#41307, SearchRegion#41308, SearchParkName#41309, PropertyCode#41335, NoOfTokens#41311]
       +- SortMergeJoin [coalesce(ActivityUrl#41289, ), isnull(ActivityUrl#41289)], [coalesce(ActivityUrl#41364, ), isnull(ActivityUrl#41364)], Inner
          :- Sort [coalesce(ActivityUrl#41289, ) ASC NULLS FIRST, isnull(ActivityUrl#41289) ASC NULLS FIRST], false, 0
          :  +- ColumnarToRow
          :     +- PhotonResultStage
          :        +- PhotonShuffleExchangeSource
          :           +- PhotonShuffleMapStage
          :              +- PhotonShuffleExchangeSink hashpartitioning(coalesce(ActivityUrl#41289, ), isnull(ActivityUrl#41289), 200)
          :                 +- PhotonProject [ActivityID#41282, ActivityURL#41289]
          :                    +- PhotonAdapter
          :                       +- FileScan parquet *[ActivityID#41282,ActivityURL#41289,PartitionKey#41303] Batched: true, DataFilters: [], Format: Parquet, Location: PreparedDeltaFileIndex(1 paths)[abfss://*.dfs.core.windows.net/*..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<ActivityID:int,ActivityURL:string>
          +- Sort [coalesce(ActivityUrl#41364, ) ASC NULLS FIRST, isnull(ActivityUrl#41364) ASC NULLS FIRST], false, 0
             +- Exchange hashpartitioning(coalesce(ActivityUrl#41364, ), isnull(ActivityUrl#41364), 200), ENSURE_REQUIREMENTS, [id=#19269]
                +- Project [Url#41327, UrlPath#41328, UrlParams#41334, PathArray#41329[2] AS SearchState#41307, PathArray#41329[3] AS SearchRegion#41308, PathArray#41329[4] AS SearchParkName#41309, PropertyCode#41335, (size(PathArray#41329, true) - 2) AS NoOfTokens#41311, ActivityUrl#41364]
                   +- Project [ActivityUrl#41289 AS url#41327, parse_url(ActivityUrl#41289, PATH, false) AS UrlPath#41328, split(parse_url(ActivityUrl#41289, PATH, false), /, -1) AS PathArray#41329, parse_url(ActivityUrl#41289, QUERY, false) AS UrlParams#41334, parse_url(ActivityUrl#41289, QUERY, propertyCode, false) AS PropertyCode#41335, ActivityUrl#41289 AS ActivityUrl#41364]
                      +- ColumnarToRow
                         +- PhotonResultStage
                            +- PhotonGroupingAgg(keys=[ActivityUrl#41289], functions=[])
                               +- PhotonShuffleExchangeSource
                                  +- PhotonShuffleMapStage
                                     +- PhotonShuffleExchangeSink hashpartitioning(ActivityUrl#41289, 200)
                                        +- PhotonGroupingAgg(keys=[ActivityUrl#41289], functions=[])
                                           +- PhotonProject [ActivityURL#41289]
                                              +- PhotonAdapter
                                                 +- FileScan parquet *[ActivityURL#41289,PartitionKey#41303] Batched: true, DataFilters: [], Format: Parquet, Location: PreparedDeltaFileIndex(1 paths)[abfss://*.dfs.core.windows.net/*..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<ActivityURL:string>

fast plan

 == Physical Plan ==
 CollectLimit 100
 +- Project [ActivityId#41482, ActivityURL#41489, ActivityUrl#41489 AS Url#41504, parse_url(ActivityUrl#41489, PATH, false) AS UrlPath#41505, parse_url(ActivityUrl#41489, QUERY, false) AS UrlParams#41506, split(parse_url(ActivityUrl#41489, PATH, false), /, 4)[2] AS SearchState#41507, split(parse_url(ActivityUrl#41489, PATH, false), /, 5)[3] AS SearchRegion#41508, split(parse_url(ActivityUrl#41489, PATH, false), /, 6)[4] AS SearchParkName#41509, parse_url(ActivityUrl#41489, QUERY, propertyCode, false) AS PropertyCode#41510, (size(split(parse_url(ActivityUrl#41489, PATH, false), /, -1), true) - 2) AS NoOfTokens#41511]
    +- *(1) ColumnarToRow
       +- PhotonResultStage
          +- PhotonAdapter
             +- FileScan parquet *[ActivityID#41482,ActivityURL#41489,PartitionKey#41503] Batched: true, DataFilters: [], Format: Parquet, Location: PreparedDeltaFileIndex(1 paths)[abfss://*.dfs.core.windows.net/*, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<ActivityID:int,ActivityURL:string>
    
    
 == Photon Explanation ==
 Photon does not fully support the query because:
  Unsupported expression(s): parse_url(ActivityUrl#41489, PATH, false), parse_url(ActivityUrl#41489, QUERY, false), parse_url(ActivityUrl#41489, PATH, false), parse_url(ActivityUrl#41489, PATH, false), parse_url(ActivityUrl#41489, PATH, false), parse_url(ActivityUrl#41489, QUERY, propertyCode, false), parse_url(ActivityUrl#41489, PATH, false)
 reference node:
  Project [ActivityId#41482, ActivityURL#41489, ActivityUrl#41489 AS Url#41504, parse_url(ActivityUrl#41489, PATH, false) AS UrlPath#41505, parse_url(ActivityUrl#41489, QUERY, false) AS UrlParams#41506, split(parse_url(ActivityUrl#41489, PATH, false), /, 4)[2] AS SearchState#41507, split(parse_url(ActivityUrl#41489, PATH, false), /, 5)[3] AS SearchRegion#41508, split(parse_url(ActivityUrl#41489, PATH, false), /, 6)[4] AS SearchParkName#41509, parse_url(ActivityUrl#41489, QUERY, propertyCode, false) AS PropertyCode#41510, (size(split(parse_url(ActivityUrl#41489, PATH, false), /, -1), true) - 2) AS NoOfTokens#41511]

is this a bug, alternative approaches?

Would writing this in python and registering it make a difference? (i'd rather not as the %sql fn is under source control managed along with the other sql objects)

the usage is

select a.url, u.*

from activities a, lateral urlsplitter(a.url) u

where id = 1

0 REPLIES 0
Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.