Skip to content

Commit

Permalink
Merge #108
Browse files Browse the repository at this point in the history
108: Attempt to fix "remaining burst capacity" when time moves r=antifuchs a=antifuchs

This is a stab at addressing #102. This adds a "time of measurement" field to the StateSnapshot struct, which allows correct computation of the remaining burst capacity.

This PR was previously blocked by a mysterious test failure on my M1 mac, which I'm pretty sure was related #114. After rebasing to latest master, this passes its own tests and looks plausible enough to release.

Co-authored-by: Andreas Fuchs <asf@boinkor.net>
  • Loading branch information
bors[bot] and antifuchs authored May 28, 2022
2 parents cbf718d + c24bf93 commit 9ef36f4
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 22 deletions.
26 changes: 15 additions & 11 deletions governor/src/gcra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,15 @@ impl Gcra {
let tat = tat.unwrap_or_else(|| self.starting_state(t0));
let earliest_time = tat.saturating_sub(tau);
if t0 < earliest_time {
Err(MW::disallow(key, (self, earliest_time), start))
Err(MW::disallow(
key,
StateSnapshot::new(self.t, self.tau, earliest_time, earliest_time),
start,
))
} else {
let next = cmp::max(tat, t0) + t;
Ok((
MW::allow(key, StateSnapshot::new(self.t, self.tau, next)),
MW::allow(key, StateSnapshot::new(self.t, self.tau, t0, next)),
next,
))
}
Expand Down Expand Up @@ -157,23 +161,23 @@ impl Gcra {
if t0 < earliest_time {
Err(NegativeMultiDecision::BatchNonConforming(
n.get(),
MW::disallow(key, (self, earliest_time), start),
MW::disallow(
key,
StateSnapshot::new(self.t, self.tau, earliest_time, earliest_time),
start,
),
))
} else {
let next = cmp::max(tat, t0) + t + additional_weight;
Ok((MW::allow(key, (self, next)), next))
Ok((
MW::allow(key, StateSnapshot::new(self.t, self.tau, t0, next)),
next,
))
}
})
}
}

impl From<(&Gcra, Nanos)> for StateSnapshot {
#[inline]
fn from(pair: (&Gcra, Nanos)) -> Self {
StateSnapshot::new(pair.0.t, pair.0.tau, pair.1)
}
}

#[cfg(test)]
mod test {
use super::*;
Expand Down
31 changes: 20 additions & 11 deletions governor/src/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
//!
//! You can define your own middleware by `impl`ing [`RateLimitingMiddleware`].
use core::fmt;
use std::marker::PhantomData;
use std::{cmp, marker::PhantomData};

use crate::{clock, nanos::Nanos, NotUntil, Quota};

Expand All @@ -77,14 +77,22 @@ pub struct StateSnapshot {
/// The "burst capacity" of the bucket.
tau: Nanos,

/// The time at which the measurement was taken.
pub(crate) time_of_measurement: Nanos,

/// The next time a cell is expected to arrive
pub(crate) tat: Nanos,
}

impl StateSnapshot {
#[inline]
pub(crate) fn new(t: Nanos, tau: Nanos, tat: Nanos) -> Self {
Self { t, tau, tat }
pub(crate) fn new(t: Nanos, tau: Nanos, time_of_measurement: Nanos, tat: Nanos) -> Self {
Self {
t,
tau,
time_of_measurement,
tat,
}
}

/// Returns the quota used to make the rate limiting decision.
Expand All @@ -98,14 +106,15 @@ impl StateSnapshot {
/// If this state snapshot is based on a negative rate limiting
/// outcome, this method returns 0.
pub fn remaining_burst_capacity(&self) -> u32 {
// at this point we know that we're `tat` nanos after the
// earliest arrival time, and so are using up some "burst
// capacity".
//
// As one cell has already been used by the positive
// decision, we're relying on the "round down" behavior of
// unsigned integer division.
(self.quota().burst_size().get() + 1).saturating_sub((self.tat / self.t) as u32)
let t0 = if self.time_of_measurement.as_u64() == 0 {
self.time_of_measurement + self.t
} else {
self.time_of_measurement
};
(cmp::min(
(t0 + self.tau).saturating_sub(self.tat).as_u64(),
self.tau.as_u64(),
) / self.t.as_u64()) as u32
}
}

Expand Down
33 changes: 33 additions & 0 deletions governor/tests/middleware.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::time::Duration;

use governor::{
clock::{self, FakeRelativeClock},
middleware::{RateLimitingMiddleware, StateInformationMiddleware, StateSnapshot},
Expand Down Expand Up @@ -68,3 +70,34 @@ fn state_information() {
fn mymw_derives() {
assert_eq!(format!("{:?}", MyMW), "MyMW");
}

#[test]
fn state_snapshot_tracks_quota_accurately() {
use crate::clock::FakeRelativeClock;
use crate::RateLimiter;
use nonzero_ext::*;

let clock = FakeRelativeClock::default();

let lim = RateLimiter::direct_with_clock(Quota::per_minute(nonzero!(5_u32)), &clock)
.with_middleware::<StateInformationMiddleware>();

assert_eq!(lim.check().map(|s| s.remaining_burst_capacity()), Ok(4));
assert_eq!(
lim.check_n(nonzero!(3_u32))
.map(|s| s.remaining_burst_capacity()),
Ok(1)
);
assert_eq!(lim.check().map(|s| s.remaining_burst_capacity()), Ok(0));
assert_eq!(lim.check().map_err(|_| ()), Err(()), "should rate limit");

clock.advance(Duration::from_secs(120));
assert_eq!(lim.check().map(|s| s.remaining_burst_capacity()), Ok(4));
assert_eq!(lim.check().map(|s| s.remaining_burst_capacity()), Ok(3));
assert_eq!(lim.check().map(|s| s.remaining_burst_capacity()), Ok(2));
assert_eq!(lim.check().map(|s| s.remaining_burst_capacity()), Ok(1));
assert_eq!(lim.check().map(|s| s.remaining_burst_capacity()), Ok(0));
// TODO: this is incorrect:
assert_eq!(lim.check().map(|s| s.remaining_burst_capacity()), Ok(0));
assert_eq!(lim.check().map_err(|_| ()), Err(()));
}

0 comments on commit 9ef36f4

Please sign in to comment.