a month ago
Title: Databricks workflows for APIs with different frequencies (cluster keeps restarting)
Hey everyone,
I’m stuck with a Databricks workflow design and could use some advice.
Currently, we are calling 70+ APIs
Right now the workflow looks something like:
Task1 → Task2 → ForEach → notebook (API calls)
However, there is a new requirement that each API needs to be called at a different frequency — some must run every 1 min, some 2 mins, some 5 mins. And we have to create a generalized solution.
In task 1 we are reading a view, where all API's, path and their apicallfreq is stored.
We’re using job clusters, and the problem is:
So for 1-min jobs, it’s basically constantly restarting clusters, which is not really feasible (time + cost).
We looked into:
Has anyone handled something similar?
Would really appreciate any practical suggestions from real-world setups.
Thanks!
3 weeks ago
You're right that job clusters are the wrong fit here. The cold start time (including serverless, which is still 25-50s) makes anything under 5 minutes impractical when the cluster terminates between runs.
The simplest approach: all-purpose cluster + scheduling loop in a single notebook.
You already have a config view with API paths and frequencies, so you're most of the way there. The idea is to run one notebook on an always-on all-purpose cluster that
ticks every 60 seconds and checks which APIs are due.
Step 1: Add a last_called column to your config table
You need to track when each API was last called so the scheduler knows what's due. A Delta table works well for this:
CREATE TABLE api_call_config (
api_name STRING,
api_path STRING,
frequency_seconds INT,
last_called TIMESTAMP
);
-- Populate from your existing view
INSERT INTO api_call_config
SELECT api_name, api_path, apicallfreq, NULL AS last_called
FROM your_existing_config_view;
Step 2: The scheduling notebook
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
import time
from datetime import datetime
def call_api(row):
"""Call a single API and return the result"""
try:
response = requests.get(row["api_path"], timeout=30)
return {"api_name": row["api_name"], "status": response.status_code, "success": True}
except Exception as e:
return {"api_name": row["api_name"], "status": str(e), "success": False}
while True:
tick_start = time.time()
# Find which APIs are due
due_apis = spark.sql("""
SELECT api_name, api_path, frequency_seconds
FROM api_call_config
WHERE last_called IS NULL
OR TIMESTAMPDIFF(SECOND, last_called, current_timestamp()) >= frequency_seconds
""").collect()
if due_apis:
# Call them in parallel
with ThreadPoolExecutor(max_workers=10) as pool:
futures = {pool.submit(call_api, row.asDict()): row for row in due_apis}
successful = []
for future in as_completed(futures):
result = future.result()
if result["success"]:
successful.append(result["api_name"])
# Update last_called for successful calls
if successful:
names = ",".join([f"'{n}'" for n in successful])
spark.sql(f"""
UPDATE api_call_config
SET last_called = current_timestamp()
WHERE api_name IN ({names})
""")
print(f"[{datetime.now():%H:%M:%S}] Called {len(due_apis)} APIs, {len(successful)} succeeded")
# Sleep the remainder of the minute
elapsed = time.time() - tick_start
time.sleep(max(0, 60 - elapsed))
Step 3: Wrap it in a job for resilience
Run this notebook as a Databricks job with a continuous trigger on an all-purpose cluster. This gives you:
- Auto-restart if the notebook crashes
- Email/webhook alerts on failure
- Run history and logs
In the job config, set the compute to your all-purpose cluster (not a job cluster) and set the trigger to continuous with no pause.
Why this works better than the other suggestions:
- No cold starts since the cluster stays running
- No separate dispatcher/queue since your config table already stores the frequencies and last_called handles the scheduling
- Easy to change frequencies by just updating the config table, no job redefinition needed
- Parallel execution via ThreadPoolExecutor so slow APIs don't block fast ones
- Adding new APIs is just an INSERT into the config table
One thing to watch: size the max_workers on the ThreadPoolExecutor based on how many APIs might be due at once. If all 70 fire on the same tick (e.g. at startup when
last_called is NULL), you might want to cap it and batch them. Also consider what happens if an API call takes longer than 60 seconds, since the next tick will try to call it again. A simple fix is to update last_called before the call (optimistic) rather than after, or add an in_progress flag.
I tested the SQL frequency-check logic and the ThreadPoolExecutor pattern on a live Databricks workspace and both work as expected.
a month ago - last edited a month ago
Hi @mordex
This is a classic high-frequency orchestration problem on Databricks. The core issue is that Databricks job clusters are designed for batch workloads, not sub-5-minute polling loops.
Job clusters have a ~3–5 min cold start. For a 1-min frequency API, you're spending more time starting the cluster than running the actual work. This is fundamentally the wrong tool for that cadence.
Option 1: All-Purpose Cluster with Internal Scheduling (Most Practical)
Keep your existing workflow structure but run it on an all-purpose cluster that stays alive. Inside the notebook, manage the frequency loop yourself:
Option 2: Databricks Jobs + Delta Queue Pattern (Most Robust)
A dispatcher job that writes due API calls into a Delta table queue
A long-running worker job that polls the queue and executes calls
The queue table DDL
The dispatcher logic with the SQL to identify due APIs
Option 3: Structured Streaming with foreachBatch
If your APIs can be modeled as a micro-batch stream, Structured Streaming on an all-purpose cluster handles variable-frequency polling natively.
Given you already have a view-based config and ForEach pattern, Option 2 (Delta Queue) is the cleanest long-term solution. But if you need something working fast, Option 1 (all-purpose cluster + threading) gets you there today with minimal refactoring.
The key insight is: separate the scheduling decision from the execution. Your view already stores the frequency — you just need a lightweight process that reads it and dispatches work, rather than having the cluster itself restart to make that decision.
3 weeks ago
You're right that job clusters are the wrong fit here. The cold start time (including serverless, which is still 25-50s) makes anything under 5 minutes impractical when the cluster terminates between runs.
The simplest approach: all-purpose cluster + scheduling loop in a single notebook.
You already have a config view with API paths and frequencies, so you're most of the way there. The idea is to run one notebook on an always-on all-purpose cluster that
ticks every 60 seconds and checks which APIs are due.
Step 1: Add a last_called column to your config table
You need to track when each API was last called so the scheduler knows what's due. A Delta table works well for this:
CREATE TABLE api_call_config (
api_name STRING,
api_path STRING,
frequency_seconds INT,
last_called TIMESTAMP
);
-- Populate from your existing view
INSERT INTO api_call_config
SELECT api_name, api_path, apicallfreq, NULL AS last_called
FROM your_existing_config_view;
Step 2: The scheduling notebook
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
import time
from datetime import datetime
def call_api(row):
"""Call a single API and return the result"""
try:
response = requests.get(row["api_path"], timeout=30)
return {"api_name": row["api_name"], "status": response.status_code, "success": True}
except Exception as e:
return {"api_name": row["api_name"], "status": str(e), "success": False}
while True:
tick_start = time.time()
# Find which APIs are due
due_apis = spark.sql("""
SELECT api_name, api_path, frequency_seconds
FROM api_call_config
WHERE last_called IS NULL
OR TIMESTAMPDIFF(SECOND, last_called, current_timestamp()) >= frequency_seconds
""").collect()
if due_apis:
# Call them in parallel
with ThreadPoolExecutor(max_workers=10) as pool:
futures = {pool.submit(call_api, row.asDict()): row for row in due_apis}
successful = []
for future in as_completed(futures):
result = future.result()
if result["success"]:
successful.append(result["api_name"])
# Update last_called for successful calls
if successful:
names = ",".join([f"'{n}'" for n in successful])
spark.sql(f"""
UPDATE api_call_config
SET last_called = current_timestamp()
WHERE api_name IN ({names})
""")
print(f"[{datetime.now():%H:%M:%S}] Called {len(due_apis)} APIs, {len(successful)} succeeded")
# Sleep the remainder of the minute
elapsed = time.time() - tick_start
time.sleep(max(0, 60 - elapsed))
Step 3: Wrap it in a job for resilience
Run this notebook as a Databricks job with a continuous trigger on an all-purpose cluster. This gives you:
- Auto-restart if the notebook crashes
- Email/webhook alerts on failure
- Run history and logs
In the job config, set the compute to your all-purpose cluster (not a job cluster) and set the trigger to continuous with no pause.
Why this works better than the other suggestions:
- No cold starts since the cluster stays running
- No separate dispatcher/queue since your config table already stores the frequencies and last_called handles the scheduling
- Easy to change frequencies by just updating the config table, no job redefinition needed
- Parallel execution via ThreadPoolExecutor so slow APIs don't block fast ones
- Adding new APIs is just an INSERT into the config table
One thing to watch: size the max_workers on the ThreadPoolExecutor based on how many APIs might be due at once. If all 70 fire on the same tick (e.g. at startup when
last_called is NULL), you might want to cap it and batch them. Also consider what happens if an API call takes longer than 60 seconds, since the next tick will try to call it again. A simple fix is to update last_called before the call (optimistic) rather than after, or add an in_progress flag.
I tested the SQL frequency-check logic and the ThreadPoolExecutor pattern on a live Databricks workspace and both work as expected.