<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: create a one off job run using databricks SDK. in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/create-a-one-off-job-run-using-databricks-sdk/m-p/109398#M43299</link>
    <description>&lt;P&gt;Hi&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/93881"&gt;@BriGuy&lt;/a&gt;,&lt;/P&gt;
&lt;P&gt;Can you try importing this module first?&lt;/P&gt;
&lt;P&gt;from databricks.sdk.service.jobs import PermissionLevel&lt;/P&gt;</description>
    <pubDate>Fri, 07 Feb 2025 13:09:45 GMT</pubDate>
    <dc:creator>Alberto_Umana</dc:creator>
    <dc:date>2025-02-07T13:09:45Z</dc:date>
    <item>
      <title>create a one off job run using databricks SDK.</title>
      <link>https://community.databricks.com/t5/data-engineering/create-a-one-off-job-run-using-databricks-sdk/m-p/109372#M43291</link>
      <description>&lt;P&gt;I'm trying to build the job spec using objects.&amp;nbsp; When I try to call execute the job I get the following error.&lt;/P&gt;&lt;P&gt;I'm somewhat new to python and not sure what I'm doing wrong here.&amp;nbsp; Is anyone able to help?&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;Traceback (most recent call last):
  File "y:\My Drive\Source\TempDev\ABtoDatabricks\startjob.py", line 220, in &amp;lt;module&amp;gt;
    waiter = w.jobs.submit(run_name=run_name,
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\BHARTMAN\AppData\Local\Programs\Python\Python312\Lib\site-packages\databricks\sdk\service\jobs.py", line 8110, in submit
    body['access_control_list'] = [v.as_dict() for v in access_control_list]
                                   ^^^^^^^^^^^
  File "C:\Users\BHARTMAN\AppData\Local\Programs\Python\Python312\Lib\site-packages\databricks\sdk\service\jobs.py", line 1844, in as_dict
    if self.permission_level is not None: body['permission_level'] = self.permission_level.value
                                                                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'str' object has no attribute 'value'&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;import datetime
import logging
import sys
import csv

from databricks.sdk.service import compute, jobs
from databricks.sdk import WorkspaceClient


# the vars will be parameters for the script
# job settings

aclstring = '''\
usertypename,value,permissionlevel
user_name,brian_hartman@next.co.uk,CAN_MANAGE
'''

# cluster settings
num_workers = 2
node_type_id = "Standard_E4ds_v5"
spark_version = "LatestLTS"
driver_node_type_id = "Standard_D4ds_v5"
driver_instance_pool_id = None
data_security_mode = 'DATA_SECURITY_MODE_STANDARD'
instance_pool_id = None
autoscale = True
min_workers = 1
max_workers = 2
spark_conf = "spark.databricks.sql.initial.catalog.name: businessintelligencesystems_dev, PYSPARK_PYTHON: /databricks/python3/bin/python3"
custom_tags = "businessarea: BIS, squad: mysquad"
init_scripts = None
spark_env_vars = None

# task settings
notebookpath = "/Workspace/Users/brian_hartman@next.co.uk/jobtestbrh"
base_parameters = "Age: 44, Name: Brian"

counter = 0
# callback, that receives a polled entity between state updates
def print_status(run: jobs.Run):
    global counter
    counter += 1
    if counter % 10 == 0 or counter == 1:
        logging.info(f'{run.run_page_url}')
    statuses = [f'{t.task_key}: {t.state.life_cycle_state}' for t in run.tasks]
    logging.info(f'workflow intermediate status: {", ".join(statuses)}')



# Define a function to create a cluster spec
def create_cluster_spec(
        num_workers:int,
        node_type_id:str,
        spark_version:str,
        driver_node_type_id:str=None,
        instance_pool_id:str=None,
        driver_instance_pool_id:str=None,
        data_security_mode:str='DATA_SECURITY_MODE_STANDARD',
        cluster_name:str=None,
        autoscale:bool=True,
        min_workers:int=None,
        max_workers:int=None,
        spark_conf:dict=None,
        custom_tags:dict=None,
        init_scripts:list=None,
        spark_env_vars:dict=None,
        policy_id:str=None,
        apply_policy_default_values:bool=False,
        azure_attributes:dict=None
        ):
    
    cluster_spec = compute.ClusterSpec()

    cluster_spec.num_workers = num_workers
    cluster_spec.node_type_id = node_type_id
    cluster_spec.data_security_mode = data_security_mode
    cluster_spec.spark_version = spark_version

    if cluster_name:
        cluster_spec.cluster_name = cluster_name
    if driver_node_type_id:
        cluster_spec.driver_node_type_id = driver_node_type_id
    if autoscale:
        if min_workers is None:
            if num_workers &amp;gt; 0:
                min_workers = 1
        if max_workers is None:
            max_workers = num_workers
        if max_workers &amp;gt; min_workers:
            min_workers = max_workers
        cluster_spec.autoscale = compute.AutoScale(min_workers, max_workers)
    if spark_conf:
        cluster_spec.spark_conf = spark_conf
    if custom_tags:
        cluster_spec.custom_tags = custom_tags
    if init_scripts:
        cluster_spec.init_scripts = init_scripts
    if spark_env_vars:
        cluster_spec.spark_env_vars = spark_env_vars
    if instance_pool_id:
        cluster_spec.instance_pool_id = instance_pool_id
    if driver_instance_pool_id:
        cluster_spec.driver_instance_pool_id = driver_instance_pool_id
    if policy_id:
        cluster_spec.policy_id = policy_id
        cluster_spec.apply_policy_default_values = apply_policy_default_values
    if azure_attributes:
        cluster_spec.azure_attributes = azure_attributes

    return cluster_spec

def create_notebook_task(
        notebookpath:str,
        base_parameters:dict=None,
        source:str="WORKSPACE"
        ):
    
    notebook_task = jobs.NotebookTask(notebook_path=notebookpath)
    if base_parameters:
        notebook_task.base_parameters = base_parameters
    notebook_task.source = source

    return notebook_task

def create_task(
        task_key:str,
        notebook_task:jobs.NotebookTask,
        new_cluster:compute.ClusterSpec
        ):
    
    task = jobs.Task(task_key=task_key,
                     notebook_task=notebook_task,
                     new_cluster=new_cluster)
    
    return task

def create_access_control_list(aclstring:str):
    access_controls = []
    
    acls = csv.DictReader(aclstring.splitlines(), delimiter=',')
    for acl in acls:
        if acl['usertypename'] == 'user_name':
            access_controls.append(jobs.JobAccessControlRequest(user_name=acl['value'], permission_level=acl['permissionlevel']))
        elif acl['usertypename'] == 'group_name':
            access_controls.append(jobs.JobAccessControlRequest(group_name=acl['value'], permission_level=acl['permissionlevel']))
        elif acl['usertypename'] == 'service_principal_name':
            access_controls.append(jobs.JobAccessControlRequest(service_principal_name=acl['value'], permission_level=acl['permissionlevel']))
        else:
            raise ValueError(f"Unknown usertypename: {acl['usertypename']}")
            exit(1)
    return access_controls



if __name__ == "__main__":
    logging.basicConfig(stream=sys.stdout,
                        level=logging.INFO,
                        format="%(asctime)s [%(name)s][%(levelname)s] %(message)s",
                        )

    w = WorkspaceClient()

    # create time var
    now = datetime.datetime.now()
    timestr = now.strftime("%Y%m%d-%H%M%S")
    # create dictionaries/objects as required
    spark_conf_dict = dict(conf.split(": ") for conf in spark_conf.split(", "))
    custom_tags_dict = dict(tag.split(": ") for tag in custom_tags.split(", "))
    base_parameters_dict = dict(param.split(": ") for param in base_parameters.split(", "))

    #names
    notebookname = notebookpath.split("/")[-1]
    cluster_name = f"jobcluster-{notebookname}-{timestr}"
    run_name = f"{notebookname}-{timestr}"
    

    if spark_version == "LatestLTS":
        spark_version = w.clusters.select_spark_version(latest=True, long_term_support=True)
    else:
        spark_version = w.clusters.select_spark_version(version=spark_version)
       

    cluster_spec = create_cluster_spec(
        num_workers=num_workers,
        node_type_id=node_type_id,
        spark_version=spark_version,
        driver_node_type_id=driver_node_type_id,
        instance_pool_id=instance_pool_id,
        driver_instance_pool_id=driver_instance_pool_id,
        data_security_mode=data_security_mode,
        cluster_name=cluster_name,
        autoscale=autoscale,
        min_workers=min_workers,
        max_workers=max_workers,
        spark_conf=spark_conf_dict,
        custom_tags=custom_tags_dict,
        init_scripts=init_scripts,
        spark_env_vars=spark_env_vars,
        )
    
    notebook_task = create_notebook_task(
        notebookpath=notebookpath,
        base_parameters=base_parameters_dict
        )
    
    task = create_task(
        task_key="notebook_task",
        notebook_task=notebook_task,
        new_cluster=cluster_spec
        )

    access_control_list = create_access_control_list(aclstring)
    
    

    waiter = w.jobs.submit(run_name=run_name,
                           tasks=[task],
                           access_control_list=access_control_list
                           )
    
    logging.info(f'starting to poll: {waiter.run_id}')

    run = waiter.result(timeout=datetime.timedelta(minutes=15),
                    callback=print_status)
    logging.info(f'job finished: {run.run_page_url}')&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Fri, 07 Feb 2025 09:44:07 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/create-a-one-off-job-run-using-databricks-sdk/m-p/109372#M43291</guid>
      <dc:creator>BriGuy</dc:creator>
      <dc:date>2025-02-07T09:44:07Z</dc:date>
    </item>
    <item>
      <title>Re: create a one off job run using databricks SDK.</title>
      <link>https://community.databricks.com/t5/data-engineering/create-a-one-off-job-run-using-databricks-sdk/m-p/109398#M43299</link>
      <description>&lt;P&gt;Hi&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/93881"&gt;@BriGuy&lt;/a&gt;,&lt;/P&gt;
&lt;P&gt;Can you try importing this module first?&lt;/P&gt;
&lt;P&gt;from databricks.sdk.service.jobs import PermissionLevel&lt;/P&gt;</description>
      <pubDate>Fri, 07 Feb 2025 13:09:45 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/create-a-one-off-job-run-using-databricks-sdk/m-p/109398#M43299</guid>
      <dc:creator>Alberto_Umana</dc:creator>
      <dc:date>2025-02-07T13:09:45Z</dc:date>
    </item>
    <item>
      <title>Re: create a one off job run using databricks SDK.</title>
      <link>https://community.databricks.com/t5/data-engineering/create-a-one-off-job-run-using-databricks-sdk/m-p/109416#M43311</link>
      <description>&lt;P&gt;I've updated the code as follows trying to make sure to use correct enums.&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;import datetime
import logging
import sys
import csv

from databricks.sdk.service import compute, jobs
from databricks.sdk import WorkspaceClient


# the vars will be parameters for the script
# job settings

aclstring = '''\
usertypename,value,permissionlevel
user_name,brian_hartman@next.co.uk,CAN_MANAGE
group_name,USRS-BusinessIntelligenceSystems,CAN_VIEW
service_principal_name,e28796de-c433-4fd6-85e6-c06b7f9d7e02,CAN_MANAGE
'''

# cluster settings
num_workers = 2
node_type_id = "Standard_E4ds_v5"
spark_version = "LatestLTS"
driver_node_type_id = "Standard_D4ds_v5"
driver_instance_pool_id = None
data_security_mode = 'DATA_SECURITY_MODE_STANDARD'
instance_pool_id = None
autoscale = True
min_workers = 1
max_workers = 2
spark_conf = "spark.databricks.sql.initial.catalog.name: businessintelligencesystems_dev, PYSPARK_PYTHON: /databricks/python3/bin/python3"
custom_tags = "businessarea: BIS, squad: mysquad"
init_scripts = None
spark_env_vars = None

# task settings
notebookpath = "/Workspace/Users/brian_hartman@next.co.uk/jobtestbrh"
base_parameters = "Age: 44, Name: Brian"

counter = 0
# callback, that receives a polled entity between state updates
def print_status(run: jobs.Run):
    global counter
    counter += 1
    if counter % 10 == 0 or counter == 1:
        logging.info(f'{run.run_page_url}')
    statuses = [f'{t.task_key}: {t.state.life_cycle_state}' for t in run.tasks]
    logging.info(f'workflow intermediate status: {", ".join(statuses)}')



# Define a function to create a cluster spec
def create_cluster_spec(
        num_workers:int,
        node_type_id:str,
        spark_version:str,
        driver_node_type_id:str=None,
        instance_pool_id:str=None,
        driver_instance_pool_id:str=None,
        data_security_mode:str='DATA_SECURITY_MODE_STANDARD',
        cluster_name:str=None,
        autoscale:bool=True,
        min_workers:int=None,
        max_workers:int=None,
        spark_conf:dict=None,
        custom_tags:dict=None,
        init_scripts:list=None,
        spark_env_vars:dict=None,
        policy_id:str=None,
        apply_policy_default_values:bool=False,
        azure_attributes:dict=None
        ):
    
    cluster_spec = compute.ClusterSpec()

    cluster_spec.num_workers = num_workers
    cluster_spec.node_type_id = node_type_id
    cluster_spec.data_security_mode = compute.DataSecurityMode(data_security_mode)
    cluster_spec.spark_version = spark_version

    if cluster_name:
        cluster_spec.cluster_name = cluster_name
    if driver_node_type_id:
        cluster_spec.driver_node_type_id = driver_node_type_id
    if autoscale:
        if min_workers is None:
            if num_workers &amp;gt; 0:
                min_workers = 1
        if max_workers is None:
            max_workers = num_workers
        if max_workers &amp;gt; min_workers:
            min_workers = max_workers
        cluster_spec.autoscale = compute.AutoScale(min_workers, max_workers)
    if spark_conf:
        cluster_spec.spark_conf = spark_conf
    if custom_tags:
        cluster_spec.custom_tags = custom_tags
    if init_scripts:
        cluster_spec.init_scripts = init_scripts
    if spark_env_vars:
        cluster_spec.spark_env_vars = spark_env_vars
    if instance_pool_id:
        cluster_spec.instance_pool_id = instance_pool_id
    if driver_instance_pool_id:
        cluster_spec.driver_instance_pool_id = driver_instance_pool_id
    if policy_id:
        cluster_spec.policy_id = policy_id
        cluster_spec.apply_policy_default_values = apply_policy_default_values
    if azure_attributes:
        cluster_spec.azure_attributes = azure_attributes

    return cluster_spec

def create_notebook_task(
        notebookpath:str,
        base_parameters:dict=None,
        source:str="WORKSPACE"
        ):
    
    notebook_task = jobs.NotebookTask(notebook_path=notebookpath)
    if base_parameters:
        notebook_task.base_parameters = base_parameters

    return notebook_task

def create_task(
        task_key:str,
        notebook_task:jobs.NotebookTask,
        new_cluster:compute.ClusterSpec
        ) -&amp;gt; jobs.SubmitTask:
    
    task = jobs.SubmitTask(task_key=task_key,
                     notebook_task=notebook_task,
                     new_cluster=new_cluster)
    
    return task

def create_access_control_list(aclstring:str):
    access_controls = []
    
    acls = csv.DictReader(aclstring.splitlines(), delimiter=',')
    for acl in acls:
        request = jobs.JobAccessControlRequest()
        if acl['usertypename'] == 'user_name':
            request.user_name = acl['value']
            request.permission_level = jobs.JobPermissionLevel[acl['permissionlevel']]            
        elif acl['usertypename'] == 'group_name':
            request.group_name = acl['value']
            request.permission_level = jobs.JobPermissionLevel[acl['permissionlevel']]
        elif acl['usertypename'] == 'service_principal_name':
            request.service_principal_name = acl['value']
            request.permission_level = jobs.JobPermissionLevel[acl['permissionlevel']]
        else:
            raise ValueError(f"Unknown usertypename: {acl['usertypename']}")
            exit(1)
        access_controls.append(request)
    return access_controls



if __name__ == "__main__":
    logging.basicConfig(stream=sys.stdout,
                        level=logging.INFO,
                        format="%(asctime)s [%(name)s][%(levelname)s] %(message)s",
                        )

    w = WorkspaceClient()

    # create time var
    now = datetime.datetime.now()
    timestr = now.strftime("%Y%m%d-%H%M%S")
    # create dictionaries/objects as required
    spark_conf_dict = dict(conf.split(": ") for conf in spark_conf.split(", "))
    custom_tags_dict = dict(tag.split(": ") for tag in custom_tags.split(", "))
    base_parameters_dict = dict(param.split(": ") for param in base_parameters.split(", "))

    #names
    notebookname = notebookpath.split("/")[-1]
    cluster_name = f"jobcluster-{notebookname}-{timestr}"
    run_name = f"{notebookname}-{timestr}"
    

    if spark_version == "LatestLTS":
        spark_version = w.clusters.select_spark_version(latest=True, long_term_support=True)
    else:
        spark_version = w.clusters.select_spark_version(version=spark_version)
       

    cluster_spec = create_cluster_spec(
        num_workers=num_workers,
        node_type_id=node_type_id,
        spark_version=spark_version,
        driver_node_type_id=driver_node_type_id,
        instance_pool_id=instance_pool_id,
        driver_instance_pool_id=driver_instance_pool_id,
        data_security_mode=data_security_mode,
        cluster_name=cluster_name,
        autoscale=autoscale,
        min_workers=min_workers,
        max_workers=max_workers,
        spark_conf=spark_conf_dict,
        custom_tags=custom_tags_dict,
        init_scripts=init_scripts,
        spark_env_vars=spark_env_vars,
        )
    
    notebook_task = create_notebook_task(
        notebookpath=notebookpath,
        base_parameters=base_parameters_dict
        )
    
    task = create_task(
        task_key="notebook_task",
        notebook_task=notebook_task,
        new_cluster=cluster_spec
        )

    access_control_list = create_access_control_list(aclstring)
    
    

    waiter = w.jobs.submit(run_name=run_name,
                           tasks=[task],
                           access_control_list=access_control_list
                           )
    
    logging.info(f'starting to poll: {waiter.run_id}')

    run = waiter.result(timeout=datetime.timedelta(minutes=15),
                    callback=print_status)
    logging.info(f'job finished: {run.run_page_url}')


    test = jobs.JobAccessControlRequest(user_name="test", permission_level="CAN_MANAGE")


    &lt;/LI-CODE&gt;&lt;P&gt;Unfortunately I now get these errors and there isn't much detail to determine the issue.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;2025-02-07 14:13:46,145 [databricks.sdk][INFO] loading DEFAULT profile from ~/.databrickscfg: host, client_id, client_secret
Traceback (most recent call last):
  File "y:\My Drive\Source\TempDev\ABtoDatabricks\startjob.py", line 224, in &amp;lt;module&amp;gt;
    waiter = w.jobs.submit(run_name=run_name,
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\BHARTMAN\AppData\Local\Programs\Python\Python312\Lib\site-packages\databricks\sdk\service\jobs.py", line 8126, in submit
    op_response = self._api.do('POST', '/api/2.1/jobs/runs/submit', body=body, headers=headers)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\BHARTMAN\AppData\Local\Programs\Python\Python312\Lib\site-packages\databricks\sdk\core.py", line 77, in do
    return self._api_client.do(method=method,
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\BHARTMAN\AppData\Local\Programs\Python\Python312\Lib\site-packages\databricks\sdk\_base_client.py", line 186, in do
    response = call(method,
               ^^^^^^^^^^^^
  File "C:\Users\BHARTMAN\AppData\Local\Programs\Python\Python312\Lib\site-packages\databricks\sdk\retries.py", line 55, in wrapper
    raise err
  File "C:\Users\BHARTMAN\AppData\Local\Programs\Python\Python312\Lib\site-packages\databricks\sdk\retries.py", line 34, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\BHARTMAN\AppData\Local\Programs\Python\Python312\Lib\site-packages\databricks\sdk\_base_client.py", line 278, in _perform
    raise error from None
databricks.sdk.errors.platform.InternalError: None&lt;/LI-CODE&gt;</description>
      <pubDate>Fri, 07 Feb 2025 14:46:58 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/create-a-one-off-job-run-using-databricks-sdk/m-p/109416#M43311</guid>
      <dc:creator>BriGuy</dc:creator>
      <dc:date>2025-02-07T14:46:58Z</dc:date>
    </item>
  </channel>
</rss>

