I've updated the code as follows trying to make sure to use correct enums.

import datetime
import logging
import sys
import csv

from databricks.sdk.service import compute, jobs
from databricks.sdk import WorkspaceClient


# the vars will be parameters for the script
# job settings

aclstring = '''\
usertypename,value,permissionlevel
user_name,brian_hartman@next.co.uk,CAN_MANAGE
group_name,USRS-BusinessIntelligenceSystems,CAN_VIEW
service_principal_name,e28796de-c433-4fd6-85e6-c06b7f9d7e02,CAN_MANAGE
'''

# cluster settings
num_workers = 2
node_type_id = "Standard_E4ds_v5"
spark_version = "LatestLTS"
driver_node_type_id = "Standard_D4ds_v5"
driver_instance_pool_id = None
data_security_mode = 'DATA_SECURITY_MODE_STANDARD'
instance_pool_id = None
autoscale = True
min_workers = 1
max_workers = 2
spark_conf = "spark.databricks.sql.initial.catalog.name: businessintelligencesystems_dev, PYSPARK_PYTHON: /databricks/python3/bin/python3"
custom_tags = "businessarea: BIS, squad: mysquad"
init_scripts = None
spark_env_vars = None

# task settings
notebookpath = "/Workspace/Users/brian_hartman@next.co.uk/jobtestbrh"
base_parameters = "Age: 44, Name: Brian"

counter = 0
# callback, that receives a polled entity between state updates
def print_status(run: jobs.Run):
    global counter
    counter += 1
    if counter % 10 == 0 or counter == 1:
        logging.info(f'{run.run_page_url}')
    statuses = [f'{t.task_key}: {t.state.life_cycle_state}' for t in run.tasks]
    logging.info(f'workflow intermediate status: {", ".join(statuses)}')



# Define a function to create a cluster spec
def create_cluster_spec(
        num_workers:int,
        node_type_id:str,
        spark_version:str,
        driver_node_type_id:str=None,
        instance_pool_id:str=None,
        driver_instance_pool_id:str=None,
        data_security_mode:str='DATA_SECURITY_MODE_STANDARD',
        cluster_name:str=None,
        autoscale:bool=True,
        min_workers:int=None,
        max_workers:int=None,
        spark_conf:dict=None,
        custom_tags:dict=None,
        init_scripts:list=None,
        spark_env_vars:dict=None,
        policy_id:str=None,
        apply_policy_default_values:bool=False,
        azure_attributes:dict=None
        ):
    
    cluster_spec = compute.ClusterSpec()

    cluster_spec.num_workers = num_workers
    cluster_spec.node_type_id = node_type_id
    cluster_spec.data_security_mode = compute.DataSecurityMode(data_security_mode)
    cluster_spec.spark_version = spark_version

    if cluster_name:
        cluster_spec.cluster_name = cluster_name
    if driver_node_type_id:
        cluster_spec.driver_node_type_id = driver_node_type_id
    if autoscale:
        if min_workers is None:
            if num_workers > 0:
                min_workers = 1
        if max_workers is None:
            max_workers = num_workers
        if max_workers > min_workers:
            min_workers = max_workers
        cluster_spec.autoscale = compute.AutoScale(min_workers, max_workers)
    if spark_conf:
        cluster_spec.spark_conf = spark_conf
    if custom_tags:
        cluster_spec.custom_tags = custom_tags
    if init_scripts:
        cluster_spec.init_scripts = init_scripts
    if spark_env_vars:
        cluster_spec.spark_env_vars = spark_env_vars
    if instance_pool_id:
        cluster_spec.instance_pool_id = instance_pool_id
    if driver_instance_pool_id:
        cluster_spec.driver_instance_pool_id = driver_instance_pool_id
    if policy_id:
        cluster_spec.policy_id = policy_id
        cluster_spec.apply_policy_default_values = apply_policy_default_values
    if azure_attributes:
        cluster_spec.azure_attributes = azure_attributes

    return cluster_spec

def create_notebook_task(
        notebookpath:str,
        base_parameters:dict=None,
        source:str="WORKSPACE"
        ):
    
    notebook_task = jobs.NotebookTask(notebook_path=notebookpath)
    if base_parameters:
        notebook_task.base_parameters = base_parameters

    return notebook_task

def create_task(
        task_key:str,
        notebook_task:jobs.NotebookTask,
        new_cluster:compute.ClusterSpec
        ) -> jobs.SubmitTask:
    
    task = jobs.SubmitTask(task_key=task_key,
                     notebook_task=notebook_task,
                     new_cluster=new_cluster)
    
    return task

def create_access_control_list(aclstring:str):
    access_controls = []
    
    acls = csv.DictReader(aclstring.splitlines(), delimiter=',')
    for acl in acls:
        request = jobs.JobAccessControlRequest()
        if acl['usertypename'] == 'user_name':
            request.user_name = acl['value']
            request.permission_level = jobs.JobPermissionLevel[acl['permissionlevel']]            
        elif acl['usertypename'] == 'group_name':
            request.group_name = acl['value']
            request.permission_level = jobs.JobPermissionLevel[acl['permissionlevel']]
        elif acl['usertypename'] == 'service_principal_name':
            request.service_principal_name = acl['value']
            request.permission_level = jobs.JobPermissionLevel[acl['permissionlevel']]
        else:
            raise ValueError(f"Unknown usertypename: {acl['usertypename']}")
            exit(1)
        access_controls.append(request)
    return access_controls



if __name__ == "__main__":
    logging.basicConfig(stream=sys.stdout,
                        level=logging.INFO,
                        format="%(asctime)s [%(name)s][%(levelname)s] %(message)s",
                        )

    w = WorkspaceClient()

    # create time var
    now = datetime.datetime.now()
    timestr = now.strftime("%Y%m%d-%H%M%S")
    # create dictionaries/objects as required
    spark_conf_dict = dict(conf.split(": ") for conf in spark_conf.split(", "))
    custom_tags_dict = dict(tag.split(": ") for tag in custom_tags.split(", "))
    base_parameters_dict = dict(param.split(": ") for param in base_parameters.split(", "))

    #names
    notebookname = notebookpath.split("/")[-1]
    cluster_name = f"jobcluster-{notebookname}-{timestr}"
    run_name = f"{notebookname}-{timestr}"
    

    if spark_version == "LatestLTS":
        spark_version = w.clusters.select_spark_version(latest=True, long_term_support=True)
    else:
        spark_version = w.clusters.select_spark_version(version=spark_version)
       

    cluster_spec = create_cluster_spec(
        num_workers=num_workers,
        node_type_id=node_type_id,
        spark_version=spark_version,
        driver_node_type_id=driver_node_type_id,
        instance_pool_id=instance_pool_id,
        driver_instance_pool_id=driver_instance_pool_id,
        data_security_mode=data_security_mode,
        cluster_name=cluster_name,
        autoscale=autoscale,
        min_workers=min_workers,
        max_workers=max_workers,
        spark_conf=spark_conf_dict,
        custom_tags=custom_tags_dict,
        init_scripts=init_scripts,
        spark_env_vars=spark_env_vars,
        )
    
    notebook_task = create_notebook_task(
        notebookpath=notebookpath,
        base_parameters=base_parameters_dict
        )
    
    task = create_task(
        task_key="notebook_task",
        notebook_task=notebook_task,
        new_cluster=cluster_spec
        )

    access_control_list = create_access_control_list(aclstring)
    
    

    waiter = w.jobs.submit(run_name=run_name,
                           tasks=[task],
                           access_control_list=access_control_list
                           )
    
    logging.info(f'starting to poll: {waiter.run_id}')

    run = waiter.result(timeout=datetime.timedelta(minutes=15),
                    callback=print_status)
    logging.info(f'job finished: {run.run_page_url}')


    test = jobs.JobAccessControlRequest(user_name="test", permission_level="CAN_MANAGE")


    

Unfortunately I now get these errors and there isn't much detail to determine the issue.

 

2025-02-07 14:13:46,145 [databricks.sdk][INFO] loading DEFAULT profile from ~/.databrickscfg: host, client_id, client_secret
Traceback (most recent call last):
  File "y:\My Drive\Source\TempDev\ABtoDatabricks\startjob.py", line 224, in <module>
    waiter = w.jobs.submit(run_name=run_name,
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\BHARTMAN\AppData\Local\Programs\Python\Python312\Lib\site-packages\databricks\sdk\service\jobs.py", line 8126, in submit
    op_response = self._api.do('POST', '/api/2.1/jobs/runs/submit', body=body, headers=headers)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\BHARTMAN\AppData\Local\Programs\Python\Python312\Lib\site-packages\databricks\sdk\core.py", line 77, in do
    return self._api_client.do(method=method,
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\BHARTMAN\AppData\Local\Programs\Python\Python312\Lib\site-packages\databricks\sdk\_base_client.py", line 186, in do
    response = call(method,
               ^^^^^^^^^^^^
  File "C:\Users\BHARTMAN\AppData\Local\Programs\Python\Python312\Lib\site-packages\databricks\sdk\retries.py", line 55, in wrapper
    raise err
  File "C:\Users\BHARTMAN\AppData\Local\Programs\Python\Python312\Lib\site-packages\databricks\sdk\retries.py", line 34, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\BHARTMAN\AppData\Local\Programs\Python\Python312\Lib\site-packages\databricks\sdk\_base_client.py", line 278, in _perform
    raise error from None
databricks.sdk.errors.platform.InternalError: None