Get Job Run output through Rest API call

Alex79
New Contributor II

I have a simple notebook reading a dataframe as input and returning another dataframe, which is as follows:


from
pyspark.sql import SparkSession
import pandas as pd, json

spark = SparkSession.builder \
    .appName("Pandas to Spark DataFrame Conversion") \
    .getOrCreate()

dbutils.widgets.text("dataframe_json", "")
dataframe_json = dbutils.widgets.get("dataframe_json")
user_dataframe = pd.read_json(dataframe_json, orient='split')

res = None

def process_dataframe(df😞
    # Example transformation
    spark_df = spark.createDataFrame(df)
    spark_df = spark_df.withColumn("new_column", spark_df[spark_df.columns[0]] * 2)
    return spark_df  

try:
    user_dataframe = pd.read_json(dataframe_json, orient='split')  
    print("DataFrame reconstructed successfully:")
    print('1')
except ValueError as e:
    print("Failed to reconstruct DataFrame:", e)
    res = {"error": str(e)}

if not res is None:
    dbutils.notebook.exit(res)
   
try:
    result = process_dataframe(user_dataframe)
    result_json = [row.asDict() for row in result.collect()]
    res = json.dumps({"result": result_json})
   
except Exception as e:
    res = json.dumps({"error": str(e)})

dbutils.notebook.exit(res)

The notebook is run by a job that takes a dataframe as input, passes it to the notebook - no problem so far-. I run the notebook via an API call as follows:

def run_job():
import requests
import json
import pandas as pd, time

df = pd.DataFrame({
'Column1': [1, 2, 3],
'Column2': ['D', 'E', 'C']})

# Serialize the DataFrame to JSON
df_json = df.to_json(orient='split')

# Configuration
DATABRICKS_INSTANCE = "https://adb-my_session.azuredatabricks.net"
TOKEN = "my_token"
CLUSTER_ID = "my_cluser" 

# Headers for authentication
headers = {
"Authorization": f"Bearer {TOKEN}",
"Content-Type": "application/json"
}

job_payload = {
"run_name": "Run Notebook via API",
"existing_cluster_id": CLUSTER_ID,
'job_id': 'my_job_id',
"job_parameters": {'dataframe_json': df_json}

}

response = requests.post(
f"{DATABRICKS_INSTANCE}/api/2.2/jobs/run-now",
headers=headers,
data=json.dumps(job_payload),
verify = False
)

if response.status_code == 200:
run_id = response.json()["run_id"]
# Wait for the run to complete
while True:
response = requests.get(
f"{DATABRICKS_INSTANCE}/api/2.2/jobs/runs/get",
headers=headers,
params={"run_id": run_id, 'include_output': 'true'},
verify=False)

output_format = 'JSON'
body = {
"run_id": run_id,
"output_format": output_format
}

response = requests.get(
f"{DATABRICKS_INSTANCE}/api/2.2/jobs/runs/get-output",
headers=headers,
json=body,
verify=False
)
if response.status_code == 200:
state = response.json()["state"]["life_cycle_state"]
if state == "TERMINATED":
output = response.json()['output']
if "result" in output:
print("Result:", output["result"])
elif "error" in output:
print("Error:", output["error"])
break
elif state == "INTERNAL_ERROR":
print("Internal error:", response.json()["state"]["state_message"])
break
time.sleep(10)
else:
print("Error submitting job:", response.text)


The run-now entry point returns HTTP 200 works.
The get entry point returns HTTP 200 and works and returns among other: 'status': {'state': 'TERMINATED', 'termination_details': {'code': 'SUCCESS', 'type': 'SUCCESS', 'message': ''}}
The get-output entry point returns HTTP 400 with the following content:
{'error_code': 'INVALID_PARAMETER_VALUE', 'message': 'Retrieving the output of runs with multiple tasks is not supported. Please retrieve the output of each individual task run instead.', 'details': [{'@type': 'type.googleapis.com/google.rpc.RequestInfo', 'request_id': '512da231-ef1b-4d82-bd7e-8dae326a6356', 'serving_data': ''}]}


Vidhi_Khaitan
Databricks Employee
Databricks Employee

Hi team,

{
"error_code": "INVALID_PARAMETER_VALUE",
"message": "Retrieving the output of runs with multiple tasks is not supported..."
}
means the job you're triggering (job_id = 'my_job_id') is a multi-task job (even if it has only one task). In such cases, jobs/runs/get-output only works on task-level run IDs, not the top-level job run ID.

Solution:
1. To get the output from a specific task within the job: Get the run_id from the jobs/run-now response.
2. Call /api/2.1/jobs/runs/get with that run_id.

 

Thanks Vidhi. Attached is the response I get when using api/2.1/jobs/get:

I still don't get the result from the notebooko processing, whereas clearly, it worked (see second capture 'output').

How / where can I get the result back?.

Btw, is it expected that I cannot do a copy and paste into these messages? This is super complicated to provide the relevant information without this functionality.

Capture.JPG

Output.JPG