Skip to content

Commit

Permalink
Add mostrecent aggregation to Gauge
Browse files Browse the repository at this point in the history
In the multiprocess mode, the process that exposes the metrics needs to
aggregate the samples from other processes. Gauge metric allows users to
choose the aggregation mode. This implements 'mostrecent' (and
'livemostrecent') mode where the last observed value is exposed.

In order to support this, the file format is expanded to store the
timestamps in addition to the values. The stored timestamps are read by
the reader process and it's used to find the latest value.

The timestamp itself is exposed as a part of Prometheus exposition
(https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md).
This allows further aggregation across exporters.

Closes prometheus#847

Consideration on the atomicity:

Previously, mmap_dict.py had a comment saying "We assume that reading
from an 8 byte aligned value is atomic". With this change, the value
write becomes a 16 bytes 8-byte aligned write. The code author tried to
find a basis on the original assumption, but couldn't find any.
According to write(2), **if a file descriptor is shared**, the write
becomes atomic. However, we do not share the file descriptors in the
current architecture.

Considering that Ruby implementation also does the same and hadn't seen
an issue with it, this write atomicity problem might be practically not
an issue.

See also:

* prometheus/client_ruby#172

  The approach and naming are taken from client_ruby.

* https://github.com/prometheus/client_golang/blob/v1.17.0/prometheus/metric.go#L149-L161

  client_golang has an API for setting timestamp already. It explains
  the use case for the timestamp beyond the client-local aggregation. In
  order to support the same use case in Python, further changes are
  needed.

Signed-off-by: Masaya Suzuki <draftcode@gmail.com>
  • Loading branch information
draftcode committed Oct 17, 2023
1 parent 249490e commit cc84ce2
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 45 deletions.
15 changes: 10 additions & 5 deletions prometheus_client/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ def f():
d.set_function(lambda: len(my_dict))
"""
_type = 'gauge'
_MULTIPROC_MODES = frozenset(('all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum'))
_MULTIPROC_MODES = frozenset(('all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum', 'mostrecent', 'livemostrecent'))

def __init__(self,
name: str,
Expand All @@ -357,7 +357,7 @@ def __init__(self,
unit: str = '',
registry: Optional[CollectorRegistry] = REGISTRY,
_labelvalues: Optional[Sequence[str]] = None,
multiprocess_mode: Literal['all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum'] = 'all',
multiprocess_mode: Literal['all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum', 'mostrecent', 'livemostrecent'] = 'all',
):
self._multiprocess_mode = multiprocess_mode
if multiprocess_mode not in self._MULTIPROC_MODES:
Expand Down Expand Up @@ -390,10 +390,15 @@ def dec(self, amount: float = 1) -> None:
self._raise_if_not_observable()
self._value.inc(-amount)

def set(self, value: float) -> None:
"""Set gauge to the given value."""
def set(self, value: float, timestamp_sec: Optional[float] = None) -> None:
"""Set gauge to the given value.
This can take an optional timestamp to indicate when the sample was
taken. This is used for the most_recent aggregation and Prometheus
exposition.
"""
self._raise_if_not_observable()
self._value.set(float(value))
self._value.set(float(value), timestamp_sec=timestamp_sec)

def set_to_current_time(self) -> None:
"""Set gauge to the current unixtime."""
Expand Down
40 changes: 20 additions & 20 deletions prometheus_client/mmap_dict.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,26 @@

_INITIAL_MMAP_SIZE = 1 << 16
_pack_integer_func = struct.Struct(b'i').pack
_pack_double_func = struct.Struct(b'd').pack
_pack_two_doubles_func = struct.Struct(b'dd').pack
_unpack_integer = struct.Struct(b'i').unpack_from
_unpack_double = struct.Struct(b'd').unpack_from
_unpack_two_doubles = struct.Struct(b'dd').unpack_from


# struct.pack_into has atomicity issues because it will temporarily write 0 into
# the mmap, resulting in false reads to 0 when experiencing a lot of writes.
# Using direct assignment solves this issue.

def _pack_double(data, pos, value):
data[pos:pos + 8] = _pack_double_func(value)

def _pack_two_doubles(data, pos, value, timestamp_sec):
data[pos:pos + 16] = _pack_two_doubles_func(value, timestamp_sec)


def _pack_integer(data, pos, value):
data[pos:pos + 4] = _pack_integer_func(value)


def _read_all_values(data, used=0):
"""Yield (key, value, pos). No locking is performed."""
"""Yield (key, value, timestamp_sec, pos). No locking is performed."""

if used <= 0:
# If not valid `used` value is passed in, read it from the file.
Expand All @@ -41,9 +42,9 @@ def _read_all_values(data, used=0):
encoded_key = data[pos:pos + encoded_len]
padded_len = encoded_len + (8 - (encoded_len + 4) % 8)
pos += padded_len
value = _unpack_double(data, pos)[0]
yield encoded_key.decode('utf-8'), value, pos
pos += 8
value, timestamp_sec = _unpack_two_doubles(data, pos)
yield encoded_key.decode('utf-8'), value, timestamp_sec, pos
pos += 16


class MmapedDict:
Expand All @@ -53,7 +54,8 @@ class MmapedDict:
Then 4 bytes of padding.
There's then a number of entries, consisting of a 4 byte int which is the
size of the next field, a utf-8 encoded string key, padding to a 8 byte
alignment, and then a 8 byte float which is the value.
alignment, and then a 8 byte float which is the value and a 8 byte float
which is a UNIX timestamp in seconds.
Not thread safe.
"""
Expand All @@ -76,7 +78,7 @@ def __init__(self, filename, read_mode=False):
_pack_integer(self._m, 0, self._used)
else:
if not read_mode:
for key, _, pos in self._read_all_values():
for key, _, _, pos in self._read_all_values():
self._positions[key] = pos

@staticmethod
Expand All @@ -95,7 +97,7 @@ def _init_value(self, key):
encoded = key.encode('utf-8')
# Pad to be 8-byte aligned.
padded = encoded + (b' ' * (8 - (len(encoded) + 4) % 8))
value = struct.pack(f'i{len(padded)}sd'.encode(), len(encoded), padded, 0.0)
value = struct.pack(f'i{len(padded)}sdd'.encode(), len(encoded), padded, 0.0, 0.0)
while self._used + len(value) > self._capacity:
self._capacity *= 2
self._f.truncate(self._capacity)
Expand All @@ -105,30 +107,28 @@ def _init_value(self, key):
# Update how much space we've used.
self._used += len(value)
_pack_integer(self._m, 0, self._used)
self._positions[key] = self._used - 8
self._positions[key] = self._used - 16

def _read_all_values(self):
"""Yield (key, value, pos). No locking is performed."""
return _read_all_values(data=self._m, used=self._used)

def read_all_values(self):
"""Yield (key, value). No locking is performed."""
for k, v, _ in self._read_all_values():
yield k, v
"""Yield (key, value, timestamp_sec). No locking is performed."""
for k, v, ts, _ in self._read_all_values():
yield k, v, ts

def read_value(self, key):
if key not in self._positions:
self._init_value(key)
pos = self._positions[key]
# We assume that reading from an 8 byte aligned value is atomic
return _unpack_double(self._m, pos)[0]
return _unpack_two_doubles(self._m, pos)

def write_value(self, key, value):
def write_value(self, key, value, timestamp_sec):
if key not in self._positions:
self._init_value(key)
pos = self._positions[key]
# We assume that writing to an 8 byte aligned value is atomic
_pack_double(self._m, pos, value)
_pack_two_doubles(self._m, pos, value, timestamp_sec)

def close(self):
if self._f:
Expand Down
13 changes: 10 additions & 3 deletions prometheus_client/multiprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def _parse_key(key):
# the file is missing
continue
raise
for key, value, _ in file_values:
for key, value, timestamp_sec, _ in file_values:
metric_name, name, labels, labels_key, help_text = _parse_key(key)

metric = metrics.get(metric_name)
Expand All @@ -79,7 +79,7 @@ def _parse_key(key):
if typ == 'gauge':
pid = parts[2][:-3]
metric._multiprocess_mode = parts[1]
metric.add_sample(name, labels_key + (('pid', pid),), value)
metric.add_sample(name, labels_key + (('pid', pid),), value, timestamp_sec)
else:
# The duplicates and labels are fixed in the next for.
metric.add_sample(name, labels_key, value)
Expand All @@ -89,6 +89,7 @@ def _parse_key(key):
def _accumulate_metrics(metrics, accumulate):
for metric in metrics.values():
samples = defaultdict(float)
sample_timestamp_secs = defaultdict(float)
buckets = defaultdict(lambda: defaultdict(float))
samples_setdefault = samples.setdefault
for s in metric.samples:
Expand All @@ -105,6 +106,12 @@ def _accumulate_metrics(metrics, accumulate):
samples[without_pid_key] = value
elif metric._multiprocess_mode in ('sum', 'livesum'):
samples[without_pid_key] += value
elif metric._multiprocess_mode in ('mostrecent', 'livemostrecent'):
current_ts_sec = sample_timestamp_secs[without_pid_key]
ts_sec = float(timestamp or 0)
if current_ts_sec < ts_sec:
samples[without_pid_key] = value
sample_timestamp_secs[without_pid_key] = ts_sec
else: # all/liveall
samples[(name, labels)] = value

Expand Down Expand Up @@ -143,7 +150,7 @@ def _accumulate_metrics(metrics, accumulate):
samples[(metric.name + '_count', labels)] = acc

# Convert to correct sample format.
metric.samples = [Sample(name_, dict(labels), value) for (name_, labels), value in samples.items()]
metric.samples = [Sample(name_, dict(labels), value, sample_timestamp_secs.get((name_, labels), None)) for (name_, labels), value in samples.items()]
return metrics.values()

def collect(self):
Expand Down
30 changes: 23 additions & 7 deletions prometheus_client/values.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
from threading import Lock
import time
import warnings

from .mmap_dict import mmap_key, MmapedDict
Expand All @@ -12,16 +13,23 @@ class MutexValue:

def __init__(self, typ, metric_name, name, labelnames, labelvalues, help_text, **kwargs):
self._value = 0.0
self._timestamp_sec = 0.0
self._exemplar = None
self._lock = Lock()

def inc(self, amount):
def inc(self, amount, timestamp_sec=None):
if not timestamp_sec:
timestamp_sec = time.time()
with self._lock:
self._value += amount
self._timestamp_sec = timestamp_sec

def set(self, value):
def set(self, value, timestamp_sec=None):
if not timestamp_sec:
timestamp_sec = time.time()
with self._lock:
self._value = value
self._timestamp_sec = timestamp_sec

def set_exemplar(self, exemplar):
with self._lock:
Expand Down Expand Up @@ -82,7 +90,7 @@ def __reset(self):
files[file_prefix] = MmapedDict(filename)
self._file = files[file_prefix]
self._key = mmap_key(metric_name, name, labelnames, labelvalues, help_text)
self._value = self._file.read_value(self._key)
self._value, self._timestamp_sec = self._file.read_value(self._key)

def __check_for_pid_change(self):
actual_pid = process_identifier()
Expand All @@ -95,17 +103,25 @@ def __check_for_pid_change(self):
for value in values:
value.__reset()

def inc(self, amount):
def inc(self, amount, timestamp_sec=None):
if not timestamp_sec:
timestamp_sec = time.time()

with lock:
self.__check_for_pid_change()
self._value += amount
self._file.write_value(self._key, self._value)
self._timestamp_sec = timestamp_sec
self._file.write_value(self._key, self._value, self._timestamp_sec)

def set(self, value, timestamp_sec=None):
if not timestamp_sec:
timestamp_sec = time.time()

def set(self, value):
with lock:
self.__check_for_pid_change()
self._value = value
self._file.write_value(self._key, self._value)
self._timestamp_sec = timestamp_sec
self._file.write_value(self._key, self._value, self._timestamp_sec)

def set_exemplar(self, exemplar):
# TODO: Implement exemplars for multiprocess mode.
Expand Down
40 changes: 30 additions & 10 deletions tests/test_multiprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,26 @@ def test_gauge_livesum(self):
mark_process_dead(123, os.environ['PROMETHEUS_MULTIPROC_DIR'])
self.assertEqual(2, self.registry.get_sample_value('g'))

def test_gauge_mostrecent(self):
g1 = Gauge('g', 'help', registry=None, multiprocess_mode='mostrecent')
values.ValueClass = MultiProcessValue(lambda: 456)
g2 = Gauge('g', 'help', registry=None, multiprocess_mode='mostrecent')
g1.set(1, 2000)
g2.set(2, 1000)
self.assertEqual(1, self.registry.get_sample_value('g'))
mark_process_dead(123, os.environ['PROMETHEUS_MULTIPROC_DIR'])
self.assertEqual(1, self.registry.get_sample_value('g'))

def test_gauge_livemostrecent(self):
g1 = Gauge('g', 'help', registry=None, multiprocess_mode='livemostrecent')
values.ValueClass = MultiProcessValue(lambda: 456)
g2 = Gauge('g', 'help', registry=None, multiprocess_mode='livemostrecent')
g1.set(1, 2000)
g2.set(2, 1000)
self.assertEqual(1, self.registry.get_sample_value('g'))
mark_process_dead(123, os.environ['PROMETHEUS_MULTIPROC_DIR'])
self.assertEqual(2, self.registry.get_sample_value('g'))

def test_namespace_subsystem(self):
c1 = Counter('c', 'help', registry=None, namespace='ns', subsystem='ss')
c1.inc(1)
Expand Down Expand Up @@ -369,28 +389,28 @@ def setUp(self):
self.d = mmap_dict.MmapedDict(self.tempfile)

def test_process_restart(self):
self.d.write_value('abc', 123.0)
self.d.write_value('abc', 123.0, 987.0)
self.d.close()
self.d = mmap_dict.MmapedDict(self.tempfile)
self.assertEqual(123, self.d.read_value('abc'))
self.assertEqual([('abc', 123.0)], list(self.d.read_all_values()))
self.assertEqual((123, 987.0), self.d.read_value('abc'))
self.assertEqual([('abc', 123.0, 987.0)], list(self.d.read_all_values()))

def test_expansion(self):
key = 'a' * mmap_dict._INITIAL_MMAP_SIZE
self.d.write_value(key, 123.0)
self.assertEqual([(key, 123.0)], list(self.d.read_all_values()))
self.d.write_value(key, 123.0, 987.0)
self.assertEqual([(key, 123.0, 987.0)], list(self.d.read_all_values()))

def test_multi_expansion(self):
key = 'a' * mmap_dict._INITIAL_MMAP_SIZE * 4
self.d.write_value('abc', 42.0)
self.d.write_value(key, 123.0)
self.d.write_value('def', 17.0)
self.d.write_value('abc', 42.0, 987.0)
self.d.write_value(key, 123.0, 876.0)
self.d.write_value('def', 17.0, 765.0)
self.assertEqual(
[('abc', 42.0), (key, 123.0), ('def', 17.0)],
[('abc', 42.0, 987.0), (key, 123.0, 876.0), ('def', 17.0, 765.0)],
list(self.d.read_all_values()))

def test_corruption_detected(self):
self.d.write_value('abc', 42.0)
self.d.write_value('abc', 42.0, 987.0)
# corrupt the written data
self.d._m[8:16] = b'somejunk'
with self.assertRaises(RuntimeError):
Expand Down

0 comments on commit cc84ce2

Please sign in to comment.