pavel_merkle
Databricks Partner

So I did play around to achieve the reasonable results. You have to create a parsing Python code that ingests the JSON generated by the Databricks (Workflows -> View JSON -> Create) and produce a proper SDK call with all the SDK objects in place. The call will fail anytime the SDK expects the object but gets the String instead. Very sensitive! 

I am attaching the complete Python program that takes jobs from confg file and creates the jobs in the Databricks using SDK. 100% working 100% free 😄 

from databricks.sdk import WorkspaceClient
from databricks.sdk.service import jobs, iam
from databricks.sdk.service.compute import ClusterSpec, DbfsStorageInfo, ClusterLogConf, WorkspaceStorageInfo, InitScriptInfo, DataSecurityMode, RuntimeEngine, RCranLibrary, MavenLibrary, PythonPyPiLibrary, Library, AzureAttributes, AzureAvailability
import datetime
import argparse
import logging
import json
import sys


# Helper function to create JobCluster
def create_job_cluster(cluster_spec, pool_id, logger):
	logger.info(f"...creating cluster definition for {cluster_spec['job_cluster_key']}")
	job_cluster = jobs.JobCluster(
		job_cluster_key=cluster_spec["job_cluster_key"],
		new_cluster=ClusterSpec(
			cluster_name=cluster_spec["new_cluster"].get("cluster_name", []),
			spark_version=cluster_spec["new_cluster"].get("spark_version", []),
			spark_conf=cluster_spec["new_cluster"].get("spark_conf", []),
			azure_attributes = AzureAttributes(
				availability=AzureAvailability(
					cluster_spec["new_cluster"]["azure_attributes"].get("availability", None),
				),
				first_on_demand=cluster_spec["new_cluster"]["azure_attributes"].get("first_on_demand", []),
				spot_bid_max_price=cluster_spec["new_cluster"]["azure_attributes"].get("spot_bid_max_price", []),
			) if cluster_spec["new_cluster"].get("azure_attributes") else {},
			custom_tags=cluster_spec["new_cluster"].get("custom_tags", []),
			cluster_log_conf=ClusterLogConf(
				dbfs=DbfsStorageInfo(
					destination=cluster_spec["new_cluster"]["cluster_log_conf"].get("dbfs", []).get("destination", []),
				)
			) if "cluster_log_conf" in cluster_spec["new_cluster"] else None,
			init_scripts=[
				InitScriptInfo(
					workspace=WorkspaceStorageInfo(
						destination=script["workspace"].get("destination", [])
					)
				) for script in cluster_spec["new_cluster"].get("init_scripts", [])
			],
			enable_elastic_disk=cluster_spec["new_cluster"].get("enable_elastic_disk", None),
			instance_pool_id=pool_id if "instance_pool_id" in cluster_spec["new_cluster"] else None,
			driver_instance_pool_id=pool_id if "driver_instance_pool_id" in cluster_spec["new_cluster"] else None,
			node_type_id = cluster_spec["new_cluster"].get("node_type_id", None),
			driver_node_type_id = cluster_spec["new_cluster"].get("driver_node_type_id", None),
			#data_security_mode=DataSecurityMode(cluster_spec["new_cluster"].get("data_security_mode", [])),
			runtime_engine=RuntimeEngine(cluster_spec["new_cluster"].get("runtime_engine", [])),
			num_workers=cluster_spec["new_cluster"].get("num_workers", []),
		)
	)

	return job_cluster


# Helper function to create Task
def create_job_task(task_spec, logger):
	logger.info(f"...creating task definition for {task_spec['task_key']}")
	# set the libraries if they are specified
	task_libraries = []
	for lib in task_spec.get("libraries", []):
		if "maven" in lib:
			task_libraries.append(Library(maven=MavenLibrary(**lib["maven"])))
		elif "pypi" in lib:
			task_libraries.append(Library(pypi=PythonPyPiLibrary(package=lib["pypi"]["package"], repo=lib["pypi"].get("repo"))))
		elif "cran" in lib:
			task_libraries.append(Library(cran=RCranLibrary(package=lib["cran"]["package"], repo=lib["cran"].get("repo"))))
		elif "egg" in lib:
			task_libraries.append(Library(egg=lib["egg"]))
		elif "jar" in lib:
			task_libraries.append(Library(jar=lib["jar"]))				
		elif "whl" in lib:
			task_libraries.append(Library(whl=lib["whl"]))		
		elif "requirements" in lib:
			task_libraries.append(Library(requirements=lib["requirements"]))							

	job_task =  jobs.Task(
		task_key=task_spec["task_key"],
		run_if=jobs.RunIf(task_spec.get("run_if", [])),
		libraries=task_libraries,
		depends_on=[
			jobs.TaskDependency(task_key=dep["task_key"]) for dep in task_spec.get("depends_on", [])
		],
		notebook_task=jobs.NotebookTask(
			notebook_path=task_spec["notebook_task"]["notebook_path"],
			base_parameters=task_spec["notebook_task"].get("base_parameters", {}),
			source=jobs.Source(task_spec["notebook_task"]["source"]),
		),
		job_cluster_key=task_spec.get("job_cluster_key", []),
		timeout_seconds=task_spec.get("timeout_seconds", []),
		email_notifications=jobs.JobEmailNotifications(
			on_start=task_spec["email_notifications"].get("on_start", []),
			on_success=task_spec["email_notifications"].get("on_success", []),
			on_failure=task_spec["email_notifications"].get("on_failure", [])
		) if "email_notifications" in task_spec else None,
		notification_settings=jobs.TaskNotificationSettings(
			no_alert_for_skipped_runs=task_spec["notification_settings"].get("no_alert_for_skipped_runs", []),
			no_alert_for_canceled_runs=task_spec["notification_settings"].get("no_alert_for_canceled_runs", []),
			alert_on_last_attempt=task_spec["notification_settings"].get("alert_on_last_attempt", False)
		) if "notification_settings" in task_spec else None,    
	)

	return job_task


def get_service_principal(logger: logging.Logger, dbk: WorkspaceClient):
	logger.info(f"...getting principal ID")
	principals = dbk.service_principals.list()
	principal_id = None
	for p in principals:
		if p.active:
			principal_id = p.application_id
			break
	return principal_id


def create_job_definition(logger: logging.Logger, dbk: WorkspaceClient, job_dict: dict, pool: str, job_pause_status: str) -> dict:
	"""
	given that jobs.create() or jobs.reset() cannot accept job definitions via .json
	we can unpack a dict with all necessary objects 
	"""	
	definition = job_dict.copy()
	logger.info(f"...creating job definition for {job_dict['name']}")
	# Validate and fetch pool ID
	pool_id = None
	pool_list = dbk.instance_pools.list()
	for p in pool_list:
		if p.instance_pool_name == pool:
			pool_id = p.instance_pool_id
			logger.info(f"...fetched pool_id for '{pool}': {pool_id}")
			break
	if not pool_id :
		logger.info(f"Error fetching pool_id. Pool '{pool}' does not exist")
		raise ValueError(f"Pool '{pool}' does not exist")
	
	# Create job clusters
	job_clusters = [create_job_cluster(cluster, pool_id, logger) for cluster in definition.get("job_clusters", [])]

	# Create tasks
	tasks = [create_job_task(task, logger) for task in definition.get("tasks", [])]

	# Define run_as: either user or service principal fetched for the workspace
	if "user_name" in definition["run_as"] and definition["run_as"]["user_name"]:
		run_as = jobs.JobRunAs(user_name = definition["run_as"]["user_name"])
	else:
		service_principal_envr = get_service_principal(logger, dbk)
		run_as = jobs.JobRunAs(service_principal_name = service_principal_envr)
	

	# Populate job settings
	definition["job_clusters"] = job_clusters
	definition["tasks"] = tasks
	if "continuous" in definition:
		definition["continuous"] = jobs.Continuous(
			pause_status = jobs.PauseStatus(job_pause_status) if "continuous" in definition else None	
		)	
	definition["email_notifications"] = jobs.JobEmailNotifications(
		on_failure=definition.get("email_notifications", {}).get("on_failure", []),
		on_start=definition.get("email_notifications", {}).get("on_start", []),
		on_success=definition.get("email_notifications", {}).get("on_success", []),
		no_alert_for_skipped_runs=definition.get("email_notifications", {}).get("no_alert_for_skipped_runs", False)
	) if "email_notifications" in definition else None
	definition["notification_settings"] = jobs.JobNotificationSettings(
		no_alert_for_skipped_runs=definition.get("notification_settings", {}).get("no_alert_for_skipped_runs", False),
		no_alert_for_canceled_runs=definition.get("notification_settings", {}).get("no_alert_for_canceled_runs", False),
	) if "notification_settings" in definition else None
	definition["webhook_notifications"] = jobs.WebhookNotifications(
		on_start=definition.get("webhook_notifications", {}).get("on_start", False),
		on_failure=definition.get("webhook_notifications", {}).get("on_failure", False),
		on_success=definition.get("webhook_notifications", {}).get("on_success", False),
	) if "webhook_notifications" in definition else None	
	definition["schedule"] = jobs.CronSchedule(
		quartz_cron_expression=definition.get("schedule", {}).get("quartz_cron_expression", ""),
		timezone_id=definition.get("schedule", {}).get("timezone_id", "UTC"),
		pause_status = jobs.PauseStatus(job_pause_status) 
	) if "schedule" in definition else None
	definition["run_as"] = run_as
	definition["queue"] = jobs.QueueSettings(enabled=definition.get("queue", {}).get("enabled", True)) if "queue" in definition else None
	
	
	return definition

def get_existing_job_id(logger: logging.Logger, dbk: WorkspaceClient, job_name: str):
	logger.info(f"Checking the existing jobs with name {job_name}")
	job_list = dbk.jobs.list()
	job_count = 0
	job_id = None
	for j in job_list:
		base_job_name = j.settings.name
		if base_job_name == job_name:
			created = datetime.datetime.fromtimestamp(int(j.created_time)/1000).strftime(format="%d.%m.%Y %H.%M.%S")
			if job_id: 
				logger.info(f"Deleting duplicated job {j.job_id} created {created}")	
				dbk.jobs.delete(j.job_id)
				job_count += 1
			else:
				job_id = j.job_id
				logger.info(f"Found an existing job {j.job_id} created {created}")	
	if job_count > 0:
		logger.info(f"Deleted {job_count} jobs with name '{job_name}'")
		
	return job_id
		

def create_or_reset_job(logger: logging.Logger, dbk: WorkspaceClient, json_path: str, pool: str, pause_status: str) -> str:
	with open(json_path, "r") as file:
		logger.info(f"Loading json: {json_path}...")
		job_json = json.load(file)
		job_name = job_json['name']
		logger.info(f"Processing job {job_name}")
	try:
		job_settings = create_job_definition(logger, dbk, job_json, pool, pause_status)
		job_id = get_existing_job_id(logger, dbk, job_name)
		if job_id:
			logger.info(f"Resetting existing job {job_id}...")	
			job_reset = jobs.JobSettings(**job_settings)
			job = dbk.jobs.reset(job_id=job_id, new_settings=job_reset)	
			logger.info(f"Job {job_name} reset.")
		else:
			logger.info(f"Job '{job_name}' doesn't exist. Creating a new job...")
			job = dbk.jobs.create(**job_settings)    
			logger.info(f"Job created. job_id: {job.job_id}")
					
		return "ok"
	except Exception as e:
		return "error"

def main(args):
	
	# logging setup
	logger = logging.getLogger('azure.mgmt.resource')
	logger.setLevel(logging.INFO)
	handler = logging.StreamHandler(stream=sys.stdout)
	logger.addHandler(handler)
	
	# connect to workspace
	dbk = WorkspaceClient(host=args.host, token=args.token, auth_type="pat")

	# load Job definitions file
	config = json.load(open("config_jobs.json"))
	
	# for each job in environment, attempt to create or reset
	if args.environment is not None:
		ok = 0
		error = 0
		for job in config[args.environment]: # take all jobs from config file filtered by envr
			if "pause_status" in job:
				pause_status = job["pause_status"]
			else:
				pause_status = None			
			status = create_or_reset_job(logger, dbk, job["path"], job["pool"], pause_status)	
			logger.info(f"STATUS: {status}")

			if status == "error":
				error+=1
			else:
				ok +=1
		logger.info(f"JOBS OK: {ok}, JOBS ERROR: {error}")
		if error > 0:	
			raise RuntimeError(f"Error while creating jobs.")


if __name__ == "__main__":
	parser = argparse.ArgumentParser()
	parser.add_argument("--host", help="Databricks host")
	parser.add_argument("--token", help="Databricks token")
	parser.add_argument("--environment", help="Used to switch cluster configs to loop over")
	parser.add_argument("--force", action= argparse.BooleanOptionalAction, help="Force job creation")
	args = parser.parse_args()
	main(args)