cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Databrikcs SDK - create new job using JSON

pavel_merkle
New Contributor II

Hello,

I am trying to create a Job via Databricks SDK. As input, I use the JSON generated via Workflows UI (Worklflows->Jobs->View YAML/JSON->JSON API->Create) generating pavel_job.json. When trying to run SDK function jobs.create as

dbk = WorkspaceClient(host=args.host, token=args.token, auth_type="pat")

job_dict = json.load(open("pavel_job.json"))
job = dbk.jobs.create(**job_dict)

This running into an error:

Error creating job 'pavel_job': 'dict' object has no attribute 'as_dict'

Can you please advice on how to use the generated JSON with SDK to create a Job?

4 REPLIES 4

mhiltner
Databricks Employee
Databricks Employee

Hey there! I have been using Volumes to get the files. It looks like this:

dbk = WorkspaceClient(host=args.host, token=args.token, auth_type="pat")

file_path = "/Volumes/{{your_catalog}}/{{your_schema}}/json_volumes/sample1.json"
content = dbutils.fs.head(file_path)
job_dict = eval(content)
job = dbk.jobs.create(**job_dict)
 
Your problem is probably when creating the dict, not calling the API function.

Hi mhiltner,

does this job creation work for you? Can you please share sample1.json? Do you run it in notebook? I am trying to run it via VSCode and other API calls work for me so I guess the json/dict I pass is the problem, but Ialready tried the simplest one.

What do you mean by not calling the API function when creating the dict?

mhiltner
Databricks Employee
Databricks Employee

it does work for me, using directly on a notebook cell (havent tried in vscode). 

This is my sample Json 

{
  "name": "Agregacao_Gold",
  "email_notifications": {},
  "webhook_notifications": {},
  "timeout_seconds": 0,
  "max_concurrent_runs": 1,
  "queue": {
    "enabled": True
  },
  "run_as": {
    "user_name": "mhiltner@community.com"
  }
}

 What i meant is that maybe your job_dict variable is not storing your job as a dict type and this could be causing an error. If you dont mind, feel free to share the result of printing your job_dict 

pavel_merkle
New Contributor II

So I did play around to achieve the reasonable results. You have to create a parsing Python code that ingests the JSON generated by the Databricks (Workflows -> View JSON -> Create) and produce a proper SDK call with all the SDK objects in place. The call will fail anytime the SDK expects the object but gets the String instead. Very sensitive! 

I am attaching the complete Python program that takes jobs from confg file and creates the jobs in the Databricks using SDK. 100% working 100% free ๐Ÿ˜„ 

from databricks.sdk import WorkspaceClient
from databricks.sdk.service import jobs, iam
from databricks.sdk.service.compute import ClusterSpec, DbfsStorageInfo, ClusterLogConf, WorkspaceStorageInfo, InitScriptInfo, DataSecurityMode, RuntimeEngine, RCranLibrary, MavenLibrary, PythonPyPiLibrary, Library, AzureAttributes, AzureAvailability
import datetime
import argparse
import logging
import json
import sys


# Helper function to create JobCluster
def create_job_cluster(cluster_spec, pool_id, logger):
	logger.info(f"...creating cluster definition for {cluster_spec['job_cluster_key']}")
	job_cluster = jobs.JobCluster(
		job_cluster_key=cluster_spec["job_cluster_key"],
		new_cluster=ClusterSpec(
			cluster_name=cluster_spec["new_cluster"].get("cluster_name", []),
			spark_version=cluster_spec["new_cluster"].get("spark_version", []),
			spark_conf=cluster_spec["new_cluster"].get("spark_conf", []),
			azure_attributes = AzureAttributes(
				availability=AzureAvailability(
					cluster_spec["new_cluster"]["azure_attributes"].get("availability", None),
				),
				first_on_demand=cluster_spec["new_cluster"]["azure_attributes"].get("first_on_demand", []),
				spot_bid_max_price=cluster_spec["new_cluster"]["azure_attributes"].get("spot_bid_max_price", []),
			) if cluster_spec["new_cluster"].get("azure_attributes") else {},
			custom_tags=cluster_spec["new_cluster"].get("custom_tags", []),
			cluster_log_conf=ClusterLogConf(
				dbfs=DbfsStorageInfo(
					destination=cluster_spec["new_cluster"]["cluster_log_conf"].get("dbfs", []).get("destination", []),
				)
			) if "cluster_log_conf" in cluster_spec["new_cluster"] else None,
			init_scripts=[
				InitScriptInfo(
					workspace=WorkspaceStorageInfo(
						destination=script["workspace"].get("destination", [])
					)
				) for script in cluster_spec["new_cluster"].get("init_scripts", [])
			],
			enable_elastic_disk=cluster_spec["new_cluster"].get("enable_elastic_disk", None),
			instance_pool_id=pool_id if "instance_pool_id" in cluster_spec["new_cluster"] else None,
			driver_instance_pool_id=pool_id if "driver_instance_pool_id" in cluster_spec["new_cluster"] else None,
			node_type_id = cluster_spec["new_cluster"].get("node_type_id", None),
			driver_node_type_id = cluster_spec["new_cluster"].get("driver_node_type_id", None),
			#data_security_mode=DataSecurityMode(cluster_spec["new_cluster"].get("data_security_mode", [])),
			runtime_engine=RuntimeEngine(cluster_spec["new_cluster"].get("runtime_engine", [])),
			num_workers=cluster_spec["new_cluster"].get("num_workers", []),
		)
	)

	return job_cluster


# Helper function to create Task
def create_job_task(task_spec, logger):
	logger.info(f"...creating task definition for {task_spec['task_key']}")
	# set the libraries if they are specified
	task_libraries = []
	for lib in task_spec.get("libraries", []):
		if "maven" in lib:
			task_libraries.append(Library(maven=MavenLibrary(**lib["maven"])))
		elif "pypi" in lib:
			task_libraries.append(Library(pypi=PythonPyPiLibrary(package=lib["pypi"]["package"], repo=lib["pypi"].get("repo"))))
		elif "cran" in lib:
			task_libraries.append(Library(cran=RCranLibrary(package=lib["cran"]["package"], repo=lib["cran"].get("repo"))))
		elif "egg" in lib:
			task_libraries.append(Library(egg=lib["egg"]))
		elif "jar" in lib:
			task_libraries.append(Library(jar=lib["jar"]))				
		elif "whl" in lib:
			task_libraries.append(Library(whl=lib["whl"]))		
		elif "requirements" in lib:
			task_libraries.append(Library(requirements=lib["requirements"]))							

	job_task =  jobs.Task(
		task_key=task_spec["task_key"],
		run_if=jobs.RunIf(task_spec.get("run_if", [])),
		libraries=task_libraries,
		depends_on=[
			jobs.TaskDependency(task_key=dep["task_key"]) for dep in task_spec.get("depends_on", [])
		],
		notebook_task=jobs.NotebookTask(
			notebook_path=task_spec["notebook_task"]["notebook_path"],
			base_parameters=task_spec["notebook_task"].get("base_parameters", {}),
			source=jobs.Source(task_spec["notebook_task"]["source"]),
		),
		job_cluster_key=task_spec.get("job_cluster_key", []),
		timeout_seconds=task_spec.get("timeout_seconds", []),
		email_notifications=jobs.JobEmailNotifications(
			on_start=task_spec["email_notifications"].get("on_start", []),
			on_success=task_spec["email_notifications"].get("on_success", []),
			on_failure=task_spec["email_notifications"].get("on_failure", [])
		) if "email_notifications" in task_spec else None,
		notification_settings=jobs.TaskNotificationSettings(
			no_alert_for_skipped_runs=task_spec["notification_settings"].get("no_alert_for_skipped_runs", []),
			no_alert_for_canceled_runs=task_spec["notification_settings"].get("no_alert_for_canceled_runs", []),
			alert_on_last_attempt=task_spec["notification_settings"].get("alert_on_last_attempt", False)
		) if "notification_settings" in task_spec else None,    
	)

	return job_task


def get_service_principal(logger: logging.Logger, dbk: WorkspaceClient):
	logger.info(f"...getting principal ID")
	principals = dbk.service_principals.list()
	principal_id = None
	for p in principals:
		if p.active:
			principal_id = p.application_id
			break
	return principal_id


def create_job_definition(logger: logging.Logger, dbk: WorkspaceClient, job_dict: dict, pool: str, job_pause_status: str) -> dict:
	"""
	given that jobs.create() or jobs.reset() cannot accept job definitions via .json
	we can unpack a dict with all necessary objects 
	"""	
	definition = job_dict.copy()
	logger.info(f"...creating job definition for {job_dict['name']}")
	# Validate and fetch pool ID
	pool_id = None
	pool_list = dbk.instance_pools.list()
	for p in pool_list:
		if p.instance_pool_name == pool:
			pool_id = p.instance_pool_id
			logger.info(f"...fetched pool_id for '{pool}': {pool_id}")
			break
	if not pool_id :
		logger.info(f"Error fetching pool_id. Pool '{pool}' does not exist")
		raise ValueError(f"Pool '{pool}' does not exist")
	
	# Create job clusters
	job_clusters = [create_job_cluster(cluster, pool_id, logger) for cluster in definition.get("job_clusters", [])]

	# Create tasks
	tasks = [create_job_task(task, logger) for task in definition.get("tasks", [])]

	# Define run_as: either user or service principal fetched for the workspace
	if "user_name" in definition["run_as"] and definition["run_as"]["user_name"]:
		run_as = jobs.JobRunAs(user_name = definition["run_as"]["user_name"])
	else:
		service_principal_envr = get_service_principal(logger, dbk)
		run_as = jobs.JobRunAs(service_principal_name = service_principal_envr)
	

	# Populate job settings
	definition["job_clusters"] = job_clusters
	definition["tasks"] = tasks
	if "continuous" in definition:
		definition["continuous"] = jobs.Continuous(
			pause_status = jobs.PauseStatus(job_pause_status) if "continuous" in definition else None	
		)	
	definition["email_notifications"] = jobs.JobEmailNotifications(
		on_failure=definition.get("email_notifications", {}).get("on_failure", []),
		on_start=definition.get("email_notifications", {}).get("on_start", []),
		on_success=definition.get("email_notifications", {}).get("on_success", []),
		no_alert_for_skipped_runs=definition.get("email_notifications", {}).get("no_alert_for_skipped_runs", False)
	) if "email_notifications" in definition else None
	definition["notification_settings"] = jobs.JobNotificationSettings(
		no_alert_for_skipped_runs=definition.get("notification_settings", {}).get("no_alert_for_skipped_runs", False),
		no_alert_for_canceled_runs=definition.get("notification_settings", {}).get("no_alert_for_canceled_runs", False),
	) if "notification_settings" in definition else None
	definition["webhook_notifications"] = jobs.WebhookNotifications(
		on_start=definition.get("webhook_notifications", {}).get("on_start", False),
		on_failure=definition.get("webhook_notifications", {}).get("on_failure", False),
		on_success=definition.get("webhook_notifications", {}).get("on_success", False),
	) if "webhook_notifications" in definition else None	
	definition["schedule"] = jobs.CronSchedule(
		quartz_cron_expression=definition.get("schedule", {}).get("quartz_cron_expression", ""),
		timezone_id=definition.get("schedule", {}).get("timezone_id", "UTC"),
		pause_status = jobs.PauseStatus(job_pause_status) 
	) if "schedule" in definition else None
	definition["run_as"] = run_as
	definition["queue"] = jobs.QueueSettings(enabled=definition.get("queue", {}).get("enabled", True)) if "queue" in definition else None
	
	
	return definition

def get_existing_job_id(logger: logging.Logger, dbk: WorkspaceClient, job_name: str):
	logger.info(f"Checking the existing jobs with name {job_name}")
	job_list = dbk.jobs.list()
	job_count = 0
	job_id = None
	for j in job_list:
		base_job_name = j.settings.name
		if base_job_name == job_name:
			created = datetime.datetime.fromtimestamp(int(j.created_time)/1000).strftime(format="%d.%m.%Y %H.%M.%S")
			if job_id: 
				logger.info(f"Deleting duplicated job {j.job_id} created {created}")	
				dbk.jobs.delete(j.job_id)
				job_count += 1
			else:
				job_id = j.job_id
				logger.info(f"Found an existing job {j.job_id} created {created}")	
	if job_count > 0:
		logger.info(f"Deleted {job_count} jobs with name '{job_name}'")
		
	return job_id
		

def create_or_reset_job(logger: logging.Logger, dbk: WorkspaceClient, json_path: str, pool: str, pause_status: str) -> str:
	with open(json_path, "r") as file:
		logger.info(f"Loading json: {json_path}...")
		job_json = json.load(file)
		job_name = job_json['name']
		logger.info(f"Processing job {job_name}")
	try:
		job_settings = create_job_definition(logger, dbk, job_json, pool, pause_status)
		job_id = get_existing_job_id(logger, dbk, job_name)
		if job_id:
			logger.info(f"Resetting existing job {job_id}...")	
			job_reset = jobs.JobSettings(**job_settings)
			job = dbk.jobs.reset(job_id=job_id, new_settings=job_reset)	
			logger.info(f"Job {job_name} reset.")
		else:
			logger.info(f"Job '{job_name}' doesn't exist. Creating a new job...")
			job = dbk.jobs.create(**job_settings)    
			logger.info(f"Job created. job_id: {job.job_id}")
					
		return "ok"
	except Exception as e:
		return "error"

def main(args):
	
	# logging setup
	logger = logging.getLogger('azure.mgmt.resource')
	logger.setLevel(logging.INFO)
	handler = logging.StreamHandler(stream=sys.stdout)
	logger.addHandler(handler)
	
	# connect to workspace
	dbk = WorkspaceClient(host=args.host, token=args.token, auth_type="pat")

	# load Job definitions file
	config = json.load(open("config_jobs.json"))
	
	# for each job in environment, attempt to create or reset
	if args.environment is not None:
		ok = 0
		error = 0
		for job in config[args.environment]: # take all jobs from config file filtered by envr
			if "pause_status" in job:
				pause_status = job["pause_status"]
			else:
				pause_status = None			
			status = create_or_reset_job(logger, dbk, job["path"], job["pool"], pause_status)	
			logger.info(f"STATUS: {status}")

			if status == "error":
				error+=1
			else:
				ok +=1
		logger.info(f"JOBS OK: {ok}, JOBS ERROR: {error}")
		if error > 0:	
			raise RuntimeError(f"Error while creating jobs.")


if __name__ == "__main__":
	parser = argparse.ArgumentParser()
	parser.add_argument("--host", help="Databricks host")
	parser.add_argument("--token", help="Databricks token")
	parser.add_argument("--environment", help="Used to switch cluster configs to loop over")
	parser.add_argument("--force", action= argparse.BooleanOptionalAction, help="Force job creation")
	args = parser.parse_args()
	main(args)

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group