<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Databrikcs SDK - create new job using JSON in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/databrikcs-sdk-create-new-job-using-json/m-p/70706#M34131</link>
    <description>&lt;P&gt;Hey there! I have been using Volumes to get the files. It looks like this:&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;dbk = WorkspaceClient(host=args.host, token=args.token, auth_type="pat")&lt;/SPAN&gt;&lt;/P&gt;
&lt;DIV&gt;
&lt;DIV&gt;&lt;SPAN&gt;file_path &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt; &lt;SPAN&gt;"/Volumes/{{your_catalog}}/{{your_schema}}/json_volumes/sample1.json"&lt;/SPAN&gt;&lt;/DIV&gt;
&lt;DIV&gt;&lt;SPAN&gt;content &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; dbutils.fs.&lt;/SPAN&gt;&lt;SPAN&gt;head&lt;/SPAN&gt;&lt;SPAN&gt;(file_path)&lt;/SPAN&gt;&lt;/DIV&gt;
&lt;DIV&gt;&lt;SPAN&gt;job_dict &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt; &lt;SPAN&gt;eval&lt;/SPAN&gt;&lt;SPAN&gt;(content)&lt;/SPAN&gt;&lt;/DIV&gt;
&lt;DIV&gt;
&lt;DIV&gt;
&lt;DIV&gt;
&lt;DIV&gt;&lt;SPAN&gt;job = dbk.jobs.create(**job_dict)&lt;/SPAN&gt;&lt;/DIV&gt;
&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;DIV&gt;&lt;SPAN&gt;Your problem is probably when creating the dict, not calling the API function. &lt;/SPAN&gt;&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;/DIV&gt;</description>
    <pubDate>Mon, 27 May 2024 11:05:32 GMT</pubDate>
    <dc:creator>mhiltner</dc:creator>
    <dc:date>2024-05-27T11:05:32Z</dc:date>
    <item>
      <title>Databrikcs SDK - create new job using JSON</title>
      <link>https://community.databricks.com/t5/data-engineering/databrikcs-sdk-create-new-job-using-json/m-p/70521#M34086</link>
      <description>&lt;P&gt;Hello,&lt;/P&gt;&lt;P&gt;I am trying to create a Job via Databricks SDK. As input, I use the JSON generated via Workflows UI (Worklflows-&amp;gt;Jobs-&amp;gt;View YAML/JSON-&amp;gt;JSON API-&amp;gt;Create) generating&amp;nbsp;&lt;STRONG&gt;pavel_job.json&lt;/STRONG&gt;. When trying to run SDK function jobs.create as&lt;/P&gt;&lt;DIV&gt;&lt;P&gt;dbk = WorkspaceClient(host=args.host, token=args.token, auth_type="pat")&lt;/P&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;job_dict&lt;/SPAN&gt; &lt;SPAN&gt;=&lt;/SPAN&gt;&amp;nbsp;&lt;SPAN&gt;json&lt;/SPAN&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;SPAN&gt;load&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;open&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"&lt;STRONG&gt;pavel_job.json&lt;/STRONG&gt;"&lt;/SPAN&gt;&lt;SPAN&gt;))&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;job = dbk.jobs.create(**&lt;SPAN&gt;job_dict&lt;/SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;P&gt;This running into an error:&lt;/P&gt;&lt;P&gt;Error creating job 'pavel_job': 'dict' object has no attribute 'as_dict'&lt;/P&gt;&lt;P&gt;Can you please advice on how to use the generated JSON with SDK to create a Job?&lt;/P&gt;</description>
      <pubDate>Thu, 23 May 2024 16:11:58 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/databrikcs-sdk-create-new-job-using-json/m-p/70521#M34086</guid>
      <dc:creator>pavel_merkle</dc:creator>
      <dc:date>2024-05-23T16:11:58Z</dc:date>
    </item>
    <item>
      <title>Re: Databrikcs SDK - create new job using JSON</title>
      <link>https://community.databricks.com/t5/data-engineering/databrikcs-sdk-create-new-job-using-json/m-p/70706#M34131</link>
      <description>&lt;P&gt;Hey there! I have been using Volumes to get the files. It looks like this:&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;dbk = WorkspaceClient(host=args.host, token=args.token, auth_type="pat")&lt;/SPAN&gt;&lt;/P&gt;
&lt;DIV&gt;
&lt;DIV&gt;&lt;SPAN&gt;file_path &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt; &lt;SPAN&gt;"/Volumes/{{your_catalog}}/{{your_schema}}/json_volumes/sample1.json"&lt;/SPAN&gt;&lt;/DIV&gt;
&lt;DIV&gt;&lt;SPAN&gt;content &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; dbutils.fs.&lt;/SPAN&gt;&lt;SPAN&gt;head&lt;/SPAN&gt;&lt;SPAN&gt;(file_path)&lt;/SPAN&gt;&lt;/DIV&gt;
&lt;DIV&gt;&lt;SPAN&gt;job_dict &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt; &lt;SPAN&gt;eval&lt;/SPAN&gt;&lt;SPAN&gt;(content)&lt;/SPAN&gt;&lt;/DIV&gt;
&lt;DIV&gt;
&lt;DIV&gt;
&lt;DIV&gt;
&lt;DIV&gt;&lt;SPAN&gt;job = dbk.jobs.create(**job_dict)&lt;/SPAN&gt;&lt;/DIV&gt;
&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;DIV&gt;&lt;SPAN&gt;Your problem is probably when creating the dict, not calling the API function. &lt;/SPAN&gt;&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;/DIV&gt;</description>
      <pubDate>Mon, 27 May 2024 11:05:32 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/databrikcs-sdk-create-new-job-using-json/m-p/70706#M34131</guid>
      <dc:creator>mhiltner</dc:creator>
      <dc:date>2024-05-27T11:05:32Z</dc:date>
    </item>
    <item>
      <title>Re: Databrikcs SDK - create new job using JSON</title>
      <link>https://community.databricks.com/t5/data-engineering/databrikcs-sdk-create-new-job-using-json/m-p/71053#M34228</link>
      <description>&lt;P&gt;Hi&amp;nbsp;mhiltner,&lt;/P&gt;&lt;P&gt;does this job creation work for you? Can you please share sample1.json? Do you run it in notebook? I am trying to run it via VSCode and other API calls work for me so I guess the json/dict I pass is the problem, but Ialready tried the simplest one.&lt;/P&gt;&lt;P&gt;What do you mean by not calling the API function when creating the dict?&lt;/P&gt;</description>
      <pubDate>Wed, 29 May 2024 18:53:34 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/databrikcs-sdk-create-new-job-using-json/m-p/71053#M34228</guid>
      <dc:creator>pavel_merkle</dc:creator>
      <dc:date>2024-05-29T18:53:34Z</dc:date>
    </item>
    <item>
      <title>Re: Databrikcs SDK - create new job using JSON</title>
      <link>https://community.databricks.com/t5/data-engineering/databrikcs-sdk-create-new-job-using-json/m-p/71284#M34280</link>
      <description>&lt;P&gt;it does work for me, using directly on a notebook cell (havent tried in vscode).&amp;nbsp;&lt;/P&gt;
&lt;P&gt;This is my sample Json&amp;nbsp;&lt;/P&gt;
&lt;LI-CODE lang="markup"&gt;{
  "name": "Agregacao_Gold",
  "email_notifications": {},
  "webhook_notifications": {},
  "timeout_seconds": 0,
  "max_concurrent_runs": 1,
  "queue": {
    "enabled": True
  },
  "run_as": {
    "user_name": "mhiltner@community.com"
  }
}&lt;/LI-CODE&gt;
&lt;P&gt;&amp;nbsp;What i meant is that maybe your job_dict variable is not storing your job as a dict type and this could be causing an error. If you dont mind, feel free to share the result of printing your job_dict&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Fri, 31 May 2024 20:27:03 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/databrikcs-sdk-create-new-job-using-json/m-p/71284#M34280</guid>
      <dc:creator>mhiltner</dc:creator>
      <dc:date>2024-05-31T20:27:03Z</dc:date>
    </item>
    <item>
      <title>Re: Databrikcs SDK - create new job using JSON</title>
      <link>https://community.databricks.com/t5/data-engineering/databrikcs-sdk-create-new-job-using-json/m-p/89657#M37881</link>
      <description>&lt;P&gt;So I did play around to achieve the reasonable results. You have to create a parsing Python code that ingests the JSON generated by the Databricks (Workflows -&amp;gt; View JSON -&amp;gt; Create) and produce a proper SDK call with all the SDK objects in place. The call will fail anytime the SDK expects the object but gets the String instead. Very sensitive!&amp;nbsp;&lt;/P&gt;&lt;P&gt;I am attaching the complete Python program that takes jobs from confg file and creates the jobs in the Databricks using SDK. 100% working 100% free &lt;span class="lia-unicode-emoji" title=":grinning_face_with_smiling_eyes:"&gt;😄&lt;/span&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;from databricks.sdk import WorkspaceClient
from databricks.sdk.service import jobs, iam
from databricks.sdk.service.compute import ClusterSpec, DbfsStorageInfo, ClusterLogConf, WorkspaceStorageInfo, InitScriptInfo, DataSecurityMode, RuntimeEngine, RCranLibrary, MavenLibrary, PythonPyPiLibrary, Library, AzureAttributes, AzureAvailability
import datetime
import argparse
import logging
import json
import sys


# Helper function to create JobCluster
def create_job_cluster(cluster_spec, pool_id, logger):
	logger.info(f"...creating cluster definition for {cluster_spec['job_cluster_key']}")
	job_cluster = jobs.JobCluster(
		job_cluster_key=cluster_spec["job_cluster_key"],
		new_cluster=ClusterSpec(
			cluster_name=cluster_spec["new_cluster"].get("cluster_name", []),
			spark_version=cluster_spec["new_cluster"].get("spark_version", []),
			spark_conf=cluster_spec["new_cluster"].get("spark_conf", []),
			azure_attributes = AzureAttributes(
				availability=AzureAvailability(
					cluster_spec["new_cluster"]["azure_attributes"].get("availability", None),
				),
				first_on_demand=cluster_spec["new_cluster"]["azure_attributes"].get("first_on_demand", []),
				spot_bid_max_price=cluster_spec["new_cluster"]["azure_attributes"].get("spot_bid_max_price", []),
			) if cluster_spec["new_cluster"].get("azure_attributes") else {},
			custom_tags=cluster_spec["new_cluster"].get("custom_tags", []),
			cluster_log_conf=ClusterLogConf(
				dbfs=DbfsStorageInfo(
					destination=cluster_spec["new_cluster"]["cluster_log_conf"].get("dbfs", []).get("destination", []),
				)
			) if "cluster_log_conf" in cluster_spec["new_cluster"] else None,
			init_scripts=[
				InitScriptInfo(
					workspace=WorkspaceStorageInfo(
						destination=script["workspace"].get("destination", [])
					)
				) for script in cluster_spec["new_cluster"].get("init_scripts", [])
			],
			enable_elastic_disk=cluster_spec["new_cluster"].get("enable_elastic_disk", None),
			instance_pool_id=pool_id if "instance_pool_id" in cluster_spec["new_cluster"] else None,
			driver_instance_pool_id=pool_id if "driver_instance_pool_id" in cluster_spec["new_cluster"] else None,
			node_type_id = cluster_spec["new_cluster"].get("node_type_id", None),
			driver_node_type_id = cluster_spec["new_cluster"].get("driver_node_type_id", None),
			#data_security_mode=DataSecurityMode(cluster_spec["new_cluster"].get("data_security_mode", [])),
			runtime_engine=RuntimeEngine(cluster_spec["new_cluster"].get("runtime_engine", [])),
			num_workers=cluster_spec["new_cluster"].get("num_workers", []),
		)
	)

	return job_cluster


# Helper function to create Task
def create_job_task(task_spec, logger):
	logger.info(f"...creating task definition for {task_spec['task_key']}")
	# set the libraries if they are specified
	task_libraries = []
	for lib in task_spec.get("libraries", []):
		if "maven" in lib:
			task_libraries.append(Library(maven=MavenLibrary(**lib["maven"])))
		elif "pypi" in lib:
			task_libraries.append(Library(pypi=PythonPyPiLibrary(package=lib["pypi"]["package"], repo=lib["pypi"].get("repo"))))
		elif "cran" in lib:
			task_libraries.append(Library(cran=RCranLibrary(package=lib["cran"]["package"], repo=lib["cran"].get("repo"))))
		elif "egg" in lib:
			task_libraries.append(Library(egg=lib["egg"]))
		elif "jar" in lib:
			task_libraries.append(Library(jar=lib["jar"]))				
		elif "whl" in lib:
			task_libraries.append(Library(whl=lib["whl"]))		
		elif "requirements" in lib:
			task_libraries.append(Library(requirements=lib["requirements"]))							

	job_task =  jobs.Task(
		task_key=task_spec["task_key"],
		run_if=jobs.RunIf(task_spec.get("run_if", [])),
		libraries=task_libraries,
		depends_on=[
			jobs.TaskDependency(task_key=dep["task_key"]) for dep in task_spec.get("depends_on", [])
		],
		notebook_task=jobs.NotebookTask(
			notebook_path=task_spec["notebook_task"]["notebook_path"],
			base_parameters=task_spec["notebook_task"].get("base_parameters", {}),
			source=jobs.Source(task_spec["notebook_task"]["source"]),
		),
		job_cluster_key=task_spec.get("job_cluster_key", []),
		timeout_seconds=task_spec.get("timeout_seconds", []),
		email_notifications=jobs.JobEmailNotifications(
			on_start=task_spec["email_notifications"].get("on_start", []),
			on_success=task_spec["email_notifications"].get("on_success", []),
			on_failure=task_spec["email_notifications"].get("on_failure", [])
		) if "email_notifications" in task_spec else None,
		notification_settings=jobs.TaskNotificationSettings(
			no_alert_for_skipped_runs=task_spec["notification_settings"].get("no_alert_for_skipped_runs", []),
			no_alert_for_canceled_runs=task_spec["notification_settings"].get("no_alert_for_canceled_runs", []),
			alert_on_last_attempt=task_spec["notification_settings"].get("alert_on_last_attempt", False)
		) if "notification_settings" in task_spec else None,    
	)

	return job_task


def get_service_principal(logger: logging.Logger, dbk: WorkspaceClient):
	logger.info(f"...getting principal ID")
	principals = dbk.service_principals.list()
	principal_id = None
	for p in principals:
		if p.active:
			principal_id = p.application_id
			break
	return principal_id


def create_job_definition(logger: logging.Logger, dbk: WorkspaceClient, job_dict: dict, pool: str, job_pause_status: str) -&amp;gt; dict:
	"""
	given that jobs.create() or jobs.reset() cannot accept job definitions via .json
	we can unpack a dict with all necessary objects 
	"""	
	definition = job_dict.copy()
	logger.info(f"...creating job definition for {job_dict['name']}")
	# Validate and fetch pool ID
	pool_id = None
	pool_list = dbk.instance_pools.list()
	for p in pool_list:
		if p.instance_pool_name == pool:
			pool_id = p.instance_pool_id
			logger.info(f"...fetched pool_id for '{pool}': {pool_id}")
			break
	if not pool_id :
		logger.info(f"Error fetching pool_id. Pool '{pool}' does not exist")
		raise ValueError(f"Pool '{pool}' does not exist")
	
	# Create job clusters
	job_clusters = [create_job_cluster(cluster, pool_id, logger) for cluster in definition.get("job_clusters", [])]

	# Create tasks
	tasks = [create_job_task(task, logger) for task in definition.get("tasks", [])]

	# Define run_as: either user or service principal fetched for the workspace
	if "user_name" in definition["run_as"] and definition["run_as"]["user_name"]:
		run_as = jobs.JobRunAs(user_name = definition["run_as"]["user_name"])
	else:
		service_principal_envr = get_service_principal(logger, dbk)
		run_as = jobs.JobRunAs(service_principal_name = service_principal_envr)
	

	# Populate job settings
	definition["job_clusters"] = job_clusters
	definition["tasks"] = tasks
	if "continuous" in definition:
		definition["continuous"] = jobs.Continuous(
			pause_status = jobs.PauseStatus(job_pause_status) if "continuous" in definition else None	
		)	
	definition["email_notifications"] = jobs.JobEmailNotifications(
		on_failure=definition.get("email_notifications", {}).get("on_failure", []),
		on_start=definition.get("email_notifications", {}).get("on_start", []),
		on_success=definition.get("email_notifications", {}).get("on_success", []),
		no_alert_for_skipped_runs=definition.get("email_notifications", {}).get("no_alert_for_skipped_runs", False)
	) if "email_notifications" in definition else None
	definition["notification_settings"] = jobs.JobNotificationSettings(
		no_alert_for_skipped_runs=definition.get("notification_settings", {}).get("no_alert_for_skipped_runs", False),
		no_alert_for_canceled_runs=definition.get("notification_settings", {}).get("no_alert_for_canceled_runs", False),
	) if "notification_settings" in definition else None
	definition["webhook_notifications"] = jobs.WebhookNotifications(
		on_start=definition.get("webhook_notifications", {}).get("on_start", False),
		on_failure=definition.get("webhook_notifications", {}).get("on_failure", False),
		on_success=definition.get("webhook_notifications", {}).get("on_success", False),
	) if "webhook_notifications" in definition else None	
	definition["schedule"] = jobs.CronSchedule(
		quartz_cron_expression=definition.get("schedule", {}).get("quartz_cron_expression", ""),
		timezone_id=definition.get("schedule", {}).get("timezone_id", "UTC"),
		pause_status = jobs.PauseStatus(job_pause_status) 
	) if "schedule" in definition else None
	definition["run_as"] = run_as
	definition["queue"] = jobs.QueueSettings(enabled=definition.get("queue", {}).get("enabled", True)) if "queue" in definition else None
	
	
	return definition

def get_existing_job_id(logger: logging.Logger, dbk: WorkspaceClient, job_name: str):
	logger.info(f"Checking the existing jobs with name {job_name}")
	job_list = dbk.jobs.list()
	job_count = 0
	job_id = None
	for j in job_list:
		base_job_name = j.settings.name
		if base_job_name == job_name:
			created = datetime.datetime.fromtimestamp(int(j.created_time)/1000).strftime(format="%d.%m.%Y %H.%M.%S")
			if job_id: 
				logger.info(f"Deleting duplicated job {j.job_id} created {created}")	
				dbk.jobs.delete(j.job_id)
				job_count += 1
			else:
				job_id = j.job_id
				logger.info(f"Found an existing job {j.job_id} created {created}")	
	if job_count &amp;gt; 0:
		logger.info(f"Deleted {job_count} jobs with name '{job_name}'")
		
	return job_id
		

def create_or_reset_job(logger: logging.Logger, dbk: WorkspaceClient, json_path: str, pool: str, pause_status: str) -&amp;gt; str:
	with open(json_path, "r") as file:
		logger.info(f"Loading json: {json_path}...")
		job_json = json.load(file)
		job_name = job_json['name']
		logger.info(f"Processing job {job_name}")
	try:
		job_settings = create_job_definition(logger, dbk, job_json, pool, pause_status)
		job_id = get_existing_job_id(logger, dbk, job_name)
		if job_id:
			logger.info(f"Resetting existing job {job_id}...")	
			job_reset = jobs.JobSettings(**job_settings)
			job = dbk.jobs.reset(job_id=job_id, new_settings=job_reset)	
			logger.info(f"Job {job_name} reset.")
		else:
			logger.info(f"Job '{job_name}' doesn't exist. Creating a new job...")
			job = dbk.jobs.create(**job_settings)    
			logger.info(f"Job created. job_id: {job.job_id}")
					
		return "ok"
	except Exception as e:
		return "error"

def main(args):
	
	# logging setup
	logger = logging.getLogger('azure.mgmt.resource')
	logger.setLevel(logging.INFO)
	handler = logging.StreamHandler(stream=sys.stdout)
	logger.addHandler(handler)
	
	# connect to workspace
	dbk = WorkspaceClient(host=args.host, token=args.token, auth_type="pat")

	# load Job definitions file
	config = json.load(open("config_jobs.json"))
	
	# for each job in environment, attempt to create or reset
	if args.environment is not None:
		ok = 0
		error = 0
		for job in config[args.environment]: # take all jobs from config file filtered by envr
			if "pause_status" in job:
				pause_status = job["pause_status"]
			else:
				pause_status = None			
			status = create_or_reset_job(logger, dbk, job["path"], job["pool"], pause_status)	
			logger.info(f"STATUS: {status}")

			if status == "error":
				error+=1
			else:
				ok +=1
		logger.info(f"JOBS OK: {ok}, JOBS ERROR: {error}")
		if error &amp;gt; 0:	
			raise RuntimeError(f"Error while creating jobs.")


if __name__ == "__main__":
	parser = argparse.ArgumentParser()
	parser.add_argument("--host", help="Databricks host")
	parser.add_argument("--token", help="Databricks token")
	parser.add_argument("--environment", help="Used to switch cluster configs to loop over")
	parser.add_argument("--force", action= argparse.BooleanOptionalAction, help="Force job creation")
	args = parser.parse_args()
	main(args)&lt;/LI-CODE&gt;</description>
      <pubDate>Thu, 12 Sep 2024 15:43:38 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/databrikcs-sdk-create-new-job-using-json/m-p/89657#M37881</guid>
      <dc:creator>pavel_merkle</dc:creator>
      <dc:date>2024-09-12T15:43:38Z</dc:date>
    </item>
    <item>
      <title>Re: Databrikcs SDK - create new job using JSON</title>
      <link>https://community.databricks.com/t5/data-engineering/databrikcs-sdk-create-new-job-using-json/m-p/119962#M46007</link>
      <description>&lt;P&gt;It may work, but this defeats the purpose, as at this point it's much easier to just call Databricks API directly - you can pass it JSON just like that. Entire point of the SDK is to make using API simpler and not to generate few hundred lines of additional code to make it work.&lt;/P&gt;</description>
      <pubDate>Thu, 22 May 2025 12:03:51 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/databrikcs-sdk-create-new-job-using-json/m-p/119962#M46007</guid>
      <dc:creator>mike933</dc:creator>
      <dc:date>2025-05-22T12:03:51Z</dc:date>
    </item>
    <item>
      <title>Re: Databrikcs SDK - create new job using JSON</title>
      <link>https://community.databricks.com/t5/data-engineering/databrikcs-sdk-create-new-job-using-json/m-p/119970#M46010</link>
      <description>&lt;P&gt;This is probably the easiest way to create job from JSON:&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;import json

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.jobs import CreateJob

client = WorkspaceClient(
    host=WORKSPACE_DICT[WORKSPACE_NAME]["host_name"], token=os.getenv(WORKSPACE_DICT[WORKSPACE_NAME]["token_name"])
)

with open("test_job.json") as f:
    job_spec = json.load(f)

job_object = CreateJob.from_dict(job_spec)
new_job = client.jobs.create(**job_object.as_shallow_dict())&lt;/LI-CODE&gt;</description>
      <pubDate>Thu, 22 May 2025 13:27:15 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/databrikcs-sdk-create-new-job-using-json/m-p/119970#M46010</guid>
      <dc:creator>mike933</dc:creator>
      <dc:date>2025-05-22T13:27:15Z</dc:date>
    </item>
  </channel>
</rss>

