Hi I'm trying to use the metrics registry object inside an UDF function, but I can't because it's not serializable due to Lock. Our goal is to be able to count the number of messages parsed, and the number of messages we can't parsed (due to exceptions), and push the metrics to Prometheus.
def get_parser_udf(self, parser_cls, compressed=False):
parser = parser_cls()
metrics = self._metrics #this contains the CollectorRegistry object
def parse_message(msg):
try:
logger.debug(f'compressed is {compressed}')
if compressed:
msg = gzip.decompress(msg).decode('utf-8')
logger.debug(f"message is {msg}")
message = json.loads(msg)
result = json.dumps(parser.parse_data(message), ensure_ascii=False, default=str)
metrics['items_count'].inc()
return result
except Exception as e:
logger.exception(e)
logger.error(f"Exception occurred parsing message: {msg[:100]}")
metrics['exceptions_count'].inc()
return
parse_message_udf = udf(lambda m: parse_message(m))
return parse_message_udf
ErrorStack
.withColumn("dat_gold", self._parser(col("jstring")))
File "/home/liafan/.cache/pypoetry/virtualenvs/lake-loader-0knkvT5S-py3.10/lib/python3.10/site-packages/pyspark/sql/udf.py", line 425, in wrapper
return self(*args)
File "/home/liafan/.cache/pypoetry/virtualenvs/lake-loader-0knkvT5S-py3.10/lib/python3.10/site-packages/pyspark/sql/udf.py", line 402, in __call__
judf = self._judf
File "/home/liafan/.cache/pypoetry/virtualenvs/lake-loader-0knkvT5S-py3.10/lib/python3.10/site-packages/pyspark/sql/udf.py", line 322, in _judf
self._judf_placeholder = self._create_judf(self.func)
File "/home/liafan/.cache/pypoetry/virtualenvs/lake-loader-0knkvT5S-py3.10/lib/python3.10/site-packages/pyspark/sql/udf.py", line 331, in _create_judf
wrapped_func = _wrap_function(sc, func, self.returnType)
File "/home/liafan/.cache/pypoetry/virtualenvs/lake-loader-0knkvT5S-py3.10/lib/python3.10/site-packages/pyspark/sql/udf.py", line 60, in _wrap_function
pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
File "/home/liafan/.cache/pypoetry/virtualenvs/lake-loader-0knkvT5S-py3.10/lib/python3.10/site-packages/pyspark/rdd.py", line 5251, in _prepare_for_python_RDD
pickled_command = ser.dumps(command)
File "/home/liafan/.cache/pypoetry/virtualenvs/lake-loader-0knkvT5S-py3.10/lib/python3.10/site-packages/pyspark/serializers.py", line 469, in dumps
raise pickle.PicklingError(msg)
_pickle.PicklingError: Could not serialize object: TypeError: cannot pickle '_thread.lock' object