Create a Job via SKD with JobSettings Object
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-03-2024 01:18 AM
Hey, I want to create a Job via the Python SDK with a JobSettings object.
import os
import time
from databricks.sdk import WorkspaceClient
from databricks.sdk.service import jobs
from databricks.sdk.service.jobs import JobSettings
w = WorkspaceClient()
notebook_path = f'/Users/{w.current_user.me().user_name}/sdk-{time.time_ns()}'
cluster_id = w.clusters.ensure_cluster_is_running(
os.environ["DATABRICKS_CLUSTER_ID"]) and os.environ["DATABRICKS_CLUSTER_ID"]
params = {
"name":f'sdk-{time.time_ns()}',
"tasks": [jobs.Task(description="test",
existing_cluster_id=cluster_id,
notebook_task=jobs.NotebookTask(notebook_path=notebook_path),
task_key="test",
timeout_seconds=0)],
'email_notifications': jobs.JobEmailNotifications(
no_alert_for_skipped_runs=True,
on_failure= ['some@email.com']
),
}
settings = JobSettings(
**params
)
created_job = w.jobs.create(**params) # this works
created_job = w.jobs.create(**settings.as_dict()) # this does not
the last create call leads to this error:
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
File <command-884331343802555>, line 48
33 params = {
34 "name":f'sdk-{time.time_ns()}',
35 "tasks": [jobs.Task(description="test",
(...)
43 ),
44 }
45 settings = JobSettings(
46 **params
47 )
---> 48 created_job = w.jobs.create(**settings.as_dict())
49 # cleanup
50 #w.jobs.delete(job_id=created_job.job_id)
File /databricks/python/lib/python3.10/site-packages/databricks/sdk/service/jobs.py:4739, in JobsAPI.create(self, access_control_list, compute, continuous, deployment, description, edit_mode, email_notifications, format, git_source, health, job_clusters, max_concurrent_runs, name, notification_settings, parameters, queue, run_as, schedule, tags, tasks, timeout_seconds, trigger, webhook_notifications)
4737 if description is not None: body['description'] = description
4738 if edit_mode is not None: body['edit_mode'] = edit_mode.value
-> 4739 if email_notifications is not None: body['email_notifications'] = email_notifications.as_dict()
4740 if format is not None: body['format'] = format.value
4741 if git_source is not None: body['git_source'] = git_source.as_dict()
AttributeError: 'dict' object has no attribute 'as_dict'
Do I do something wrong or is this a bug?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-03-2024 02:02 AM
I also tried to first create the job and then reset the settings. But this did not work either.
My Code:
created_job = w.jobs.create(name=f'sdk-{time.time_ns()}',
tasks=[
jobs.Task(description="test",
existing_cluster_id=cluster_id,
notebook_task=jobs.NotebookTask(notebook_path=notebook_path),
task_key="test",
timeout_seconds=0)
],
)
params = {
"name":f'sdk-{time.time_ns()}',
"tasks": [jobs.Task(description="test",
existing_cluster_id=cluster_id,
notebook_task=jobs.NotebookTask(notebook_path=notebook_path),
task_key="test",
timeout_seconds=0)],
'email_notifications': jobs.JobEmailNotifications(
no_alert_for_skipped_runs= True,
on_failure= ['some@email.de']
),
}
settings = JobSettings(
**params
)
w.jobs.reset(created_job, settings)
the error:
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
File <command-884331343802555>, line 47
32 params = {
33 "name":f'sdk-{time.time_ns()}',
34 "tasks": [jobs.Task(description="test",
(...)
42 ),
43 }
44 settings = JobSettings(
45 **params
46 )
---> 47 w.jobs.reset(created_job, settings)
49 #BaseJob.from_dict(settings.as_dict())
50 #created_job = w.jobs.create(**settings.as_dict())
51 # cleanup
52 #w.jobs.delete(job_id=created_job.job_id)
File /databricks/python/lib/python3.10/site-packages/databricks/sdk/service/jobs.py:5206, in JobsAPI.reset(self, job_id, new_settings)
5204 if new_settings is not None: body['new_settings'] = new_settings.as_dict()
5205 headers = {'Accept': 'application/json', 'Content-Type': 'application/json', }
-> 5206 self._api.do('POST', '/api/2.1/jobs/reset', body=body, headers=headers)
File /databricks/python/lib/python3.10/site-packages/databricks/sdk/core.py:1144, in ApiClient.do(self, method, path, query, headers, body, raw, files, data)
1141 headers['User-Agent'] = self._user_agent_base
1142 retryable = retried(timeout=timedelta(seconds=self._retry_timeout_seconds),
1143 is_retryable=self._is_retryable)
-> 1144 return retryable(self._perform)(method,
1145 path,
1146 query=query,
1147 headers=headers,
1148 body=body,
1149 raw=raw,
1150 files=files,
1151 data=data)
File /databricks/python/lib/python3.10/site-packages/databricks/sdk/retries.py:50, in retried.<locals>.decorator.<locals>.wrapper(*args, **kwargs)
46 retry_reason = f'{type(err).__name__} is allowed to retry'
48 if retry_reason is None:
49 # raise if exception is not retryable
---> 50 raise err
52 logger.debug(f'Retrying: {retry_reason} (sleeping ~{sleep}s)')
53 time.sleep(sleep + random())
File /databricks/python/lib/python3.10/site-packages/databricks/sdk/retries.py:29, in retried.<locals>.decorator.<locals>.wrapper(*args, **kwargs)
27 while time.time() < deadline:
28 try:
---> 29 return func(*args, **kwargs)
30 except Exception as err:
31 last_err = err
File /databricks/python/lib/python3.10/site-packages/databricks/sdk/core.py:1222, in ApiClient._perform(self, method, path, query, headers, body, raw, files, data)
1213 def _perform(self,
1214 method: str,
1215 path: str,
(...)
1220 files=None,
1221 data=None):
-> 1222 response = self._session.request(method,
1223 f"{self._cfg.host}{path}",
1224 params=self._fix_query_string(query),
1225 json=body,
1226 headers=headers,
1227 files=files,
1228 data=data,
1229 stream=raw)
1230 try:
1231 self._record_request_log(response, raw=raw or data is not None or files is not None)
File /databricks/python/lib/python3.10/site-packages/requests/sessions.py:573, in Session.request(self, method, url, params, data, headers, cookies, files, auth, timeout, allow_redirects, proxies, hooks, stream, verify, cert, json)
560 # Create the Request.
561 req = Request(
562 method=method.upper(),
563 url=url,
(...)
571 hooks=hooks,
572 )
--> 573 prep = self.prepare_request(req)
575 proxies = proxies or {}
577 settings = self.merge_environment_settings(
578 prep.url, proxies, stream, verify, cert
579 )
File /databricks/python/lib/python3.10/site-packages/requests/sessions.py:484, in Session.prepare_request(self, request)
481 auth = get_netrc_auth(request.url)
483 p = PreparedRequest()
--> 484 p.prepare(
485 method=request.method.upper(),
486 url=request.url,
487 files=request.files,
488 data=request.data,
489 json=request.json,
490 headers=merge_setting(
491 request.headers, self.headers, dict_class=CaseInsensitiveDict
492 ),
493 params=merge_setting(request.params, self.params),
494 auth=merge_setting(auth, self.auth),
495 cookies=merged_cookies,
496 hooks=merge_hooks(request.hooks, self.hooks),
497 )
498 return p
File /databricks/python/lib/python3.10/site-packages/requests/models.py:371, in PreparedRequest.prepare(self, method, url, headers, files, data, params, auth, cookies, hooks, json)
369 self.prepare_headers(headers)
370 self.prepare_cookies(cookies)
--> 371 self.prepare_body(data, files, json)
372 self.prepare_auth(auth, url)
374 # Note that prepare_auth must be last to enable authentication schemes
375 # such as OAuth to work on a fully prepared request.
376
377 # This MUST go after prepare_auth. Authenticators could add a hook
File /databricks/python/lib/python3.10/site-packages/requests/models.py:511, in PreparedRequest.prepare_body(self, data, files, json)
508 content_type = "application/json"
510 try:
--> 511 body = complexjson.dumps(json, allow_nan=False)
512 except ValueError as ve:
513 raise InvalidJSONError(ve, request=self)
File /usr/lib/python3.10/json/__init__.py:238, in dumps(obj, skipkeys, ensure_ascii, check_circular, allow_nan, cls, indent, separators, default, sort_keys, **kw)
232 if cls is None:
233 cls = JSONEncoder
234 return cls(
235 skipkeys=skipkeys, ensure_ascii=ensure_ascii,
236 check_circular=check_circular, allow_nan=allow_nan, indent=indent,
237 separators=separators, default=default, sort_keys=sort_keys,
--> 238 **kw).encode(obj)
File /usr/lib/python3.10/json/encoder.py:199, in JSONEncoder.encode(self, o)
195 return encode_basestring(o)
196 # This doesn't pass the iterator directly to ''.join() because the
197 # exceptions aren't as detailed. The list call should be roughly
198 # equivalent to the PySequence_Fast that ''.join() would do.
--> 199 chunks = self.iterencode(o, _one_shot=True)
200 if not isinstance(chunks, (list, tuple)):
201 chunks = list(chunks)
File /usr/lib/python3.10/json/encoder.py:257, in JSONEncoder.iterencode(self, o, _one_shot)
252 else:
253 _iterencode = _make_iterencode(
254 markers, self.default, _encoder, self.indent, floatstr,
255 self.key_separator, self.item_separator, self.sort_keys,
256 self.skipkeys, _one_shot)
--> 257 return _iterencode(o, 0)
File /usr/lib/python3.10/json/encoder.py:179, in JSONEncoder.default(self, o)
160 def default(self, o):
161 """Implement this method in a subclass such that it returns
162 a serializable object for ``o``, or calls the base implementation
163 (to raise a ``TypeError``).
(...)
177
178 """
--> 179 raise TypeError(f'Object of type {o.__class__.__name__} '
180 f'is not JSON serializable')
TypeError: Object of type CreateResponse is not JSON serializable
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-08-2024 12:36 AM
any help on this topic?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
03-15-2024 07:19 AM
I just faced the same problem. The issue is that the when you do
JobSettings.as_dict()
the settings are parsed to a dict where all the values are also parsed recursively. When you pass the parameters as **params, the create method again tries to parse each parameter to a dict since it needs to create a full json body to make the API request.
The trick is to parse only the first level of JobSettings to dict (not recursively) and create the params dict.
Try this:
import os
import time
from databricks.sdk import WorkspaceClient
from databricks.sdk.service import jobs
from databricks.sdk.service.jobs import JobSettings
w = WorkspaceClient()
notebook_path = f'/Users/{w.current_user.me().user_name}/sdk-{time.time_ns()}'
cluster_id = w.clusters.ensure_cluster_is_running(
os.environ["DATABRICKS_CLUSTER_ID"]) and os.environ["DATABRICKS_CLUSTER_ID"]
params = {
"name":f'sdk-{time.time_ns()}',
"tasks": [jobs.Task(description="test",
existing_cluster_id=cluster_id,
notebook_task=jobs.NotebookTask(notebook_path=notebook_path),
task_key="test",
timeout_seconds=0)],
'email_notifications': jobs.JobEmailNotifications(
no_alert_for_skipped_runs=True,
on_failure= ['some@email.com']
),
}
settings = JobSettings(
**params
)
# This only creates a dictionary but preserving the types of the values.
# Go to https://github.com/databricks/databricks-sdk-py/blob/33b209bdbaca4a956c7375fd65cbf8d6c3b4d95d/databricks/sdk/service/jobs.py and check that some settings has it's own classes (types) with its own method .to_dict()
create_params = {k:getattr(settings,k) for k in settings.as_dict().keys()}
created_job = w.jobs.create(**params) # this works
created_job = w.jobs.create(**settings.as_dict()) # this does not
created_job = w.jobs.create(**create_params) # this does

