create a one off job run using databricks SDK.
Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-07-2025 01:44 AM
I'm trying to build the job spec using objects. When I try to call execute the job I get the following error.
I'm somewhat new to python and not sure what I'm doing wrong here. Is anyone able to help?
Traceback (most recent call last):
File "y:\My Drive\Source\TempDev\ABtoDatabricks\startjob.py", line 220, in <module>
waiter = w.jobs.submit(run_name=run_name,
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\BHARTMAN\AppData\Local\Programs\Python\Python312\Lib\site-packages\databricks\sdk\service\jobs.py", line 8110, in submit
body['access_control_list'] = [v.as_dict() for v in access_control_list]
^^^^^^^^^^^
File "C:\Users\BHARTMAN\AppData\Local\Programs\Python\Python312\Lib\site-packages\databricks\sdk\service\jobs.py", line 1844, in as_dict
if self.permission_level is not None: body['permission_level'] = self.permission_level.value
^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'str' object has no attribute 'value'
import datetime
import logging
import sys
import csv
from databricks.sdk.service import compute, jobs
from databricks.sdk import WorkspaceClient
# the vars will be parameters for the script
# job settings
aclstring = '''\
usertypename,value,permissionlevel
user_name,brian_hartman@next.co.uk,CAN_MANAGE
'''
# cluster settings
num_workers = 2
node_type_id = "Standard_E4ds_v5"
spark_version = "LatestLTS"
driver_node_type_id = "Standard_D4ds_v5"
driver_instance_pool_id = None
data_security_mode = 'DATA_SECURITY_MODE_STANDARD'
instance_pool_id = None
autoscale = True
min_workers = 1
max_workers = 2
spark_conf = "spark.databricks.sql.initial.catalog.name: businessintelligencesystems_dev, PYSPARK_PYTHON: /databricks/python3/bin/python3"
custom_tags = "businessarea: BIS, squad: mysquad"
init_scripts = None
spark_env_vars = None
# task settings
notebookpath = "/Workspace/Users/brian_hartman@next.co.uk/jobtestbrh"
base_parameters = "Age: 44, Name: Brian"
counter = 0
# callback, that receives a polled entity between state updates
def print_status(run: jobs.Run):
global counter
counter += 1
if counter % 10 == 0 or counter == 1:
logging.info(f'{run.run_page_url}')
statuses = [f'{t.task_key}: {t.state.life_cycle_state}' for t in run.tasks]
logging.info(f'workflow intermediate status: {", ".join(statuses)}')
# Define a function to create a cluster spec
def create_cluster_spec(
num_workers:int,
node_type_id:str,
spark_version:str,
driver_node_type_id:str=None,
instance_pool_id:str=None,
driver_instance_pool_id:str=None,
data_security_mode:str='DATA_SECURITY_MODE_STANDARD',
cluster_name:str=None,
autoscale:bool=True,
min_workers:int=None,
max_workers:int=None,
spark_conf:dict=None,
custom_tags:dict=None,
init_scripts:list=None,
spark_env_vars:dict=None,
policy_id:str=None,
apply_policy_default_values:bool=False,
azure_attributes:dict=None
):
cluster_spec = compute.ClusterSpec()
cluster_spec.num_workers = num_workers
cluster_spec.node_type_id = node_type_id
cluster_spec.data_security_mode = data_security_mode
cluster_spec.spark_version = spark_version
if cluster_name:
cluster_spec.cluster_name = cluster_name
if driver_node_type_id:
cluster_spec.driver_node_type_id = driver_node_type_id
if autoscale:
if min_workers is None:
if num_workers > 0:
min_workers = 1
if max_workers is None:
max_workers = num_workers
if max_workers > min_workers:
min_workers = max_workers
cluster_spec.autoscale = compute.AutoScale(min_workers, max_workers)
if spark_conf:
cluster_spec.spark_conf = spark_conf
if custom_tags:
cluster_spec.custom_tags = custom_tags
if init_scripts:
cluster_spec.init_scripts = init_scripts
if spark_env_vars:
cluster_spec.spark_env_vars = spark_env_vars
if instance_pool_id:
cluster_spec.instance_pool_id = instance_pool_id
if driver_instance_pool_id:
cluster_spec.driver_instance_pool_id = driver_instance_pool_id
if policy_id:
cluster_spec.policy_id = policy_id
cluster_spec.apply_policy_default_values = apply_policy_default_values
if azure_attributes:
cluster_spec.azure_attributes = azure_attributes
return cluster_spec
def create_notebook_task(
notebookpath:str,
base_parameters:dict=None,
source:str="WORKSPACE"
):
notebook_task = jobs.NotebookTask(notebook_path=notebookpath)
if base_parameters:
notebook_task.base_parameters = base_parameters
notebook_task.source = source
return notebook_task
def create_task(
task_key:str,
notebook_task:jobs.NotebookTask,
new_cluster:compute.ClusterSpec
):
task = jobs.Task(task_key=task_key,
notebook_task=notebook_task,
new_cluster=new_cluster)
return task
def create_access_control_list(aclstring:str):
access_controls = []
acls = csv.DictReader(aclstring.splitlines(), delimiter=',')
for acl in acls:
if acl['usertypename'] == 'user_name':
access_controls.append(jobs.JobAccessControlRequest(user_name=acl['value'], permission_level=acl['permissionlevel']))
elif acl['usertypename'] == 'group_name':
access_controls.append(jobs.JobAccessControlRequest(group_name=acl['value'], permission_level=acl['permissionlevel']))
elif acl['usertypename'] == 'service_principal_name':
access_controls.append(jobs.JobAccessControlRequest(service_principal_name=acl['value'], permission_level=acl['permissionlevel']))
else:
raise ValueError(f"Unknown usertypename: {acl['usertypename']}")
exit(1)
return access_controls
if __name__ == "__main__":
logging.basicConfig(stream=sys.stdout,
level=logging.INFO,
format="%(asctime)s [%(name)s][%(levelname)s] %(message)s",
)
w = WorkspaceClient()
# create time var
now = datetime.datetime.now()
timestr = now.strftime("%Y%m%d-%H%M%S")
# create dictionaries/objects as required
spark_conf_dict = dict(conf.split(": ") for conf in spark_conf.split(", "))
custom_tags_dict = dict(tag.split(": ") for tag in custom_tags.split(", "))
base_parameters_dict = dict(param.split(": ") for param in base_parameters.split(", "))
#names
notebookname = notebookpath.split("/")[-1]
cluster_name = f"jobcluster-{notebookname}-{timestr}"
run_name = f"{notebookname}-{timestr}"
if spark_version == "LatestLTS":
spark_version = w.clusters.select_spark_version(latest=True, long_term_support=True)
else:
spark_version = w.clusters.select_spark_version(version=spark_version)
cluster_spec = create_cluster_spec(
num_workers=num_workers,
node_type_id=node_type_id,
spark_version=spark_version,
driver_node_type_id=driver_node_type_id,
instance_pool_id=instance_pool_id,
driver_instance_pool_id=driver_instance_pool_id,
data_security_mode=data_security_mode,
cluster_name=cluster_name,
autoscale=autoscale,
min_workers=min_workers,
max_workers=max_workers,
spark_conf=spark_conf_dict,
custom_tags=custom_tags_dict,
init_scripts=init_scripts,
spark_env_vars=spark_env_vars,
)
notebook_task = create_notebook_task(
notebookpath=notebookpath,
base_parameters=base_parameters_dict
)
task = create_task(
task_key="notebook_task",
notebook_task=notebook_task,
new_cluster=cluster_spec
)
access_control_list = create_access_control_list(aclstring)
waiter = w.jobs.submit(run_name=run_name,
tasks=[task],
access_control_list=access_control_list
)
logging.info(f'starting to poll: {waiter.run_id}')
run = waiter.result(timeout=datetime.timedelta(minutes=15),
callback=print_status)
logging.info(f'job finished: {run.run_page_url}')
2 REPLIES 2
Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-07-2025 05:09 AM
Hi @BriGuy,
Can you try importing this module first?
from databricks.sdk.service.jobs import PermissionLevel
Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-07-2025 06:46 AM
I've updated the code as follows trying to make sure to use correct enums.
import datetime
import logging
import sys
import csv
from databricks.sdk.service import compute, jobs
from databricks.sdk import WorkspaceClient
# the vars will be parameters for the script
# job settings
aclstring = '''\
usertypename,value,permissionlevel
user_name,brian_hartman@next.co.uk,CAN_MANAGE
group_name,USRS-BusinessIntelligenceSystems,CAN_VIEW
service_principal_name,e28796de-c433-4fd6-85e6-c06b7f9d7e02,CAN_MANAGE
'''
# cluster settings
num_workers = 2
node_type_id = "Standard_E4ds_v5"
spark_version = "LatestLTS"
driver_node_type_id = "Standard_D4ds_v5"
driver_instance_pool_id = None
data_security_mode = 'DATA_SECURITY_MODE_STANDARD'
instance_pool_id = None
autoscale = True
min_workers = 1
max_workers = 2
spark_conf = "spark.databricks.sql.initial.catalog.name: businessintelligencesystems_dev, PYSPARK_PYTHON: /databricks/python3/bin/python3"
custom_tags = "businessarea: BIS, squad: mysquad"
init_scripts = None
spark_env_vars = None
# task settings
notebookpath = "/Workspace/Users/brian_hartman@next.co.uk/jobtestbrh"
base_parameters = "Age: 44, Name: Brian"
counter = 0
# callback, that receives a polled entity between state updates
def print_status(run: jobs.Run):
global counter
counter += 1
if counter % 10 == 0 or counter == 1:
logging.info(f'{run.run_page_url}')
statuses = [f'{t.task_key}: {t.state.life_cycle_state}' for t in run.tasks]
logging.info(f'workflow intermediate status: {", ".join(statuses)}')
# Define a function to create a cluster spec
def create_cluster_spec(
num_workers:int,
node_type_id:str,
spark_version:str,
driver_node_type_id:str=None,
instance_pool_id:str=None,
driver_instance_pool_id:str=None,
data_security_mode:str='DATA_SECURITY_MODE_STANDARD',
cluster_name:str=None,
autoscale:bool=True,
min_workers:int=None,
max_workers:int=None,
spark_conf:dict=None,
custom_tags:dict=None,
init_scripts:list=None,
spark_env_vars:dict=None,
policy_id:str=None,
apply_policy_default_values:bool=False,
azure_attributes:dict=None
):
cluster_spec = compute.ClusterSpec()
cluster_spec.num_workers = num_workers
cluster_spec.node_type_id = node_type_id
cluster_spec.data_security_mode = compute.DataSecurityMode(data_security_mode)
cluster_spec.spark_version = spark_version
if cluster_name:
cluster_spec.cluster_name = cluster_name
if driver_node_type_id:
cluster_spec.driver_node_type_id = driver_node_type_id
if autoscale:
if min_workers is None:
if num_workers > 0:
min_workers = 1
if max_workers is None:
max_workers = num_workers
if max_workers > min_workers:
min_workers = max_workers
cluster_spec.autoscale = compute.AutoScale(min_workers, max_workers)
if spark_conf:
cluster_spec.spark_conf = spark_conf
if custom_tags:
cluster_spec.custom_tags = custom_tags
if init_scripts:
cluster_spec.init_scripts = init_scripts
if spark_env_vars:
cluster_spec.spark_env_vars = spark_env_vars
if instance_pool_id:
cluster_spec.instance_pool_id = instance_pool_id
if driver_instance_pool_id:
cluster_spec.driver_instance_pool_id = driver_instance_pool_id
if policy_id:
cluster_spec.policy_id = policy_id
cluster_spec.apply_policy_default_values = apply_policy_default_values
if azure_attributes:
cluster_spec.azure_attributes = azure_attributes
return cluster_spec
def create_notebook_task(
notebookpath:str,
base_parameters:dict=None,
source:str="WORKSPACE"
):
notebook_task = jobs.NotebookTask(notebook_path=notebookpath)
if base_parameters:
notebook_task.base_parameters = base_parameters
return notebook_task
def create_task(
task_key:str,
notebook_task:jobs.NotebookTask,
new_cluster:compute.ClusterSpec
) -> jobs.SubmitTask:
task = jobs.SubmitTask(task_key=task_key,
notebook_task=notebook_task,
new_cluster=new_cluster)
return task
def create_access_control_list(aclstring:str):
access_controls = []
acls = csv.DictReader(aclstring.splitlines(), delimiter=',')
for acl in acls:
request = jobs.JobAccessControlRequest()
if acl['usertypename'] == 'user_name':
request.user_name = acl['value']
request.permission_level = jobs.JobPermissionLevel[acl['permissionlevel']]
elif acl['usertypename'] == 'group_name':
request.group_name = acl['value']
request.permission_level = jobs.JobPermissionLevel[acl['permissionlevel']]
elif acl['usertypename'] == 'service_principal_name':
request.service_principal_name = acl['value']
request.permission_level = jobs.JobPermissionLevel[acl['permissionlevel']]
else:
raise ValueError(f"Unknown usertypename: {acl['usertypename']}")
exit(1)
access_controls.append(request)
return access_controls
if __name__ == "__main__":
logging.basicConfig(stream=sys.stdout,
level=logging.INFO,
format="%(asctime)s [%(name)s][%(levelname)s] %(message)s",
)
w = WorkspaceClient()
# create time var
now = datetime.datetime.now()
timestr = now.strftime("%Y%m%d-%H%M%S")
# create dictionaries/objects as required
spark_conf_dict = dict(conf.split(": ") for conf in spark_conf.split(", "))
custom_tags_dict = dict(tag.split(": ") for tag in custom_tags.split(", "))
base_parameters_dict = dict(param.split(": ") for param in base_parameters.split(", "))
#names
notebookname = notebookpath.split("/")[-1]
cluster_name = f"jobcluster-{notebookname}-{timestr}"
run_name = f"{notebookname}-{timestr}"
if spark_version == "LatestLTS":
spark_version = w.clusters.select_spark_version(latest=True, long_term_support=True)
else:
spark_version = w.clusters.select_spark_version(version=spark_version)
cluster_spec = create_cluster_spec(
num_workers=num_workers,
node_type_id=node_type_id,
spark_version=spark_version,
driver_node_type_id=driver_node_type_id,
instance_pool_id=instance_pool_id,
driver_instance_pool_id=driver_instance_pool_id,
data_security_mode=data_security_mode,
cluster_name=cluster_name,
autoscale=autoscale,
min_workers=min_workers,
max_workers=max_workers,
spark_conf=spark_conf_dict,
custom_tags=custom_tags_dict,
init_scripts=init_scripts,
spark_env_vars=spark_env_vars,
)
notebook_task = create_notebook_task(
notebookpath=notebookpath,
base_parameters=base_parameters_dict
)
task = create_task(
task_key="notebook_task",
notebook_task=notebook_task,
new_cluster=cluster_spec
)
access_control_list = create_access_control_list(aclstring)
waiter = w.jobs.submit(run_name=run_name,
tasks=[task],
access_control_list=access_control_list
)
logging.info(f'starting to poll: {waiter.run_id}')
run = waiter.result(timeout=datetime.timedelta(minutes=15),
callback=print_status)
logging.info(f'job finished: {run.run_page_url}')
test = jobs.JobAccessControlRequest(user_name="test", permission_level="CAN_MANAGE")
Unfortunately I now get these errors and there isn't much detail to determine the issue.
2025-02-07 14:13:46,145 [databricks.sdk][INFO] loading DEFAULT profile from ~/.databrickscfg: host, client_id, client_secret
Traceback (most recent call last):
File "y:\My Drive\Source\TempDev\ABtoDatabricks\startjob.py", line 224, in <module>
waiter = w.jobs.submit(run_name=run_name,
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\BHARTMAN\AppData\Local\Programs\Python\Python312\Lib\site-packages\databricks\sdk\service\jobs.py", line 8126, in submit
op_response = self._api.do('POST', '/api/2.1/jobs/runs/submit', body=body, headers=headers)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\BHARTMAN\AppData\Local\Programs\Python\Python312\Lib\site-packages\databricks\sdk\core.py", line 77, in do
return self._api_client.do(method=method,
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\BHARTMAN\AppData\Local\Programs\Python\Python312\Lib\site-packages\databricks\sdk\_base_client.py", line 186, in do
response = call(method,
^^^^^^^^^^^^
File "C:\Users\BHARTMAN\AppData\Local\Programs\Python\Python312\Lib\site-packages\databricks\sdk\retries.py", line 55, in wrapper
raise err
File "C:\Users\BHARTMAN\AppData\Local\Programs\Python\Python312\Lib\site-packages\databricks\sdk\retries.py", line 34, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\BHARTMAN\AppData\Local\Programs\Python\Python312\Lib\site-packages\databricks\sdk\_base_client.py", line 278, in _perform
raise error from None
databricks.sdk.errors.platform.InternalError: None

