cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Create a Job via SKD with JobSettings Object

dbx-user7354
New Contributor III

Hey, I want to create a Job via the Python SDK with a JobSettings object.

import os
import time

from databricks.sdk import WorkspaceClient
from databricks.sdk.service import jobs
from databricks.sdk.service.jobs import JobSettings

w = WorkspaceClient()

notebook_path = f'/Users/{w.current_user.me().user_name}/sdk-{time.time_ns()}'

cluster_id = w.clusters.ensure_cluster_is_running(
    os.environ["DATABRICKS_CLUSTER_ID"]) and os.environ["DATABRICKS_CLUSTER_ID"]

params = {
    "name":f'sdk-{time.time_ns()}',
    "tasks": [jobs.Task(description="test",
                existing_cluster_id=cluster_id,
                notebook_task=jobs.NotebookTask(notebook_path=notebook_path),
                task_key="test",
                timeout_seconds=0)],
    'email_notifications': jobs.JobEmailNotifications(
        no_alert_for_skipped_runs=True, 
        on_failure= ['some@email.com']
        ),
}
settings = JobSettings(
    **params
)
created_job = w.jobs.create(**params) # this works
created_job = w.jobs.create(**settings.as_dict()) # this does not

the last create call leads to this error:

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
File <command-884331343802555>, line 48
     33 params = {
     34     "name":f'sdk-{time.time_ns()}',
     35     "tasks": [jobs.Task(description="test",
   (...)
     43         ),
     44 }
     45 settings = JobSettings(
     46     **params
     47 )
---> 48 created_job = w.jobs.create(**settings.as_dict())
     49 # cleanup
     50 #w.jobs.delete(job_id=created_job.job_id)

File /databricks/python/lib/python3.10/site-packages/databricks/sdk/service/jobs.py:4739, in JobsAPI.create(self, access_control_list, compute, continuous, deployment, description, edit_mode, email_notifications, format, git_source, health, job_clusters, max_concurrent_runs, name, notification_settings, parameters, queue, run_as, schedule, tags, tasks, timeout_seconds, trigger, webhook_notifications)
   4737 if description is not None: body['description'] = description
   4738 if edit_mode is not None: body['edit_mode'] = edit_mode.value
-> 4739 if email_notifications is not None: body['email_notifications'] = email_notifications.as_dict()
   4740 if format is not None: body['format'] = format.value
   4741 if git_source is not None: body['git_source'] = git_source.as_dict()

AttributeError: 'dict' object has no attribute 'as_dict'

 Do I do something wrong or is this a bug?

3 REPLIES 3

dbx-user7354
New Contributor III

I also tried to first create the job and then reset the settings. But this did not work either.

My Code:

created_job = w.jobs.create(name=f'sdk-{time.time_ns()}',
                            tasks=[
                                jobs.Task(description="test",
                                          existing_cluster_id=cluster_id,
                                          notebook_task=jobs.NotebookTask(notebook_path=notebook_path),
                                          task_key="test",
                                          timeout_seconds=0)
                            ],
                            )



params = {
    "name":f'sdk-{time.time_ns()}',
    "tasks": [jobs.Task(description="test",
                existing_cluster_id=cluster_id,
                notebook_task=jobs.NotebookTask(notebook_path=notebook_path),
                task_key="test",
                timeout_seconds=0)],
    'email_notifications': jobs.JobEmailNotifications(
        no_alert_for_skipped_runs= True, 
        on_failure= ['some@email.de']
        ),
}
settings = JobSettings(
    **params
)
w.jobs.reset(created_job, settings)

the error:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
File <command-884331343802555>, line 47
     32 params = {
     33     "name":f'sdk-{time.time_ns()}',
     34     "tasks": [jobs.Task(description="test",
   (...)
     42         ),
     43 }
     44 settings = JobSettings(
     45     **params
     46 )
---> 47 w.jobs.reset(created_job, settings)
     49 #BaseJob.from_dict(settings.as_dict())
     50 #created_job = w.jobs.create(**settings.as_dict())
     51 # cleanup
     52 #w.jobs.delete(job_id=created_job.job_id)

File /databricks/python/lib/python3.10/site-packages/databricks/sdk/service/jobs.py:5206, in JobsAPI.reset(self, job_id, new_settings)
   5204 if new_settings is not None: body['new_settings'] = new_settings.as_dict()
   5205 headers = {'Accept': 'application/json', 'Content-Type': 'application/json', }
-> 5206 self._api.do('POST', '/api/2.1/jobs/reset', body=body, headers=headers)

File /databricks/python/lib/python3.10/site-packages/databricks/sdk/core.py:1144, in ApiClient.do(self, method, path, query, headers, body, raw, files, data)
   1141 headers['User-Agent'] = self._user_agent_base
   1142 retryable = retried(timeout=timedelta(seconds=self._retry_timeout_seconds),
   1143                     is_retryable=self._is_retryable)
-> 1144 return retryable(self._perform)(method,
   1145                                 path,
   1146                                 query=query,
   1147                                 headers=headers,
   1148                                 body=body,
   1149                                 raw=raw,
   1150                                 files=files,
   1151                                 data=data)

File /databricks/python/lib/python3.10/site-packages/databricks/sdk/retries.py:50, in retried.<locals>.decorator.<locals>.wrapper(*args, **kwargs)
     46         retry_reason = f'{type(err).__name__} is allowed to retry'
     48 if retry_reason is None:
     49     # raise if exception is not retryable
---> 50     raise err
     52 logger.debug(f'Retrying: {retry_reason} (sleeping ~{sleep}s)')
     53 time.sleep(sleep + random())

File /databricks/python/lib/python3.10/site-packages/databricks/sdk/retries.py:29, in retried.<locals>.decorator.<locals>.wrapper(*args, **kwargs)
     27 while time.time() < deadline:
     28     try:
---> 29         return func(*args, **kwargs)
     30     except Exception as err:
     31         last_err = err

File /databricks/python/lib/python3.10/site-packages/databricks/sdk/core.py:1222, in ApiClient._perform(self, method, path, query, headers, body, raw, files, data)
   1213 def _perform(self,
   1214              method: str,
   1215              path: str,
   (...)
   1220              files=None,
   1221              data=None):
-> 1222     response = self._session.request(method,
   1223                                      f"{self._cfg.host}{path}",
   1224                                      params=self._fix_query_string(query),
   1225                                      json=body,
   1226                                      headers=headers,
   1227                                      files=files,
   1228                                      data=data,
   1229                                      stream=raw)
   1230     try:
   1231         self._record_request_log(response, raw=raw or data is not None or files is not None)

File /databricks/python/lib/python3.10/site-packages/requests/sessions.py:573, in Session.request(self, method, url, params, data, headers, cookies, files, auth, timeout, allow_redirects, proxies, hooks, stream, verify, cert, json)
    560 # Create the Request.
    561 req = Request(
    562     method=method.upper(),
    563     url=url,
   (...)
    571     hooks=hooks,
    572 )
--> 573 prep = self.prepare_request(req)
    575 proxies = proxies or {}
    577 settings = self.merge_environment_settings(
    578     prep.url, proxies, stream, verify, cert
    579 )

File /databricks/python/lib/python3.10/site-packages/requests/sessions.py:484, in Session.prepare_request(self, request)
    481     auth = get_netrc_auth(request.url)
    483 p = PreparedRequest()
--> 484 p.prepare(
    485     method=request.method.upper(),
    486     url=request.url,
    487     files=request.files,
    488     data=request.data,
    489     json=request.json,
    490     headers=merge_setting(
    491         request.headers, self.headers, dict_class=CaseInsensitiveDict
    492     ),
    493     params=merge_setting(request.params, self.params),
    494     auth=merge_setting(auth, self.auth),
    495     cookies=merged_cookies,
    496     hooks=merge_hooks(request.hooks, self.hooks),
    497 )
    498 return p

File /databricks/python/lib/python3.10/site-packages/requests/models.py:371, in PreparedRequest.prepare(self, method, url, headers, files, data, params, auth, cookies, hooks, json)
    369 self.prepare_headers(headers)
    370 self.prepare_cookies(cookies)
--> 371 self.prepare_body(data, files, json)
    372 self.prepare_auth(auth, url)
    374 # Note that prepare_auth must be last to enable authentication schemes
    375 # such as OAuth to work on a fully prepared request.
    376 
    377 # This MUST go after prepare_auth. Authenticators could add a hook

File /databricks/python/lib/python3.10/site-packages/requests/models.py:511, in PreparedRequest.prepare_body(self, data, files, json)
    508 content_type = "application/json"
    510 try:
--> 511     body = complexjson.dumps(json, allow_nan=False)
    512 except ValueError as ve:
    513     raise InvalidJSONError(ve, request=self)

File /usr/lib/python3.10/json/__init__.py:238, in dumps(obj, skipkeys, ensure_ascii, check_circular, allow_nan, cls, indent, separators, default, sort_keys, **kw)
    232 if cls is None:
    233     cls = JSONEncoder
    234 return cls(
    235     skipkeys=skipkeys, ensure_ascii=ensure_ascii,
    236     check_circular=check_circular, allow_nan=allow_nan, indent=indent,
    237     separators=separators, default=default, sort_keys=sort_keys,
--> 238     **kw).encode(obj)

File /usr/lib/python3.10/json/encoder.py:199, in JSONEncoder.encode(self, o)
    195         return encode_basestring(o)
    196 # This doesn't pass the iterator directly to ''.join() because the
    197 # exceptions aren't as detailed.  The list call should be roughly
    198 # equivalent to the PySequence_Fast that ''.join() would do.
--> 199 chunks = self.iterencode(o, _one_shot=True)
    200 if not isinstance(chunks, (list, tuple)):
    201     chunks = list(chunks)

File /usr/lib/python3.10/json/encoder.py:257, in JSONEncoder.iterencode(self, o, _one_shot)
    252 else:
    253     _iterencode = _make_iterencode(
    254         markers, self.default, _encoder, self.indent, floatstr,
    255         self.key_separator, self.item_separator, self.sort_keys,
    256         self.skipkeys, _one_shot)
--> 257 return _iterencode(o, 0)

File /usr/lib/python3.10/json/encoder.py:179, in JSONEncoder.default(self, o)
    160 def default(self, o):
    161     """Implement this method in a subclass such that it returns
    162     a serializable object for ``o``, or calls the base implementation
    163     (to raise a ``TypeError``).
   (...)
    177 
    178     """
--> 179     raise TypeError(f'Object of type {o.__class__.__name__} '
    180                     f'is not JSON serializable')

TypeError: Object of type CreateResponse is not JSON serializable

dbx-user7354
New Contributor III

any help on this topic?

nenetto
New Contributor II

I just faced the same problem. The issue is that the when you do 

JobSettings.as_dict()

the settings are parsed to a dict where all the values are also parsed recursively. When you pass the parameters as **params, the create method again tries to parse each parameter to a dict since it needs to create a full json body to make the API request. 
The trick is to parse only the first level of JobSettings to dict (not recursively) and create the params dict.

Try this:

import os
import time

from databricks.sdk import WorkspaceClient
from databricks.sdk.service import jobs
from databricks.sdk.service.jobs import JobSettings

w = WorkspaceClient()

notebook_path = f'/Users/{w.current_user.me().user_name}/sdk-{time.time_ns()}'

cluster_id = w.clusters.ensure_cluster_is_running(
    os.environ["DATABRICKS_CLUSTER_ID"]) and os.environ["DATABRICKS_CLUSTER_ID"]

params = {
    "name":f'sdk-{time.time_ns()}',
    "tasks": [jobs.Task(description="test",
                existing_cluster_id=cluster_id,
                notebook_task=jobs.NotebookTask(notebook_path=notebook_path),
                task_key="test",
                timeout_seconds=0)],
    'email_notifications': jobs.JobEmailNotifications(
        no_alert_for_skipped_runs=True, 
        on_failure= ['some@email.com']
        ),
}
settings = JobSettings(
    **params
)

# This only creates a dictionary but preserving the types of the values.
# Go to https://github.com/databricks/databricks-sdk-py/blob/33b209bdbaca4a956c7375fd65cbf8d6c3b4d95d/databricks/sdk/service/jobs.py and check that some settings has it's own classes (types) with its own method .to_dict()
create_params = {k:getattr(settings,k) for k in settings.as_dict().keys()}


created_job = w.jobs.create(**params) # this works
created_job = w.jobs.create(**settings.as_dict()) # this does not
created_job = w.jobs.create(**create_params) # this does