โ06-23-2021 08:49 PM
โ06-23-2021 08:54 PM
Courtesy of my colleague Sri, here's some sample library code to execute on a databricks cluster with a short SLA
import logging
import textwrap
import time
from typing import Text
from databricks_cli.sdk import ApiClient, ClusterService
# Create a custom logging
logging.basicConfig(format='%(asctime)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S')
logging.getLogger().setLevel(logging.INFO)
class ClusterExecutionContext(object):
def __init__(self, api_client_1_2: ApiClient, cluster_service: ClusterService, cluster_id: Text, language: Text, context_id: Text = None):
self.context_id = context_id
self.language = language
self.cluster_id = cluster_id
self.api_client_1_2 = api_client_1_2
self.cluster_service = cluster_service
def __enter__(self):
logging.info("Creating Cluster Execution Context")
self._create_context()
self.wait_for_context_ready()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
logging.info("Exiting Cluster Execution Context Deleting Context")
self._delete_context()
def _verify_cluster_running_state(self):
cluster_resp = self.cluster_service.get_cluster(self.cluster_id)
if cluster_resp["state"] != "RUNNING":
raise ValueError("State is not in Running cannot generate context!")
def _verify_context_exists(self):
if self.context_id is None or len(self.context_id) <= 0:
raise ValueError("Context ID does not exist.")
def _create_context(self):
self._verify_cluster_running_state()
data = {"language": self.language, "clusterId": self.cluster_id}
resp = self.api_client_1_2.perform_query("POST", "/contexts/create", data)
self.context_id = resp["id"]
logging.debug(resp)
return resp
def _read_context(self):
self._verify_cluster_running_state()
self._verify_context_exists()
data = {"contextId": self.context_id, "clusterId": self.cluster_id}
resp = self.api_client_1_2.perform_query("GET", "/contexts/status", data)
logging.debug(resp)
return resp
def wait_for_context_ready(self):
self._verify_cluster_running_state()
self._verify_context_exists()
# Will attempt 5 tries with a 10 second delay
for i in range(100):
context = self._read_context()
status = context["status"]
logging.debug("Context is in a {} state...".format(status))
if status == "Running":
logging.info("Context is in running state ready to run commands!")
return
time.sleep(0.5)
raise Exception("Context failed to be created after 50 seconds")
def run_command(self, command_string, timeout_seconds=10):
with ClusterCommandExecutionContext(cluster_execution_context=self, command_string=command_string,
timeout_seconds=timeout_seconds) as ccec:
return ccec
def _delete_context(self):
self._verify_cluster_running_state()
self._verify_context_exists()
data = {"contextId": self.context_id, "clusterId": self.cluster_id}
resp = self.api_client_1_2.perform_query("POST", "/contexts/destroy", data)
return resp
class ClusterCommandExecutionContext(object):
def __init__(self, cluster_execution_context: ClusterExecutionContext, command_string: Text, timeout_seconds: int, command_id: Text=None):
# assumes to be a running context
self.command_id = command_id
self.timeout_seconds = timeout_seconds
self.command_string = command_string
self.cluster_execution_context = cluster_execution_context
def __enter__(self):
self._create_command()
try:
self._wait_for_command_finish()
return self._read_command()
except TimeoutError as te:
logging.info("Command Timed out: {}".format(te))
self._delete_command()
self._wait_for_command_finish()
logging.info("Command Canceled Successfully!")
return None
def __exit__(self, exc_type, exc_val, exc_tb):
logging.debug("Exiting Command Execution Context")
def _verify_cluster_running_state(self):
cluster_resp = self.cluster_execution_context.cluster_service.get_cluster(self.cluster_execution_context.cluster_id)
if cluster_resp["state"] != "RUNNING":
raise ValueError("State is not in Running cannot generate context!")
def _verify_command_exists(self):
if self.command_id is None or len(self.command_id) <= 0:
raise ValueError("Command ID does not exist.")
def _create_command(self):
self._verify_cluster_running_state()
data = {"language": self.cluster_execution_context.language,
"clusterId": self.cluster_execution_context.cluster_id,
"contextId": self.cluster_execution_context.context_id,
"command": self.command_string}
resp = self.cluster_execution_context.api_client_1_2.perform_query("POST", "/commands/execute", data)
self.command_id = resp["id"]
logging.debug(resp)
return resp
def _read_command(self):
self._verify_cluster_running_state()
self._verify_command_exists()
data = {
"clusterId": self.cluster_execution_context.cluster_id,
"contextId": self.cluster_execution_context.context_id,
"commandId": self.command_id}
resp = self.cluster_execution_context.api_client_1_2.perform_query("GET", "/commands/status", data)
logging.debug(resp)
return resp
def _wait_for_command_finish(self):
self._verify_cluster_running_state()
self._verify_command_exists()
# Will attempt 5 tries with a 10 second delay
end_states = ["Finished", "Cancelled", "Error"]
for i in range(self.timeout_seconds):
command = self._read_command()
status = command["status"]
if i % 10 == 0:
logging.debug("Command is in a {} state...".format(status))
if status in end_states:
logging.debug("Command is in a terminal state: {}".format(status))
return
time.sleep(0.5)
raise TimeoutError("Command timed out after: {} seconds".format(self.timeout_seconds))
def _delete_command(self):
self._verify_cluster_running_state()
data = {"clusterId": self.cluster_execution_context.cluster_id,
"contextId": self.cluster_execution_context.context_id,
"commandId": self.command_id}
resp = self.cluster_execution_context.api_client_1_2.perform_query("POST", "/commands/cancel", data)
return resp
To use code above use the following
host = ""
token = ""
cluster = ""
language = "python"
command = """
import json
import time
import requests
# raise Exception("hello world")
display(spark.sql("SHOW TABLES IN default"))
"""
command2 = """
import json
import time
display(spark.sql("SELECT * FROM airlines_gold"))
"""
db_api_client = ApiClient(host=host, token=token, apiVersion="1.2")
cluster_service = ClusterService(ApiClient(host=host, token=token, apiVersion="2.0"))
with ClusterExecutionContext(api_client_1_2=db_api_client, cluster_service=cluster_service, cluster_id=cluster, language=language) as cec:
for i in range(10):
data = cec.run_command(textwrap.dedent(command), timeout_seconds=12)
error_is_true = True if data.get("results", {}).get("resultType", None) == "error" else False
if error_is_true:
raise Exception(data.get("results", {}).get("summary", None))
logging.info(data)
โ06-23-2021 08:54 PM
Courtesy of my colleague Sri, here's some sample library code to execute on a databricks cluster with a short SLA
import logging
import textwrap
import time
from typing import Text
from databricks_cli.sdk import ApiClient, ClusterService
# Create a custom logging
logging.basicConfig(format='%(asctime)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S')
logging.getLogger().setLevel(logging.INFO)
class ClusterExecutionContext(object):
def __init__(self, api_client_1_2: ApiClient, cluster_service: ClusterService, cluster_id: Text, language: Text, context_id: Text = None):
self.context_id = context_id
self.language = language
self.cluster_id = cluster_id
self.api_client_1_2 = api_client_1_2
self.cluster_service = cluster_service
def __enter__(self):
logging.info("Creating Cluster Execution Context")
self._create_context()
self.wait_for_context_ready()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
logging.info("Exiting Cluster Execution Context Deleting Context")
self._delete_context()
def _verify_cluster_running_state(self):
cluster_resp = self.cluster_service.get_cluster(self.cluster_id)
if cluster_resp["state"] != "RUNNING":
raise ValueError("State is not in Running cannot generate context!")
def _verify_context_exists(self):
if self.context_id is None or len(self.context_id) <= 0:
raise ValueError("Context ID does not exist.")
def _create_context(self):
self._verify_cluster_running_state()
data = {"language": self.language, "clusterId": self.cluster_id}
resp = self.api_client_1_2.perform_query("POST", "/contexts/create", data)
self.context_id = resp["id"]
logging.debug(resp)
return resp
def _read_context(self):
self._verify_cluster_running_state()
self._verify_context_exists()
data = {"contextId": self.context_id, "clusterId": self.cluster_id}
resp = self.api_client_1_2.perform_query("GET", "/contexts/status", data)
logging.debug(resp)
return resp
def wait_for_context_ready(self):
self._verify_cluster_running_state()
self._verify_context_exists()
# Will attempt 5 tries with a 10 second delay
for i in range(100):
context = self._read_context()
status = context["status"]
logging.debug("Context is in a {} state...".format(status))
if status == "Running":
logging.info("Context is in running state ready to run commands!")
return
time.sleep(0.5)
raise Exception("Context failed to be created after 50 seconds")
def run_command(self, command_string, timeout_seconds=10):
with ClusterCommandExecutionContext(cluster_execution_context=self, command_string=command_string,
timeout_seconds=timeout_seconds) as ccec:
return ccec
def _delete_context(self):
self._verify_cluster_running_state()
self._verify_context_exists()
data = {"contextId": self.context_id, "clusterId": self.cluster_id}
resp = self.api_client_1_2.perform_query("POST", "/contexts/destroy", data)
return resp
class ClusterCommandExecutionContext(object):
def __init__(self, cluster_execution_context: ClusterExecutionContext, command_string: Text, timeout_seconds: int, command_id: Text=None):
# assumes to be a running context
self.command_id = command_id
self.timeout_seconds = timeout_seconds
self.command_string = command_string
self.cluster_execution_context = cluster_execution_context
def __enter__(self):
self._create_command()
try:
self._wait_for_command_finish()
return self._read_command()
except TimeoutError as te:
logging.info("Command Timed out: {}".format(te))
self._delete_command()
self._wait_for_command_finish()
logging.info("Command Canceled Successfully!")
return None
def __exit__(self, exc_type, exc_val, exc_tb):
logging.debug("Exiting Command Execution Context")
def _verify_cluster_running_state(self):
cluster_resp = self.cluster_execution_context.cluster_service.get_cluster(self.cluster_execution_context.cluster_id)
if cluster_resp["state"] != "RUNNING":
raise ValueError("State is not in Running cannot generate context!")
def _verify_command_exists(self):
if self.command_id is None or len(self.command_id) <= 0:
raise ValueError("Command ID does not exist.")
def _create_command(self):
self._verify_cluster_running_state()
data = {"language": self.cluster_execution_context.language,
"clusterId": self.cluster_execution_context.cluster_id,
"contextId": self.cluster_execution_context.context_id,
"command": self.command_string}
resp = self.cluster_execution_context.api_client_1_2.perform_query("POST", "/commands/execute", data)
self.command_id = resp["id"]
logging.debug(resp)
return resp
def _read_command(self):
self._verify_cluster_running_state()
self._verify_command_exists()
data = {
"clusterId": self.cluster_execution_context.cluster_id,
"contextId": self.cluster_execution_context.context_id,
"commandId": self.command_id}
resp = self.cluster_execution_context.api_client_1_2.perform_query("GET", "/commands/status", data)
logging.debug(resp)
return resp
def _wait_for_command_finish(self):
self._verify_cluster_running_state()
self._verify_command_exists()
# Will attempt 5 tries with a 10 second delay
end_states = ["Finished", "Cancelled", "Error"]
for i in range(self.timeout_seconds):
command = self._read_command()
status = command["status"]
if i % 10 == 0:
logging.debug("Command is in a {} state...".format(status))
if status in end_states:
logging.debug("Command is in a terminal state: {}".format(status))
return
time.sleep(0.5)
raise TimeoutError("Command timed out after: {} seconds".format(self.timeout_seconds))
def _delete_command(self):
self._verify_cluster_running_state()
data = {"clusterId": self.cluster_execution_context.cluster_id,
"contextId": self.cluster_execution_context.context_id,
"commandId": self.command_id}
resp = self.cluster_execution_context.api_client_1_2.perform_query("POST", "/commands/cancel", data)
return resp
To use code above use the following
host = ""
token = ""
cluster = ""
language = "python"
command = """
import json
import time
import requests
# raise Exception("hello world")
display(spark.sql("SHOW TABLES IN default"))
"""
command2 = """
import json
import time
display(spark.sql("SELECT * FROM airlines_gold"))
"""
db_api_client = ApiClient(host=host, token=token, apiVersion="1.2")
cluster_service = ClusterService(ApiClient(host=host, token=token, apiVersion="2.0"))
with ClusterExecutionContext(api_client_1_2=db_api_client, cluster_service=cluster_service, cluster_id=cluster, language=language) as cec:
for i in range(10):
data = cec.run_command(textwrap.dedent(command), timeout_seconds=12)
error_is_true = True if data.get("results", {}).get("resultType", None) == "error" else False
if error_is_true:
raise Exception(data.get("results", {}).get("summary", None))
logging.info(data)
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group