โ06-23-2021 08:49 PM
โ06-23-2021 08:54 PM
Courtesy of my colleague Sri, here's some sample library code to execute on a databricks cluster with a short SLA
import logging
import textwrap
import time
from typing import Text
from databricks_cli.sdk import ApiClient, ClusterService
# Create a custom logging
logging.basicConfig(format='%(asctime)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S')
logging.getLogger().setLevel(logging.INFO)
class ClusterExecutionContext(object):
def __init__(self, api_client_1_2: ApiClient, cluster_service: ClusterService, cluster_id: Text, language: Text, context_id: Text = None):
self.context_id = context_id
self.language = language
self.cluster_id = cluster_id
self.api_client_1_2 = api_client_1_2
self.cluster_service = cluster_service
def __enter__(self):
logging.info("Creating Cluster Execution Context")
self._create_context()
self.wait_for_context_ready()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
logging.info("Exiting Cluster Execution Context Deleting Context")
self._delete_context()
def _verify_cluster_running_state(self):
cluster_resp = self.cluster_service.get_cluster(self.cluster_id)
if cluster_resp["state"] != "RUNNING":
raise ValueError("State is not in Running cannot generate context!")
def _verify_context_exists(self):
if self.context_id is None or len(self.context_id) <= 0:
raise ValueError("Context ID does not exist.")
def _create_context(self):
self._verify_cluster_running_state()
data = {"language": self.language, "clusterId": self.cluster_id}
resp = self.api_client_1_2.perform_query("POST", "/contexts/create", data)
self.context_id = resp["id"]
logging.debug(resp)
return resp
def _read_context(self):
self._verify_cluster_running_state()
self._verify_context_exists()
data = {"contextId": self.context_id, "clusterId": self.cluster_id}
resp = self.api_client_1_2.perform_query("GET", "/contexts/status", data)
logging.debug(resp)
return resp
def wait_for_context_ready(self):
self._verify_cluster_running_state()
self._verify_context_exists()
# Will attempt 5 tries with a 10 second delay
for i in range(100):
context = self._read_context()
status = context["status"]
logging.debug("Context is in a {} state...".format(status))
if status == "Running":
logging.info("Context is in running state ready to run commands!")
return
time.sleep(0.5)
raise Exception("Context failed to be created after 50 seconds")
def run_command(self, command_string, timeout_seconds=10):
with ClusterCommandExecutionContext(cluster_execution_context=self, command_string=command_string,
timeout_seconds=timeout_seconds) as ccec:
return ccec
def _delete_context(self):
self._verify_cluster_running_state()
self._verify_context_exists()
data = {"contextId": self.context_id, "clusterId": self.cluster_id}
resp = self.api_client_1_2.perform_query("POST", "/contexts/destroy", data)
return resp
class ClusterCommandExecutionContext(object):
def __init__(self, cluster_execution_context: ClusterExecutionContext, command_string: Text, timeout_seconds: int, command_id: Text=None):
# assumes to be a running context
self.command_id = command_id
self.timeout_seconds = timeout_seconds
self.command_string = command_string
self.cluster_execution_context = cluster_execution_context
def __enter__(self):
self._create_command()
try:
self._wait_for_command_finish()
return self._read_command()
except TimeoutError as te:
logging.info("Command Timed out: {}".format(te))
self._delete_command()
self._wait_for_command_finish()
logging.info("Command Canceled Successfully!")
return None
def __exit__(self, exc_type, exc_val, exc_tb):
logging.debug("Exiting Command Execution Context")
def _verify_cluster_running_state(self):
cluster_resp = self.cluster_execution_context.cluster_service.get_cluster(self.cluster_execution_context.cluster_id)
if cluster_resp["state"] != "RUNNING":
raise ValueError("State is not in Running cannot generate context!")
def _verify_command_exists(self):
if self.command_id is None or len(self.command_id) <= 0:
raise ValueError("Command ID does not exist.")
def _create_command(self):
self._verify_cluster_running_state()
data = {"language": self.cluster_execution_context.language,
"clusterId": self.cluster_execution_context.cluster_id,
"contextId": self.cluster_execution_context.context_id,
"command": self.command_string}
resp = self.cluster_execution_context.api_client_1_2.perform_query("POST", "/commands/execute", data)
self.command_id = resp["id"]
logging.debug(resp)
return resp
def _read_command(self):
self._verify_cluster_running_state()
self._verify_command_exists()
data = {
"clusterId": self.cluster_execution_context.cluster_id,
"contextId": self.cluster_execution_context.context_id,
"commandId": self.command_id}
resp = self.cluster_execution_context.api_client_1_2.perform_query("GET", "/commands/status", data)
logging.debug(resp)
return resp
def _wait_for_command_finish(self):
self._verify_cluster_running_state()
self._verify_command_exists()
# Will attempt 5 tries with a 10 second delay
end_states = ["Finished", "Cancelled", "Error"]
for i in range(self.timeout_seconds):
command = self._read_command()
status = command["status"]
if i % 10 == 0:
logging.debug("Command is in a {} state...".format(status))
if status in end_states:
logging.debug("Command is in a terminal state: {}".format(status))
return
time.sleep(0.5)
raise TimeoutError("Command timed out after: {} seconds".format(self.timeout_seconds))
def _delete_command(self):
self._verify_cluster_running_state()
data = {"clusterId": self.cluster_execution_context.cluster_id,
"contextId": self.cluster_execution_context.context_id,
"commandId": self.command_id}
resp = self.cluster_execution_context.api_client_1_2.perform_query("POST", "/commands/cancel", data)
return resp
To use code above use the following
host = ""
token = ""
cluster = ""
language = "python"
command = """
import json
import time
import requests
# raise Exception("hello world")
display(spark.sql("SHOW TABLES IN default"))
"""
command2 = """
import json
import time
display(spark.sql("SELECT * FROM airlines_gold"))
"""
db_api_client = ApiClient(host=host, token=token, apiVersion="1.2")
cluster_service = ClusterService(ApiClient(host=host, token=token, apiVersion="2.0"))
with ClusterExecutionContext(api_client_1_2=db_api_client, cluster_service=cluster_service, cluster_id=cluster, language=language) as cec:
for i in range(10):
data = cec.run_command(textwrap.dedent(command), timeout_seconds=12)
error_is_true = True if data.get("results", {}).get("resultType", None) == "error" else False
if error_is_true:
raise Exception(data.get("results", {}).get("summary", None))
logging.info(data)
โ06-23-2021 08:54 PM
Courtesy of my colleague Sri, here's some sample library code to execute on a databricks cluster with a short SLA
import logging
import textwrap
import time
from typing import Text
from databricks_cli.sdk import ApiClient, ClusterService
# Create a custom logging
logging.basicConfig(format='%(asctime)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S')
logging.getLogger().setLevel(logging.INFO)
class ClusterExecutionContext(object):
def __init__(self, api_client_1_2: ApiClient, cluster_service: ClusterService, cluster_id: Text, language: Text, context_id: Text = None):
self.context_id = context_id
self.language = language
self.cluster_id = cluster_id
self.api_client_1_2 = api_client_1_2
self.cluster_service = cluster_service
def __enter__(self):
logging.info("Creating Cluster Execution Context")
self._create_context()
self.wait_for_context_ready()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
logging.info("Exiting Cluster Execution Context Deleting Context")
self._delete_context()
def _verify_cluster_running_state(self):
cluster_resp = self.cluster_service.get_cluster(self.cluster_id)
if cluster_resp["state"] != "RUNNING":
raise ValueError("State is not in Running cannot generate context!")
def _verify_context_exists(self):
if self.context_id is None or len(self.context_id) <= 0:
raise ValueError("Context ID does not exist.")
def _create_context(self):
self._verify_cluster_running_state()
data = {"language": self.language, "clusterId": self.cluster_id}
resp = self.api_client_1_2.perform_query("POST", "/contexts/create", data)
self.context_id = resp["id"]
logging.debug(resp)
return resp
def _read_context(self):
self._verify_cluster_running_state()
self._verify_context_exists()
data = {"contextId": self.context_id, "clusterId": self.cluster_id}
resp = self.api_client_1_2.perform_query("GET", "/contexts/status", data)
logging.debug(resp)
return resp
def wait_for_context_ready(self):
self._verify_cluster_running_state()
self._verify_context_exists()
# Will attempt 5 tries with a 10 second delay
for i in range(100):
context = self._read_context()
status = context["status"]
logging.debug("Context is in a {} state...".format(status))
if status == "Running":
logging.info("Context is in running state ready to run commands!")
return
time.sleep(0.5)
raise Exception("Context failed to be created after 50 seconds")
def run_command(self, command_string, timeout_seconds=10):
with ClusterCommandExecutionContext(cluster_execution_context=self, command_string=command_string,
timeout_seconds=timeout_seconds) as ccec:
return ccec
def _delete_context(self):
self._verify_cluster_running_state()
self._verify_context_exists()
data = {"contextId": self.context_id, "clusterId": self.cluster_id}
resp = self.api_client_1_2.perform_query("POST", "/contexts/destroy", data)
return resp
class ClusterCommandExecutionContext(object):
def __init__(self, cluster_execution_context: ClusterExecutionContext, command_string: Text, timeout_seconds: int, command_id: Text=None):
# assumes to be a running context
self.command_id = command_id
self.timeout_seconds = timeout_seconds
self.command_string = command_string
self.cluster_execution_context = cluster_execution_context
def __enter__(self):
self._create_command()
try:
self._wait_for_command_finish()
return self._read_command()
except TimeoutError as te:
logging.info("Command Timed out: {}".format(te))
self._delete_command()
self._wait_for_command_finish()
logging.info("Command Canceled Successfully!")
return None
def __exit__(self, exc_type, exc_val, exc_tb):
logging.debug("Exiting Command Execution Context")
def _verify_cluster_running_state(self):
cluster_resp = self.cluster_execution_context.cluster_service.get_cluster(self.cluster_execution_context.cluster_id)
if cluster_resp["state"] != "RUNNING":
raise ValueError("State is not in Running cannot generate context!")
def _verify_command_exists(self):
if self.command_id is None or len(self.command_id) <= 0:
raise ValueError("Command ID does not exist.")
def _create_command(self):
self._verify_cluster_running_state()
data = {"language": self.cluster_execution_context.language,
"clusterId": self.cluster_execution_context.cluster_id,
"contextId": self.cluster_execution_context.context_id,
"command": self.command_string}
resp = self.cluster_execution_context.api_client_1_2.perform_query("POST", "/commands/execute", data)
self.command_id = resp["id"]
logging.debug(resp)
return resp
def _read_command(self):
self._verify_cluster_running_state()
self._verify_command_exists()
data = {
"clusterId": self.cluster_execution_context.cluster_id,
"contextId": self.cluster_execution_context.context_id,
"commandId": self.command_id}
resp = self.cluster_execution_context.api_client_1_2.perform_query("GET", "/commands/status", data)
logging.debug(resp)
return resp
def _wait_for_command_finish(self):
self._verify_cluster_running_state()
self._verify_command_exists()
# Will attempt 5 tries with a 10 second delay
end_states = ["Finished", "Cancelled", "Error"]
for i in range(self.timeout_seconds):
command = self._read_command()
status = command["status"]
if i % 10 == 0:
logging.debug("Command is in a {} state...".format(status))
if status in end_states:
logging.debug("Command is in a terminal state: {}".format(status))
return
time.sleep(0.5)
raise TimeoutError("Command timed out after: {} seconds".format(self.timeout_seconds))
def _delete_command(self):
self._verify_cluster_running_state()
data = {"clusterId": self.cluster_execution_context.cluster_id,
"contextId": self.cluster_execution_context.context_id,
"commandId": self.command_id}
resp = self.cluster_execution_context.api_client_1_2.perform_query("POST", "/commands/cancel", data)
return resp
To use code above use the following
host = ""
token = ""
cluster = ""
language = "python"
command = """
import json
import time
import requests
# raise Exception("hello world")
display(spark.sql("SHOW TABLES IN default"))
"""
command2 = """
import json
import time
display(spark.sql("SELECT * FROM airlines_gold"))
"""
db_api_client = ApiClient(host=host, token=token, apiVersion="1.2")
cluster_service = ClusterService(ApiClient(host=host, token=token, apiVersion="2.0"))
with ClusterExecutionContext(api_client_1_2=db_api_client, cluster_service=cluster_service, cluster_id=cluster, language=language) as cec:
for i in range(10):
data = cec.run_command(textwrap.dedent(command), timeout_seconds=12)
error_is_true = True if data.get("results", {}).get("resultType", None) == "error" else False
if error_is_true:
raise Exception(data.get("results", {}).get("summary", None))
logging.info(data)
Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections.
Click here to register and join today!
Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.