cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

create a one off job run using databricks SDK.

BriGuy
New Contributor II

I'm trying to build the job spec using objects.  When I try to call execute the job I get the following error.

I'm somewhat new to python and not sure what I'm doing wrong here.  Is anyone able to help?

Traceback (most recent call last):
  File "y:\My Drive\Source\TempDev\ABtoDatabricks\startjob.py", line 220, in <module>
    waiter = w.jobs.submit(run_name=run_name,
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\BHARTMAN\AppData\Local\Programs\Python\Python312\Lib\site-packages\databricks\sdk\service\jobs.py", line 8110, in submit
    body['access_control_list'] = [v.as_dict() for v in access_control_list]
                                   ^^^^^^^^^^^
  File "C:\Users\BHARTMAN\AppData\Local\Programs\Python\Python312\Lib\site-packages\databricks\sdk\service\jobs.py", line 1844, in as_dict
    if self.permission_level is not None: body['permission_level'] = self.permission_level.value
                                                                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'str' object has no attribute 'value'

 

 

 

import datetime
import logging
import sys
import csv

from databricks.sdk.service import compute, jobs
from databricks.sdk import WorkspaceClient


# the vars will be parameters for the script
# job settings

aclstring = '''\
usertypename,value,permissionlevel
user_name,brian_hartman@next.co.uk,CAN_MANAGE
'''

# cluster settings
num_workers = 2
node_type_id = "Standard_E4ds_v5"
spark_version = "LatestLTS"
driver_node_type_id = "Standard_D4ds_v5"
driver_instance_pool_id = None
data_security_mode = 'DATA_SECURITY_MODE_STANDARD'
instance_pool_id = None
autoscale = True
min_workers = 1
max_workers = 2
spark_conf = "spark.databricks.sql.initial.catalog.name: businessintelligencesystems_dev, PYSPARK_PYTHON: /databricks/python3/bin/python3"
custom_tags = "businessarea: BIS, squad: mysquad"
init_scripts = None
spark_env_vars = None

# task settings
notebookpath = "/Workspace/Users/brian_hartman@next.co.uk/jobtestbrh"
base_parameters = "Age: 44, Name: Brian"

counter = 0
# callback, that receives a polled entity between state updates
def print_status(run: jobs.Run):
    global counter
    counter += 1
    if counter % 10 == 0 or counter == 1:
        logging.info(f'{run.run_page_url}')
    statuses = [f'{t.task_key}: {t.state.life_cycle_state}' for t in run.tasks]
    logging.info(f'workflow intermediate status: {", ".join(statuses)}')



# Define a function to create a cluster spec
def create_cluster_spec(
        num_workers:int,
        node_type_id:str,
        spark_version:str,
        driver_node_type_id:str=None,
        instance_pool_id:str=None,
        driver_instance_pool_id:str=None,
        data_security_mode:str='DATA_SECURITY_MODE_STANDARD',
        cluster_name:str=None,
        autoscale:bool=True,
        min_workers:int=None,
        max_workers:int=None,
        spark_conf:dict=None,
        custom_tags:dict=None,
        init_scripts:list=None,
        spark_env_vars:dict=None,
        policy_id:str=None,
        apply_policy_default_values:bool=False,
        azure_attributes:dict=None
        ):
    
    cluster_spec = compute.ClusterSpec()

    cluster_spec.num_workers = num_workers
    cluster_spec.node_type_id = node_type_id
    cluster_spec.data_security_mode = data_security_mode
    cluster_spec.spark_version = spark_version

    if cluster_name:
        cluster_spec.cluster_name = cluster_name
    if driver_node_type_id:
        cluster_spec.driver_node_type_id = driver_node_type_id
    if autoscale:
        if min_workers is None:
            if num_workers > 0:
                min_workers = 1
        if max_workers is None:
            max_workers = num_workers
        if max_workers > min_workers:
            min_workers = max_workers
        cluster_spec.autoscale = compute.AutoScale(min_workers, max_workers)
    if spark_conf:
        cluster_spec.spark_conf = spark_conf
    if custom_tags:
        cluster_spec.custom_tags = custom_tags
    if init_scripts:
        cluster_spec.init_scripts = init_scripts
    if spark_env_vars:
        cluster_spec.spark_env_vars = spark_env_vars
    if instance_pool_id:
        cluster_spec.instance_pool_id = instance_pool_id
    if driver_instance_pool_id:
        cluster_spec.driver_instance_pool_id = driver_instance_pool_id
    if policy_id:
        cluster_spec.policy_id = policy_id
        cluster_spec.apply_policy_default_values = apply_policy_default_values
    if azure_attributes:
        cluster_spec.azure_attributes = azure_attributes

    return cluster_spec

def create_notebook_task(
        notebookpath:str,
        base_parameters:dict=None,
        source:str="WORKSPACE"
        ):
    
    notebook_task = jobs.NotebookTask(notebook_path=notebookpath)
    if base_parameters:
        notebook_task.base_parameters = base_parameters
    notebook_task.source = source

    return notebook_task

def create_task(
        task_key:str,
        notebook_task:jobs.NotebookTask,
        new_cluster:compute.ClusterSpec
        ):
    
    task = jobs.Task(task_key=task_key,
                     notebook_task=notebook_task,
                     new_cluster=new_cluster)
    
    return task

def create_access_control_list(aclstring:str):
    access_controls = []
    
    acls = csv.DictReader(aclstring.splitlines(), delimiter=',')
    for acl in acls:
        if acl['usertypename'] == 'user_name':
            access_controls.append(jobs.JobAccessControlRequest(user_name=acl['value'], permission_level=acl['permissionlevel']))
        elif acl['usertypename'] == 'group_name':
            access_controls.append(jobs.JobAccessControlRequest(group_name=acl['value'], permission_level=acl['permissionlevel']))
        elif acl['usertypename'] == 'service_principal_name':
            access_controls.append(jobs.JobAccessControlRequest(service_principal_name=acl['value'], permission_level=acl['permissionlevel']))
        else:
            raise ValueError(f"Unknown usertypename: {acl['usertypename']}")
            exit(1)
    return access_controls



if __name__ == "__main__":
    logging.basicConfig(stream=sys.stdout,
                        level=logging.INFO,
                        format="%(asctime)s [%(name)s][%(levelname)s] %(message)s",
                        )

    w = WorkspaceClient()

    # create time var
    now = datetime.datetime.now()
    timestr = now.strftime("%Y%m%d-%H%M%S")
    # create dictionaries/objects as required
    spark_conf_dict = dict(conf.split(": ") for conf in spark_conf.split(", "))
    custom_tags_dict = dict(tag.split(": ") for tag in custom_tags.split(", "))
    base_parameters_dict = dict(param.split(": ") for param in base_parameters.split(", "))

    #names
    notebookname = notebookpath.split("/")[-1]
    cluster_name = f"jobcluster-{notebookname}-{timestr}"
    run_name = f"{notebookname}-{timestr}"
    

    if spark_version == "LatestLTS":
        spark_version = w.clusters.select_spark_version(latest=True, long_term_support=True)
    else:
        spark_version = w.clusters.select_spark_version(version=spark_version)
       

    cluster_spec = create_cluster_spec(
        num_workers=num_workers,
        node_type_id=node_type_id,
        spark_version=spark_version,
        driver_node_type_id=driver_node_type_id,
        instance_pool_id=instance_pool_id,
        driver_instance_pool_id=driver_instance_pool_id,
        data_security_mode=data_security_mode,
        cluster_name=cluster_name,
        autoscale=autoscale,
        min_workers=min_workers,
        max_workers=max_workers,
        spark_conf=spark_conf_dict,
        custom_tags=custom_tags_dict,
        init_scripts=init_scripts,
        spark_env_vars=spark_env_vars,
        )
    
    notebook_task = create_notebook_task(
        notebookpath=notebookpath,
        base_parameters=base_parameters_dict
        )
    
    task = create_task(
        task_key="notebook_task",
        notebook_task=notebook_task,
        new_cluster=cluster_spec
        )

    access_control_list = create_access_control_list(aclstring)
    
    

    waiter = w.jobs.submit(run_name=run_name,
                           tasks=[task],
                           access_control_list=access_control_list
                           )
    
    logging.info(f'starting to poll: {waiter.run_id}')

    run = waiter.result(timeout=datetime.timedelta(minutes=15),
                    callback=print_status)
    logging.info(f'job finished: {run.run_page_url}')

 

2 REPLIES 2

Alberto_Umana
Databricks Employee
Databricks Employee

Hi @BriGuy,

Can you try importing this module first?

from databricks.sdk.service.jobs import PermissionLevel

I've updated the code as follows trying to make sure to use correct enums.

import datetime
import logging
import sys
import csv

from databricks.sdk.service import compute, jobs
from databricks.sdk import WorkspaceClient


# the vars will be parameters for the script
# job settings

aclstring = '''\
usertypename,value,permissionlevel
user_name,brian_hartman@next.co.uk,CAN_MANAGE
group_name,USRS-BusinessIntelligenceSystems,CAN_VIEW
service_principal_name,e28796de-c433-4fd6-85e6-c06b7f9d7e02,CAN_MANAGE
'''

# cluster settings
num_workers = 2
node_type_id = "Standard_E4ds_v5"
spark_version = "LatestLTS"
driver_node_type_id = "Standard_D4ds_v5"
driver_instance_pool_id = None
data_security_mode = 'DATA_SECURITY_MODE_STANDARD'
instance_pool_id = None
autoscale = True
min_workers = 1
max_workers = 2
spark_conf = "spark.databricks.sql.initial.catalog.name: businessintelligencesystems_dev, PYSPARK_PYTHON: /databricks/python3/bin/python3"
custom_tags = "businessarea: BIS, squad: mysquad"
init_scripts = None
spark_env_vars = None

# task settings
notebookpath = "/Workspace/Users/brian_hartman@next.co.uk/jobtestbrh"
base_parameters = "Age: 44, Name: Brian"

counter = 0
# callback, that receives a polled entity between state updates
def print_status(run: jobs.Run):
    global counter
    counter += 1
    if counter % 10 == 0 or counter == 1:
        logging.info(f'{run.run_page_url}')
    statuses = [f'{t.task_key}: {t.state.life_cycle_state}' for t in run.tasks]
    logging.info(f'workflow intermediate status: {", ".join(statuses)}')



# Define a function to create a cluster spec
def create_cluster_spec(
        num_workers:int,
        node_type_id:str,
        spark_version:str,
        driver_node_type_id:str=None,
        instance_pool_id:str=None,
        driver_instance_pool_id:str=None,
        data_security_mode:str='DATA_SECURITY_MODE_STANDARD',
        cluster_name:str=None,
        autoscale:bool=True,
        min_workers:int=None,
        max_workers:int=None,
        spark_conf:dict=None,
        custom_tags:dict=None,
        init_scripts:list=None,
        spark_env_vars:dict=None,
        policy_id:str=None,
        apply_policy_default_values:bool=False,
        azure_attributes:dict=None
        ):
    
    cluster_spec = compute.ClusterSpec()

    cluster_spec.num_workers = num_workers
    cluster_spec.node_type_id = node_type_id
    cluster_spec.data_security_mode = compute.DataSecurityMode(data_security_mode)
    cluster_spec.spark_version = spark_version

    if cluster_name:
        cluster_spec.cluster_name = cluster_name
    if driver_node_type_id:
        cluster_spec.driver_node_type_id = driver_node_type_id
    if autoscale:
        if min_workers is None:
            if num_workers > 0:
                min_workers = 1
        if max_workers is None:
            max_workers = num_workers
        if max_workers > min_workers:
            min_workers = max_workers
        cluster_spec.autoscale = compute.AutoScale(min_workers, max_workers)
    if spark_conf:
        cluster_spec.spark_conf = spark_conf
    if custom_tags:
        cluster_spec.custom_tags = custom_tags
    if init_scripts:
        cluster_spec.init_scripts = init_scripts
    if spark_env_vars:
        cluster_spec.spark_env_vars = spark_env_vars
    if instance_pool_id:
        cluster_spec.instance_pool_id = instance_pool_id
    if driver_instance_pool_id:
        cluster_spec.driver_instance_pool_id = driver_instance_pool_id
    if policy_id:
        cluster_spec.policy_id = policy_id
        cluster_spec.apply_policy_default_values = apply_policy_default_values
    if azure_attributes:
        cluster_spec.azure_attributes = azure_attributes

    return cluster_spec

def create_notebook_task(
        notebookpath:str,
        base_parameters:dict=None,
        source:str="WORKSPACE"
        ):
    
    notebook_task = jobs.NotebookTask(notebook_path=notebookpath)
    if base_parameters:
        notebook_task.base_parameters = base_parameters

    return notebook_task

def create_task(
        task_key:str,
        notebook_task:jobs.NotebookTask,
        new_cluster:compute.ClusterSpec
        ) -> jobs.SubmitTask:
    
    task = jobs.SubmitTask(task_key=task_key,
                     notebook_task=notebook_task,
                     new_cluster=new_cluster)
    
    return task

def create_access_control_list(aclstring:str):
    access_controls = []
    
    acls = csv.DictReader(aclstring.splitlines(), delimiter=',')
    for acl in acls:
        request = jobs.JobAccessControlRequest()
        if acl['usertypename'] == 'user_name':
            request.user_name = acl['value']
            request.permission_level = jobs.JobPermissionLevel[acl['permissionlevel']]            
        elif acl['usertypename'] == 'group_name':
            request.group_name = acl['value']
            request.permission_level = jobs.JobPermissionLevel[acl['permissionlevel']]
        elif acl['usertypename'] == 'service_principal_name':
            request.service_principal_name = acl['value']
            request.permission_level = jobs.JobPermissionLevel[acl['permissionlevel']]
        else:
            raise ValueError(f"Unknown usertypename: {acl['usertypename']}")
            exit(1)
        access_controls.append(request)
    return access_controls



if __name__ == "__main__":
    logging.basicConfig(stream=sys.stdout,
                        level=logging.INFO,
                        format="%(asctime)s [%(name)s][%(levelname)s] %(message)s",
                        )

    w = WorkspaceClient()

    # create time var
    now = datetime.datetime.now()
    timestr = now.strftime("%Y%m%d-%H%M%S")
    # create dictionaries/objects as required
    spark_conf_dict = dict(conf.split(": ") for conf in spark_conf.split(", "))
    custom_tags_dict = dict(tag.split(": ") for tag in custom_tags.split(", "))
    base_parameters_dict = dict(param.split(": ") for param in base_parameters.split(", "))

    #names
    notebookname = notebookpath.split("/")[-1]
    cluster_name = f"jobcluster-{notebookname}-{timestr}"
    run_name = f"{notebookname}-{timestr}"
    

    if spark_version == "LatestLTS":
        spark_version = w.clusters.select_spark_version(latest=True, long_term_support=True)
    else:
        spark_version = w.clusters.select_spark_version(version=spark_version)
       

    cluster_spec = create_cluster_spec(
        num_workers=num_workers,
        node_type_id=node_type_id,
        spark_version=spark_version,
        driver_node_type_id=driver_node_type_id,
        instance_pool_id=instance_pool_id,
        driver_instance_pool_id=driver_instance_pool_id,
        data_security_mode=data_security_mode,
        cluster_name=cluster_name,
        autoscale=autoscale,
        min_workers=min_workers,
        max_workers=max_workers,
        spark_conf=spark_conf_dict,
        custom_tags=custom_tags_dict,
        init_scripts=init_scripts,
        spark_env_vars=spark_env_vars,
        )
    
    notebook_task = create_notebook_task(
        notebookpath=notebookpath,
        base_parameters=base_parameters_dict
        )
    
    task = create_task(
        task_key="notebook_task",
        notebook_task=notebook_task,
        new_cluster=cluster_spec
        )

    access_control_list = create_access_control_list(aclstring)
    
    

    waiter = w.jobs.submit(run_name=run_name,
                           tasks=[task],
                           access_control_list=access_control_list
                           )
    
    logging.info(f'starting to poll: {waiter.run_id}')

    run = waiter.result(timeout=datetime.timedelta(minutes=15),
                    callback=print_status)
    logging.info(f'job finished: {run.run_page_url}')


    test = jobs.JobAccessControlRequest(user_name="test", permission_level="CAN_MANAGE")


    

Unfortunately I now get these errors and there isn't much detail to determine the issue.

 

2025-02-07 14:13:46,145 [databricks.sdk][INFO] loading DEFAULT profile from ~/.databrickscfg: host, client_id, client_secret
Traceback (most recent call last):
  File "y:\My Drive\Source\TempDev\ABtoDatabricks\startjob.py", line 224, in <module>
    waiter = w.jobs.submit(run_name=run_name,
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\BHARTMAN\AppData\Local\Programs\Python\Python312\Lib\site-packages\databricks\sdk\service\jobs.py", line 8126, in submit
    op_response = self._api.do('POST', '/api/2.1/jobs/runs/submit', body=body, headers=headers)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\BHARTMAN\AppData\Local\Programs\Python\Python312\Lib\site-packages\databricks\sdk\core.py", line 77, in do
    return self._api_client.do(method=method,
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\BHARTMAN\AppData\Local\Programs\Python\Python312\Lib\site-packages\databricks\sdk\_base_client.py", line 186, in do
    response = call(method,
               ^^^^^^^^^^^^
  File "C:\Users\BHARTMAN\AppData\Local\Programs\Python\Python312\Lib\site-packages\databricks\sdk\retries.py", line 55, in wrapper
    raise err
  File "C:\Users\BHARTMAN\AppData\Local\Programs\Python\Python312\Lib\site-packages\databricks\sdk\retries.py", line 34, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\BHARTMAN\AppData\Local\Programs\Python\Python312\Lib\site-packages\databricks\sdk\_base_client.py", line 278, in _perform
    raise error from None
databricks.sdk.errors.platform.InternalError: None

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group