cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Is there a way to execute code on a databricks cluster with a shorter duration than a Notebook which has some fixed startup time

aladda
Databricks Employee
Databricks Employee
 
1 ACCEPTED SOLUTION

Accepted Solutions

aladda
Databricks Employee
Databricks Employee

Courtesy of my colleague Sri, here's some sample library code to execute on a databricks cluster with a short SLA

import logging
import textwrap
import time
from typing import Text
from databricks_cli.sdk import ApiClient, ClusterService
# Create a custom logging
logging.basicConfig(format='%(asctime)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S')
logging.getLogger().setLevel(logging.INFO)
class ClusterExecutionContext(object):
    def __init__(self, api_client_1_2: ApiClient, cluster_service: ClusterService, cluster_id: Text, language: Text, context_id: Text = None):
        self.context_id = context_id
        self.language = language
        self.cluster_id = cluster_id
        self.api_client_1_2 = api_client_1_2
        self.cluster_service = cluster_service
    def __enter__(self):
        logging.info("Creating Cluster Execution Context")
        self._create_context()
        self.wait_for_context_ready()
        return self
    def __exit__(self, exc_type, exc_val, exc_tb):
        logging.info("Exiting Cluster Execution Context Deleting Context")
        self._delete_context()
    def _verify_cluster_running_state(self):
        cluster_resp = self.cluster_service.get_cluster(self.cluster_id)
        if cluster_resp["state"] != "RUNNING":
            raise ValueError("State is not in Running cannot generate context!")
    def _verify_context_exists(self):
        if self.context_id is None or len(self.context_id) <= 0:
            raise ValueError("Context ID does not exist.")
    def _create_context(self):
        self._verify_cluster_running_state()
        data = {"language": self.language, "clusterId": self.cluster_id}
        resp = self.api_client_1_2.perform_query("POST", "/contexts/create", data)
        self.context_id = resp["id"]
        logging.debug(resp)
        return resp
    def _read_context(self):
        self._verify_cluster_running_state()
        self._verify_context_exists()
        data = {"contextId": self.context_id, "clusterId": self.cluster_id}
        resp = self.api_client_1_2.perform_query("GET", "/contexts/status", data)
        logging.debug(resp)
        return resp
    def wait_for_context_ready(self):
        self._verify_cluster_running_state()
        self._verify_context_exists()
        # Will attempt 5 tries with a 10 second delay
        for i in range(100):
            context = self._read_context()
            status = context["status"]
            logging.debug("Context is in a {} state...".format(status))
            if status == "Running":
                logging.info("Context is in running state ready to run commands!")
                return
            time.sleep(0.5)
        raise Exception("Context failed to be created after 50 seconds")
    def run_command(self, command_string, timeout_seconds=10):
        with ClusterCommandExecutionContext(cluster_execution_context=self, command_string=command_string,
                                            timeout_seconds=timeout_seconds) as ccec:
            return ccec
    def _delete_context(self):
        self._verify_cluster_running_state()
        self._verify_context_exists()
        data = {"contextId": self.context_id, "clusterId": self.cluster_id}
        resp = self.api_client_1_2.perform_query("POST", "/contexts/destroy", data)
        return resp
class ClusterCommandExecutionContext(object):
    def __init__(self, cluster_execution_context: ClusterExecutionContext, command_string: Text, timeout_seconds: int, command_id: Text=None):
        # assumes to be a running context
        self.command_id = command_id
        self.timeout_seconds = timeout_seconds
        self.command_string = command_string
        self.cluster_execution_context = cluster_execution_context
    def __enter__(self):
        self._create_command()
        try:
            self._wait_for_command_finish()
            return self._read_command()
        except TimeoutError as te:
            logging.info("Command Timed out: {}".format(te))
            self._delete_command()
            self._wait_for_command_finish()
            logging.info("Command Canceled Successfully!")
        return None
    def __exit__(self, exc_type, exc_val, exc_tb):
        logging.debug("Exiting Command Execution Context")
    def _verify_cluster_running_state(self):
        cluster_resp = self.cluster_execution_context.cluster_service.get_cluster(self.cluster_execution_context.cluster_id)
        if cluster_resp["state"] != "RUNNING":
            raise ValueError("State is not in Running cannot generate context!")
    def _verify_command_exists(self):
        if self.command_id is None or len(self.command_id) <= 0:
            raise ValueError("Command ID does not exist.")
    def _create_command(self):
        self._verify_cluster_running_state()
        data = {"language": self.cluster_execution_context.language,
                "clusterId": self.cluster_execution_context.cluster_id,
                "contextId": self.cluster_execution_context.context_id,
                "command": self.command_string}
        resp = self.cluster_execution_context.api_client_1_2.perform_query("POST", "/commands/execute", data)
        self.command_id = resp["id"]
        logging.debug(resp)
        return resp
    def _read_command(self):
        self._verify_cluster_running_state()
        self._verify_command_exists()
        data = {
                "clusterId": self.cluster_execution_context.cluster_id,
                "contextId": self.cluster_execution_context.context_id,
                "commandId": self.command_id}
        resp = self.cluster_execution_context.api_client_1_2.perform_query("GET", "/commands/status", data)
        logging.debug(resp)
        return resp
    def _wait_for_command_finish(self):
        self._verify_cluster_running_state()
        self._verify_command_exists()
        # Will attempt 5 tries with a 10 second delay
        end_states = ["Finished", "Cancelled", "Error"]
        for i in range(self.timeout_seconds):
            command = self._read_command()
            status = command["status"]
            if i % 10 == 0:
                logging.debug("Command is in a {} state...".format(status))
            if status in end_states:
                logging.debug("Command is in a terminal state: {}".format(status))
                return
            time.sleep(0.5)
        raise TimeoutError("Command timed out after: {} seconds".format(self.timeout_seconds))
    def _delete_command(self):
        self._verify_cluster_running_state()
        data = {"clusterId": self.cluster_execution_context.cluster_id,
                "contextId": self.cluster_execution_context.context_id,
                "commandId": self.command_id}
        resp = self.cluster_execution_context.api_client_1_2.perform_query("POST", "/commands/cancel", data)
        return resp

To use code above use the following

host = ""
token = ""
cluster = ""
language = "python"
command = """
    import json
    import time
    import requests
    # raise Exception("hello world")
    display(spark.sql("SHOW TABLES IN default"))
    """
command2 = """
    import json
    import time
    display(spark.sql("SELECT * FROM airlines_gold"))
    """
db_api_client = ApiClient(host=host, token=token, apiVersion="1.2")
cluster_service = ClusterService(ApiClient(host=host, token=token, apiVersion="2.0"))
with ClusterExecutionContext(api_client_1_2=db_api_client, cluster_service=cluster_service, cluster_id=cluster, language=language) as cec:
    for i in range(10):
        data = cec.run_command(textwrap.dedent(command), timeout_seconds=12)
        error_is_true = True if data.get("results", {}).get("resultType", None) == "error" else False
        if error_is_true:
            raise Exception(data.get("results", {}).get("summary", None))
        logging.info(data)

View solution in original post

1 REPLY 1

aladda
Databricks Employee
Databricks Employee

Courtesy of my colleague Sri, here's some sample library code to execute on a databricks cluster with a short SLA

import logging
import textwrap
import time
from typing import Text
from databricks_cli.sdk import ApiClient, ClusterService
# Create a custom logging
logging.basicConfig(format='%(asctime)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S')
logging.getLogger().setLevel(logging.INFO)
class ClusterExecutionContext(object):
    def __init__(self, api_client_1_2: ApiClient, cluster_service: ClusterService, cluster_id: Text, language: Text, context_id: Text = None):
        self.context_id = context_id
        self.language = language
        self.cluster_id = cluster_id
        self.api_client_1_2 = api_client_1_2
        self.cluster_service = cluster_service
    def __enter__(self):
        logging.info("Creating Cluster Execution Context")
        self._create_context()
        self.wait_for_context_ready()
        return self
    def __exit__(self, exc_type, exc_val, exc_tb):
        logging.info("Exiting Cluster Execution Context Deleting Context")
        self._delete_context()
    def _verify_cluster_running_state(self):
        cluster_resp = self.cluster_service.get_cluster(self.cluster_id)
        if cluster_resp["state"] != "RUNNING":
            raise ValueError("State is not in Running cannot generate context!")
    def _verify_context_exists(self):
        if self.context_id is None or len(self.context_id) <= 0:
            raise ValueError("Context ID does not exist.")
    def _create_context(self):
        self._verify_cluster_running_state()
        data = {"language": self.language, "clusterId": self.cluster_id}
        resp = self.api_client_1_2.perform_query("POST", "/contexts/create", data)
        self.context_id = resp["id"]
        logging.debug(resp)
        return resp
    def _read_context(self):
        self._verify_cluster_running_state()
        self._verify_context_exists()
        data = {"contextId": self.context_id, "clusterId": self.cluster_id}
        resp = self.api_client_1_2.perform_query("GET", "/contexts/status", data)
        logging.debug(resp)
        return resp
    def wait_for_context_ready(self):
        self._verify_cluster_running_state()
        self._verify_context_exists()
        # Will attempt 5 tries with a 10 second delay
        for i in range(100):
            context = self._read_context()
            status = context["status"]
            logging.debug("Context is in a {} state...".format(status))
            if status == "Running":
                logging.info("Context is in running state ready to run commands!")
                return
            time.sleep(0.5)
        raise Exception("Context failed to be created after 50 seconds")
    def run_command(self, command_string, timeout_seconds=10):
        with ClusterCommandExecutionContext(cluster_execution_context=self, command_string=command_string,
                                            timeout_seconds=timeout_seconds) as ccec:
            return ccec
    def _delete_context(self):
        self._verify_cluster_running_state()
        self._verify_context_exists()
        data = {"contextId": self.context_id, "clusterId": self.cluster_id}
        resp = self.api_client_1_2.perform_query("POST", "/contexts/destroy", data)
        return resp
class ClusterCommandExecutionContext(object):
    def __init__(self, cluster_execution_context: ClusterExecutionContext, command_string: Text, timeout_seconds: int, command_id: Text=None):
        # assumes to be a running context
        self.command_id = command_id
        self.timeout_seconds = timeout_seconds
        self.command_string = command_string
        self.cluster_execution_context = cluster_execution_context
    def __enter__(self):
        self._create_command()
        try:
            self._wait_for_command_finish()
            return self._read_command()
        except TimeoutError as te:
            logging.info("Command Timed out: {}".format(te))
            self._delete_command()
            self._wait_for_command_finish()
            logging.info("Command Canceled Successfully!")
        return None
    def __exit__(self, exc_type, exc_val, exc_tb):
        logging.debug("Exiting Command Execution Context")
    def _verify_cluster_running_state(self):
        cluster_resp = self.cluster_execution_context.cluster_service.get_cluster(self.cluster_execution_context.cluster_id)
        if cluster_resp["state"] != "RUNNING":
            raise ValueError("State is not in Running cannot generate context!")
    def _verify_command_exists(self):
        if self.command_id is None or len(self.command_id) <= 0:
            raise ValueError("Command ID does not exist.")
    def _create_command(self):
        self._verify_cluster_running_state()
        data = {"language": self.cluster_execution_context.language,
                "clusterId": self.cluster_execution_context.cluster_id,
                "contextId": self.cluster_execution_context.context_id,
                "command": self.command_string}
        resp = self.cluster_execution_context.api_client_1_2.perform_query("POST", "/commands/execute", data)
        self.command_id = resp["id"]
        logging.debug(resp)
        return resp
    def _read_command(self):
        self._verify_cluster_running_state()
        self._verify_command_exists()
        data = {
                "clusterId": self.cluster_execution_context.cluster_id,
                "contextId": self.cluster_execution_context.context_id,
                "commandId": self.command_id}
        resp = self.cluster_execution_context.api_client_1_2.perform_query("GET", "/commands/status", data)
        logging.debug(resp)
        return resp
    def _wait_for_command_finish(self):
        self._verify_cluster_running_state()
        self._verify_command_exists()
        # Will attempt 5 tries with a 10 second delay
        end_states = ["Finished", "Cancelled", "Error"]
        for i in range(self.timeout_seconds):
            command = self._read_command()
            status = command["status"]
            if i % 10 == 0:
                logging.debug("Command is in a {} state...".format(status))
            if status in end_states:
                logging.debug("Command is in a terminal state: {}".format(status))
                return
            time.sleep(0.5)
        raise TimeoutError("Command timed out after: {} seconds".format(self.timeout_seconds))
    def _delete_command(self):
        self._verify_cluster_running_state()
        data = {"clusterId": self.cluster_execution_context.cluster_id,
                "contextId": self.cluster_execution_context.context_id,
                "commandId": self.command_id}
        resp = self.cluster_execution_context.api_client_1_2.perform_query("POST", "/commands/cancel", data)
        return resp

To use code above use the following

host = ""
token = ""
cluster = ""
language = "python"
command = """
    import json
    import time
    import requests
    # raise Exception("hello world")
    display(spark.sql("SHOW TABLES IN default"))
    """
command2 = """
    import json
    import time
    display(spark.sql("SELECT * FROM airlines_gold"))
    """
db_api_client = ApiClient(host=host, token=token, apiVersion="1.2")
cluster_service = ClusterService(ApiClient(host=host, token=token, apiVersion="2.0"))
with ClusterExecutionContext(api_client_1_2=db_api_client, cluster_service=cluster_service, cluster_id=cluster, language=language) as cec:
    for i in range(10):
        data = cec.run_command(textwrap.dedent(command), timeout_seconds=12)
        error_is_true = True if data.get("results", {}).get("resultType", None) == "error" else False
        if error_is_true:
            raise Exception(data.get("results", {}).get("summary", None))
        logging.info(data)

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group