cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Create a Job via SKD with JobSettings Object

dbx-user7354
New Contributor III

Hey, I want to create a Job via the Python SDK with a JobSettings object.

import os
import time

from databricks.sdk import WorkspaceClient
from databricks.sdk.service import jobs
from databricks.sdk.service.jobs import JobSettings

w = WorkspaceClient()

notebook_path = f'/Users/{w.current_user.me().user_name}/sdk-{time.time_ns()}'

cluster_id = w.clusters.ensure_cluster_is_running(
    os.environ["DATABRICKS_CLUSTER_ID"]) and os.environ["DATABRICKS_CLUSTER_ID"]

params = {
    "name":f'sdk-{time.time_ns()}',
    "tasks": [jobs.Task(description="test",
                existing_cluster_id=cluster_id,
                notebook_task=jobs.NotebookTask(notebook_path=notebook_path),
                task_key="test",
                timeout_seconds=0)],
    'email_notifications': jobs.JobEmailNotifications(
        no_alert_for_skipped_runs=True, 
        on_failure= ['some@email.com']
        ),
}
settings = JobSettings(
    **params
)
created_job = w.jobs.create(**params) # this works
created_job = w.jobs.create(**settings.as_dict()) # this does not

the last create call leads to this error:

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
File <command-884331343802555>, line 48
     33 params = {
     34     "name":f'sdk-{time.time_ns()}',
     35     "tasks": [jobs.Task(description="test",
   (...)
     43         ),
     44 }
     45 settings = JobSettings(
     46     **params
     47 )
---> 48 created_job = w.jobs.create(**settings.as_dict())
     49 # cleanup
     50 #w.jobs.delete(job_id=created_job.job_id)

File /databricks/python/lib/python3.10/site-packages/databricks/sdk/service/jobs.py:4739, in JobsAPI.create(self, access_control_list, compute, continuous, deployment, description, edit_mode, email_notifications, format, git_source, health, job_clusters, max_concurrent_runs, name, notification_settings, parameters, queue, run_as, schedule, tags, tasks, timeout_seconds, trigger, webhook_notifications)
   4737 if description is not None: body['description'] = description
   4738 if edit_mode is not None: body['edit_mode'] = edit_mode.value
-> 4739 if email_notifications is not None: body['email_notifications'] = email_notifications.as_dict()
   4740 if format is not None: body['format'] = format.value
   4741 if git_source is not None: body['git_source'] = git_source.as_dict()

AttributeError: 'dict' object has no attribute 'as_dict'

 Do I do something wrong or is this a bug?

3 REPLIES 3

dbx-user7354
New Contributor III

I also tried to first create the job and then reset the settings. But this did not work either.

My Code:

created_job = w.jobs.create(name=f'sdk-{time.time_ns()}',
                            tasks=[
                                jobs.Task(description="test",
                                          existing_cluster_id=cluster_id,
                                          notebook_task=jobs.NotebookTask(notebook_path=notebook_path),
                                          task_key="test",
                                          timeout_seconds=0)
                            ],
                            )



params = {
    "name":f'sdk-{time.time_ns()}',
    "tasks": [jobs.Task(description="test",
                existing_cluster_id=cluster_id,
                notebook_task=jobs.NotebookTask(notebook_path=notebook_path),
                task_key="test",
                timeout_seconds=0)],
    'email_notifications': jobs.JobEmailNotifications(
        no_alert_for_skipped_runs= True, 
        on_failure= ['some@email.de']
        ),
}
settings = JobSettings(
    **params
)
w.jobs.reset(created_job, settings)

the error:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
File <command-884331343802555>, line 47
     32 params = {
     33     "name":f'sdk-{time.time_ns()}',
     34     "tasks": [jobs.Task(description="test",
   (...)
     42         ),
     43 }
     44 settings = JobSettings(
     45     **params
     46 )
---> 47 w.jobs.reset(created_job, settings)
     49 #BaseJob.from_dict(settings.as_dict())
     50 #created_job = w.jobs.create(**settings.as_dict())
     51 # cleanup
     52 #w.jobs.delete(job_id=created_job.job_id)

File /databricks/python/lib/python3.10/site-packages/databricks/sdk/service/jobs.py:5206, in JobsAPI.reset(self, job_id, new_settings)
   5204 if new_settings is not None: body['new_settings'] = new_settings.as_dict()
   5205 headers = {'Accept': 'application/json', 'Content-Type': 'application/json', }
-> 5206 self._api.do('POST', '/api/2.1/jobs/reset', body=body, headers=headers)

File /databricks/python/lib/python3.10/site-packages/databricks/sdk/core.py:1144, in ApiClient.do(self, method, path, query, headers, body, raw, files, data)
   1141 headers['User-Agent'] = self._user_agent_base
   1142 retryable = retried(timeout=timedelta(seconds=self._retry_timeout_seconds),
   1143                     is_retryable=self._is_retryable)
-> 1144 return retryable(self._perform)(method,
   1145                                 path,
   1146                                 query=query,
   1147                                 headers=headers,
   1148                                 body=body,
   1149                                 raw=raw,
   1150                                 files=files,
   1151                                 data=data)

File /databricks/python/lib/python3.10/site-packages/databricks/sdk/retries.py:50, in retried.<locals>.decorator.<locals>.wrapper(*args, **kwargs)
     46         retry_reason = f'{type(err).__name__} is allowed to retry'
     48 if retry_reason is None:
     49     # raise if exception is not retryable
---> 50     raise err
     52 logger.debug(f'Retrying: {retry_reason} (sleeping ~{sleep}s)')
     53 time.sleep(sleep + random())

File /databricks/python/lib/python3.10/site-packages/databricks/sdk/retries.py:29, in retried.<locals>.decorator.<locals>.wrapper(*args, **kwargs)
     27 while time.time() < deadline:
     28     try:
---> 29         return func(*args, **kwargs)
     30     except Exception as err:
     31         last_err = err

File /databricks/python/lib/python3.10/site-packages/databricks/sdk/core.py:1222, in ApiClient._perform(self, method, path, query, headers, body, raw, files, data)
   1213 def _perform(self,
   1214              method: str,
   1215              path: str,
   (...)
   1220              files=None,
   1221              data=None):
-> 1222     response = self._session.request(method,
   1223                                      f"{self._cfg.host}{path}",
   1224                                      params=self._fix_query_string(query),
   1225                                      json=body,
   1226                                      headers=headers,
   1227                                      files=files,
   1228                                      data=data,
   1229                                      stream=raw)
   1230     try:
   1231         self._record_request_log(response, raw=raw or data is not None or files is not None)

File /databricks/python/lib/python3.10/site-packages/requests/sessions.py:573, in Session.request(self, method, url, params, data, headers, cookies, files, auth, timeout, allow_redirects, proxies, hooks, stream, verify, cert, json)
    560 # Create the Request.
    561 req = Request(
    562     method=method.upper(),
    563     url=url,
   (...)
    571     hooks=hooks,
    572 )
--> 573 prep = self.prepare_request(req)
    575 proxies = proxies or {}
    577 settings = self.merge_environment_settings(
    578     prep.url, proxies, stream, verify, cert
    579 )

File /databricks/python/lib/python3.10/site-packages/requests/sessions.py:484, in Session.prepare_request(self, request)
    481     auth = get_netrc_auth(request.url)
    483 p = PreparedRequest()
--> 484 p.prepare(
    485     method=request.method.upper(),
    486     url=request.url,
    487     files=request.files,
    488     data=request.data,
    489     json=request.json,
    490     headers=merge_setting(
    491         request.headers, self.headers, dict_class=CaseInsensitiveDict
    492     ),
    493     params=merge_setting(request.params, self.params),
    494     auth=merge_setting(auth, self.auth),
    495     cookies=merged_cookies,
    496     hooks=merge_hooks(request.hooks, self.hooks),
    497 )
    498 return p

File /databricks/python/lib/python3.10/site-packages/requests/models.py:371, in PreparedRequest.prepare(self, method, url, headers, files, data, params, auth, cookies, hooks, json)
    369 self.prepare_headers(headers)
    370 self.prepare_cookies(cookies)
--> 371 self.prepare_body(data, files, json)
    372 self.prepare_auth(auth, url)
    374 # Note that prepare_auth must be last to enable authentication schemes
    375 # such as OAuth to work on a fully prepared request.
    376 
    377 # This MUST go after prepare_auth. Authenticators could add a hook

File /databricks/python/lib/python3.10/site-packages/requests/models.py:511, in PreparedRequest.prepare_body(self, data, files, json)
    508 content_type = "application/json"
    510 try:
--> 511     body = complexjson.dumps(json, allow_nan=False)
    512 except ValueError as ve:
    513     raise InvalidJSONError(ve, request=self)

File /usr/lib/python3.10/json/__init__.py:238, in dumps(obj, skipkeys, ensure_ascii, check_circular, allow_nan, cls, indent, separators, default, sort_keys, **kw)
    232 if cls is None:
    233     cls = JSONEncoder
    234 return cls(
    235     skipkeys=skipkeys, ensure_ascii=ensure_ascii,
    236     check_circular=check_circular, allow_nan=allow_nan, indent=indent,
    237     separators=separators, default=default, sort_keys=sort_keys,
--> 238     **kw).encode(obj)

File /usr/lib/python3.10/json/encoder.py:199, in JSONEncoder.encode(self, o)
    195         return encode_basestring(o)
    196 # This doesn't pass the iterator directly to ''.join() because the
    197 # exceptions aren't as detailed.  The list call should be roughly
    198 # equivalent to the PySequence_Fast that ''.join() would do.
--> 199 chunks = self.iterencode(o, _one_shot=True)
    200 if not isinstance(chunks, (list, tuple)):
    201     chunks = list(chunks)

File /usr/lib/python3.10/json/encoder.py:257, in JSONEncoder.iterencode(self, o, _one_shot)
    252 else:
    253     _iterencode = _make_iterencode(
    254         markers, self.default, _encoder, self.indent, floatstr,
    255         self.key_separator, self.item_separator, self.sort_keys,
    256         self.skipkeys, _one_shot)
--> 257 return _iterencode(o, 0)

File /usr/lib/python3.10/json/encoder.py:179, in JSONEncoder.default(self, o)
    160 def default(self, o):
    161     """Implement this method in a subclass such that it returns
    162     a serializable object for ``o``, or calls the base implementation
    163     (to raise a ``TypeError``).
   (...)
    177 
    178     """
--> 179     raise TypeError(f'Object of type {o.__class__.__name__} '
    180                     f'is not JSON serializable')

TypeError: Object of type CreateResponse is not JSON serializable

dbx-user7354
New Contributor III

any help on this topic?

nenetto
New Contributor II

I just faced the same problem. The issue is that the when you do 

JobSettings.as_dict()

the settings are parsed to a dict where all the values are also parsed recursively. When you pass the parameters as **params, the create method again tries to parse each parameter to a dict since it needs to create a full json body to make the API request. 
The trick is to parse only the first level of JobSettings to dict (not recursively) and create the params dict.

Try this:

import os
import time

from databricks.sdk import WorkspaceClient
from databricks.sdk.service import jobs
from databricks.sdk.service.jobs import JobSettings

w = WorkspaceClient()

notebook_path = f'/Users/{w.current_user.me().user_name}/sdk-{time.time_ns()}'

cluster_id = w.clusters.ensure_cluster_is_running(
    os.environ["DATABRICKS_CLUSTER_ID"]) and os.environ["DATABRICKS_CLUSTER_ID"]

params = {
    "name":f'sdk-{time.time_ns()}',
    "tasks": [jobs.Task(description="test",
                existing_cluster_id=cluster_id,
                notebook_task=jobs.NotebookTask(notebook_path=notebook_path),
                task_key="test",
                timeout_seconds=0)],
    'email_notifications': jobs.JobEmailNotifications(
        no_alert_for_skipped_runs=True, 
        on_failure= ['some@email.com']
        ),
}
settings = JobSettings(
    **params
)

# This only creates a dictionary but preserving the types of the values.
# Go to https://github.com/databricks/databricks-sdk-py/blob/33b209bdbaca4a956c7375fd65cbf8d6c3b4d95d/databricks/sdk/service/jobs.py and check that some settings has it's own classes (types) with its own method .to_dict()
create_params = {k:getattr(settings,k) for k in settings.as_dict().keys()}


created_job = w.jobs.create(**params) # this works
created_job = w.jobs.create(**settings.as_dict()) # this does not
created_job = w.jobs.create(**create_params) # this does

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group