In this blog, we'll explore how to dynamically generate Databricks Workflows.
Imagine needing to create multiple workflows, each consisting of tasks that run different notebooks, with varying cluster configurations and custom parameters such as min/max workers or specific node types. You might want to assign large tasks to dedicated job clusters while grouping smaller tasks to run on shared clusters.
Manually setting up thousands of such workflows—each with potentially hundreds of tasks—would be time-consuming, error-prone, and inefficient.
To address this, our approach automates the entire setup using a single configuration table and the Databricks Python SDK. This eliminates the need for manual configuration, significantly reduces human error, and improves overall efficiency.
The configuration table captures all essential details, such as cluster settings, notebook paths, and task-specific parameters. By simply adding a new entry to this table, workflows are automatically and consistently generated.
Terraform and Databricks Asset Bundles (DAB) require predefined tasks, meaning they lack the ability to generate workflows dynamically based on a config table. Our approach allows us to gain more granular control over task-level configurations, such as worker counts, node types, and execution strategies.
In Databricks Workflows, a for each task applies the same configuration across all iterations, typically executing the same notebook multiple times with different input parameters. While this works well for repetitive tasks with uniform logic, it’s limited when tasks need to diverge in behavior or resources.
By contrast, the dynamic workflow generation approach described above offers several key advantages:
Stay tuned as we walk through this powerful Python SDK-driven method to automate Databricks Workflow generation!
-------------------------------------------------------------------------
In this blog, we’ll cover:
✅ How to define workflows dynamically using a config table
✅ Generating job clusters, tasks, and jobs programmatically
✅ Submitting and managing jobs efficiently
High-Level Overview of Steps
The config table serves as the backbone for dynamically generating workflows. This allows full flexibility in defining:
📌 Important:
Each row in the configuration table can define multiple tasks by specifying multiple notebook paths. The number of tasks created corresponds to the number of notebooks listed in that row.
These notebook paths must be provided in the config column as a comma-separated string (e.g., "path/to/notebook1,path/to/notebook2").
Alternatively, you can structure this column as an array to avoid manual parsing and improve readability.
Config Table Schema
sql_query = f"""
CREATE TABLE IF NOT EXISTS {config_catalog}.{config_schema}.job_config (
config_id BIGINT GENERATED ALWAYS AS IDENTITY COMMENT 'Unique identifier for each configuration entry (Primary Key)',
batch STRING COMMENT 'Defines job grouping for tasks (Tasks with the same batch belong to the same job)',
job_cluster_key STRING COMMENT 'Defines which tasks share the same cluster',
spark_ver STRING COMMENT 'The Spark version used for the job execution',
driver_node_type STRING COMMENT 'The driver node type (e.g., Standard_DS3_v2, Standard_E32s_v4)',
driver_nodes INT COMMENT 'Number of cores assigned to the driver node',
driver_memory STRING COMMENT 'Amount of memory assigned to the driver node (e.g., 4g)',
executor_node_type STRING COMMENT 'The executor node type (e.g., Standard_DS3_v2, Standard_E32s_v4)',
min_workers INT COMMENT 'Minimum number of workers required for execution',
max_workers INT COMMENT 'Maximum number of workers allowed',
compute_policy STRING COMMENT 'Defines the compute policy applied to the job',
photon_acceleration BOOLEAN COMMENT 'Indicates whether Photon acceleration is enabled (true/false)',
tags STRING COMMENT 'Metadata tags (stored as a JSON object) for tracking job details',
library STRING COMMENT 'Libraries required for job execution',
init_scripts STRING COMMENT 'Initialization scripts to be run before execution',
batch_schedule STRING COMMENT 'Job execution schedule (e.g., daily, hourly)',
job_run_as STRING COMMENT 'User account under which the job runs',
task_parameters STRING COMMENT 'JSON object containing key-value task-specific parameters',
user_name STRING COMMENT 'Username associated with job permissions',
user_permissions STRING COMMENT 'Permission level (e.g., CAN_MANAGE, CAN_RUN)',
queue BOOLEAN COMMENT 'Indicates if the task should be queued when resources are unavailable',
notebook_paths STRING COMMENT 'Path to the Databricks notebook for execution',
data_security_mode compute access mode SINGLE_USER or USER_ISOLATION
)
"""
# Execute the SQL query
spark.sql(sql_query)
Below is a sample dataset that demonstrates how different tasks can be grouped under jobs and clusters. It will create three pipelines in a workflow, each with two tasks.
sql_query = f"""
INSERT INTO users.anjali_jain.job_config (
batch, job_cluster_key, spark_ver, driver_node_type, driver_nodes, driver_memory,
executor_node_type, min_workers, max_workers, compute_policy, photon_acceleration,
tags, library, init_scripts, batch_schedule, job_run_as, task_parameters, user_name,
user_permissions, queue, notebook_paths
) VALUES (
'BATCH_1', 'cluster_1', '16.0.x-scala2.12', 'r3.xlarge', 4, '8g',
'r3.xlarge', 2, 10, 'default', true,
'{{"tag1": "value1", "tag2": "value2"}}', 'pypi=sample_library',
'dbfs=/path/to/init_script|abfss=/path/to/init_script', 'daily', 'user.1@xyz.com',
'{{"param1": "value1", "param2": "value2"}}', 'user.1@xyz.com', 'CAN_MANAGE', true, "path_to_notebook_1,path_to_notebook_2", “SINGLE_USER”
),
(
'BATCH_1', 'cluster_2', '16.0.x-scala2.12', 'r3.xlarge', 4, '8g',
'r3.xlarge', 2, 10, 'default', true,
'{{"tag1": "value1", "tag2": "value2"}}', 'pypi=sample_library',
'dbfs=/path/to/init_script|abfss=/path/to/init_script', 'daily', 'user.1@xyz.com',
'{{"param1": "value1", "param2": "value2"}}', 'user.1@xyz.com', 'CAN_MANAGE', true, "path_to_notebook_1,path_to_notebook_2", “SINGLE_USER”
),
(
'BATCH_1', 'cluster_2', '16.0.x-scala2.12', 'r3.xlarge', 8, '8g',
'r3.xlarge', 2, 10, 'default', true,
'{{"tag1": "value1", "tag2": "value2"}}', 'pypi=sample_library',
'dbfs=/path/to/init_script|abfss=/path/to/init_script', 'daily', 'user.1@xyz.com',
'{{"param1": "value1", "param2": "value2"}}', 'user.1@xyz.com', 'CAN_MANAGE', true, "path_to_notebook_1,path_to_notebook_2",”SINGLE_USER”
);
"""
# Execute the SQL query
spark.sql(sql_query)
NOTE:
Once the config table is set up, we can use the Databricks SDK for Python to generate workflows dynamically. The following steps are involved:
To dynamically generate workflows, we first need to retrieve the job definitions stored in a configuration table.
We query the table by executing a SQL statement against a Databricks SQL Warehouse using the execute_statement() method.
The query results are then mapped into a list of dictionaries (mapped_results), which will be used in later steps to generate job clusters and tasks dynamically.
Here’s the code for this step:
import json
from databricks.sdk import WorkspaceClient
from databricks.sdk.service import jobs, compute, iam
from databricks.sdk.service.jobs import QueueSettings
CONFIG_TABLE_NAME = "catalog.schema.job_config"
SQL_WAREHOUSE_ID = "862f1d7ghf0424f7"
batch = "BATCH_1"
try:
w = WorkspaceClient()
jobgrp_query = f"SELECT * FROM {CONFIG_TABLE_NAME} WHERE LOWER(TRIM(batch))='{batch.lower().strip()}'"
print("SQL warehouse starting... ")
w.warehouses.start_and_wait(SQL_WAREHOUSE_ID)
wh = w.warehouses.wait_get_warehouse_running(SQL_WAREHOUSE_ID)
if wh.state == wh.state.RUNNING:
result = w.statement_execution.execute_statement(
statement=jobgrp_query, warehouse_id=SQL_WAREHOUSE_ID
)
column_names = [col.name for col in result.manifest.schema.columns]
mapped_results = [
dict(zip(column_names, row)) for row in result.result.data_array
]
except Exception as e:
print(f"An error occurred: {e}")
raise e
2. Create job clusters based on the cluster configurations in the table
def create_job_clusters(d):
"""
Create job clusters based on the configuration dictionary.
Args:
d (dict): Configuration dictionary containing job details.
Returns:
JobCluster: Created job cluster object.
"""
try:
job_cluster = jobs.JobCluster(
job_cluster_key=d["job_cluster_key"],
new_cluster=compute.ClusterSpec(
cluster_name="",
spark_version=d["spark_ver"],
node_type_id=d['executor_node_type'],
driver_node_type_id=d['driver_node_type'],
autoscale=compute.AutoScale(
min_workers=d['min_workers'],
max_workers=d['max_workers']
),
data_security_mode=compute.DataSecurityMode.SINGLE_USER if d['data_security_mode'] == 'SINGLE_USER' else compute.DataSecurityMode.USER_INSOLATION,
)
)
return job_cluster
except Exception as e:
raise e
3. Create tasks and automatically manage task dependencies, where each task n depends on task n-1, as illustrated above:
While the example below focuses on chaining notebook-based tasks, there are many more options you can configure via the configuration table, such as:
def create_task(d):
"""
Create tasks for the job based on the configuration dictionary.
The number of tasks depends on the notebook_paths
Args:
d (dict): Configuration dictionary containing job details.
Returns:
list: List of SubmitTask objects.
"""
try:
notebook_paths = d['notebook_paths'].split(',')
tasks = []
for i, notebook_path in enumerate(notebook_paths):
task = jobs.Task(
task_key=f"{i+1}_{d['config_id']}",
job_cluster_key=d["job_cluster_key"],
run_if=jobs.RunIf("ALL_SUCCESS"),
notebook_task=jobs.NotebookTask(notebook_path=notebook_path),
depends_on=[tasks[-1]] if tasks else []
)
tasks.append(task)
return tasks
except Exception as e:
raise e
4. Create a Job /workflow
def create_job(d, tasks, job_clusters, databricks_group_name):
"""
Create a job in Databricks based on the provided configuration.
Args:
d (dict): Configuration dictionary containing job details.
tasks (list): List of tasks to be included in the job.
job_clusters (list): List of job clusters to be used.
databricks_group_name (str): Name of the Databricks group.
Returns:
Job: Created job object.
"""
try:
job = w.jobs.create(
name=f"job_{d['batch']}",
email_notifications=jobs.JobEmailNotifications(no_alert_for_skipped_runs=False),
timeout_seconds=1000,
max_concurrent_runs=1,
tasks=tasks,
job_clusters=job_clusters,
queue=QueueSettings(enabled=True),
access_control_list=[
iam.AccessControlRequest(
user_name=d["user_name"],
permission_level=iam.PermissionLevel.CAN_MANAGE if d["user_permissions"] == "CAN_MANAGE" else iam.PermissionLevel.CAN_RUN
)
]
)
return job
except Exception as e:
print(f"Failed to create job: {e}")
raise e
5. Submit the job for execution
try:
tasks = []
clusters = []
for d in mapped_results:
tasks.extend(create_task(d))
##create cluster only if it's not already created
if d["job_cluster_key"] not in [cluster.job_cluster_key for cluster in clusters]:
clusters.append(create_job_clusters(d))
if tasks:
try:
print("Job is running")
job = create_job(d, tasks, clusters, "DATABRICKS_GROUP_NAME")
print(f"JOB_ID : {job.job_id}")
run = w.jobs.run_now_and_wait(job.job_id)
except Exception as e:
print(f"An error occurred in job run: {e}")
raise e
print("Job is finished")
if run.state.result_state != jobs.RunResultState.SUCCESS:
raise Exception(f"Job failed. Result state: {run.state.result_state}")
else:
print("Job finished successfully")
else:
print("No tasks to run")
except Exception as e:
print(f"An error occurred: {e}")
raise e
There are multiple ways to submit and manage jobs dynamically:
1. One-Time Runs (Logical Jobs)
%python
w.jobs.submit(
run_name="Onetime_Run",
tasks=[task],
job_clusters=[job_cluster])
2. Create and Delete Jobs
Create a job, run it, and delete it afterward to keep the workspace clean.
created_job = w.jobs.create(name=f'sdk-{time.time_ns()}',
tasks=[
jobs.Task(description="test",
existing_cluster_id=cluster_id,
notebook_task=jobs.NotebookTask(notebook_path=notebook_path),
task_key="test",
timeout_seconds=0)
])
w.jobs.cancel_all_runs(job_id=created_job.job_id)
# cleanup
w.jobs.delete(job_id=created_job.job_id)
3. Check for Existing Jobs
existing_jobs = w.jobs.list()
for job in existing_jobs:
if job.settings.name) == "Dynamic_Job":
print(f"Job already exists with ID: {job.job_id}")
break
Note: This framework can be extended for serverless as well by modifying properties in the config table
By leveraging the Databricks Python SDK and a configuration-driven approach, we can:
✅ Automate workflow creation dynamically
✅ Gain fine-grained control over tasks and clusters
✅ Eliminate the need for manual workflow management
This method provides greater flexibility than Terraform/DAB (which requires predefined tasks) and workflows ForEach task (which applies the same config to all tasks).
Now, effortlessly generate a vast number of workflows with just a configuration table! 🚀
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.