02-07-2025 01:44 AM
I'm trying to build the job spec using objects. When I try to call execute the job I get the following error.
I'm somewhat new to python and not sure what I'm doing wrong here. Is anyone able to help?
Traceback (most recent call last):
File "y:\My Drive\Source\TempDev\ABtoDatabricks\startjob.py", line 220, in <module>
waiter = w.jobs.submit(run_name=run_name,
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\BHARTMAN\AppData\Local\Programs\Python\Python312\Lib\site-packages\databricks\sdk\service\jobs.py", line 8110, in submit
body['access_control_list'] = [v.as_dict() for v in access_control_list]
^^^^^^^^^^^
File "C:\Users\BHARTMAN\AppData\Local\Programs\Python\Python312\Lib\site-packages\databricks\sdk\service\jobs.py", line 1844, in as_dict
if self.permission_level is not None: body['permission_level'] = self.permission_level.value
^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'str' object has no attribute 'value'
import datetime
import logging
import sys
import csv
from databricks.sdk.service import compute, jobs
from databricks.sdk import WorkspaceClient
# the vars will be parameters for the script
# job settings
aclstring = '''\
usertypename,value,permissionlevel
user_name,brian_hartman@next.co.uk,CAN_MANAGE
'''
# cluster settings
num_workers = 2
node_type_id = "Standard_E4ds_v5"
spark_version = "LatestLTS"
driver_node_type_id = "Standard_D4ds_v5"
driver_instance_pool_id = None
data_security_mode = 'DATA_SECURITY_MODE_STANDARD'
instance_pool_id = None
autoscale = True
min_workers = 1
max_workers = 2
spark_conf = "spark.databricks.sql.initial.catalog.name: businessintelligencesystems_dev, PYSPARK_PYTHON: /databricks/python3/bin/python3"
custom_tags = "businessarea: BIS, squad: mysquad"
init_scripts = None
spark_env_vars = None
# task settings
notebookpath = "/Workspace/Users/brian_hartman@next.co.uk/jobtestbrh"
base_parameters = "Age: 44, Name: Brian"
counter = 0
# callback, that receives a polled entity between state updates
def print_status(run: jobs.Run):
global counter
counter += 1
if counter % 10 == 0 or counter == 1:
logging.info(f'{run.run_page_url}')
statuses = [f'{t.task_key}: {t.state.life_cycle_state}' for t in run.tasks]
logging.info(f'workflow intermediate status: {", ".join(statuses)}')
# Define a function to create a cluster spec
def create_cluster_spec(
num_workers:int,
node_type_id:str,
spark_version:str,
driver_node_type_id:str=None,
instance_pool_id:str=None,
driver_instance_pool_id:str=None,
data_security_mode:str='DATA_SECURITY_MODE_STANDARD',
cluster_name:str=None,
autoscale:bool=True,
min_workers:int=None,
max_workers:int=None,
spark_conf:dict=None,
custom_tags:dict=None,
init_scripts:list=None,
spark_env_vars:dict=None,
policy_id:str=None,
apply_policy_default_values:bool=False,
azure_attributes:dict=None
):
cluster_spec = compute.ClusterSpec()
cluster_spec.num_workers = num_workers
cluster_spec.node_type_id = node_type_id
cluster_spec.data_security_mode = data_security_mode
cluster_spec.spark_version = spark_version
if cluster_name:
cluster_spec.cluster_name = cluster_name
if driver_node_type_id:
cluster_spec.driver_node_type_id = driver_node_type_id
if autoscale:
if min_workers is None:
if num_workers > 0:
min_workers = 1
if max_workers is None:
max_workers = num_workers
if max_workers > min_workers:
min_workers = max_workers
cluster_spec.autoscale = compute.AutoScale(min_workers, max_workers)
if spark_conf:
cluster_spec.spark_conf = spark_conf
if custom_tags:
cluster_spec.custom_tags = custom_tags
if init_scripts:
cluster_spec.init_scripts = init_scripts
if spark_env_vars:
cluster_spec.spark_env_vars = spark_env_vars
if instance_pool_id:
cluster_spec.instance_pool_id = instance_pool_id
if driver_instance_pool_id:
cluster_spec.driver_instance_pool_id = driver_instance_pool_id
if policy_id:
cluster_spec.policy_id = policy_id
cluster_spec.apply_policy_default_values = apply_policy_default_values
if azure_attributes:
cluster_spec.azure_attributes = azure_attributes
return cluster_spec
def create_notebook_task(
notebookpath:str,
base_parameters:dict=None,
source:str="WORKSPACE"
):
notebook_task = jobs.NotebookTask(notebook_path=notebookpath)
if base_parameters:
notebook_task.base_parameters = base_parameters
notebook_task.source = source
return notebook_task
def create_task(
task_key:str,
notebook_task:jobs.NotebookTask,
new_cluster:compute.ClusterSpec
):
task = jobs.Task(task_key=task_key,
notebook_task=notebook_task,
new_cluster=new_cluster)
return task
def create_access_control_list(aclstring:str):
access_controls = []
acls = csv.DictReader(aclstring.splitlines(), delimiter=',')
for acl in acls:
if acl['usertypename'] == 'user_name':
access_controls.append(jobs.JobAccessControlRequest(user_name=acl['value'], permission_level=acl['permissionlevel']))
elif acl['usertypename'] == 'group_name':
access_controls.append(jobs.JobAccessControlRequest(group_name=acl['value'], permission_level=acl['permissionlevel']))
elif acl['usertypename'] == 'service_principal_name':
access_controls.append(jobs.JobAccessControlRequest(service_principal_name=acl['value'], permission_level=acl['permissionlevel']))
else:
raise ValueError(f"Unknown usertypename: {acl['usertypename']}")
exit(1)
return access_controls
if __name__ == "__main__":
logging.basicConfig(stream=sys.stdout,
level=logging.INFO,
format="%(asctime)s [%(name)s][%(levelname)s] %(message)s",
)
w = WorkspaceClient()
# create time var
now = datetime.datetime.now()
timestr = now.strftime("%Y%m%d-%H%M%S")
# create dictionaries/objects as required
spark_conf_dict = dict(conf.split(": ") for conf in spark_conf.split(", "))
custom_tags_dict = dict(tag.split(": ") for tag in custom_tags.split(", "))
base_parameters_dict = dict(param.split(": ") for param in base_parameters.split(", "))
#names
notebookname = notebookpath.split("/")[-1]
cluster_name = f"jobcluster-{notebookname}-{timestr}"
run_name = f"{notebookname}-{timestr}"
if spark_version == "LatestLTS":
spark_version = w.clusters.select_spark_version(latest=True, long_term_support=True)
else:
spark_version = w.clusters.select_spark_version(version=spark_version)
cluster_spec = create_cluster_spec(
num_workers=num_workers,
node_type_id=node_type_id,
spark_version=spark_version,
driver_node_type_id=driver_node_type_id,
instance_pool_id=instance_pool_id,
driver_instance_pool_id=driver_instance_pool_id,
data_security_mode=data_security_mode,
cluster_name=cluster_name,
autoscale=autoscale,
min_workers=min_workers,
max_workers=max_workers,
spark_conf=spark_conf_dict,
custom_tags=custom_tags_dict,
init_scripts=init_scripts,
spark_env_vars=spark_env_vars,
)
notebook_task = create_notebook_task(
notebookpath=notebookpath,
base_parameters=base_parameters_dict
)
task = create_task(
task_key="notebook_task",
notebook_task=notebook_task,
new_cluster=cluster_spec
)
access_control_list = create_access_control_list(aclstring)
waiter = w.jobs.submit(run_name=run_name,
tasks=[task],
access_control_list=access_control_list
)
logging.info(f'starting to poll: {waiter.run_id}')
run = waiter.result(timeout=datetime.timedelta(minutes=15),
callback=print_status)
logging.info(f'job finished: {run.run_page_url}')
02-07-2025 05:09 AM
Hi @BriGuy,
Can you try importing this module first?
from databricks.sdk.service.jobs import PermissionLevel
02-07-2025 06:46 AM
I've updated the code as follows trying to make sure to use correct enums.
import datetime
import logging
import sys
import csv
from databricks.sdk.service import compute, jobs
from databricks.sdk import WorkspaceClient
# the vars will be parameters for the script
# job settings
aclstring = '''\
usertypename,value,permissionlevel
user_name,brian_hartman@next.co.uk,CAN_MANAGE
group_name,USRS-BusinessIntelligenceSystems,CAN_VIEW
service_principal_name,e28796de-c433-4fd6-85e6-c06b7f9d7e02,CAN_MANAGE
'''
# cluster settings
num_workers = 2
node_type_id = "Standard_E4ds_v5"
spark_version = "LatestLTS"
driver_node_type_id = "Standard_D4ds_v5"
driver_instance_pool_id = None
data_security_mode = 'DATA_SECURITY_MODE_STANDARD'
instance_pool_id = None
autoscale = True
min_workers = 1
max_workers = 2
spark_conf = "spark.databricks.sql.initial.catalog.name: businessintelligencesystems_dev, PYSPARK_PYTHON: /databricks/python3/bin/python3"
custom_tags = "businessarea: BIS, squad: mysquad"
init_scripts = None
spark_env_vars = None
# task settings
notebookpath = "/Workspace/Users/brian_hartman@next.co.uk/jobtestbrh"
base_parameters = "Age: 44, Name: Brian"
counter = 0
# callback, that receives a polled entity between state updates
def print_status(run: jobs.Run):
global counter
counter += 1
if counter % 10 == 0 or counter == 1:
logging.info(f'{run.run_page_url}')
statuses = [f'{t.task_key}: {t.state.life_cycle_state}' for t in run.tasks]
logging.info(f'workflow intermediate status: {", ".join(statuses)}')
# Define a function to create a cluster spec
def create_cluster_spec(
num_workers:int,
node_type_id:str,
spark_version:str,
driver_node_type_id:str=None,
instance_pool_id:str=None,
driver_instance_pool_id:str=None,
data_security_mode:str='DATA_SECURITY_MODE_STANDARD',
cluster_name:str=None,
autoscale:bool=True,
min_workers:int=None,
max_workers:int=None,
spark_conf:dict=None,
custom_tags:dict=None,
init_scripts:list=None,
spark_env_vars:dict=None,
policy_id:str=None,
apply_policy_default_values:bool=False,
azure_attributes:dict=None
):
cluster_spec = compute.ClusterSpec()
cluster_spec.num_workers = num_workers
cluster_spec.node_type_id = node_type_id
cluster_spec.data_security_mode = compute.DataSecurityMode(data_security_mode)
cluster_spec.spark_version = spark_version
if cluster_name:
cluster_spec.cluster_name = cluster_name
if driver_node_type_id:
cluster_spec.driver_node_type_id = driver_node_type_id
if autoscale:
if min_workers is None:
if num_workers > 0:
min_workers = 1
if max_workers is None:
max_workers = num_workers
if max_workers > min_workers:
min_workers = max_workers
cluster_spec.autoscale = compute.AutoScale(min_workers, max_workers)
if spark_conf:
cluster_spec.spark_conf = spark_conf
if custom_tags:
cluster_spec.custom_tags = custom_tags
if init_scripts:
cluster_spec.init_scripts = init_scripts
if spark_env_vars:
cluster_spec.spark_env_vars = spark_env_vars
if instance_pool_id:
cluster_spec.instance_pool_id = instance_pool_id
if driver_instance_pool_id:
cluster_spec.driver_instance_pool_id = driver_instance_pool_id
if policy_id:
cluster_spec.policy_id = policy_id
cluster_spec.apply_policy_default_values = apply_policy_default_values
if azure_attributes:
cluster_spec.azure_attributes = azure_attributes
return cluster_spec
def create_notebook_task(
notebookpath:str,
base_parameters:dict=None,
source:str="WORKSPACE"
):
notebook_task = jobs.NotebookTask(notebook_path=notebookpath)
if base_parameters:
notebook_task.base_parameters = base_parameters
return notebook_task
def create_task(
task_key:str,
notebook_task:jobs.NotebookTask,
new_cluster:compute.ClusterSpec
) -> jobs.SubmitTask:
task = jobs.SubmitTask(task_key=task_key,
notebook_task=notebook_task,
new_cluster=new_cluster)
return task
def create_access_control_list(aclstring:str):
access_controls = []
acls = csv.DictReader(aclstring.splitlines(), delimiter=',')
for acl in acls:
request = jobs.JobAccessControlRequest()
if acl['usertypename'] == 'user_name':
request.user_name = acl['value']
request.permission_level = jobs.JobPermissionLevel[acl['permissionlevel']]
elif acl['usertypename'] == 'group_name':
request.group_name = acl['value']
request.permission_level = jobs.JobPermissionLevel[acl['permissionlevel']]
elif acl['usertypename'] == 'service_principal_name':
request.service_principal_name = acl['value']
request.permission_level = jobs.JobPermissionLevel[acl['permissionlevel']]
else:
raise ValueError(f"Unknown usertypename: {acl['usertypename']}")
exit(1)
access_controls.append(request)
return access_controls
if __name__ == "__main__":
logging.basicConfig(stream=sys.stdout,
level=logging.INFO,
format="%(asctime)s [%(name)s][%(levelname)s] %(message)s",
)
w = WorkspaceClient()
# create time var
now = datetime.datetime.now()
timestr = now.strftime("%Y%m%d-%H%M%S")
# create dictionaries/objects as required
spark_conf_dict = dict(conf.split(": ") for conf in spark_conf.split(", "))
custom_tags_dict = dict(tag.split(": ") for tag in custom_tags.split(", "))
base_parameters_dict = dict(param.split(": ") for param in base_parameters.split(", "))
#names
notebookname = notebookpath.split("/")[-1]
cluster_name = f"jobcluster-{notebookname}-{timestr}"
run_name = f"{notebookname}-{timestr}"
if spark_version == "LatestLTS":
spark_version = w.clusters.select_spark_version(latest=True, long_term_support=True)
else:
spark_version = w.clusters.select_spark_version(version=spark_version)
cluster_spec = create_cluster_spec(
num_workers=num_workers,
node_type_id=node_type_id,
spark_version=spark_version,
driver_node_type_id=driver_node_type_id,
instance_pool_id=instance_pool_id,
driver_instance_pool_id=driver_instance_pool_id,
data_security_mode=data_security_mode,
cluster_name=cluster_name,
autoscale=autoscale,
min_workers=min_workers,
max_workers=max_workers,
spark_conf=spark_conf_dict,
custom_tags=custom_tags_dict,
init_scripts=init_scripts,
spark_env_vars=spark_env_vars,
)
notebook_task = create_notebook_task(
notebookpath=notebookpath,
base_parameters=base_parameters_dict
)
task = create_task(
task_key="notebook_task",
notebook_task=notebook_task,
new_cluster=cluster_spec
)
access_control_list = create_access_control_list(aclstring)
waiter = w.jobs.submit(run_name=run_name,
tasks=[task],
access_control_list=access_control_list
)
logging.info(f'starting to poll: {waiter.run_id}')
run = waiter.result(timeout=datetime.timedelta(minutes=15),
callback=print_status)
logging.info(f'job finished: {run.run_page_url}')
test = jobs.JobAccessControlRequest(user_name="test", permission_level="CAN_MANAGE")
Unfortunately I now get these errors and there isn't much detail to determine the issue.
2025-02-07 14:13:46,145 [databricks.sdk][INFO] loading DEFAULT profile from ~/.databrickscfg: host, client_id, client_secret
Traceback (most recent call last):
File "y:\My Drive\Source\TempDev\ABtoDatabricks\startjob.py", line 224, in <module>
waiter = w.jobs.submit(run_name=run_name,
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\BHARTMAN\AppData\Local\Programs\Python\Python312\Lib\site-packages\databricks\sdk\service\jobs.py", line 8126, in submit
op_response = self._api.do('POST', '/api/2.1/jobs/runs/submit', body=body, headers=headers)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\BHARTMAN\AppData\Local\Programs\Python\Python312\Lib\site-packages\databricks\sdk\core.py", line 77, in do
return self._api_client.do(method=method,
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\BHARTMAN\AppData\Local\Programs\Python\Python312\Lib\site-packages\databricks\sdk\_base_client.py", line 186, in do
response = call(method,
^^^^^^^^^^^^
File "C:\Users\BHARTMAN\AppData\Local\Programs\Python\Python312\Lib\site-packages\databricks\sdk\retries.py", line 55, in wrapper
raise err
File "C:\Users\BHARTMAN\AppData\Local\Programs\Python\Python312\Lib\site-packages\databricks\sdk\retries.py", line 34, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\BHARTMAN\AppData\Local\Programs\Python\Python312\Lib\site-packages\databricks\sdk\_base_client.py", line 278, in _perform
raise error from None
databricks.sdk.errors.platform.InternalError: None
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group