cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Asynchronous API calls from Databricks Workflow job

pjv
New Contributor II

Hi all,

I have many API calls to run on a python Databricks notebook which I then run regularly on a Databricks Workflow job. When I test the following code on an all purpose cluster locally i.e. not via a job, it runs perfectly fine. However, when I run the same notebook on a job it does not work anymore. The calls are run sequentially instead of in parallel. Does anyone know why and what I can do to fix it?

Thank you!

Here is my code:

 

 

import asyncio
import requests

import nest_asyncio

nest_asyncio.apply()

async def with_threads():
def make_request(😞
response = requests.get('https://www.google.com')
return response
 
reqs = [asyncio.to_thread(make_request) for _ in range(0,20)] 

responses = await asyncio.gather(*reqs)
 
return responses
 
async_result = asyncio.run(with_threads())
 
PS: The request and loop is different in my original code and only used here to explain the problem.
2 REPLIES 2

mhiltner
New Contributor III
New Contributor III

Would you mind sharing the cluster setup for both cases? I'd make sure that databricks Runtime is the same for both and check the number of workers allocated in each cluster. 

pjv
New Contributor II

I actually got it too work though I do see that if I run two jobs of the same code in parallel the async execution time slows down. Do the number of workers of the cluster on which the parallel jobs are run effect the execution time of async calls of the jobs?

Here is the code that I got to run:

 

 

# Asynchronous function to fetch data from a given URL using aiohttp
async def fetch_data(session, url😞

async with session.get(url) as response:
 
return await response.json()
 
# Asynchronous main function
async def get_url_data(input_args😞

# List of URLs to fetch data from
urls = [get_api_url(input_arg) for input_arg in input_args]

headers = {'X-API-KEY': "<API_KEY>"}

# Create an aiohttp ClientSession for making asynchronous HTTP requests
async with aiohttp.ClientSession(headers=headers) as session:

# Create a list of tasks, where each task is a call to 'fetch_data' with a specific URL
tasks = [fetch_data(session, url) for url in urls]
 
# Use 'asyncio.gather()' to run the tasks concurrently and gather their results
results = await asyncio.gather(*tasks, return_exceptions=False)

# Print the results obtained from fetching data from each URL
return results
Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!