cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Administration & Architecture
Explore discussions on Databricks administration, deployment strategies, and architectural best practices. Connect with administrators and architects to optimize your Databricks environment for performance, scalability, and security.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

How to monitor a python wheel job with Prometheus?

alexlod
New Contributor III

Hi Community,

We have a Databricks job with a single Python wheel task that runs our streaming pyspark job. The job runs on a single-node compute cluster and consumes from Kafka.

Our monitoring stack is Prometheus + Grafana.

I want the job's metrics to be available in our monitoring stack.

I believe this is possible based on some questions I've found in this community. However, nothing I've found explains how to actually do it.

We're interested in basic metrics, eg some equivalent to events / second.

Thanks and best wishes!

2 REPLIES 2

alexlod
New Contributor III

Thanks @Retired_mod ! Our job is long-running because it's a streaming job. Is there no way to use the Prometheus HTTP server and have Prometheus scrape the metrics? That would avoid us from having to introduce a new component, the Pushgateway. (I experimented with the HTTP server but couldn't get it working. Although, I did find the Prometheus metric endpoint for the Databricks cluster node.) Thanks again!

amelia1
New Contributor II

Hi I'm trying to use the metrics registry object inside an UDF function, but I can't because it's not serializable due to Lock. Our goal is to be able to count the number of messages parsed, and the number of messages we can't parsed (due to exceptions), and push the metrics to Prometheus.

    def get_parser_udf(self, parser_cls, compressed=False):
    	parser = parser_cls()
        metrics = self._metrics #this contains the CollectorRegistry object

        def parse_message(msg):
            try:
                logger.debug(f'compressed is {compressed}')
                if compressed:
                    msg = gzip.decompress(msg).decode('utf-8')
                logger.debug(f"message is {msg}")
                message = json.loads(msg)
                result = json.dumps(parser.parse_data(message), ensure_ascii=False, default=str)
                metrics['items_count'].inc()
                return result
            except Exception as e:
                logger.exception(e)
                logger.error(f"Exception occurred parsing message: {msg[:100]}")
                metrics['exceptions_count'].inc()
                return

        parse_message_udf = udf(lambda m: parse_message(m))
        return parse_message_udf

ErrorStack

    .withColumn("dat_gold", self._parser(col("jstring")))
  File "/home/liafan/.cache/pypoetry/virtualenvs/lake-loader-0knkvT5S-py3.10/lib/python3.10/site-packages/pyspark/sql/udf.py", line 425, in wrapper
    return self(*args)
  File "/home/liafan/.cache/pypoetry/virtualenvs/lake-loader-0knkvT5S-py3.10/lib/python3.10/site-packages/pyspark/sql/udf.py", line 402, in __call__
    judf = self._judf
  File "/home/liafan/.cache/pypoetry/virtualenvs/lake-loader-0knkvT5S-py3.10/lib/python3.10/site-packages/pyspark/sql/udf.py", line 322, in _judf
    self._judf_placeholder = self._create_judf(self.func)
  File "/home/liafan/.cache/pypoetry/virtualenvs/lake-loader-0knkvT5S-py3.10/lib/python3.10/site-packages/pyspark/sql/udf.py", line 331, in _create_judf
    wrapped_func = _wrap_function(sc, func, self.returnType)
  File "/home/liafan/.cache/pypoetry/virtualenvs/lake-loader-0knkvT5S-py3.10/lib/python3.10/site-packages/pyspark/sql/udf.py", line 60, in _wrap_function
    pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
  File "/home/liafan/.cache/pypoetry/virtualenvs/lake-loader-0knkvT5S-py3.10/lib/python3.10/site-packages/pyspark/rdd.py", line 5251, in _prepare_for_python_RDD
    pickled_command = ser.dumps(command)
  File "/home/liafan/.cache/pypoetry/virtualenvs/lake-loader-0knkvT5S-py3.10/lib/python3.10/site-packages/pyspark/serializers.py", line 469, in dumps
    raise pickle.PicklingError(msg)
_pickle.PicklingError: Could not serialize object: TypeError: cannot pickle '_thread.lock' object

 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group