cancel
Showing results for 
Search instead for 
Did you mean: 
Community Discussions
Connect with fellow community members to discuss general topics related to the Databricks platform, industry trends, and best practices. Share experiences, ask questions, and foster collaboration within the community.
cancel
Showing results for 
Search instead for 
Did you mean: 

python multiprocessing hangs at map on one cluster but works fine on another

mh-hsn
New Contributor II

I have a simple python script which have been running fine on my cluster but recently the same script gets stuck at map. So I tried creating a new cluster with less resources and tried to run the same script over that and it ran just fine.

Here are the specifications of my cluster:

 

My old cluster:
9.1-LTS ML (includes Apache Spark 3.1.2, Scala 2.12)
Worker type: Standard_D16s_v3 (min "1", max "8" )
Driver type: Standard_D64s_v3
Spot instances = True
 
My new cluster:
9.1-LTS ML (includes Apache Spark 3.1.2, Scala 2.12)
Worker type: Standard_DS3_v2 (min "1", max "8" )
Driver type: Standard_DS3_v2
Spot instances = True
 

 

 

import multiprocessing
from functools import partial


# Define the function to process each row
def process_row(row, func):
    index, data = row
    if data['some_new_column'] == '':
        data['some_new_column'] = func(data['text'])
    return index, data


# Define the function for parallel processing
def parallel_process(data, func, num_processes):
    pool = multiprocessing.Pool(processes=num_processes)
    func_partial = partial(process_row, func=func)
    print('Stating mapping. . . ')
    processed_data = pool.map(func_partial, data)
    pool.close()
    pool.join()
    return processed_data

num_processes = multiprocessing.cpu_count()

# Apply parallel processing to speed up the operation
processed_data = parallel_process(models_df.iterrows(), my_custom_func, num_processes)

 

 

 

 

 

 

1 ACCEPTED SOLUTION

Accepted Solutions

raphaelblg
Contributor III
Contributor III

Hello @mh-hsn ,

First of all, let me clarify that all your multiprocessing is occurring in your driver node, where the main Python thread is active in your cluster. It's important to note that your executors' configuration does not impact this scenario.

In your old cluster (Standard_D64s_v3 driver), you have 64 vCPUs available, allowing you to instantiate up to 64 simultaneous threads on the driver node. However, in your new cluster (Standard_DS3_v2 driver), there are only 4 vCPUs available, which may limit you to just 4 threads.

Running 64 parallel threads can be resource-intensive, even for the Standard_D64s_v3 driver. The reason your driver gets stuck could be due to OOM issues or excessive Python GC. The Standard_DS3_v2 driver is less capable, but 4 threads might not be enough to cause OOM or excessive GC.

Without memory metrics and cluster logs, it's challenging to confirm the root cause. However, based on your description, I believe my assessment is likely accurate.

Best regards,

Raphael Balogo
Sr. Technical Solutions Engineer
Databricks

View solution in original post

5 REPLIES 5

raphaelblg
Contributor III
Contributor III

Hello @mh-hsn ,

First of all, let me clarify that all your multiprocessing is occurring in your driver node, where the main Python thread is active in your cluster. It's important to note that your executors' configuration does not impact this scenario.

In your old cluster (Standard_D64s_v3 driver), you have 64 vCPUs available, allowing you to instantiate up to 64 simultaneous threads on the driver node. However, in your new cluster (Standard_DS3_v2 driver), there are only 4 vCPUs available, which may limit you to just 4 threads.

Running 64 parallel threads can be resource-intensive, even for the Standard_D64s_v3 driver. The reason your driver gets stuck could be due to OOM issues or excessive Python GC. The Standard_DS3_v2 driver is less capable, but 4 threads might not be enough to cause OOM or excessive GC.

Without memory metrics and cluster logs, it's challenging to confirm the root cause. However, based on your description, I believe my assessment is likely accurate.

Best regards,

Raphael Balogo
Sr. Technical Solutions Engineer
Databricks

mh-hsn
New Contributor II

@raphaelblgThanks you for your response, really appreciate that. After your answer, I was able to resolve the issue by reducing "num_processes" in my old cluster. But the thing that I still don't understand is why it has been executing successfully in past. Because the function that I am applying on my dataframe rows, it just do some api calls. So the inconsistent behavior is what I do not understand at the moment. Anyways, thanks again for the help.

jacovangelder
Contributor III

I agree with @raphaelblg. Most likely you're running out of memory. Multiprocessing or threadpools unfortunately do not benefit from extra workers as they only run on your driver node. This is very annoying and not a very known fact. Spark driver also often bugs because of it. I've read that a for each activity is currently in private preview, this will hopefully resolve some of these issues.

@jacovangelder  Thanks for the response. Actually the inconsistent behavior is what I am not able to understand at the moment as I mentioned in my response to @raphaelblg 

Its very difficult to pinpoint the exact issue on the driver node really. I've seen some very inconsistent behaviour myself this week using threadpools/multiprocessing. One day it will run fine, other day the Spark driver would bug out running the same (vanilla Python, not even Spark) workload. While having plenty of memory still free. My best advice would be to reconsider the multiprocessing/threadpools and perhaps have a little less parallelism but instead more consistency. 

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!