cancel
Showing results for 
Search instead for 
Did you mean: 
Community Platform Discussions
Connect with fellow community members to discuss general topics related to the Databricks platform, industry trends, and best practices. Share experiences, ask questions, and foster collaboration within the community.
cancel
Showing results for 
Search instead for 
Did you mean: 

python multiprocessing hangs at map on one cluster but works fine on another

mh-hsn
New Contributor III

I have a simple python script which have been running fine on my cluster but recently the same script gets stuck at map. So I tried creating a new cluster with less resources and tried to run the same script over that and it ran just fine.

Here are the specifications of my cluster:

 

My old cluster:
9.1-LTS ML (includes Apache Spark 3.1.2, Scala 2.12)
Worker type: Standard_D16s_v3 (min "1", max "8" )
Driver type: Standard_D64s_v3
Spot instances = True
 
My new cluster:
9.1-LTS ML (includes Apache Spark 3.1.2, Scala 2.12)
Worker type: Standard_DS3_v2 (min "1", max "8" )
Driver type: Standard_DS3_v2
Spot instances = True
 

 

 

import multiprocessing
from functools import partial


# Define the function to process each row
def process_row(row, func):
    index, data = row
    if data['some_new_column'] == '':
        data['some_new_column'] = func(data['text'])
    return index, data


# Define the function for parallel processing
def parallel_process(data, func, num_processes):
    pool = multiprocessing.Pool(processes=num_processes)
    func_partial = partial(process_row, func=func)
    print('Stating mapping. . . ')
    processed_data = pool.map(func_partial, data)
    pool.close()
    pool.join()
    return processed_data

num_processes = multiprocessing.cpu_count()

# Apply parallel processing to speed up the operation
processed_data = parallel_process(models_df.iterrows(), my_custom_func, num_processes)

 

 

 

 

 

 

5 REPLIES 5

raphaelblg
Honored Contributor II

Hello @mh-hsn ,

First of all, let me clarify that all your multiprocessing is occurring in your driver node, where the main Python thread is active in your cluster. It's important to note that your executors' configuration does not impact this scenario.

In your old cluster (Standard_D64s_v3 driver), you have 64 vCPUs available, allowing you to instantiate up to 64 simultaneous threads on the driver node. However, in your new cluster (Standard_DS3_v2 driver), there are only 4 vCPUs available, which may limit you to just 4 threads.

Running 64 parallel threads can be resource-intensive, even for the Standard_D64s_v3 driver. The reason your driver gets stuck could be due to OOM issues or excessive Python GC. The Standard_DS3_v2 driver is less capable, but 4 threads might not be enough to cause OOM or excessive GC.

Without memory metrics and cluster logs, it's challenging to confirm the root cause. However, based on your description, I believe my assessment is likely accurate.

Best regards,

Raphael Balogo
Sr. Technical Solutions Engineer
Databricks

mh-hsn
New Contributor III

@raphaelblgThanks you for your response, really appreciate that. After your answer, I was able to resolve the issue by reducing "num_processes" in my old cluster. But the thing that I still don't understand is why it has been executing successfully in past. Because the function that I am applying on my dataframe rows, it just do some api calls. So the inconsistent behavior is what I do not understand at the moment. Anyways, thanks again for the help.

jacovangelder
Honored Contributor

I agree with @raphaelblg. Most likely you're running out of memory. Multiprocessing or threadpools unfortunately do not benefit from extra workers as they only run on your driver node. This is very annoying and not a very known fact. Spark driver also often bugs because of it. I've read that a for each activity is currently in private preview, this will hopefully resolve some of these issues.

mh-hsn
New Contributor III

@jacovangelder  Thanks for the response. Actually the inconsistent behavior is what I am not able to understand at the moment as I mentioned in my response to @raphaelblg 

jacovangelder
Honored Contributor

Its very difficult to pinpoint the exact issue on the driver node really. I've seen some very inconsistent behaviour myself this week using threadpools/multiprocessing. One day it will run fine, other day the Spark driver would bug out running the same (vanilla Python, not even Spark) workload. While having plenty of memory still free. My best advice would be to reconsider the multiprocessing/threadpools and perhaps have a little less parallelism but instead more consistency. 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group