cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Serverless - can't parallelize UDF in applyInPandas

Dimitry
Contributor III

HI all

Serverless V3 solved an error of mismatching python versions between driver and worker which I had on V2 (can't remember the exact wording).

So I'd been running this on classic compute so far.

Today I tried on serverless to a partial success - unfortunately the UDF is being executed on a single CPU / thread.

I use repartition to split the workload to worker nodes.

Here is a mock script to demonstrate how I execute UDF. In the mock UDF just sleeps a second:

import pandas as pd
from datetime import datetime
from time import sleep
import threading

# test function
def func(x: pd.DataFrame):
    sleep(1)
    return pd.DataFrame({'id': x['id'], 'timestamp': str(datetime.now()), 'thread': threading.get_native_id()})

# from pandas
pdf = pd.DataFrame({'id': range(40)})
sdf = spark.createDataFrame(pdf)

now = datetime.now()
sdf = sdf.repartition(8, "id").groupby('id').applyInPandas(func, schema="id int, timestamp string, thread int")
result = spark.createDataFrame(sdf.toPandas()) # trigger lazy evaluation
print((datetime.now() - now).total_seconds())

# expected at least 4
display(result.groupBy("thread").count())

It repartitions an array 40 into 8, but as classic compute has only 4 CPU, it will execute on 4 threads: 

Dimitry_1-1760679790069.png

Now, when the same runs on serverless, I'm getting a single thread:

Dimitry_2-1760679824765.png

QUESTION: how to parallelize job on serverless compute for my data frame, besides repartitioning (which obviously does not work)? 

 

 

1 REPLY 1

Dimitry
Contributor III

I was wrong in interpreting the results. 

threading.get_native_id() does not work on serverless as on classic, so different threads return the same ID. 
The time it takes to execute the test is obviously less than 40 seconds, if it was running on a single thread.

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local communityโ€”sign up today to get started!

Sign Up Now