โ01-03-2024 01:18 AM
Hey, I want to create a Job via the Python SDK with a JobSettings object.
import os
import time
from databricks.sdk import WorkspaceClient
from databricks.sdk.service import jobs
from databricks.sdk.service.jobs import JobSettings
w = WorkspaceClient()
notebook_path = f'/Users/{w.current_user.me().user_name}/sdk-{time.time_ns()}'
cluster_id = w.clusters.ensure_cluster_is_running(
os.environ["DATABRICKS_CLUSTER_ID"]) and os.environ["DATABRICKS_CLUSTER_ID"]
params = {
"name":f'sdk-{time.time_ns()}',
"tasks": [jobs.Task(description="test",
existing_cluster_id=cluster_id,
notebook_task=jobs.NotebookTask(notebook_path=notebook_path),
task_key="test",
timeout_seconds=0)],
'email_notifications': jobs.JobEmailNotifications(
no_alert_for_skipped_runs=True,
on_failure= ['some@email.com']
),
}
settings = JobSettings(
**params
)
created_job = w.jobs.create(**params) # this works
created_job = w.jobs.create(**settings.as_dict()) # this does not
the last create call leads to this error:
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
File <command-884331343802555>, line 48
33 params = {
34 "name":f'sdk-{time.time_ns()}',
35 "tasks": [jobs.Task(description="test",
(...)
43 ),
44 }
45 settings = JobSettings(
46 **params
47 )
---> 48 created_job = w.jobs.create(**settings.as_dict())
49 # cleanup
50 #w.jobs.delete(job_id=created_job.job_id)
File /databricks/python/lib/python3.10/site-packages/databricks/sdk/service/jobs.py:4739, in JobsAPI.create(self, access_control_list, compute, continuous, deployment, description, edit_mode, email_notifications, format, git_source, health, job_clusters, max_concurrent_runs, name, notification_settings, parameters, queue, run_as, schedule, tags, tasks, timeout_seconds, trigger, webhook_notifications)
4737 if description is not None: body['description'] = description
4738 if edit_mode is not None: body['edit_mode'] = edit_mode.value
-> 4739 if email_notifications is not None: body['email_notifications'] = email_notifications.as_dict()
4740 if format is not None: body['format'] = format.value
4741 if git_source is not None: body['git_source'] = git_source.as_dict()
AttributeError: 'dict' object has no attribute 'as_dict'
Do I do something wrong or is this a bug?
โ01-03-2024 02:02 AM
I also tried to first create the job and then reset the settings. But this did not work either.
My Code:
created_job = w.jobs.create(name=f'sdk-{time.time_ns()}',
tasks=[
jobs.Task(description="test",
existing_cluster_id=cluster_id,
notebook_task=jobs.NotebookTask(notebook_path=notebook_path),
task_key="test",
timeout_seconds=0)
],
)
params = {
"name":f'sdk-{time.time_ns()}',
"tasks": [jobs.Task(description="test",
existing_cluster_id=cluster_id,
notebook_task=jobs.NotebookTask(notebook_path=notebook_path),
task_key="test",
timeout_seconds=0)],
'email_notifications': jobs.JobEmailNotifications(
no_alert_for_skipped_runs= True,
on_failure= ['some@email.de']
),
}
settings = JobSettings(
**params
)
w.jobs.reset(created_job, settings)
the error:
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
File <command-884331343802555>, line 47
32 params = {
33 "name":f'sdk-{time.time_ns()}',
34 "tasks": [jobs.Task(description="test",
(...)
42 ),
43 }
44 settings = JobSettings(
45 **params
46 )
---> 47 w.jobs.reset(created_job, settings)
49 #BaseJob.from_dict(settings.as_dict())
50 #created_job = w.jobs.create(**settings.as_dict())
51 # cleanup
52 #w.jobs.delete(job_id=created_job.job_id)
File /databricks/python/lib/python3.10/site-packages/databricks/sdk/service/jobs.py:5206, in JobsAPI.reset(self, job_id, new_settings)
5204 if new_settings is not None: body['new_settings'] = new_settings.as_dict()
5205 headers = {'Accept': 'application/json', 'Content-Type': 'application/json', }
-> 5206 self._api.do('POST', '/api/2.1/jobs/reset', body=body, headers=headers)
File /databricks/python/lib/python3.10/site-packages/databricks/sdk/core.py:1144, in ApiClient.do(self, method, path, query, headers, body, raw, files, data)
1141 headers['User-Agent'] = self._user_agent_base
1142 retryable = retried(timeout=timedelta(seconds=self._retry_timeout_seconds),
1143 is_retryable=self._is_retryable)
-> 1144 return retryable(self._perform)(method,
1145 path,
1146 query=query,
1147 headers=headers,
1148 body=body,
1149 raw=raw,
1150 files=files,
1151 data=data)
File /databricks/python/lib/python3.10/site-packages/databricks/sdk/retries.py:50, in retried.<locals>.decorator.<locals>.wrapper(*args, **kwargs)
46 retry_reason = f'{type(err).__name__} is allowed to retry'
48 if retry_reason is None:
49 # raise if exception is not retryable
---> 50 raise err
52 logger.debug(f'Retrying: {retry_reason} (sleeping ~{sleep}s)')
53 time.sleep(sleep + random())
File /databricks/python/lib/python3.10/site-packages/databricks/sdk/retries.py:29, in retried.<locals>.decorator.<locals>.wrapper(*args, **kwargs)
27 while time.time() < deadline:
28 try:
---> 29 return func(*args, **kwargs)
30 except Exception as err:
31 last_err = err
File /databricks/python/lib/python3.10/site-packages/databricks/sdk/core.py:1222, in ApiClient._perform(self, method, path, query, headers, body, raw, files, data)
1213 def _perform(self,
1214 method: str,
1215 path: str,
(...)
1220 files=None,
1221 data=None):
-> 1222 response = self._session.request(method,
1223 f"{self._cfg.host}{path}",
1224 params=self._fix_query_string(query),
1225 json=body,
1226 headers=headers,
1227 files=files,
1228 data=data,
1229 stream=raw)
1230 try:
1231 self._record_request_log(response, raw=raw or data is not None or files is not None)
File /databricks/python/lib/python3.10/site-packages/requests/sessions.py:573, in Session.request(self, method, url, params, data, headers, cookies, files, auth, timeout, allow_redirects, proxies, hooks, stream, verify, cert, json)
560 # Create the Request.
561 req = Request(
562 method=method.upper(),
563 url=url,
(...)
571 hooks=hooks,
572 )
--> 573 prep = self.prepare_request(req)
575 proxies = proxies or {}
577 settings = self.merge_environment_settings(
578 prep.url, proxies, stream, verify, cert
579 )
File /databricks/python/lib/python3.10/site-packages/requests/sessions.py:484, in Session.prepare_request(self, request)
481 auth = get_netrc_auth(request.url)
483 p = PreparedRequest()
--> 484 p.prepare(
485 method=request.method.upper(),
486 url=request.url,
487 files=request.files,
488 data=request.data,
489 json=request.json,
490 headers=merge_setting(
491 request.headers, self.headers, dict_class=CaseInsensitiveDict
492 ),
493 params=merge_setting(request.params, self.params),
494 auth=merge_setting(auth, self.auth),
495 cookies=merged_cookies,
496 hooks=merge_hooks(request.hooks, self.hooks),
497 )
498 return p
File /databricks/python/lib/python3.10/site-packages/requests/models.py:371, in PreparedRequest.prepare(self, method, url, headers, files, data, params, auth, cookies, hooks, json)
369 self.prepare_headers(headers)
370 self.prepare_cookies(cookies)
--> 371 self.prepare_body(data, files, json)
372 self.prepare_auth(auth, url)
374 # Note that prepare_auth must be last to enable authentication schemes
375 # such as OAuth to work on a fully prepared request.
376
377 # This MUST go after prepare_auth. Authenticators could add a hook
File /databricks/python/lib/python3.10/site-packages/requests/models.py:511, in PreparedRequest.prepare_body(self, data, files, json)
508 content_type = "application/json"
510 try:
--> 511 body = complexjson.dumps(json, allow_nan=False)
512 except ValueError as ve:
513 raise InvalidJSONError(ve, request=self)
File /usr/lib/python3.10/json/__init__.py:238, in dumps(obj, skipkeys, ensure_ascii, check_circular, allow_nan, cls, indent, separators, default, sort_keys, **kw)
232 if cls is None:
233 cls = JSONEncoder
234 return cls(
235 skipkeys=skipkeys, ensure_ascii=ensure_ascii,
236 check_circular=check_circular, allow_nan=allow_nan, indent=indent,
237 separators=separators, default=default, sort_keys=sort_keys,
--> 238 **kw).encode(obj)
File /usr/lib/python3.10/json/encoder.py:199, in JSONEncoder.encode(self, o)
195 return encode_basestring(o)
196 # This doesn't pass the iterator directly to ''.join() because the
197 # exceptions aren't as detailed. The list call should be roughly
198 # equivalent to the PySequence_Fast that ''.join() would do.
--> 199 chunks = self.iterencode(o, _one_shot=True)
200 if not isinstance(chunks, (list, tuple)):
201 chunks = list(chunks)
File /usr/lib/python3.10/json/encoder.py:257, in JSONEncoder.iterencode(self, o, _one_shot)
252 else:
253 _iterencode = _make_iterencode(
254 markers, self.default, _encoder, self.indent, floatstr,
255 self.key_separator, self.item_separator, self.sort_keys,
256 self.skipkeys, _one_shot)
--> 257 return _iterencode(o, 0)
File /usr/lib/python3.10/json/encoder.py:179, in JSONEncoder.default(self, o)
160 def default(self, o):
161 """Implement this method in a subclass such that it returns
162 a serializable object for ``o``, or calls the base implementation
163 (to raise a ``TypeError``).
(...)
177
178 """
--> 179 raise TypeError(f'Object of type {o.__class__.__name__} '
180 f'is not JSON serializable')
TypeError: Object of type CreateResponse is not JSON serializable
โ01-08-2024 12:36 AM
any help on this topic?
โ03-15-2024 07:19 AM
I just faced the same problem. The issue is that the when you do
JobSettings.as_dict()
the settings are parsed to a dict where all the values are also parsed recursively. When you pass the parameters as **params, the create method again tries to parse each parameter to a dict since it needs to create a full json body to make the API request.
The trick is to parse only the first level of JobSettings to dict (not recursively) and create the params dict.
Try this:
import os
import time
from databricks.sdk import WorkspaceClient
from databricks.sdk.service import jobs
from databricks.sdk.service.jobs import JobSettings
w = WorkspaceClient()
notebook_path = f'/Users/{w.current_user.me().user_name}/sdk-{time.time_ns()}'
cluster_id = w.clusters.ensure_cluster_is_running(
os.environ["DATABRICKS_CLUSTER_ID"]) and os.environ["DATABRICKS_CLUSTER_ID"]
params = {
"name":f'sdk-{time.time_ns()}',
"tasks": [jobs.Task(description="test",
existing_cluster_id=cluster_id,
notebook_task=jobs.NotebookTask(notebook_path=notebook_path),
task_key="test",
timeout_seconds=0)],
'email_notifications': jobs.JobEmailNotifications(
no_alert_for_skipped_runs=True,
on_failure= ['some@email.com']
),
}
settings = JobSettings(
**params
)
# This only creates a dictionary but preserving the types of the values.
# Go to https://github.com/databricks/databricks-sdk-py/blob/33b209bdbaca4a956c7375fd65cbf8d6c3b4d95d/databricks/sdk/service/jobs.py and check that some settings has it's own classes (types) with its own method .to_dict()
create_params = {k:getattr(settings,k) for k in settings.as_dict().keys()}
created_job = w.jobs.create(**params) # this works
created_job = w.jobs.create(**settings.as_dict()) # this does not
created_job = w.jobs.create(**create_params) # this does
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group