cancel
Showing results for 
Search instead for 
Did you mean: 
Administration & Architecture
cancel
Showing results for 
Search instead for 
Did you mean: 

How to monitor a python wheel job with Prometheus?

alexlod
New Contributor III

Hi Community,

We have a Databricks job with a single Python wheel task that runs our streaming pyspark job. The job runs on a single-node compute cluster and consumes from Kafka.

Our monitoring stack is Prometheus + Grafana.

I want the job's metrics to be available in our monitoring stack.

I believe this is possible based on some questions I've found in this community. However, nothing I've found explains how to actually do it.

We're interested in basic metrics, eg some equivalent to events / second.

Thanks and best wishes!

4 REPLIES 4

Kaniz
Community Manager
Community Manager

Hi @alexlod, Certainly! Monitoring your Python wheel job with Prometheus is a great idea.

 

Let’s break down the steps to achieve this:

 

Instrument Your Python Code:

  • To expose metrics from your Python code, you’ll need to use a Prometheus client library. One popular choice is the prometheus_client library.
  • You can create custom metrics (counters, gauges, histograms, etc.) within your Python script. These metrics will capture relevant information about your job’s performance.

Push Metrics to Prometheus:

  • Prometheus typically scrapes metrics from endpoints exposed by long-lived services. However, since your job is short-lived, you’ll need a different approach.
  • Consider using Pushgateway, which acts as an intermediary between your job and Prometheus. When your Python script completes, it pushes the collected metrics to the Pushgateway.
  • Install the prometheus_client library if you haven’t already. You can do this with pip install prometheus_client.

Example Code:

  • Here’s a simple example of how you can use prometheus_client to create a metric and push it to the Pushgateway:
  • Replace 'your_job_name' with an appropriate identifier for your job.

Configure Prometheus:

  • In your Prometheus configuration, add a job to scrape metrics from the Pushgateway. For example:
  • Adjust the target address (localhost:9091) to match your Pushgateway’s location.

Visualize Metrics in Grafana:

  • Set up Grafana to connect to Prometheus as a data source.
  • Create dashboards and panels to visualize the metrics you’re interested in (e.g., events per second).

Metrics to Consider:

  • Since you’re interested in basic metrics, consider capturing:
    • Job execution time: How long does your job take to complete?
    • Resource utilization: CPU and memory consumption.
    • Event throughput: Events processed per second.

Remember that Prometheus is designed for long-lived services, so adapting it for short-lived jobs requires some workarounds. But with the right instrumentation and Pushgateway, you can integrate your Python wheel job’s metrics into your monitoring stack effectively! 🚀📊

Kaniz
Community Manager
Community Manager

Thank you for posting your question in our community! We are happy to assist you.

To help us provide you with the most accurate information, could you please take a moment to review the responses and select the one that best answers your question?

This will also help other community members who may have similar questions in the future. Thank you for your participation and let us know if you need any further assistance! 
 

alexlod
New Contributor III

Thanks @Kaniz ! Our job is long-running because it's a streaming job. Is there no way to use the Prometheus HTTP server and have Prometheus scrape the metrics? That would avoid us from having to introduce a new component, the Pushgateway. (I experimented with the HTTP server but couldn't get it working. Although, I did find the Prometheus metric endpoint for the Databricks cluster node.) Thanks again!

amelia1
New Contributor II

Hi I'm trying to use the metrics registry object inside an UDF function, but I can't because it's not serializable due to Lock. Our goal is to be able to count the number of messages parsed, and the number of messages we can't parsed (due to exceptions), and push the metrics to Prometheus.

    def get_parser_udf(self, parser_cls, compressed=False):
    	parser = parser_cls()
        metrics = self._metrics #this contains the CollectorRegistry object

        def parse_message(msg):
            try:
                logger.debug(f'compressed is {compressed}')
                if compressed:
                    msg = gzip.decompress(msg).decode('utf-8')
                logger.debug(f"message is {msg}")
                message = json.loads(msg)
                result = json.dumps(parser.parse_data(message), ensure_ascii=False, default=str)
                metrics['items_count'].inc()
                return result
            except Exception as e:
                logger.exception(e)
                logger.error(f"Exception occurred parsing message: {msg[:100]}")
                metrics['exceptions_count'].inc()
                return

        parse_message_udf = udf(lambda m: parse_message(m))
        return parse_message_udf

ErrorStack

    .withColumn("dat_gold", self._parser(col("jstring")))
  File "/home/liafan/.cache/pypoetry/virtualenvs/lake-loader-0knkvT5S-py3.10/lib/python3.10/site-packages/pyspark/sql/udf.py", line 425, in wrapper
    return self(*args)
  File "/home/liafan/.cache/pypoetry/virtualenvs/lake-loader-0knkvT5S-py3.10/lib/python3.10/site-packages/pyspark/sql/udf.py", line 402, in __call__
    judf = self._judf
  File "/home/liafan/.cache/pypoetry/virtualenvs/lake-loader-0knkvT5S-py3.10/lib/python3.10/site-packages/pyspark/sql/udf.py", line 322, in _judf
    self._judf_placeholder = self._create_judf(self.func)
  File "/home/liafan/.cache/pypoetry/virtualenvs/lake-loader-0knkvT5S-py3.10/lib/python3.10/site-packages/pyspark/sql/udf.py", line 331, in _create_judf
    wrapped_func = _wrap_function(sc, func, self.returnType)
  File "/home/liafan/.cache/pypoetry/virtualenvs/lake-loader-0knkvT5S-py3.10/lib/python3.10/site-packages/pyspark/sql/udf.py", line 60, in _wrap_function
    pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
  File "/home/liafan/.cache/pypoetry/virtualenvs/lake-loader-0knkvT5S-py3.10/lib/python3.10/site-packages/pyspark/rdd.py", line 5251, in _prepare_for_python_RDD
    pickled_command = ser.dumps(command)
  File "/home/liafan/.cache/pypoetry/virtualenvs/lake-loader-0knkvT5S-py3.10/lib/python3.10/site-packages/pyspark/serializers.py", line 469, in dumps
    raise pickle.PicklingError(msg)
_pickle.PicklingError: Could not serialize object: TypeError: cannot pickle '_thread.lock' object

 

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.