cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Is there a way to execute code on a databricks cluster with a shorter duration than a Notebook which has some fixed startup time

aladda
Honored Contributor II
Honored Contributor II
 
1 ACCEPTED SOLUTION

Accepted Solutions

aladda
Honored Contributor II
Honored Contributor II

Courtesy of my colleague Sri, here's some sample library code to execute on a databricks cluster with a short SLA

import logging
import textwrap
import time
from typing import Text
from databricks_cli.sdk import ApiClient, ClusterService
# Create a custom logging
logging.basicConfig(format='%(asctime)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S')
logging.getLogger().setLevel(logging.INFO)
class ClusterExecutionContext(object):
    def __init__(self, api_client_1_2: ApiClient, cluster_service: ClusterService, cluster_id: Text, language: Text, context_id: Text = None):
        self.context_id = context_id
        self.language = language
        self.cluster_id = cluster_id
        self.api_client_1_2 = api_client_1_2
        self.cluster_service = cluster_service
    def __enter__(self):
        logging.info("Creating Cluster Execution Context")
        self._create_context()
        self.wait_for_context_ready()
        return self
    def __exit__(self, exc_type, exc_val, exc_tb):
        logging.info("Exiting Cluster Execution Context Deleting Context")
        self._delete_context()
    def _verify_cluster_running_state(self):
        cluster_resp = self.cluster_service.get_cluster(self.cluster_id)
        if cluster_resp["state"] != "RUNNING":
            raise ValueError("State is not in Running cannot generate context!")
    def _verify_context_exists(self):
        if self.context_id is None or len(self.context_id) <= 0:
            raise ValueError("Context ID does not exist.")
    def _create_context(self):
        self._verify_cluster_running_state()
        data = {"language": self.language, "clusterId": self.cluster_id}
        resp = self.api_client_1_2.perform_query("POST", "/contexts/create", data)
        self.context_id = resp["id"]
        logging.debug(resp)
        return resp
    def _read_context(self):
        self._verify_cluster_running_state()
        self._verify_context_exists()
        data = {"contextId": self.context_id, "clusterId": self.cluster_id}
        resp = self.api_client_1_2.perform_query("GET", "/contexts/status", data)
        logging.debug(resp)
        return resp
    def wait_for_context_ready(self):
        self._verify_cluster_running_state()
        self._verify_context_exists()
        # Will attempt 5 tries with a 10 second delay
        for i in range(100):
            context = self._read_context()
            status = context["status"]
            logging.debug("Context is in a {} state...".format(status))
            if status == "Running":
                logging.info("Context is in running state ready to run commands!")
                return
            time.sleep(0.5)
        raise Exception("Context failed to be created after 50 seconds")
    def run_command(self, command_string, timeout_seconds=10):
        with ClusterCommandExecutionContext(cluster_execution_context=self, command_string=command_string,
                                            timeout_seconds=timeout_seconds) as ccec:
            return ccec
    def _delete_context(self):
        self._verify_cluster_running_state()
        self._verify_context_exists()
        data = {"contextId": self.context_id, "clusterId": self.cluster_id}
        resp = self.api_client_1_2.perform_query("POST", "/contexts/destroy", data)
        return resp
class ClusterCommandExecutionContext(object):
    def __init__(self, cluster_execution_context: ClusterExecutionContext, command_string: Text, timeout_seconds: int, command_id: Text=None):
        # assumes to be a running context
        self.command_id = command_id
        self.timeout_seconds = timeout_seconds
        self.command_string = command_string
        self.cluster_execution_context = cluster_execution_context
    def __enter__(self):
        self._create_command()
        try:
            self._wait_for_command_finish()
            return self._read_command()
        except TimeoutError as te:
            logging.info("Command Timed out: {}".format(te))
            self._delete_command()
            self._wait_for_command_finish()
            logging.info("Command Canceled Successfully!")
        return None
    def __exit__(self, exc_type, exc_val, exc_tb):
        logging.debug("Exiting Command Execution Context")
    def _verify_cluster_running_state(self):
        cluster_resp = self.cluster_execution_context.cluster_service.get_cluster(self.cluster_execution_context.cluster_id)
        if cluster_resp["state"] != "RUNNING":
            raise ValueError("State is not in Running cannot generate context!")
    def _verify_command_exists(self):
        if self.command_id is None or len(self.command_id) <= 0:
            raise ValueError("Command ID does not exist.")
    def _create_command(self):
        self._verify_cluster_running_state()
        data = {"language": self.cluster_execution_context.language,
                "clusterId": self.cluster_execution_context.cluster_id,
                "contextId": self.cluster_execution_context.context_id,
                "command": self.command_string}
        resp = self.cluster_execution_context.api_client_1_2.perform_query("POST", "/commands/execute", data)
        self.command_id = resp["id"]
        logging.debug(resp)
        return resp
    def _read_command(self):
        self._verify_cluster_running_state()
        self._verify_command_exists()
        data = {
                "clusterId": self.cluster_execution_context.cluster_id,
                "contextId": self.cluster_execution_context.context_id,
                "commandId": self.command_id}
        resp = self.cluster_execution_context.api_client_1_2.perform_query("GET", "/commands/status", data)
        logging.debug(resp)
        return resp
    def _wait_for_command_finish(self):
        self._verify_cluster_running_state()
        self._verify_command_exists()
        # Will attempt 5 tries with a 10 second delay
        end_states = ["Finished", "Cancelled", "Error"]
        for i in range(self.timeout_seconds):
            command = self._read_command()
            status = command["status"]
            if i % 10 == 0:
                logging.debug("Command is in a {} state...".format(status))
            if status in end_states:
                logging.debug("Command is in a terminal state: {}".format(status))
                return
            time.sleep(0.5)
        raise TimeoutError("Command timed out after: {} seconds".format(self.timeout_seconds))
    def _delete_command(self):
        self._verify_cluster_running_state()
        data = {"clusterId": self.cluster_execution_context.cluster_id,
                "contextId": self.cluster_execution_context.context_id,
                "commandId": self.command_id}
        resp = self.cluster_execution_context.api_client_1_2.perform_query("POST", "/commands/cancel", data)
        return resp

To use code above use the following

host = ""
token = ""
cluster = ""
language = "python"
command = """
    import json
    import time
    import requests
    # raise Exception("hello world")
    display(spark.sql("SHOW TABLES IN default"))
    """
command2 = """
    import json
    import time
    display(spark.sql("SELECT * FROM airlines_gold"))
    """
db_api_client = ApiClient(host=host, token=token, apiVersion="1.2")
cluster_service = ClusterService(ApiClient(host=host, token=token, apiVersion="2.0"))
with ClusterExecutionContext(api_client_1_2=db_api_client, cluster_service=cluster_service, cluster_id=cluster, language=language) as cec:
    for i in range(10):
        data = cec.run_command(textwrap.dedent(command), timeout_seconds=12)
        error_is_true = True if data.get("results", {}).get("resultType", None) == "error" else False
        if error_is_true:
            raise Exception(data.get("results", {}).get("summary", None))
        logging.info(data)

View solution in original post

1 REPLY 1

aladda
Honored Contributor II
Honored Contributor II

Courtesy of my colleague Sri, here's some sample library code to execute on a databricks cluster with a short SLA

import logging
import textwrap
import time
from typing import Text
from databricks_cli.sdk import ApiClient, ClusterService
# Create a custom logging
logging.basicConfig(format='%(asctime)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S')
logging.getLogger().setLevel(logging.INFO)
class ClusterExecutionContext(object):
    def __init__(self, api_client_1_2: ApiClient, cluster_service: ClusterService, cluster_id: Text, language: Text, context_id: Text = None):
        self.context_id = context_id
        self.language = language
        self.cluster_id = cluster_id
        self.api_client_1_2 = api_client_1_2
        self.cluster_service = cluster_service
    def __enter__(self):
        logging.info("Creating Cluster Execution Context")
        self._create_context()
        self.wait_for_context_ready()
        return self
    def __exit__(self, exc_type, exc_val, exc_tb):
        logging.info("Exiting Cluster Execution Context Deleting Context")
        self._delete_context()
    def _verify_cluster_running_state(self):
        cluster_resp = self.cluster_service.get_cluster(self.cluster_id)
        if cluster_resp["state"] != "RUNNING":
            raise ValueError("State is not in Running cannot generate context!")
    def _verify_context_exists(self):
        if self.context_id is None or len(self.context_id) <= 0:
            raise ValueError("Context ID does not exist.")
    def _create_context(self):
        self._verify_cluster_running_state()
        data = {"language": self.language, "clusterId": self.cluster_id}
        resp = self.api_client_1_2.perform_query("POST", "/contexts/create", data)
        self.context_id = resp["id"]
        logging.debug(resp)
        return resp
    def _read_context(self):
        self._verify_cluster_running_state()
        self._verify_context_exists()
        data = {"contextId": self.context_id, "clusterId": self.cluster_id}
        resp = self.api_client_1_2.perform_query("GET", "/contexts/status", data)
        logging.debug(resp)
        return resp
    def wait_for_context_ready(self):
        self._verify_cluster_running_state()
        self._verify_context_exists()
        # Will attempt 5 tries with a 10 second delay
        for i in range(100):
            context = self._read_context()
            status = context["status"]
            logging.debug("Context is in a {} state...".format(status))
            if status == "Running":
                logging.info("Context is in running state ready to run commands!")
                return
            time.sleep(0.5)
        raise Exception("Context failed to be created after 50 seconds")
    def run_command(self, command_string, timeout_seconds=10):
        with ClusterCommandExecutionContext(cluster_execution_context=self, command_string=command_string,
                                            timeout_seconds=timeout_seconds) as ccec:
            return ccec
    def _delete_context(self):
        self._verify_cluster_running_state()
        self._verify_context_exists()
        data = {"contextId": self.context_id, "clusterId": self.cluster_id}
        resp = self.api_client_1_2.perform_query("POST", "/contexts/destroy", data)
        return resp
class ClusterCommandExecutionContext(object):
    def __init__(self, cluster_execution_context: ClusterExecutionContext, command_string: Text, timeout_seconds: int, command_id: Text=None):
        # assumes to be a running context
        self.command_id = command_id
        self.timeout_seconds = timeout_seconds
        self.command_string = command_string
        self.cluster_execution_context = cluster_execution_context
    def __enter__(self):
        self._create_command()
        try:
            self._wait_for_command_finish()
            return self._read_command()
        except TimeoutError as te:
            logging.info("Command Timed out: {}".format(te))
            self._delete_command()
            self._wait_for_command_finish()
            logging.info("Command Canceled Successfully!")
        return None
    def __exit__(self, exc_type, exc_val, exc_tb):
        logging.debug("Exiting Command Execution Context")
    def _verify_cluster_running_state(self):
        cluster_resp = self.cluster_execution_context.cluster_service.get_cluster(self.cluster_execution_context.cluster_id)
        if cluster_resp["state"] != "RUNNING":
            raise ValueError("State is not in Running cannot generate context!")
    def _verify_command_exists(self):
        if self.command_id is None or len(self.command_id) <= 0:
            raise ValueError("Command ID does not exist.")
    def _create_command(self):
        self._verify_cluster_running_state()
        data = {"language": self.cluster_execution_context.language,
                "clusterId": self.cluster_execution_context.cluster_id,
                "contextId": self.cluster_execution_context.context_id,
                "command": self.command_string}
        resp = self.cluster_execution_context.api_client_1_2.perform_query("POST", "/commands/execute", data)
        self.command_id = resp["id"]
        logging.debug(resp)
        return resp
    def _read_command(self):
        self._verify_cluster_running_state()
        self._verify_command_exists()
        data = {
                "clusterId": self.cluster_execution_context.cluster_id,
                "contextId": self.cluster_execution_context.context_id,
                "commandId": self.command_id}
        resp = self.cluster_execution_context.api_client_1_2.perform_query("GET", "/commands/status", data)
        logging.debug(resp)
        return resp
    def _wait_for_command_finish(self):
        self._verify_cluster_running_state()
        self._verify_command_exists()
        # Will attempt 5 tries with a 10 second delay
        end_states = ["Finished", "Cancelled", "Error"]
        for i in range(self.timeout_seconds):
            command = self._read_command()
            status = command["status"]
            if i % 10 == 0:
                logging.debug("Command is in a {} state...".format(status))
            if status in end_states:
                logging.debug("Command is in a terminal state: {}".format(status))
                return
            time.sleep(0.5)
        raise TimeoutError("Command timed out after: {} seconds".format(self.timeout_seconds))
    def _delete_command(self):
        self._verify_cluster_running_state()
        data = {"clusterId": self.cluster_execution_context.cluster_id,
                "contextId": self.cluster_execution_context.context_id,
                "commandId": self.command_id}
        resp = self.cluster_execution_context.api_client_1_2.perform_query("POST", "/commands/cancel", data)
        return resp

To use code above use the following

host = ""
token = ""
cluster = ""
language = "python"
command = """
    import json
    import time
    import requests
    # raise Exception("hello world")
    display(spark.sql("SHOW TABLES IN default"))
    """
command2 = """
    import json
    import time
    display(spark.sql("SELECT * FROM airlines_gold"))
    """
db_api_client = ApiClient(host=host, token=token, apiVersion="1.2")
cluster_service = ClusterService(ApiClient(host=host, token=token, apiVersion="2.0"))
with ClusterExecutionContext(api_client_1_2=db_api_client, cluster_service=cluster_service, cluster_id=cluster, language=language) as cec:
    for i in range(10):
        data = cec.run_command(textwrap.dedent(command), timeout_seconds=12)
        error_is_true = True if data.get("results", {}).get("resultType", None) == "error" else False
        if error_is_true:
            raise Exception(data.get("results", {}).get("summary", None))
        logging.info(data)

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!