05-23-2024 09:06 AM - edited 05-23-2024 09:11 AM
Hello,
I am trying to create a Job via Databricks SDK. As input, I use the JSON generated via Workflows UI (Worklflows->Jobs->View YAML/JSON->JSON API->Create) generating pavel_job.json. When trying to run SDK function jobs.create as
dbk = WorkspaceClient(host=args.host, token=args.token, auth_type="pat")
This running into an error:
Error creating job 'pavel_job': 'dict' object has no attribute 'as_dict'
Can you please advice on how to use the generated JSON with SDK to create a Job?
05-27-2024 04:05 AM
Hey there! I have been using Volumes to get the files. It looks like this:
dbk = WorkspaceClient(host=args.host, token=args.token, auth_type="pat")
05-29-2024 11:53 AM
Hi mhiltner,
does this job creation work for you? Can you please share sample1.json? Do you run it in notebook? I am trying to run it via VSCode and other API calls work for me so I guess the json/dict I pass is the problem, but Ialready tried the simplest one.
What do you mean by not calling the API function when creating the dict?
05-31-2024 01:27 PM
it does work for me, using directly on a notebook cell (havent tried in vscode).
This is my sample Json
{
"name": "Agregacao_Gold",
"email_notifications": {},
"webhook_notifications": {},
"timeout_seconds": 0,
"max_concurrent_runs": 1,
"queue": {
"enabled": True
},
"run_as": {
"user_name": "mhiltner@community.com"
}
}
What i meant is that maybe your job_dict variable is not storing your job as a dict type and this could be causing an error. If you dont mind, feel free to share the result of printing your job_dict
27m ago
So I did play around to achieve the reasonable results. You have to create a parsing Python code that ingests the JSON generated by the Databricks (Workflows -> View JSON -> Create) and produce a proper SDK call with all the SDK objects in place. The call will fail anytime the SDK expects the object but gets the String instead. Very sensitive!
I am attaching the complete Python program that takes jobs from confg file and creates the jobs in the Databricks using SDK. 100% working 100% free 😄
from databricks.sdk import WorkspaceClient
from databricks.sdk.service import jobs, iam
from databricks.sdk.service.compute import ClusterSpec, DbfsStorageInfo, ClusterLogConf, WorkspaceStorageInfo, InitScriptInfo, DataSecurityMode, RuntimeEngine, RCranLibrary, MavenLibrary, PythonPyPiLibrary, Library, AzureAttributes, AzureAvailability
import datetime
import argparse
import logging
import json
import sys
# Helper function to create JobCluster
def create_job_cluster(cluster_spec, pool_id, logger):
logger.info(f"...creating cluster definition for {cluster_spec['job_cluster_key']}")
job_cluster = jobs.JobCluster(
job_cluster_key=cluster_spec["job_cluster_key"],
new_cluster=ClusterSpec(
cluster_name=cluster_spec["new_cluster"].get("cluster_name", []),
spark_version=cluster_spec["new_cluster"].get("spark_version", []),
spark_conf=cluster_spec["new_cluster"].get("spark_conf", []),
azure_attributes = AzureAttributes(
availability=AzureAvailability(
cluster_spec["new_cluster"]["azure_attributes"].get("availability", None),
),
first_on_demand=cluster_spec["new_cluster"]["azure_attributes"].get("first_on_demand", []),
spot_bid_max_price=cluster_spec["new_cluster"]["azure_attributes"].get("spot_bid_max_price", []),
) if cluster_spec["new_cluster"].get("azure_attributes") else {},
custom_tags=cluster_spec["new_cluster"].get("custom_tags", []),
cluster_log_conf=ClusterLogConf(
dbfs=DbfsStorageInfo(
destination=cluster_spec["new_cluster"]["cluster_log_conf"].get("dbfs", []).get("destination", []),
)
) if "cluster_log_conf" in cluster_spec["new_cluster"] else None,
init_scripts=[
InitScriptInfo(
workspace=WorkspaceStorageInfo(
destination=script["workspace"].get("destination", [])
)
) for script in cluster_spec["new_cluster"].get("init_scripts", [])
],
enable_elastic_disk=cluster_spec["new_cluster"].get("enable_elastic_disk", None),
instance_pool_id=pool_id if "instance_pool_id" in cluster_spec["new_cluster"] else None,
driver_instance_pool_id=pool_id if "driver_instance_pool_id" in cluster_spec["new_cluster"] else None,
node_type_id = cluster_spec["new_cluster"].get("node_type_id", None),
driver_node_type_id = cluster_spec["new_cluster"].get("driver_node_type_id", None),
#data_security_mode=DataSecurityMode(cluster_spec["new_cluster"].get("data_security_mode", [])),
runtime_engine=RuntimeEngine(cluster_spec["new_cluster"].get("runtime_engine", [])),
num_workers=cluster_spec["new_cluster"].get("num_workers", []),
)
)
return job_cluster
# Helper function to create Task
def create_job_task(task_spec, logger):
logger.info(f"...creating task definition for {task_spec['task_key']}")
# set the libraries if they are specified
task_libraries = []
for lib in task_spec.get("libraries", []):
if "maven" in lib:
task_libraries.append(Library(maven=MavenLibrary(**lib["maven"])))
elif "pypi" in lib:
task_libraries.append(Library(pypi=PythonPyPiLibrary(package=lib["pypi"]["package"], repo=lib["pypi"].get("repo"))))
elif "cran" in lib:
task_libraries.append(Library(cran=RCranLibrary(package=lib["cran"]["package"], repo=lib["cran"].get("repo"))))
elif "egg" in lib:
task_libraries.append(Library(egg=lib["egg"]))
elif "jar" in lib:
task_libraries.append(Library(jar=lib["jar"]))
elif "whl" in lib:
task_libraries.append(Library(whl=lib["whl"]))
elif "requirements" in lib:
task_libraries.append(Library(requirements=lib["requirements"]))
job_task = jobs.Task(
task_key=task_spec["task_key"],
run_if=jobs.RunIf(task_spec.get("run_if", [])),
libraries=task_libraries,
depends_on=[
jobs.TaskDependency(task_key=dep["task_key"]) for dep in task_spec.get("depends_on", [])
],
notebook_task=jobs.NotebookTask(
notebook_path=task_spec["notebook_task"]["notebook_path"],
base_parameters=task_spec["notebook_task"].get("base_parameters", {}),
source=jobs.Source(task_spec["notebook_task"]["source"]),
),
job_cluster_key=task_spec.get("job_cluster_key", []),
timeout_seconds=task_spec.get("timeout_seconds", []),
email_notifications=jobs.JobEmailNotifications(
on_start=task_spec["email_notifications"].get("on_start", []),
on_success=task_spec["email_notifications"].get("on_success", []),
on_failure=task_spec["email_notifications"].get("on_failure", [])
) if "email_notifications" in task_spec else None,
notification_settings=jobs.TaskNotificationSettings(
no_alert_for_skipped_runs=task_spec["notification_settings"].get("no_alert_for_skipped_runs", []),
no_alert_for_canceled_runs=task_spec["notification_settings"].get("no_alert_for_canceled_runs", []),
alert_on_last_attempt=task_spec["notification_settings"].get("alert_on_last_attempt", False)
) if "notification_settings" in task_spec else None,
)
return job_task
def get_service_principal(logger: logging.Logger, dbk: WorkspaceClient):
logger.info(f"...getting principal ID")
principals = dbk.service_principals.list()
principal_id = None
for p in principals:
if p.active:
principal_id = p.application_id
break
return principal_id
def create_job_definition(logger: logging.Logger, dbk: WorkspaceClient, job_dict: dict, pool: str, job_pause_status: str) -> dict:
"""
given that jobs.create() or jobs.reset() cannot accept job definitions via .json
we can unpack a dict with all necessary objects
"""
definition = job_dict.copy()
logger.info(f"...creating job definition for {job_dict['name']}")
# Validate and fetch pool ID
pool_id = None
pool_list = dbk.instance_pools.list()
for p in pool_list:
if p.instance_pool_name == pool:
pool_id = p.instance_pool_id
logger.info(f"...fetched pool_id for '{pool}': {pool_id}")
break
if not pool_id :
logger.info(f"Error fetching pool_id. Pool '{pool}' does not exist")
raise ValueError(f"Pool '{pool}' does not exist")
# Create job clusters
job_clusters = [create_job_cluster(cluster, pool_id, logger) for cluster in definition.get("job_clusters", [])]
# Create tasks
tasks = [create_job_task(task, logger) for task in definition.get("tasks", [])]
# Define run_as: either user or service principal fetched for the workspace
if "user_name" in definition["run_as"] and definition["run_as"]["user_name"]:
run_as = jobs.JobRunAs(user_name = definition["run_as"]["user_name"])
else:
service_principal_envr = get_service_principal(logger, dbk)
run_as = jobs.JobRunAs(service_principal_name = service_principal_envr)
# Populate job settings
definition["job_clusters"] = job_clusters
definition["tasks"] = tasks
if "continuous" in definition:
definition["continuous"] = jobs.Continuous(
pause_status = jobs.PauseStatus(job_pause_status) if "continuous" in definition else None
)
definition["email_notifications"] = jobs.JobEmailNotifications(
on_failure=definition.get("email_notifications", {}).get("on_failure", []),
on_start=definition.get("email_notifications", {}).get("on_start", []),
on_success=definition.get("email_notifications", {}).get("on_success", []),
no_alert_for_skipped_runs=definition.get("email_notifications", {}).get("no_alert_for_skipped_runs", False)
) if "email_notifications" in definition else None
definition["notification_settings"] = jobs.JobNotificationSettings(
no_alert_for_skipped_runs=definition.get("notification_settings", {}).get("no_alert_for_skipped_runs", False),
no_alert_for_canceled_runs=definition.get("notification_settings", {}).get("no_alert_for_canceled_runs", False),
) if "notification_settings" in definition else None
definition["webhook_notifications"] = jobs.WebhookNotifications(
on_start=definition.get("webhook_notifications", {}).get("on_start", False),
on_failure=definition.get("webhook_notifications", {}).get("on_failure", False),
on_success=definition.get("webhook_notifications", {}).get("on_success", False),
) if "webhook_notifications" in definition else None
definition["schedule"] = jobs.CronSchedule(
quartz_cron_expression=definition.get("schedule", {}).get("quartz_cron_expression", ""),
timezone_id=definition.get("schedule", {}).get("timezone_id", "UTC"),
pause_status = jobs.PauseStatus(job_pause_status)
) if "schedule" in definition else None
definition["run_as"] = run_as
definition["queue"] = jobs.QueueSettings(enabled=definition.get("queue", {}).get("enabled", True)) if "queue" in definition else None
return definition
def get_existing_job_id(logger: logging.Logger, dbk: WorkspaceClient, job_name: str):
logger.info(f"Checking the existing jobs with name {job_name}")
job_list = dbk.jobs.list()
job_count = 0
job_id = None
for j in job_list:
base_job_name = j.settings.name
if base_job_name == job_name:
created = datetime.datetime.fromtimestamp(int(j.created_time)/1000).strftime(format="%d.%m.%Y %H.%M.%S")
if job_id:
logger.info(f"Deleting duplicated job {j.job_id} created {created}")
dbk.jobs.delete(j.job_id)
job_count += 1
else:
job_id = j.job_id
logger.info(f"Found an existing job {j.job_id} created {created}")
if job_count > 0:
logger.info(f"Deleted {job_count} jobs with name '{job_name}'")
return job_id
def create_or_reset_job(logger: logging.Logger, dbk: WorkspaceClient, json_path: str, pool: str, pause_status: str) -> str:
with open(json_path, "r") as file:
logger.info(f"Loading json: {json_path}...")
job_json = json.load(file)
job_name = job_json['name']
logger.info(f"Processing job {job_name}")
try:
job_settings = create_job_definition(logger, dbk, job_json, pool, pause_status)
job_id = get_existing_job_id(logger, dbk, job_name)
if job_id:
logger.info(f"Resetting existing job {job_id}...")
job_reset = jobs.JobSettings(**job_settings)
job = dbk.jobs.reset(job_id=job_id, new_settings=job_reset)
logger.info(f"Job {job_name} reset.")
else:
logger.info(f"Job '{job_name}' doesn't exist. Creating a new job...")
job = dbk.jobs.create(**job_settings)
logger.info(f"Job created. job_id: {job.job_id}")
return "ok"
except Exception as e:
return "error"
def main(args):
# logging setup
logger = logging.getLogger('azure.mgmt.resource')
logger.setLevel(logging.INFO)
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)
# connect to workspace
dbk = WorkspaceClient(host=args.host, token=args.token, auth_type="pat")
# load Job definitions file
config = json.load(open("config_jobs.json"))
# for each job in environment, attempt to create or reset
if args.environment is not None:
ok = 0
error = 0
for job in config[args.environment]: # take all jobs from config file filtered by envr
if "pause_status" in job:
pause_status = job["pause_status"]
else:
pause_status = None
status = create_or_reset_job(logger, dbk, job["path"], job["pool"], pause_status)
logger.info(f"STATUS: {status}")
if status == "error":
error+=1
else:
ok +=1
logger.info(f"JOBS OK: {ok}, JOBS ERROR: {error}")
if error > 0:
raise RuntimeError(f"Error while creating jobs.")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--host", help="Databricks host")
parser.add_argument("--token", help="Databricks token")
parser.add_argument("--environment", help="Used to switch cluster configs to loop over")
parser.add_argument("--force", action= argparse.BooleanOptionalAction, help="Force job creation")
args = parser.parse_args()
main(args)
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group