Clarification Needed: Ensuring Correct Pagination with Offset and Limit in PySpark
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
04-04-2024 09:42 AM
Hi community,
I hope you're all doing well. I'm currently engaged in a PySpark project where I'm implementing pagination-like functionality using the offset and limit functions. My aim is to retrieve data between a specified starting_index and ending_index without computing the entire dataset in memory.
Here's how I'm currently using these functions:
sliced_df = df.offset(starting_index).limit(ending_index - starting_index)
However, I'm uncertain whether this approach provides reliable results, especially considering partitioned DataFrames. The documentation doesn't offer clear guidance on how these functions behave under such circumstances.
Could someone kindly address the following questions:
- Can I trust that the offset and limit functions will consistently return data between the specified starting_index and ending_index?
- How do these functions behave when applied to partitioned DataFrames?
- Are there any best practices or considerations to ensure correct pagination when using offset and limit, particularly with partitioned DataFrames?
- Is there a recommended approach that balances speed and efficiency without computing the complete dataset in memory?
Additionally, I'd like to mention that I am using db-connect Spark session for this project.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
09-16-2024 04:08 AM
You can leverage this code base. It works as expected using "next_page_token" parameter-
Don't forget to mark this solution as correct if this helped you 🙃
import requests
token = 'your token'
url = 'your URL'
params = {'expand_tasks': 'true'}
header = {'Authorization': f'Bearer {token}'}
while True:
response = requests.get(url, headers=header, params=params)
response_data = response.json()
jobs = response_data.get("jobs", [])
for job in jobs:
settings = job.get('settings')
task = settings.get('tasks')
if task and task[0].get('existing_cluster_id'):
job_name = settings.get('name')
job_creator = job.get('creator_user_name')
print(f'job creator name= {job_creator} & job name= {job_name}')
else:
print(f"{settings.get('name')} not running on ACL")
next_page_token = response_data.get('next_page_token')
if not next_page_token:
break
params['page_token'] = next_page_token

