I have a simple notebook reading a dataframe as input and returning another dataframe, which is as follows:
from pyspark.sql import SparkSession
import pandas as pd, json
spark = SparkSession.builder \
.appName("Pandas to Spark DataFrame Conversion") \
.getOrCreate()
dbutils.widgets.text("dataframe_json", "")
dataframe_json = dbutils.widgets.get("dataframe_json")
user_dataframe = pd.read_json(dataframe_json, orient='split')
res = None
def process_dataframe(df😞
# Example transformation
spark_df = spark.createDataFrame(df)
spark_df = spark_df.withColumn("new_column", spark_df[spark_df.columns[0]] * 2)
return spark_df
try:
user_dataframe = pd.read_json(dataframe_json, orient='split')
print("DataFrame reconstructed successfully:")
print('1')
except ValueError as e:
print("Failed to reconstruct DataFrame:", e)
res = {"error": str(e)}
if not res is None:
dbutils.notebook.exit(res)
try:
result = process_dataframe(user_dataframe)
result_json = [row.asDict() for row in result.collect()]
res = json.dumps({"result": result_json})
except Exception as e:
res = json.dumps({"error": str(e)})
dbutils.notebook.exit(res)
The notebook is run by a job that takes a dataframe as input, passes it to the notebook - no problem so far-. I run the notebook via an API call as follows:
def run_job():
import requests
import json
import pandas as pd, time
df = pd.DataFrame({
'Column1': [1, 2, 3],
'Column2': ['D', 'E', 'C']})
# Serialize the DataFrame to JSON
df_json = df.to_json(orient='split')
# Configuration
DATABRICKS_INSTANCE = "https://adb-my_session.azuredatabricks.net"
TOKEN = "my_token"
CLUSTER_ID = "my_cluser"
# Headers for authentication
headers = {
"Authorization": f"Bearer {TOKEN}",
"Content-Type": "application/json"
}
job_payload = {
"run_name": "Run Notebook via API",
"existing_cluster_id": CLUSTER_ID,
'job_id': 'my_job_id',
"job_parameters": {'dataframe_json': df_json}
}
response = requests.post(
f"{DATABRICKS_INSTANCE}/api/2.2/jobs/run-now",
headers=headers,
data=json.dumps(job_payload),
verify = False
)
if response.status_code == 200:
run_id = response.json()["run_id"]
# Wait for the run to complete
while True:
response = requests.get(
f"{DATABRICKS_INSTANCE}/api/2.2/jobs/runs/get",
headers=headers,
params={"run_id": run_id, 'include_output': 'true'},
verify=False)
output_format = 'JSON'
body = {
"run_id": run_id,
"output_format": output_format
}
response = requests.get(
f"{DATABRICKS_INSTANCE}/api/2.2/jobs/runs/get-output",
headers=headers,
json=body,
verify=False
)
if response.status_code == 200:
state = response.json()["state"]["life_cycle_state"]
if state == "TERMINATED":
output = response.json()['output']
if "result" in output:
print("Result:", output["result"])
elif "error" in output:
print("Error:", output["error"])
break
elif state == "INTERNAL_ERROR":
print("Internal error:", response.json()["state"]["state_message"])
break
time.sleep(10)
else:
print("Error submitting job:", response.text)
The run-now entry point returns HTTP 200 works.
The get entry point returns HTTP 200 and works and returns among other: 'status': {'state': 'TERMINATED', 'termination_details': {'code': 'SUCCESS', 'type': 'SUCCESS', 'message': ''}}
The get-output entry point returns HTTP 400 with the following content:
{'error_code': 'INVALID_PARAMETER_VALUE', 'message': 'Retrieving the output of runs with multiple tasks is not supported. Please retrieve the output of each individual task run instead.', 'details': [{'@type': 'type.googleapis.com/google.rpc.RequestInfo', 'request_id': '512da231-ef1b-4d82-bd7e-8dae326a6356', 'serving_data': ''}]}