08-09-2024 09:28 AM
I have a job with > 10 tasks in it that interacts with an external system outside of databricks. At the moment that external system cannot handle more than 3 of the tasks executing concurrently. How can I limit the number of tasks that concurrently execute in a job? I'm not particularly worried about the order in which they execute, only that the number at any one time is limited to 3.
The cluster that I execute this on currently has only 1 worker in it and I'm looking to limit what takes place on that single worker.
12-06-2024 10:36 PM
To limit the number of tasks that concurrently execute in a job to 3, you can use the max_concurrent_runs parameter in your job configuration. This parameter allows you to specify the maximum number of concurrent runs for a job, ensuring that no more than the specified number of tasks run at the same time.
When creating or updating your job, set the max_concurrent_runs parameter to 3. This will limit the number of concurrent tasks to 3.
The max_concurrent_runs parameter will handle the concurrency limit regardless of the cluster size.
12-07-2024 12:32 AM
Hi @tgburrin-afs, @Mounika_Tarigop ,
As I understand the question is about running concurrent tasks within a single job rather than running concurrent jobs.
max_concurrent_runs controls how many times a whole job can run simultaneously, not the concurrency of tasks within a single job run.
There is currently no direct feature in Databricks Jobs to specify a maximum number of concurrently running tasks within a single job run. Instead, you need to control concurrency through task dependencies or application logic.
Approaches to Limit Concurrent Tasks Within a Single Job Run
Implement Concurrency Control in Your Code:
If each Databricks task itself runs code that can executes operations against the external system, implement concurrency control in your code logic.
For example, if a single task processes multiple items and you don’t want more than three operations to hit the external system concurrently, use a thread pool within the task’s code:
from concurrent.futures import ThreadPoolExecutor
# Limit to 3 concurrent operations
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(process_item, item) for item in items_to_process]
results = [f.result() for f in futures]This approach requires merging multiple pieces of logic into a single task and controlling concurrency at the code level.
04-19-2025 12:39 AM
Same thing here; job concurrency is good but nothing for task; some jobs we do have countless parallel tasks so by not controlling it the downstream servers goes to a grinding halt and tasks just terminate.
It needs what we call a spinlock on tasks to control the concurrency in a global shared space. If you case allows for a python wrapper we can do something like this:
1. Create a volume;
2. Ensure each task has a unique key of sorts; use a parameter;
3. Do a spinlock on the volume with something like this:
import time
import random
#############
# Concurrency
#############
aat_vol="/Volumes/{your_volume_path}"
aat=1
def saat(s):
time.sleep(1+random.randrange(1,10))
ready=False
while ready==False:
list=dbutils.fs.ls(aat_vol)
if len(list)<aat:
dbutils.fs.mkdirs(aat_vol+"/"+s)
ready=True
break
else:
time.sleep(1+random.randrange(1,10))
def eaat(s):
dbutils.fs.rm(aat_vol+"/"+s)
...
try:
eaat(target)
saat(target)
...
except Exception as e:
eaat(target)
raise e
eaat(target)
aat_vol; is the path to your volume
aat: is the number of concurrent tasks you want to allow
saat: starts a task with the given key
eaat: ends a task with a given
The random is used to ensure they do not spin up on the same time in some cases 1/10 probability you will still have more than one initial spins; always ensure you call eaat to clear the volume even if it fails.
Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!
Sign Up Now