cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

REST API for Pipeline Events does not return all records

JUPin
New Contributor II

I'm using the REST API to retrieve Pipeline Events per the documentation:
https://docs.databricks.com/api/workspace/pipelines/listpipelineevents

I am able to retrieve some records but the API stops after a call or two.  I verified the number of rows using the TVF "event_logs", which is over 300 records.  The API consistently returns 34-35 before stopping, furthermore, I used the Databricks SDK to attempt the same thing, however, the results are the same (34-35) records.

https://databricks-sdk-py.readthedocs.io/en/latest/workspace/pipelines/pipelines.html

3 REPLIES 3

JUPin
New Contributor II

Thanks for responding,

I've investigated your suggestions, here are my findings:

  1. Check the max_results parameter: Ensure that you’re not inadvertently limiting the number of results returned. The default value is 1000, but you can adjust it as needed.  -- I've adjusted this over several runs.  The results get very wonky when I have a hard set value, for example, if I put set "max_results=1000", I get an error message stating the maximum value can be only 250.  If I set it to 100 (for example), sometimes the "display()" statements stop working altogether.  I have to detach and reattach the compute cluster for it start working again.  If I set it from 10 to 25, the results consistently retrieve, 35 rows.

  2. Inspect the filter criteria: If you’re using any filters (such as level='INFO' or timestamp > 'TIMESTAMP'), review them to make sure they’re not unintentionally restricting the results. -- Yes I've tried the filters, this doesn't seem to make a difference.  As a suggestion, I would strongly encourage a filter on the "update_id".

  3. Pagination: The API response includes pagination tokens (next_page_token and prev_page_token). Make sure you’re handling these tokens correctly to retrieve all available events. If you’re not using them, you might be getting only the first page of results. -- Yes, I use "next_page_token" in my subsequent API calls.  Depending on how I set my "max_results", for example "max_results=25", I get the original data pull, then I use the "next_page_token" to get the next set, which is 10.  The second set doesn't have a "next_page_token"

  4. Rate Limiting: Check if there’s any rate limiting or throttling applied to your API requests. Some APIs limit the number of requests per minute or hour. -- I don't receive any rate limiting error.  The API continues to call until it receives no response, I can even do it manually, so I don't believe this is an issue

  5. Error Handling: Inspect the response for any error messages or status codes. It’s possible that an error is occurring during the API call. -- I've checked all the error messages and status codes that return, I do not receive any errors.

Currently, I'm trying to setup a very simple example for the API call issue and the SDK to upload.

JUPin
New Contributor II

I've attached some screenshots of the API call.  It shows "59" records (Event Log API1.png) retrieved and a populated "next_page_token" however, when I pull the next set of data using the "next_page_token", the result set is empty(Event Log API2.png).  Meanwhile, the SQL result from "event_log()" shows over 322 records(SQL event_log results.png).

wise_owl
New Contributor III

You can leverage this code base. It works as expected using "next_page_token" parameter-

Don't forget to mark this solution as correct if this helped you 🙃

 

import requests

token = 'your token'
url = 'your URL'

params = {'expand_tasks': 'true'}
header = {'Authorization': f'Bearer {token}'}

while True:

    response = requests.get(url, headers=header, params=params)
    response_data = response.json()
    jobs = response_data.get("jobs", [])

    for job in jobs:

        settings = job.get('settings')
        task = settings.get('tasks')

        if task and task[0].get('existing_cluster_id'):
            job_name = settings.get('name')
            job_creator = job.get('creator_user_name')
            print(f'job creator name= {job_creator} & job name= {job_name}')
        else:
            print(f"{settings.get('name')} not running on ACL")

    next_page_token = response_data.get('next_page_token')
    if not next_page_token:
        break  

    params['page_token'] = next_page_token

 

 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group