<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Is there a way to execute code on a databricks cluster with a shorter duration than a Notebook which has some fixed startup time in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/is-there-a-way-to-execute-code-on-a-databricks-cluster-with-a/m-p/20974#M14221</link>
    <description>&lt;P&gt;Courtesy of my colleague Sri, here's some sample library code to execute on a databricks cluster with a short SLA&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;import logging
import textwrap
import time
from typing import Text
from databricks_cli.sdk import ApiClient, ClusterService
# Create a custom logging
logging.basicConfig(format='%(asctime)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S')
logging.getLogger().setLevel(logging.INFO)
class ClusterExecutionContext(object):
    def __init__(self, api_client_1_2: ApiClient, cluster_service: ClusterService, cluster_id: Text, language: Text, context_id: Text = None):
        self.context_id = context_id
        self.language = language
        self.cluster_id = cluster_id
        self.api_client_1_2 = api_client_1_2
        self.cluster_service = cluster_service
    def __enter__(self):
        logging.info("Creating Cluster Execution Context")
        self._create_context()
        self.wait_for_context_ready()
        return self
    def __exit__(self, exc_type, exc_val, exc_tb):
        logging.info("Exiting Cluster Execution Context Deleting Context")
        self._delete_context()
    def _verify_cluster_running_state(self):
        cluster_resp = self.cluster_service.get_cluster(self.cluster_id)
        if cluster_resp["state"] != "RUNNING":
            raise ValueError("State is not in Running cannot generate context!")
    def _verify_context_exists(self):
        if self.context_id is None or len(self.context_id) &amp;lt;= 0:
            raise ValueError("Context ID does not exist.")
    def _create_context(self):
        self._verify_cluster_running_state()
        data = {"language": self.language, "clusterId": self.cluster_id}
        resp = self.api_client_1_2.perform_query("POST", "/contexts/create", data)
        self.context_id = resp["id"]
        logging.debug(resp)
        return resp
    def _read_context(self):
        self._verify_cluster_running_state()
        self._verify_context_exists()
        data = {"contextId": self.context_id, "clusterId": self.cluster_id}
        resp = self.api_client_1_2.perform_query("GET", "/contexts/status", data)
        logging.debug(resp)
        return resp
    def wait_for_context_ready(self):
        self._verify_cluster_running_state()
        self._verify_context_exists()
        # Will attempt 5 tries with a 10 second delay
        for i in range(100):
            context = self._read_context()
            status = context["status"]
            logging.debug("Context is in a {} state...".format(status))
            if status == "Running":
                logging.info("Context is in running state ready to run commands!")
                return
            time.sleep(0.5)
        raise Exception("Context failed to be created after 50 seconds")
    def run_command(self, command_string, timeout_seconds=10):
        with ClusterCommandExecutionContext(cluster_execution_context=self, command_string=command_string,
                                            timeout_seconds=timeout_seconds) as ccec:
            return ccec
    def _delete_context(self):
        self._verify_cluster_running_state()
        self._verify_context_exists()
        data = {"contextId": self.context_id, "clusterId": self.cluster_id}
        resp = self.api_client_1_2.perform_query("POST", "/contexts/destroy", data)
        return resp
class ClusterCommandExecutionContext(object):
    def __init__(self, cluster_execution_context: ClusterExecutionContext, command_string: Text, timeout_seconds: int, command_id: Text=None):
        # assumes to be a running context
        self.command_id = command_id
        self.timeout_seconds = timeout_seconds
        self.command_string = command_string
        self.cluster_execution_context = cluster_execution_context
    def __enter__(self):
        self._create_command()
        try:
            self._wait_for_command_finish()
            return self._read_command()
        except TimeoutError as te:
            logging.info("Command Timed out: {}".format(te))
            self._delete_command()
            self._wait_for_command_finish()
            logging.info("Command Canceled Successfully!")
        return None
    def __exit__(self, exc_type, exc_val, exc_tb):
        logging.debug("Exiting Command Execution Context")
    def _verify_cluster_running_state(self):
        cluster_resp = self.cluster_execution_context.cluster_service.get_cluster(self.cluster_execution_context.cluster_id)
        if cluster_resp["state"] != "RUNNING":
            raise ValueError("State is not in Running cannot generate context!")
    def _verify_command_exists(self):
        if self.command_id is None or len(self.command_id) &amp;lt;= 0:
            raise ValueError("Command ID does not exist.")
    def _create_command(self):
        self._verify_cluster_running_state()
        data = {"language": self.cluster_execution_context.language,
                "clusterId": self.cluster_execution_context.cluster_id,
                "contextId": self.cluster_execution_context.context_id,
                "command": self.command_string}
        resp = self.cluster_execution_context.api_client_1_2.perform_query("POST", "/commands/execute", data)
        self.command_id = resp["id"]
        logging.debug(resp)
        return resp
    def _read_command(self):
        self._verify_cluster_running_state()
        self._verify_command_exists()
        data = {
                "clusterId": self.cluster_execution_context.cluster_id,
                "contextId": self.cluster_execution_context.context_id,
                "commandId": self.command_id}
        resp = self.cluster_execution_context.api_client_1_2.perform_query("GET", "/commands/status", data)
        logging.debug(resp)
        return resp
    def _wait_for_command_finish(self):
        self._verify_cluster_running_state()
        self._verify_command_exists()
        # Will attempt 5 tries with a 10 second delay
        end_states = ["Finished", "Cancelled", "Error"]
        for i in range(self.timeout_seconds):
            command = self._read_command()
            status = command["status"]
            if i % 10 == 0:
                logging.debug("Command is in a {} state...".format(status))
            if status in end_states:
                logging.debug("Command is in a terminal state: {}".format(status))
                return
            time.sleep(0.5)
        raise TimeoutError("Command timed out after: {} seconds".format(self.timeout_seconds))
    def _delete_command(self):
        self._verify_cluster_running_state()
        data = {"clusterId": self.cluster_execution_context.cluster_id,
                "contextId": self.cluster_execution_context.context_id,
                "commandId": self.command_id}
        resp = self.cluster_execution_context.api_client_1_2.perform_query("POST", "/commands/cancel", data)
        return resp&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;To use code above use the following&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;host = ""
token = ""
cluster = ""
language = "python"
command = """
    import json
    import time
    import requests
    # raise Exception("hello world")
    display(spark.sql("SHOW TABLES IN default"))
    """
command2 = """
    import json
    import time
    display(spark.sql("SELECT * FROM airlines_gold"))
    """
db_api_client = ApiClient(host=host, token=token, apiVersion="1.2")
cluster_service = ClusterService(ApiClient(host=host, token=token, apiVersion="2.0"))
with ClusterExecutionContext(api_client_1_2=db_api_client, cluster_service=cluster_service, cluster_id=cluster, language=language) as cec:
    for i in range(10):
        data = cec.run_command(textwrap.dedent(command), timeout_seconds=12)
        error_is_true = True if data.get("results", {}).get("resultType", None) == "error" else False
        if error_is_true:
            raise Exception(data.get("results", {}).get("summary", None))
        logging.info(data)&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;&lt;/P&gt;</description>
    <pubDate>Thu, 24 Jun 2021 03:54:13 GMT</pubDate>
    <dc:creator>aladda</dc:creator>
    <dc:date>2021-06-24T03:54:13Z</dc:date>
    <item>
      <title>Is there a way to execute code on a databricks cluster with a shorter duration than a Notebook which has some fixed startup time</title>
      <link>https://community.databricks.com/t5/data-engineering/is-there-a-way-to-execute-code-on-a-databricks-cluster-with-a/m-p/20973#M14220</link>
      <description />
      <pubDate>Thu, 24 Jun 2021 03:49:33 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/is-there-a-way-to-execute-code-on-a-databricks-cluster-with-a/m-p/20973#M14220</guid>
      <dc:creator>aladda</dc:creator>
      <dc:date>2021-06-24T03:49:33Z</dc:date>
    </item>
    <item>
      <title>Re: Is there a way to execute code on a databricks cluster with a shorter duration than a Notebook which has some fixed startup time</title>
      <link>https://community.databricks.com/t5/data-engineering/is-there-a-way-to-execute-code-on-a-databricks-cluster-with-a/m-p/20974#M14221</link>
      <description>&lt;P&gt;Courtesy of my colleague Sri, here's some sample library code to execute on a databricks cluster with a short SLA&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;import logging
import textwrap
import time
from typing import Text
from databricks_cli.sdk import ApiClient, ClusterService
# Create a custom logging
logging.basicConfig(format='%(asctime)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S')
logging.getLogger().setLevel(logging.INFO)
class ClusterExecutionContext(object):
    def __init__(self, api_client_1_2: ApiClient, cluster_service: ClusterService, cluster_id: Text, language: Text, context_id: Text = None):
        self.context_id = context_id
        self.language = language
        self.cluster_id = cluster_id
        self.api_client_1_2 = api_client_1_2
        self.cluster_service = cluster_service
    def __enter__(self):
        logging.info("Creating Cluster Execution Context")
        self._create_context()
        self.wait_for_context_ready()
        return self
    def __exit__(self, exc_type, exc_val, exc_tb):
        logging.info("Exiting Cluster Execution Context Deleting Context")
        self._delete_context()
    def _verify_cluster_running_state(self):
        cluster_resp = self.cluster_service.get_cluster(self.cluster_id)
        if cluster_resp["state"] != "RUNNING":
            raise ValueError("State is not in Running cannot generate context!")
    def _verify_context_exists(self):
        if self.context_id is None or len(self.context_id) &amp;lt;= 0:
            raise ValueError("Context ID does not exist.")
    def _create_context(self):
        self._verify_cluster_running_state()
        data = {"language": self.language, "clusterId": self.cluster_id}
        resp = self.api_client_1_2.perform_query("POST", "/contexts/create", data)
        self.context_id = resp["id"]
        logging.debug(resp)
        return resp
    def _read_context(self):
        self._verify_cluster_running_state()
        self._verify_context_exists()
        data = {"contextId": self.context_id, "clusterId": self.cluster_id}
        resp = self.api_client_1_2.perform_query("GET", "/contexts/status", data)
        logging.debug(resp)
        return resp
    def wait_for_context_ready(self):
        self._verify_cluster_running_state()
        self._verify_context_exists()
        # Will attempt 5 tries with a 10 second delay
        for i in range(100):
            context = self._read_context()
            status = context["status"]
            logging.debug("Context is in a {} state...".format(status))
            if status == "Running":
                logging.info("Context is in running state ready to run commands!")
                return
            time.sleep(0.5)
        raise Exception("Context failed to be created after 50 seconds")
    def run_command(self, command_string, timeout_seconds=10):
        with ClusterCommandExecutionContext(cluster_execution_context=self, command_string=command_string,
                                            timeout_seconds=timeout_seconds) as ccec:
            return ccec
    def _delete_context(self):
        self._verify_cluster_running_state()
        self._verify_context_exists()
        data = {"contextId": self.context_id, "clusterId": self.cluster_id}
        resp = self.api_client_1_2.perform_query("POST", "/contexts/destroy", data)
        return resp
class ClusterCommandExecutionContext(object):
    def __init__(self, cluster_execution_context: ClusterExecutionContext, command_string: Text, timeout_seconds: int, command_id: Text=None):
        # assumes to be a running context
        self.command_id = command_id
        self.timeout_seconds = timeout_seconds
        self.command_string = command_string
        self.cluster_execution_context = cluster_execution_context
    def __enter__(self):
        self._create_command()
        try:
            self._wait_for_command_finish()
            return self._read_command()
        except TimeoutError as te:
            logging.info("Command Timed out: {}".format(te))
            self._delete_command()
            self._wait_for_command_finish()
            logging.info("Command Canceled Successfully!")
        return None
    def __exit__(self, exc_type, exc_val, exc_tb):
        logging.debug("Exiting Command Execution Context")
    def _verify_cluster_running_state(self):
        cluster_resp = self.cluster_execution_context.cluster_service.get_cluster(self.cluster_execution_context.cluster_id)
        if cluster_resp["state"] != "RUNNING":
            raise ValueError("State is not in Running cannot generate context!")
    def _verify_command_exists(self):
        if self.command_id is None or len(self.command_id) &amp;lt;= 0:
            raise ValueError("Command ID does not exist.")
    def _create_command(self):
        self._verify_cluster_running_state()
        data = {"language": self.cluster_execution_context.language,
                "clusterId": self.cluster_execution_context.cluster_id,
                "contextId": self.cluster_execution_context.context_id,
                "command": self.command_string}
        resp = self.cluster_execution_context.api_client_1_2.perform_query("POST", "/commands/execute", data)
        self.command_id = resp["id"]
        logging.debug(resp)
        return resp
    def _read_command(self):
        self._verify_cluster_running_state()
        self._verify_command_exists()
        data = {
                "clusterId": self.cluster_execution_context.cluster_id,
                "contextId": self.cluster_execution_context.context_id,
                "commandId": self.command_id}
        resp = self.cluster_execution_context.api_client_1_2.perform_query("GET", "/commands/status", data)
        logging.debug(resp)
        return resp
    def _wait_for_command_finish(self):
        self._verify_cluster_running_state()
        self._verify_command_exists()
        # Will attempt 5 tries with a 10 second delay
        end_states = ["Finished", "Cancelled", "Error"]
        for i in range(self.timeout_seconds):
            command = self._read_command()
            status = command["status"]
            if i % 10 == 0:
                logging.debug("Command is in a {} state...".format(status))
            if status in end_states:
                logging.debug("Command is in a terminal state: {}".format(status))
                return
            time.sleep(0.5)
        raise TimeoutError("Command timed out after: {} seconds".format(self.timeout_seconds))
    def _delete_command(self):
        self._verify_cluster_running_state()
        data = {"clusterId": self.cluster_execution_context.cluster_id,
                "contextId": self.cluster_execution_context.context_id,
                "commandId": self.command_id}
        resp = self.cluster_execution_context.api_client_1_2.perform_query("POST", "/commands/cancel", data)
        return resp&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;To use code above use the following&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;host = ""
token = ""
cluster = ""
language = "python"
command = """
    import json
    import time
    import requests
    # raise Exception("hello world")
    display(spark.sql("SHOW TABLES IN default"))
    """
command2 = """
    import json
    import time
    display(spark.sql("SELECT * FROM airlines_gold"))
    """
db_api_client = ApiClient(host=host, token=token, apiVersion="1.2")
cluster_service = ClusterService(ApiClient(host=host, token=token, apiVersion="2.0"))
with ClusterExecutionContext(api_client_1_2=db_api_client, cluster_service=cluster_service, cluster_id=cluster, language=language) as cec:
    for i in range(10):
        data = cec.run_command(textwrap.dedent(command), timeout_seconds=12)
        error_is_true = True if data.get("results", {}).get("resultType", None) == "error" else False
        if error_is_true:
            raise Exception(data.get("results", {}).get("summary", None))
        logging.info(data)&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Thu, 24 Jun 2021 03:54:13 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/is-there-a-way-to-execute-code-on-a-databricks-cluster-with-a/m-p/20974#M14221</guid>
      <dc:creator>aladda</dc:creator>
      <dc:date>2021-06-24T03:54:13Z</dc:date>
    </item>
  </channel>
</rss>

