cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Databricks workflow creation using databricks sdk programming

Prashanth24
New Contributor III

I am trying to create Databricks workflow using sdk programming. I am successful in this but struck at how to use libraries whl files in the task from yaml file means which sdk package or code to be used to associate library whl in the notebook/python task. Below code will read yaml file and creates the workflow as per the configuration.

Yaml file

job:
name: model1
tags: {"env": "dev", "product": "sample"}
default_cluster_node_type_id: Standard_DS3_v2
email_notification_alerts: {"no_alert_for_skipped_runs": False}
tasks:
- task_key: feature
description: feature
python_file: /Workspace/Users/<user email id>/workflows/python_sample.py
source: WORKSPACE
parameters: ["feature12345","abcd"]
libraries: [{"whl": "/Workspace/Users/<user email id>/workflows/myPackage-0.0.1-py3-none-any.whl"}]

Databricks-sdk coding

import yaml
import os
from databricks.sdk.service import jobs
from databricks.sdk import WorkspaceClient
from databricks.sdk.service import compute
from databricks.sdk.service.compute import *
from databricks.sdk.core import DatabricksError
import time
from pyspark.sql.types import StructField, StructType
def config_parser(config_file: str) -> dict:
    """
    Parses the YAML configuration file containing details about the Databricks job to be executed,
    including source code and runtime parameters.
    """
    try:
        with open(config_file) as f:
            configuration = yaml.safe_load(f)
            output = {"job": {}}
            for k, v in configuration["job"].items():
                output["job"][k] = configuration["job"][k]
            return output
    except Exception as ex:
        raise ValueError(
            "Failed to initialize ApplicationConfiguration, couldn't load YAML config!"
        ) from ex
   
# def create_tasks(tasks_list_input=None, cond_tasks_list_input=None, job_cond_tasks_list_input=None)-> List:
# email_notifications=jobs.JobEmailNotifications(tasks_dict["email_notifications"]),
def create_tasks(tasks_list_input)-> List:
    """
    Dynamically creates task objects based on the provided configurations.

    Args:
    - tasks_list_input (list): List containing task details.

    Returns:
    - list: A list of Databricks task objects.
    """
    print("Enter create_tasks")
    tasks_list_output = []
    for tasks_dict in tasks_list_input:
        print("Libraries are ",tasks_dict["libraries"])
        task = jobs.Task(
            description=tasks_dict["description"],
            job_cluster_key="default_cluster",
            spark_python_task=jobs.SparkPythonTask(
                python_file=tasks_dict["python_file"],
                source=jobs.Source.WORKSPACE,
                parameters=tasks_dict["parameters"],
            ),
            task_key=tasks_dict["task_key"],
            timeout_seconds=0,
            depends_on=[
                jobs.TaskDependency(task_key=i) for i in tasks_dict.get("depends_on", [])
            ],
           
        )
        tasks_list_output.append(task)

    return tasks_list_output
 
yaml_dict = config_parser("config3.yaml")
job = yaml_dict["job"]
tasks = job["tasks"]
job_tasks = create_tasks(tasks)
created_job = w.jobs.create(
    name=f"{job['name']}-{time.time_ns()}",
    job_clusters=[
        jobs.JobCluster(
            job_cluster_key="default_cluster",
            new_cluster=compute.ClusterSpec(
                spark_version=w.clusters.select_spark_version(long_term_support=True),
                node_type_id="Standard_DS3_v2",
                num_workers=2,
                autoscale=AutoScale(min_workers=2, max_workers=6),
                data_security_mode=DataSecurityMode.NONE,            
            ),
        ),
    ],
    tasks=job_tasks,
)
1 REPLY 1

Thanks for the information. In the create_tasks function, do i need to create any object of jobs package to associate the whl list with each task for ex: jobs.TaskDependency is used to add the task dependency in the same function. Similar to this, do i need to create any job object. If possible, can you please share any code sample for adding libraries to each task.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group