REST API for Pipeline Events does not return all records
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
05-30-2024 12:14 PM
I'm using the REST API to retrieve Pipeline Events per the documentation:
https://docs.databricks.com/api/workspace/pipelines/listpipelineevents
I am able to retrieve some records but the API stops after a call or two. I verified the number of rows using the TVF "event_logs", which is over 300 records. The API consistently returns 34-35 before stopping, furthermore, I used the Databricks SDK to attempt the same thing, however, the results are the same (34-35) records.
https://databricks-sdk-py.readthedocs.io/en/latest/workspace/pipelines/pipelines.html
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
06-03-2024 11:12 AM
Thanks for responding,
I've investigated your suggestions, here are my findings:
Check the max_results parameter: Ensure that you’re not inadvertently limiting the number of results returned. The default value is 1000, but you can adjust it as needed. -- I've adjusted this over several runs. The results get very wonky when I have a hard set value, for example, if I put set "max_results=1000", I get an error message stating the maximum value can be only 250. If I set it to 100 (for example), sometimes the "display()" statements stop working altogether. I have to detach and reattach the compute cluster for it start working again. If I set it from 10 to 25, the results consistently retrieve, 35 rows.
Inspect the filter criteria: If you’re using any filters (such as level='INFO' or timestamp > 'TIMESTAMP'), review them to make sure they’re not unintentionally restricting the results. -- Yes I've tried the filters, this doesn't seem to make a difference. As a suggestion, I would strongly encourage a filter on the "update_id".
Pagination: The API response includes pagination tokens (next_page_token and prev_page_token). Make sure you’re handling these tokens correctly to retrieve all available events. If you’re not using them, you might be getting only the first page of results. -- Yes, I use "next_page_token" in my subsequent API calls. Depending on how I set my "max_results", for example "max_results=25", I get the original data pull, then I use the "next_page_token" to get the next set, which is 10. The second set doesn't have a "next_page_token"
Rate Limiting: Check if there’s any rate limiting or throttling applied to your API requests. Some APIs limit the number of requests per minute or hour. -- I don't receive any rate limiting error. The API continues to call until it receives no response, I can even do it manually, so I don't believe this is an issue
Error Handling: Inspect the response for any error messages or status codes. It’s possible that an error is occurring during the API call. -- I've checked all the error messages and status codes that return, I do not receive any errors.
Currently, I'm trying to setup a very simple example for the API call issue and the SDK to upload.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
06-05-2024 09:39 AM
I've attached some screenshots of the API call. It shows "59" records (Event Log API1.png) retrieved and a populated "next_page_token" however, when I pull the next set of data using the "next_page_token", the result set is empty(Event Log API2.png). Meanwhile, the SQL result from "event_log()" shows over 322 records(SQL event_log results.png).
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
09-16-2024 04:09 AM
You can leverage this code base. It works as expected using "next_page_token" parameter-
Don't forget to mark this solution as correct if this helped you 🙃
import requests
token = 'your token'
url = 'your URL'
params = {'expand_tasks': 'true'}
header = {'Authorization': f'Bearer {token}'}
while True:
response = requests.get(url, headers=header, params=params)
response_data = response.json()
jobs = response_data.get("jobs", [])
for job in jobs:
settings = job.get('settings')
task = settings.get('tasks')
if task and task[0].get('existing_cluster_id'):
job_name = settings.get('name')
job_creator = job.get('creator_user_name')
print(f'job creator name= {job_creator} & job name= {job_name}')
else:
print(f"{settings.get('name')} not running on ACL")
next_page_token = response_data.get('next_page_token')
if not next_page_token:
break
params['page_token'] = next_page_token

