Skip to content

Commit

Permalink
Replace len magic with count_valid method in ring buffer and moving w…
Browse files Browse the repository at this point in the history
…indow (#664)

The current len magic returns the count of samples in the ring buffer /
moving window that are determined valid. This definition is not
unambiguously obvious to the caller. The current logic is therefore
moved to its own more descriptive method and len magic are removed.
  • Loading branch information
cwasicki authored Sep 14, 2023
2 parents 910bd9c + 7dc661f commit 26590c0
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 36 deletions.
1 change: 1 addition & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
- Provide access to `capacity` (maximum number of elements) in `MovingWindow`.
- Methods to retrieve oldest and newest timestamp of valid samples are added to both.
- `MovingWindow` exposes underlying buffers `window` method.
- `len(window)` and `len(buffer)` should be replaced with `window.count_valid()` and `buffer.count_valid()`, respectively.
- `OrderedRingBuffer.window`:
- By default returns a copy.
- Can also return a view if the window contains `None` values and if `force_copy` is set to `True`.
Expand Down
12 changes: 10 additions & 2 deletions benchmarks/timeseries/periodic_feature_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,16 @@ def _num_windows(
Returns:
The number of windows that are fully contained in the MovingWindow.
"""
num_windows = len(window) // period
if len(window) - num_windows * period >= window_size:

def length(window: NDArray[np.float_] | MovingWindow) -> int:
return (
window.count_valid()
if isinstance(window, MovingWindow)
else len(window)
)

num_windows = length(window) // period
if length(window) - num_windows * period >= window_size:
num_windows += 1

return num_windows
Expand Down
12 changes: 6 additions & 6 deletions src/frequenz/sdk/timeseries/_moving_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,14 +306,14 @@ async def sink_buffer(sample: Sample[Quantity]) -> None:
asyncio.create_task(self._resampler.resample(), name="resample")
)

def __len__(self) -> int:
def count_valid(self) -> int:
"""
Return the size of the `MovingWindow`s underlying buffer.
Count the number of valid samples in this `MovingWindow`.
Returns:
The size of the `MovingWindow`.
The number of valid samples in this `MovingWindow`.
"""
return len(self._buffer)
return self._buffer.count_valid()

@overload
def __getitem__(self, key: SupportsIndex) -> float:
Expand Down Expand Up @@ -362,12 +362,12 @@ def __getitem__(self, key: SupportsIndex | datetime | slice) -> float | ArrayLik
A float if the key is a number or a timestamp.
an numpy array if the key is a slice.
"""
if len(self._buffer) == 0:
if self._buffer.count_valid() == 0:
raise IndexError("The buffer is empty.")
if isinstance(key, slice):
if isinstance(key.start, int) or isinstance(key.stop, int):
if key.start is None or key.stop is None:
key = slice(slice(key.start, key.stop).indices(self.__len__()))
key = slice(slice(key.start, key.stop).indices(self.count_valid()))
elif isinstance(key.start, datetime) or isinstance(key.stop, datetime):
if key.start is None:
key = slice(self._buffer.time_bound_oldest, key.stop)
Expand Down
8 changes: 4 additions & 4 deletions src/frequenz/sdk/timeseries/_periodic_feature_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,13 @@ def __init__(
"""Distance between two succeeding intervals in samples."""

_logger.debug("Initializing PeriodicFeatureExtractor!")
_logger.debug("MovingWindow size: %i", len(self._moving_window))
_logger.debug("MovingWindow size: %i", self._moving_window.count_valid())
_logger.debug(
"Period between two succeeding intervals (in samples): %i",
self._period,
)

if not len(self._moving_window) % self._period == 0:
if not self._moving_window.count_valid() % self._period == 0:
raise ValueError(
"The MovingWindow size is not a integer multiple of the period."
)
Expand Down Expand Up @@ -323,7 +323,7 @@ def _get_buffer_bounds(

rel_pos = self._get_relative_positions(start, window_size)

if window_size > len(self._moving_window):
if window_size > self._moving_window.count_valid():
raise ValueError(
"The window size must be smaller than the size of the `MovingWindow`"
)
Expand Down Expand Up @@ -379,7 +379,7 @@ def _get_reshaped_np_array(
(start_pos, end_pos, window_size) = self._get_buffer_bounds(start, end)

if start_pos >= end_pos:
window_start = self._buffer[start_pos : len(self._moving_window)]
window_start = self._buffer[start_pos : self._moving_window.count_valid()]
window_end = self._buffer[0:end_pos]
# make the linter happy
assert isinstance(window_start, np.ndarray)
Expand Down
10 changes: 5 additions & 5 deletions src/frequenz/sdk/timeseries/_ringbuffer/buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ def oldest_timestamp(self) -> datetime | None:
The oldest timestamp in the buffer
or None if the buffer is empty.
"""
if len(self) == 0:
if self.count_valid() == 0:
return None

if self.is_missing(self.time_bound_oldest):
Expand All @@ -217,7 +217,7 @@ def newest_timestamp(self) -> datetime | None:
Returns:
The newest timestamp in the buffer.
"""
if len(self) == 0:
if self.count_valid() == 0:
return None

return self.time_bound_newest
Expand Down Expand Up @@ -594,11 +594,11 @@ def __getitem__(self, index_or_slice: SupportsIndex | slice) -> float | FloatArr
"""
return self._buffer.__getitem__(index_or_slice)

def __len__(self) -> int:
"""Return the amount of items that this container currently holds.
def count_valid(self) -> int:
"""Count the number of valid items that this buffer currently holds.
Returns:
The length.
The number of valid items in this buffer.
"""
if self._datetime_newest == self._DATETIME_MIN:
return 0
Expand Down
10 changes: 5 additions & 5 deletions tests/timeseries/test_moving_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,13 @@ async def test_window_size() -> None:
window, sender = init_moving_window(timedelta(seconds=5))
async with window:
assert window.capacity == 5, "Wrong window capacity"
assert len(window) == 0, "Window should be empty"
assert window.count_valid() == 0, "Window should be empty"
await push_logical_meter_data(sender, range(0, 2))
assert window.capacity == 5, "Wrong window capacity"
assert len(window) == 2, "Window should be partially full"
assert window.count_valid() == 2, "Window should be partially full"
await push_logical_meter_data(sender, range(2, 20))
assert window.capacity == 5, "Wrong window capacity"
assert len(window) == 5, "Window should be full"
assert window.count_valid() == 5, "Window should be full"


# pylint: disable=redefined-outer-name
Expand All @@ -170,7 +170,7 @@ async def test_resampling_window(fake_time: time_machine.Coordinates) -> None:
resampler_config=resampler_config,
) as window:
assert window.capacity == window_size / output_sampling, "Wrong window capacity"
assert len(window) == 0, "Window should be empty at the beginning"
assert window.count_valid() == 0, "Window should be empty at the beginning"
stream_values = [4.0, 8.0, 2.0, 6.0, 5.0] * 100
for value in stream_values:
timestamp = datetime.now(tz=timezone.utc)
Expand All @@ -179,7 +179,7 @@ async def test_resampling_window(fake_time: time_machine.Coordinates) -> None:
await asyncio.sleep(0.1)
fake_time.shift(0.1)

assert len(window) == window_size / output_sampling
assert window.count_valid() == window_size / output_sampling
for value in window: # type: ignore
assert 4.9 < value < 5.1

Expand Down
28 changes: 14 additions & 14 deletions tests/timeseries/test_ringbuffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,69 +209,69 @@ def test_gaps() -> None: # pylint: disable=too-many-statements
buffer = OrderedRingBuffer([0.0] * 5, ONE_SECOND)
assert buffer.oldest_timestamp is None
assert buffer.newest_timestamp is None
assert len(buffer) == 0
assert buffer.count_valid() == 0
assert len(buffer.gaps) == 0

buffer.update(Sample(dt(0), Quantity(0)))
assert buffer.oldest_timestamp == dt(0)
assert buffer.newest_timestamp == dt(0)
assert len(buffer) == 1
assert buffer.count_valid() == 1
assert len(buffer.gaps) == 1

buffer.update(Sample(dt(6), Quantity(0)))
assert buffer.oldest_timestamp == dt(6)
assert buffer.newest_timestamp == dt(6)
assert len(buffer) == 1
assert buffer.count_valid() == 1
assert len(buffer.gaps) == 1

buffer.update(Sample(dt(2), Quantity(2)))
buffer.update(Sample(dt(3), Quantity(3)))
buffer.update(Sample(dt(4), Quantity(4)))
assert buffer.oldest_timestamp == dt(2)
assert buffer.newest_timestamp == dt(6)
assert len(buffer) == 4
assert buffer.count_valid() == 4
assert len(buffer.gaps) == 1

buffer.update(Sample(dt(3), None))
assert buffer.oldest_timestamp == dt(2)
assert buffer.newest_timestamp == dt(6)
assert len(buffer) == 3
assert buffer.count_valid() == 3
assert len(buffer.gaps) == 2

buffer.update(Sample(dt(3), Quantity(np.nan)))
assert buffer.oldest_timestamp == dt(2)
assert buffer.newest_timestamp == dt(6)
assert len(buffer) == 3
assert buffer.count_valid() == 3
assert len(buffer.gaps) == 2

buffer.update(Sample(dt(2), Quantity(np.nan)))
assert buffer.oldest_timestamp == dt(4)
assert buffer.newest_timestamp == dt(6)
assert len(buffer) == 2
assert buffer.count_valid() == 2
assert len(buffer.gaps) == 2

buffer.update(Sample(dt(3), Quantity(3)))
assert buffer.oldest_timestamp == dt(3)
assert buffer.newest_timestamp == dt(6)
assert len(buffer) == 3
assert buffer.count_valid() == 3
assert len(buffer.gaps) == 2

buffer.update(Sample(dt(2), Quantity(2)))
assert buffer.oldest_timestamp == dt(2)
assert buffer.newest_timestamp == dt(6)
assert len(buffer) == 4
assert buffer.count_valid() == 4
assert len(buffer.gaps) == 1

buffer.update(Sample(dt(5), Quantity(5)))
assert buffer.oldest_timestamp == dt(2)
assert buffer.newest_timestamp == dt(6)
assert len(buffer) == 5
assert buffer.count_valid() == 5
assert len(buffer.gaps) == 0

buffer.update(Sample(dt(99), None))
assert buffer.oldest_timestamp == dt(95) # bug: should be None
assert buffer.newest_timestamp == dt(99) # bug: should be None
assert len(buffer) == 4 # bug: should be 0 (whole range gap)
assert buffer.count_valid() == 4 # bug: should be 0 (whole range gap)
assert len(buffer.gaps) == 1


Expand Down Expand Up @@ -369,7 +369,7 @@ def test_len_ringbuffer_samples_fit_buffer_size() -> None:
timestamp = start_ts + timedelta(seconds=index)
buffer.update(Sample(timestamp, Quantity(float(sample_value))))

assert len(buffer) == len(test_samples)
assert buffer.count_valid() == len(test_samples)


def test_len_with_gaps() -> None:
Expand All @@ -384,7 +384,7 @@ def test_len_with_gaps() -> None:
buffer.update(
Sample(datetime(2, 2, 2, 0, 0, i, tzinfo=timezone.utc), Quantity(float(i)))
)
assert len(buffer) == i + 1
assert buffer.count_valid() == i + 1


def test_len_ringbuffer_samples_overwrite_buffer() -> None:
Expand Down Expand Up @@ -412,7 +412,7 @@ def test_len_ringbuffer_samples_overwrite_buffer() -> None:
timestamp = start_ts + timedelta(seconds=index)
buffer.update(Sample(timestamp, Quantity(float(sample_value))))

assert len(buffer) == half_buffer_size
assert buffer.count_valid() == half_buffer_size


def test_ringbuffer_empty_buffer() -> None:
Expand Down

0 comments on commit 26590c0

Please sign in to comment.