12-01-2023 02:40 PM
Hi Community,
We have a Databricks job with a single Python wheel task that runs our streaming pyspark job. The job runs on a single-node compute cluster and consumes from Kafka.
Our monitoring stack is Prometheus + Grafana.
I want the job's metrics to be available in our monitoring stack.
I believe this is possible based on some questions I've found in this community. However, nothing I've found explains how to actually do it.
We're interested in basic metrics, eg some equivalent to events / second.
Thanks and best wishes!
12-03-2023 07:38 AM
Hi @alexlod, Certainly! Monitoring your Python wheel job with Prometheus is a great idea.
Let’s break down the steps to achieve this:
Instrument Your Python Code:
Push Metrics to Prometheus:
Example Code:
Configure Prometheus:
Visualize Metrics in Grafana:
Metrics to Consider:
Remember that Prometheus is designed for long-lived services, so adapting it for short-lived jobs requires some workarounds. But with the right instrumentation and Pushgateway, you can integrate your Python wheel job’s metrics into your monitoring stack effectively! 🚀📊
12-03-2023 09:05 PM
Thank you for posting your question in our community! We are happy to assist you.
To help us provide you with the most accurate information, could you please take a moment to review the responses and select the one that best answers your question?
This will also help other community members who may have similar questions in the future. Thank you for your participation and let us know if you need any further assistance!
12-04-2023 10:03 AM
Thanks @Kaniz_Fatma ! Our job is long-running because it's a streaming job. Is there no way to use the Prometheus HTTP server and have Prometheus scrape the metrics? That would avoid us from having to introduce a new component, the Pushgateway. (I experimented with the HTTP server but couldn't get it working. Although, I did find the Prometheus metric endpoint for the Databricks cluster node.) Thanks again!
12-13-2023 02:19 PM
Hi I'm trying to use the metrics registry object inside an UDF function, but I can't because it's not serializable due to Lock. Our goal is to be able to count the number of messages parsed, and the number of messages we can't parsed (due to exceptions), and push the metrics to Prometheus.
def get_parser_udf(self, parser_cls, compressed=False):
parser = parser_cls()
metrics = self._metrics #this contains the CollectorRegistry object
def parse_message(msg):
try:
logger.debug(f'compressed is {compressed}')
if compressed:
msg = gzip.decompress(msg).decode('utf-8')
logger.debug(f"message is {msg}")
message = json.loads(msg)
result = json.dumps(parser.parse_data(message), ensure_ascii=False, default=str)
metrics['items_count'].inc()
return result
except Exception as e:
logger.exception(e)
logger.error(f"Exception occurred parsing message: {msg[:100]}")
metrics['exceptions_count'].inc()
return
parse_message_udf = udf(lambda m: parse_message(m))
return parse_message_udf
ErrorStack
.withColumn("dat_gold", self._parser(col("jstring")))
File "/home/liafan/.cache/pypoetry/virtualenvs/lake-loader-0knkvT5S-py3.10/lib/python3.10/site-packages/pyspark/sql/udf.py", line 425, in wrapper
return self(*args)
File "/home/liafan/.cache/pypoetry/virtualenvs/lake-loader-0knkvT5S-py3.10/lib/python3.10/site-packages/pyspark/sql/udf.py", line 402, in __call__
judf = self._judf
File "/home/liafan/.cache/pypoetry/virtualenvs/lake-loader-0knkvT5S-py3.10/lib/python3.10/site-packages/pyspark/sql/udf.py", line 322, in _judf
self._judf_placeholder = self._create_judf(self.func)
File "/home/liafan/.cache/pypoetry/virtualenvs/lake-loader-0knkvT5S-py3.10/lib/python3.10/site-packages/pyspark/sql/udf.py", line 331, in _create_judf
wrapped_func = _wrap_function(sc, func, self.returnType)
File "/home/liafan/.cache/pypoetry/virtualenvs/lake-loader-0knkvT5S-py3.10/lib/python3.10/site-packages/pyspark/sql/udf.py", line 60, in _wrap_function
pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
File "/home/liafan/.cache/pypoetry/virtualenvs/lake-loader-0knkvT5S-py3.10/lib/python3.10/site-packages/pyspark/rdd.py", line 5251, in _prepare_for_python_RDD
pickled_command = ser.dumps(command)
File "/home/liafan/.cache/pypoetry/virtualenvs/lake-loader-0knkvT5S-py3.10/lib/python3.10/site-packages/pyspark/serializers.py", line 469, in dumps
raise pickle.PicklingError(msg)
_pickle.PicklingError: Could not serialize object: TypeError: cannot pickle '_thread.lock' object
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group