cancel
Showing results for 
Search instead for 
Did you mean: 
Technical Blog
Explore in-depth articles, tutorials, and insights on data analytics and machine learning in the Databricks Technical Blog. Stay updated on industry trends, best practices, and advanced techniques.
cancel
Showing results for 
Search instead for 
Did you mean: 
de_expert_aj
Databricks Employee
Databricks Employee

In this blog, we'll explore how to dynamically generate Databricks Workflows.

de_expert_aj_0-1747811525082.png

 

Imagine needing to create multiple workflows, each consisting of tasks that run different notebooks, with varying cluster configurations and custom parameters such as min/max workers or specific node types. You might want to assign large tasks to dedicated job clusters while grouping smaller tasks to run on shared clusters.

Manually setting up thousands of such workflows—each with potentially hundreds of tasks—would be time-consuming, error-prone, and inefficient.

To address this, our approach automates the entire setup using a single configuration table and the Databricks Python SDK. This eliminates the need for manual configuration, significantly reduces human error, and improves overall efficiency.

The configuration table captures all essential details, such as cluster settings, notebook paths, and task-specific parameters. By simply adding a new entry to this table, workflows are automatically and consistently generated.

Why Not Terraform or DAB?

Terraform and Databricks Asset Bundles (DAB) require predefined tasks, meaning they lack the ability to generate workflows dynamically based on a config table. Our approach allows us to gain more granular control over task-level configurations, such as worker counts, node types, and execution strategies.

Why Not Just Use a for Each Task in Databricks Workflows?

In Databricks Workflows, a for each task applies the same configuration across all iterations, typically executing the same notebook multiple times with different input parameters. While this works well for repetitive tasks with uniform logic, it’s limited when tasks need to diverge in behavior or resources.

By contrast, the dynamic workflow generation approach described above offers several key advantages:

  • Diverse Notebook Execution: Each task can invoke a different notebook, enabling varied processing logic within a single pipeline.

  • Custom Spark Versions: Tasks can be assigned different Spark versions, optimizing performance based on specific processing needs.

  • Tailored Cluster Configurations: Each task can run on customized cluster settings, ensuring the right level of compute and memory for its workload.
    This dynamic design provides greater flexibility and scalability, especially for complex pipelines where each task has unique logic or resource requirements.

Stay tuned as we walk through this powerful Python SDK-driven method to automate Databricks Workflow generation!

-------------------------------------------------------------------------

In this blog, we’ll cover:
How to define workflows dynamically using a config table
Generating job clusters, tasks, and jobs programmatically
Submitting and managing jobs efficiently

High-Level Overview of Steps

  1. Create the Configuration Table:
    Define the structure and parameters that will control task execution, including cluster configurations, notebooks, and job-specific settings.
  2. Leverage the Databricks Python SDK to Generate Workflows:
    Use the Databricks SDK to automatically:
    1. Parse the config table
    2. Create job clusters
    3. Configure tasks and handle dependencies, 
    4. Create a job/Databricks workflow
    5. Manage workflow execution 

Step 1: Configuration Table Design

The config table serves as the backbone for dynamically generating workflows. This allows full flexibility in defining:

  • Different notebooks for each task
  • Custom cluster configurations (e.g., worker count, node type, Spark version)
  • Manage job execution, user access, libraries, compute policies, init scripts, and so on

📌 Important:

Each row in the configuration table can define multiple tasks by specifying multiple notebook paths. The number of tasks created corresponds to the number of notebooks listed in that row.

These notebook paths must be provided in the config column as a comma-separated string (e.g., "path/to/notebook1,path/to/notebook2").

Alternatively, you can structure this column as an array to avoid manual parsing and improve readability.

Config Table Schema

sql_query = f"""
CREATE TABLE IF NOT EXISTS {config_catalog}.{config_schema}.job_config (
 config_id BIGINT GENERATED ALWAYS AS IDENTITY COMMENT 'Unique identifier for each configuration entry (Primary Key)',
 batch STRING COMMENT 'Defines job grouping for tasks (Tasks with the same batch belong to the same job)',
 job_cluster_key STRING COMMENT 'Defines which tasks share the same cluster',
 spark_ver STRING COMMENT 'The Spark version used for the job execution',
 driver_node_type STRING COMMENT 'The driver node type (e.g., Standard_DS3_v2, Standard_E32s_v4)',
 driver_nodes INT COMMENT 'Number of cores assigned to the driver node',
 driver_memory STRING COMMENT 'Amount of memory assigned to the driver node (e.g., 4g)',
 executor_node_type STRING COMMENT 'The executor node type (e.g., Standard_DS3_v2, Standard_E32s_v4)',
 min_workers INT COMMENT 'Minimum number of workers required for execution',
 max_workers INT COMMENT 'Maximum number of workers allowed',
 compute_policy STRING COMMENT 'Defines the compute policy applied to the job',
 photon_acceleration BOOLEAN COMMENT 'Indicates whether Photon acceleration is enabled (true/false)',
 tags STRING COMMENT 'Metadata tags (stored as a JSON object) for tracking job details',
 library STRING COMMENT 'Libraries required for job execution',
 init_scripts STRING COMMENT 'Initialization scripts to be run before execution',
 batch_schedule STRING COMMENT 'Job execution schedule (e.g., daily, hourly)',
 job_run_as STRING COMMENT 'User account under which the job runs',
 task_parameters STRING COMMENT 'JSON object containing key-value task-specific parameters',
 user_name STRING COMMENT 'Username associated with job permissions',
 user_permissions STRING COMMENT 'Permission level (e.g., CAN_MANAGE, CAN_RUN)',
 queue BOOLEAN COMMENT 'Indicates if the task should be queued when resources are unavailable',
 notebook_paths STRING COMMENT 'Path to the Databricks notebook for execution',
data_security_mode compute access mode SINGLE_USER or USER_ISOLATION
)
"""
# Execute the SQL query
spark.sql(sql_query)

Below is a sample dataset that demonstrates how different tasks can be grouped under jobs and clusters. It will create three pipelines in a workflow, each with two tasks.

sql_query = f"""
INSERT INTO users.anjali_jain.job_config (
 batch, job_cluster_key, spark_ver, driver_node_type, driver_nodes, driver_memory,
 executor_node_type, min_workers, max_workers, compute_policy, photon_acceleration,
 tags, library, init_scripts, batch_schedule, job_run_as, task_parameters, user_name,
 user_permissions, queue, notebook_paths
) VALUES (
 'BATCH_1', 'cluster_1', '16.0.x-scala2.12', 'r3.xlarge', 4, '8g',
 'r3.xlarge', 2, 10, 'default', true,
 '{{"tag1": "value1", "tag2": "value2"}}', 'pypi=sample_library',
 'dbfs=/path/to/init_script|abfss=/path/to/init_script', 'daily', 'user.1@xyz.com',
 '{{"param1": "value1", "param2": "value2"}}', 'user.1@xyz.com', 'CAN_MANAGE', true, "path_to_notebook_1,path_to_notebook_2", “SINGLE_USER”
),
(
 'BATCH_1', 'cluster_2', '16.0.x-scala2.12', 'r3.xlarge', 4, '8g',
 'r3.xlarge', 2, 10, 'default', true,
 '{{"tag1": "value1", "tag2": "value2"}}', 'pypi=sample_library',
 'dbfs=/path/to/init_script|abfss=/path/to/init_script', 'daily', 'user.1@xyz.com',
 '{{"param1": "value1", "param2": "value2"}}', 'user.1@xyz.com', 'CAN_MANAGE', true, "path_to_notebook_1,path_to_notebook_2", “SINGLE_USER”
),
(
 'BATCH_1', 'cluster_2', '16.0.x-scala2.12', 'r3.xlarge', 8, '8g',
 'r3.xlarge', 2, 10, 'default', true,
 '{{"tag1": "value1", "tag2": "value2"}}', 'pypi=sample_library',
 'dbfs=/path/to/init_script|abfss=/path/to/init_script', 'daily', 'user.1@xyz.com',
 '{{"param1": "value1", "param2": "value2"}}', 'user.1@xyz.com', 'CAN_MANAGE', true, "path_to_notebook_1,path_to_notebook_2",”SINGLE_USER”
);
"""
# Execute the SQL query
spark.sql(sql_query)

CONFIG TABLE SNAPSHOT

de_expert_aj_1-1747811635090.png

JOB SNAPSHOT

de_expert_aj_2-1747811635150.png

NOTE: 

  • Each row in a config table represents a separate pipeline in a job/workflow
  • All the tasks sharing the same BATCH belong to the same job. 
  • The job_cluster_key allows multiple tasks to share the same compute cluster, promoting efficient resource utilization. In the example above, Pipeline 2 and Pipeline 3 are both assigned the same job_cluster_keycluster_2—which means they run on the same cluster instance rather than provisioning separate clusters for each task.
  • The number of tasks in a pipeline is derived from the number of notebook paths passed in a row. 
  • Example: Two notebook paths result in two tasks per pipeline.
    Task dependencies: Task n depends on Task n-1 in the sequence.

Key Benefits of the Config Table Approach

  • Dynamic Job Creation: No need to manually configure a large number of workflows
  • Task-Level Control: Each task can have different worker configurations, compute policies, and notebook paths
  • Efficient Job Execution: Tasks can be grouped under the same job or run independently

Step 2: Generating Workflows Using Databricks SDK for Python

Once the config table is set up, we can use the Databricks SDK for Python to generate workflows dynamically. The following steps are involved:

  1. Read the config table to retrieve job definitions  :

To dynamically generate workflows, we first need to retrieve the job definitions stored in a configuration table. 

We query the table by executing a SQL statement against a Databricks SQL Warehouse using the execute_statement() method. 

The query results are then mapped into a list of dictionaries (mapped_results), which will be used in later steps to generate job clusters and tasks dynamically.

Here’s the code for this step:

import json
from databricks.sdk import WorkspaceClient
from databricks.sdk.service import jobs, compute, iam
from databricks.sdk.service.jobs import QueueSettings


CONFIG_TABLE_NAME = "catalog.schema.job_config"
SQL_WAREHOUSE_ID = "862f1d7ghf0424f7"  
batch = "BATCH_1" 


try:
   w = WorkspaceClient()
   jobgrp_query = f"SELECT * FROM {CONFIG_TABLE_NAME} WHERE LOWER(TRIM(batch))='{batch.lower().strip()}'"


   print("SQL warehouse starting... ")
   w.warehouses.start_and_wait(SQL_WAREHOUSE_ID)
   wh = w.warehouses.wait_get_warehouse_running(SQL_WAREHOUSE_ID)


   if wh.state == wh.state.RUNNING:
       result = w.statement_execution.execute_statement(
           statement=jobgrp_query, warehouse_id=SQL_WAREHOUSE_ID
       )
       column_names = [col.name for col in result.manifest.schema.columns]
       mapped_results = [
           dict(zip(column_names, row)) for row in result.result.data_array
       ]
except Exception as e:
   print(f"An error occurred: {e}")
   raise e

2. Create job clusters based on the cluster configurations in the table

def create_job_clusters(d):
   """
   Create job clusters based on the configuration dictionary.


   Args:
       d (dict): Configuration dictionary containing job details.


   Returns:
       JobCluster: Created job cluster object.
   """
   try:
       job_cluster = jobs.JobCluster(
           job_cluster_key=d["job_cluster_key"],
           new_cluster=compute.ClusterSpec(
               cluster_name="",
               spark_version=d["spark_ver"],
               node_type_id=d['executor_node_type'],
               driver_node_type_id=d['driver_node_type'],
               autoscale=compute.AutoScale(
                   min_workers=d['min_workers'],
                   max_workers=d['max_workers']
               ),
               data_security_mode=compute.DataSecurityMode.SINGLE_USER if d['data_security_mode'] == 'SINGLE_USER' else compute.DataSecurityMode.USER_INSOLATION,
           )
       )
       return job_cluster
   except Exception as e:
       raise e​

 3. Create tasks and automatically manage task dependencies, where each task n depends on task n-1, as illustrated above:

While the example below focuses on chaining notebook-based tasks, there are many more options you can configure via the  configuration table, such as:

  • Task-level parameters (e.g., base_parameters)

  • Timeouts and retry policies

  • Libraries to install on the cluster

  • Cluster reuse and pool support

  • Email alerts or webhook notifications

  • Task types beyond notebooks (e.g., Python scripts, JARs)
def create_task(d):
  """
  Create tasks for the job based on the configuration dictionary.
  The number of tasks depends on the notebook_paths

  Args:
      d (dict): Configuration dictionary containing job details.

  Returns:
      list: List of SubmitTask objects.
  """
  try:
      notebook_paths = d['notebook_paths'].split(',')
      tasks = []
      for i, notebook_path in enumerate(notebook_paths):
          task = jobs.Task(
              task_key=f"{i+1}_{d['config_id']}",
              job_cluster_key=d["job_cluster_key"],
              run_if=jobs.RunIf("ALL_SUCCESS"),
              notebook_task=jobs.NotebookTask(notebook_path=notebook_path),
              depends_on=[tasks[-1]] if tasks else []
          )
          tasks.append(task)
      return tasks
  except Exception as e:
      raise e

4. Create a Job /workflow

def create_job(d, tasks, job_clusters, databricks_group_name):
   """
   Create a job in Databricks based on the provided configuration.

   Args:
       d (dict): Configuration dictionary containing job details.
       tasks (list): List of tasks to be included in the job.
       job_clusters (list): List of job clusters to be used.
       databricks_group_name (str): Name of the Databricks group.

   Returns:
       Job: Created job object.
   """
   try:
       job = w.jobs.create(
           name=f"job_{d['batch']}",
           email_notifications=jobs.JobEmailNotifications(no_alert_for_skipped_runs=False),
           timeout_seconds=1000,
           max_concurrent_runs=1,
           tasks=tasks,
           job_clusters=job_clusters,
           queue=QueueSettings(enabled=True),
           access_control_list=[
               iam.AccessControlRequest(
                   user_name=d["user_name"],
                   permission_level=iam.PermissionLevel.CAN_MANAGE if d["user_permissions"] == "CAN_MANAGE" else iam.PermissionLevel.CAN_RUN
               )
           ]
       )
       return job
   except Exception as e:
       print(f"Failed to create job: {e}")
       raise e

5. Submit the job for execution

try:
   tasks = []
   clusters = []
   for d in mapped_results:
       tasks.extend(create_task(d))
       ##create cluster only if it's not already created
       if d["job_cluster_key"] not in [cluster.job_cluster_key for cluster in clusters]:
           clusters.append(create_job_clusters(d))


   if tasks:
       try:
           print("Job is running")
           job = create_job(d, tasks, clusters, "DATABRICKS_GROUP_NAME")
           print(f"JOB_ID : {job.job_id}")
           run = w.jobs.run_now_and_wait(job.job_id)
       except Exception as e:
           print(f"An error occurred in job run: {e}")
           raise e
       print("Job is finished")


       if run.state.result_state != jobs.RunResultState.SUCCESS:
           raise Exception(f"Job failed. Result state: {run.state.result_state}")
       else:
           print("Job finished successfully")
   else:
       print("No tasks to run")
except Exception as e:
   print(f"An error occurred: {e}")
   raise e

Submitting and Managing Jobs

There are multiple ways to submit and manage jobs dynamically:

1. One-Time Runs (Logical Jobs)

  • Submit a one-time run. This endpoint allows you to submit a workload directly without creating a job. You can see the job in Job Runs, but you will not see the physical job under the Jobs tab. This approach is useful for executing workloads on demand without accumulating multiple job definitions in the workspace.

de_expert_aj_0-1747813197839.png

 

%python
w.jobs.submit(
   run_name="Onetime_Run",
   tasks=[task],
   job_clusters=[job_cluster])

2. Create and Delete Jobs

Create a job, run it, and delete it afterward to keep the workspace clean.

created_job = w.jobs.create(name=f'sdk-{time.time_ns()}',
                           tasks=[
                               jobs.Task(description="test",
                                         existing_cluster_id=cluster_id,
                                         notebook_task=jobs.NotebookTask(notebook_path=notebook_path),
                                         task_key="test",
                                         timeout_seconds=0)
                           ])
w.jobs.cancel_all_runs(job_id=created_job.job_id)
# cleanup
w.jobs.delete(job_id=created_job.job_id)

3. Check for Existing Jobs

  • Before creating a new job, check if a similar job already exists based on the job name.
    existing_jobs = w.jobs.list()
    for job in existing_jobs:
       if job.settings.name) == "Dynamic_Job":
           print(f"Job already exists with ID: {job.job_id}")
           break

Note: This framework can be extended for serverless as well by modifying properties in the config table

Conclusion

By leveraging the Databricks Python SDK and a configuration-driven approach, we can:
Automate workflow creation dynamically
Gain fine-grained control over tasks and clusters
Eliminate the need for manual workflow management

This method provides greater flexibility than Terraform/DAB (which requires predefined tasks) and workflows ForEach task (which applies the same config to all tasks).

Now, effortlessly generate a vast number of workflows with just a configuration table! 🚀