cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Asynchronous API calls from Databricks Workflow job

pjv
New Contributor III

Hi all,

I have many API calls to run on a python Databricks notebook which I then run regularly on a Databricks Workflow job. When I test the following code on an all purpose cluster locally i.e. not via a job, it runs perfectly fine. However, when I run the same notebook on a job it does not work anymore. The calls are run sequentially instead of in parallel. Does anyone know why and what I can do to fix it?

Thank you!

Here is my code:

 

 

import asyncio
import requests

import nest_asyncio

nest_asyncio.apply()

async def with_threads():
def make_request(😞
response = requests.get('https://www.google.com')
return response
 
reqs = [asyncio.to_thread(make_request) for _ in range(0,20)] 

responses = await asyncio.gather(*reqs)
 
return responses
 
async_result = asyncio.run(with_threads())
 
PS: The request and loop is different in my original code and only used here to explain the problem.
2 REPLIES 2

mhiltner
Databricks Employee
Databricks Employee

Would you mind sharing the cluster setup for both cases? I'd make sure that databricks Runtime is the same for both and check the number of workers allocated in each cluster. 

pjv
New Contributor III

I actually got it too work though I do see that if I run two jobs of the same code in parallel the async execution time slows down. Do the number of workers of the cluster on which the parallel jobs are run effect the execution time of async calls of the jobs?

Here is the code that I got to run:

 

 

# Asynchronous function to fetch data from a given URL using aiohttp
async def fetch_data(session, url😞

async with session.get(url) as response:
 
return await response.json()
 
# Asynchronous main function
async def get_url_data(input_args😞

# List of URLs to fetch data from
urls = [get_api_url(input_arg) for input_arg in input_args]

headers = {'X-API-KEY': "<API_KEY>"}

# Create an aiohttp ClientSession for making asynchronous HTTP requests
async with aiohttp.ClientSession(headers=headers) as session:

# Create a list of tasks, where each task is a call to 'fetch_data' with a specific URL
tasks = [fetch_data(session, url) for url in urls]
 
# Use 'asyncio.gather()' to run the tasks concurrently and gather their results
results = await asyncio.gather(*tasks, return_exceptions=False)

# Print the results obtained from fetching data from each URL
return results

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group