Can i send multiple post requests to an API endpoint and get the info if all succeeded ?

Layer
New Contributor

Hello I am trying to send multiple post requests to an endpoint, i have a spark dataframe and each column of this dataframe is sent through the payload of the post request.

However when i run this in my notebook, no exception is raised. I'm guessing it is because the requests are executed on spark workers

That's why i wonder how can i get the info if a post request returned a HTTP error 422 or 502 ?

Here's something i tried but with no result

url = "string"
headers = {
    'header': 'XXXXX'
}

def process_partition(partition):
    partition_errors = [] 
    
    for row in partition:
        payload = {
            "col1": str(row["col1"]),
        }
        try:
            response = requests.post(url, json=payload, headers=headers)
            response.raise_for_status()
        except requests.exceptions.HTTPError as err:
            partition_errors.append({"error": str(err), "data": row.asDict()})
        except Exception as e:
            partition_errors.append({"error": str(e), "data": row.asDict()})
    
    return partition_errors

def log_errors(errors):
    if errors:
        for error in errors:
            print(f"Error: {error['error']}, Data: {error['data']}")
    else:
        print("All data has been sent.")

all_errors = []

def collect_partition_errors(partition):
    errors = process_partition(partition)
    all_errors.extend(errors) 

df_websocket_data.foreachPartition(collect_partition_errors)

log_errors(all_errors)

 

cgrant
Databricks Employee
Databricks Employee

The return type for foreachPartition is None, so this is expected. If you're looking to do arbitrary code execution and return a result, mapInPandas or Pandas UDFs are good choices - you'd want to combine those with something like a .toLocalIterator call to interact with results in Python