cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Limiting concurrent tasks in a job

tgburrin-afs
New Contributor

I have a job with > 10 tasks in it that interacts with an external system outside of databricks.  At the moment that external system cannot handle more than 3 of the tasks executing concurrently.  How can I limit the number of tasks that concurrently execute in a job?  I'm not particularly worried about the order in which they execute, only that the number at any one time is limited to 3.

The cluster that I execute this on currently has only 1 worker in it and I'm looking to limit what takes place on that single worker.

3 REPLIES 3

Mounika_Tarigop
Databricks Employee
Databricks Employee

To limit the number of tasks that concurrently execute in a job to 3, you can use the max_concurrent_runs parameter in your job configuration. This parameter allows you to specify the maximum number of concurrent runs for a job, ensuring that no more than the specified number of tasks run at the same time.

When creating or updating your job, set the max_concurrent_runs parameter to 3. This will limit the number of concurrent tasks to 3.

The max_concurrent_runs parameter will handle the concurrency limit regardless of the cluster size.

filipniziol
Esteemed Contributor

Hi @tgburrin-afs@Mounika_Tarigop ,

As I understand the question is about running concurrent tasks within a single job rather than running concurrent jobs. 

max_concurrent_runs controls how many times a whole job can run simultaneously, not the concurrency of tasks within a single job run.

There is currently no direct feature in Databricks Jobs to specify a maximum number of concurrently running tasks within a single job run. Instead, you need to control concurrency through task dependencies or application logic.

Approaches to Limit Concurrent Tasks Within a Single Job Run

  1. Use Task Dependencies to Limit Parallelism:
    Structure your job so that no more than three tasks run at the same "layer." For example:
    • Suppose you have 12 tasks total. Instead of having all 12 start at once, arrange them in four "waves" of three tasks each.
    • In the Job UI or JSON configuration:
      • Start with three tasks (A, B, C) that have no upstream dependencies. They run simultaneously.
      • The next set of three tasks (D, E, F) only start after A, B, and C all complete.
      • Repeat this pattern until all tasks have run. This ensures that at most three tasks are active at the same time.
  2. Implement Concurrency Control in Your Code:

    • If each Databricks task itself runs code that can executes operations against the external system, implement concurrency control in your code logic.

    • For example, if a single task processes multiple items and you don’t want more than three operations to hit the external system concurrently, use a thread pool within the task’s code:

 

from concurrent.futures import ThreadPoolExecutor

# Limit to 3 concurrent operations
with ThreadPoolExecutor(max_workers=3) as executor:
    futures = [executor.submit(process_item, item) for item in items_to_process]
    results = [f.result() for f in futures]This approach requires merging multiple pieces of logic into a single task and controlling concurrency at the code level. 

 

  • This approach requires merging multiple pieces of logic into a single task and controlling concurrency at the code level.

 

 

_J
New Contributor II

Same thing here; job concurrency is good but nothing for task; some jobs we do have countless parallel tasks so by not controlling it the downstream servers goes to a grinding halt and tasks just terminate.

It needs what we call a spinlock on tasks to control the concurrency in a global shared space. If you case allows for a python wrapper we can do something like this: 

1. Create a volume;

2. Ensure each task has a unique key of sorts; use a parameter;

3. Do a spinlock on the volume with something like this:

import time
import random

#############
# Concurrency
#############
aat_vol="/Volumes/{your_volume_path}"
aat=1
def saat(s):
    time.sleep(1+random.randrange(1,10))
    ready=False
    while ready==False:
        list=dbutils.fs.ls(aat_vol)
        if len(list)<aat:
            dbutils.fs.mkdirs(aat_vol+"/"+s)
            ready=True
            break
        else:
            time.sleep(1+random.randrange(1,10))

def eaat(s):
    dbutils.fs.rm(aat_vol+"/"+s)

...
try:
    eaat(target)
    saat(target)
    ...
except Exception as e:
    eaat(target)
    raise e
eaat(target)

aat_vol; is the path to your volume
aat: is the number of concurrent tasks you want to allow
saat: starts a task with the given key
eaat: ends a task with a given

The random is used to ensure they do not spin up on the same time in some cases 1/10 probability you will still have more than one initial spins; always ensure you call eaat to clear the volume even if it fails.