Skip to content

Commit

Permalink
Add distributed.metrics.monotonic
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Apr 23, 2022
1 parent 370f456 commit 1ff857e
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 20 deletions.
27 changes: 16 additions & 11 deletions distributed/metrics.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
from __future__ import annotations

import collections
import sys
import time as timemod
from collections.abc import Callable
from functools import wraps

from distributed.compatibility import WINDOWS

_empty_namedtuple = collections.namedtuple("_empty_namedtuple", ())


Expand Down Expand Up @@ -36,15 +40,15 @@ def wrapper(): # pragma: no cover


class _WindowsTime:
"""
Combine time.time() and time.perf_counter() to get an absolute clock
with fine resolution.
"""Combine time.time() or time.monotonic() with time.perf_counter() to get an
absolute clock with fine resolution.
"""

# Resync every N seconds, to avoid drifting
RESYNC_EVERY = 600

def __init__(self):
def __init__(self, base: Callable[[], float]):
self.base_timer = base
self.delta = None
self.last_resync = float("-inf")

Expand All @@ -59,7 +63,7 @@ def time(self):
return delta + cur

def resync(self):
_time = timemod.time
_time = self.base_timer
_perf_counter = self.perf_counter
min_samples = 5
while True:
Expand All @@ -77,19 +81,20 @@ def resync(self):


# A high-resolution wall clock timer measuring the seconds since Unix epoch
if sys.platform.startswith("win"):
time = _WindowsTime().time
if WINDOWS:
time = _WindowsTime(timemod.time).time
monotonic = _WindowsTime(timemod.monotonic).time
else:
# Under modern Unices, time.time() should be good enough
# Under modern Unixes, time.time() and time.monotonic() should be good enough
time = timemod.time
monotonic = timemod.monotonic

process_time = timemod.process_time

# Get a per-thread CPU timer function if possible, otherwise
# use a per-process CPU timer function.
try:
# thread_time is supported on Python 3.7+ but not all platforms
# thread_time is not supported on all platforms
thread_time = timemod.thread_time
except (AttributeError, OSError): # pragma: no cover
# process_time is supported on Python 3.3+ everywhere
thread_time = process_time
17 changes: 10 additions & 7 deletions distributed/tests/test_metrics.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
import time

import pytest

from distributed import metrics


def test_wall_clock():
@pytest.mark.parametrize("name", ["time", "monotonic"])
def test_wall_clock(name):
for i in range(3):
time.sleep(0.01)
t = time.time()
samples = [metrics.time() for j in range(50)]
t = getattr(time, name)()
samples = [getattr(metrics, name)() for _ in range(100)]
# Resolution
deltas = [samples[j + 1] - samples[j] for j in range(len(samples) - 1)]
deltas = [sj - si for si, sj in zip(samples[:-1], samples[1:])]
assert min(deltas) >= 0.0, deltas
assert max(deltas) <= 1.0, deltas
assert any(lambda d: 0.0 < d < 0.0001 for d in deltas), deltas
# Close to time.time()
assert max(deltas) <= 0.001, deltas
assert any(0.0 < d < 0.0001 for d in deltas), deltas
# Close to time.time() / time.monotonic()
assert t - 0.5 < samples[0] < t + 0.5
4 changes: 2 additions & 2 deletions distributed/tests/test_variable.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
import logging
import random
from datetime import timedelta
from time import monotonic, sleep
from time import sleep

import pytest

from distributed import Client, Nanny, TimeoutError, Variable, wait, worker_client
from distributed.compatibility import WINDOWS
from distributed.metrics import time
from distributed.metrics import time, monotonic
from distributed.utils_test import captured_logger, div, gen_cluster, inc, popen


Expand Down

0 comments on commit 1ff857e

Please sign in to comment.