Skip to content

Commit

Permalink
recovery: windowed min and max filter implementation
Browse files Browse the repository at this point in the history
Port google opensourced min-max filter code based on
Kathleen Nichols' algorithm for tracking the minimum
(or maximum) value of a data stream over some fixed
time interval.
  • Loading branch information
Lohith Bellad committed Mar 8, 2020
1 parent 2088dfe commit f806c43
Show file tree
Hide file tree
Showing 3 changed files with 288 additions and 2 deletions.
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5451,6 +5451,7 @@ mod crypto;
mod ffi;
mod frame;
pub mod h3;
mod minmax;
mod octets;
mod packet;
mod rand;
Expand Down
273 changes: 273 additions & 0 deletions src/minmax.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,273 @@
// Copyright (C) 2018-2019, Cloudflare, Inc.
// Copyright (C) 2017, Google, Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright notice,
// this list of conditions and the following disclaimer.
//
// * Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::time::Duration;
use std::time::Instant;

#[derive(Copy, Clone)]
struct MinmaxSample {
time: Instant,
value: Duration,
}

pub struct Minmax {
estimate: [MinmaxSample; 3],
}

impl Minmax {
pub fn new() -> Self {
Minmax {
estimate: [MinmaxSample {
time: Instant::now(),
value: Duration::new(0, 0),
}; 3],
}
}

// Reset estimates to measured value.
pub fn reset(&mut self, time: Instant, meas: Duration) -> Duration {
let val = MinmaxSample { time, value: meas };

for i in 0..3 {
self.estimate[i] = val;
}
self.estimate[0].value
}

// Check if new measurement updates previous min estimates.
pub fn running_min(
&mut self, win: Duration, time: Instant, meas: Duration,
) -> Duration {
let val = MinmaxSample { time, value: meas };

let temp = time.duration_since(self.estimate[2].time);

// if nothing in window or found new min value, reset!
if temp.gt(&win) || val.value.le(&self.estimate[0].value) {
return self.reset(time, meas);
}

if val.value.le(&self.estimate[1].value) {
self.estimate[2] = val;
self.estimate[1] = val;
} else if val.value.le(&self.estimate[2].value) {
self.estimate[2] = val;
}

self.subwin_update(win, time, meas)
}

// Check if new measurement updates previous max estimates.
pub fn _running_max(
&mut self, win: Duration, time: Instant, meas: Duration,
) -> Duration {
let val = MinmaxSample { time, value: meas };

let temp = time.duration_since(self.estimate[2].time);

// if nothing in window or found new max value, reset!
if val.value.ge(&self.estimate[0].value) || temp.gt(&win) {
return self.reset(time, meas);
}

if val.value.ge(&self.estimate[1].value) {
self.estimate[2] = val;
self.estimate[2] = val;
} else if val.value.ge(&self.estimate[2].value) {
self.estimate[2] = val
}

self.subwin_update(win, time, meas)
}

fn subwin_update(
&mut self, win: Duration, time: Instant, meas: Duration,
) -> Duration {
let val = MinmaxSample { time, value: meas };

let dt = time.duration_since(self.estimate[0].time);

if dt.gt(&win) {
self.estimate[0] = self.estimate[1];
self.estimate[1] = self.estimate[2];
self.estimate[2] = val;

let temp = time.duration_since(self.estimate[0].time);
if temp.gt(&win) {
self.estimate[0] = self.estimate[1];
self.estimate[1] = self.estimate[2];
self.estimate[2] = val;
}
} else if self.estimate[1].time.eq(&self.estimate[0].time) &&
dt.gt(&win.div_f32(4.0))
{
self.estimate[2] = val;
self.estimate[1] = val;
} else if self.estimate[2].time.eq(&self.estimate[1].time) &&
dt.gt(&win.div_f32(2.0))
{
self.estimate[2] = val;
}
self.estimate[0].value
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn reset_filter() {
let mut f = Minmax::new();
let now = Instant::now();
let rtt = Duration::new(0, 50000000);

let rtt_min = f.reset(now, rtt);
assert_eq!(rtt_min, rtt);

assert_eq!(f.estimate[0].time, now);
assert_eq!(f.estimate[0].value, rtt);

assert_eq!(f.estimate[1].time, now);
assert_eq!(f.estimate[1].value, rtt);

assert_eq!(f.estimate[2].time, now);
assert_eq!(f.estimate[2].value, rtt);
}

#[test]
fn get_windowed_min() {
let mut f = Minmax::new();
let rtt_25 = Duration::new(0, 25000000);
let rtt_24 = Duration::new(0, 24000000);
let win = Duration::new(1, 0);
let mut time = Instant::now();

let mut rtt_min = f.reset(time, rtt_25);
assert_eq!(rtt_min, rtt_25);

time += Duration::new(0, 500000000);
rtt_min = f.running_min(win, time, rtt_24);
assert_eq!(rtt_min, rtt_24);
assert_eq!(f.estimate[1].value, rtt_24);
assert_eq!(f.estimate[2].value, rtt_24);

time += Duration::new(1, 25000000);
rtt_min = f.running_min(win, time, rtt_25);
assert_eq!(rtt_min, rtt_25);
assert_eq!(f.estimate[1].value, rtt_25);
assert_eq!(f.estimate[2].value, rtt_25);
}

#[test]
fn get_windowed_max() {
let mut f = Minmax::new();
let rtt_25 = Duration::new(0, 25000000);
let rtt_24 = Duration::new(0, 24000000);
let win = Duration::new(1, 0);
let mut time = Instant::now();

let mut rtt_max = f.reset(time, rtt_24);
assert_eq!(rtt_max, rtt_24);

time += Duration::new(0, 500000000);
rtt_max = f._running_max(win, time, rtt_25);
assert_eq!(rtt_max, rtt_25);
assert_eq!(f.estimate[1].value, rtt_25);
assert_eq!(f.estimate[2].value, rtt_25);

time += Duration::new(1, 25000000);
rtt_max = f._running_max(win, time, rtt_24);
assert_eq!(rtt_max, rtt_24);
assert_eq!(f.estimate[1].value, rtt_24);
assert_eq!(f.estimate[2].value, rtt_24);
}

#[test]
fn get_windowed_min_estimates() {
let mut f = Minmax::new();
let rtt_25 = Duration::new(0, 25000000);
let rtt_24 = Duration::new(0, 24000000);
let rtt_23 = Duration::new(0, 23000000);
let rtt_22 = Duration::new(0, 22000000);
let win = Duration::new(1, 0);
let mut time = Instant::now();

let mut rtt_min = f.reset(time, rtt_23);
assert_eq!(rtt_min, rtt_23);

time += Duration::new(0, 300000000);
rtt_min = f.running_min(win, time, rtt_24);
assert_eq!(rtt_min, rtt_23);
assert_eq!(f.estimate[1].value, rtt_24);
assert_eq!(f.estimate[2].value, rtt_24);

time += Duration::new(0, 300000000);
rtt_min = f.running_min(win, time, rtt_25);
assert_eq!(rtt_min, rtt_23);
assert_eq!(f.estimate[1].value, rtt_24);
assert_eq!(f.estimate[2].value, rtt_25);

time += Duration::new(0, 300000000);
rtt_min = f.running_min(win, time, rtt_22);
assert_eq!(rtt_min, rtt_22);
assert_eq!(f.estimate[1].value, rtt_22);
assert_eq!(f.estimate[2].value, rtt_22);
}

#[test]
fn get_windowed_max_estimates() {
let mut f = Minmax::new();
let rtt_25 = Duration::new(0, 25000000);
let rtt_24 = Duration::new(0, 24000000);
let rtt_23 = Duration::new(0, 23000000);
let rtt_26 = Duration::new(0, 26000000);
let win = Duration::new(1, 0);
let mut time = Instant::now();

let mut rtt_max = f.reset(time, rtt_25);
assert_eq!(rtt_max, rtt_25);

time += Duration::new(0, 300000000);
rtt_max = f._running_max(win, time, rtt_24);
assert_eq!(rtt_max, rtt_25);
assert_eq!(f.estimate[1].value, rtt_24);
assert_eq!(f.estimate[2].value, rtt_24);

time += Duration::new(0, 300000000);
rtt_max = f._running_max(win, time, rtt_23);
assert_eq!(rtt_max, rtt_25);
assert_eq!(f.estimate[1].value, rtt_24);
assert_eq!(f.estimate[2].value, rtt_23);

time += Duration::new(0, 300000000);
rtt_max = f._running_max(win, time, rtt_26);
assert_eq!(rtt_max, rtt_26);
assert_eq!(f.estimate[1].value, rtt_26);
assert_eq!(f.estimate[2].value, rtt_26);
}
}
16 changes: 14 additions & 2 deletions src/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::Result;

use crate::cc;
use crate::frame;
use crate::minmax;
use crate::packet;
use crate::ranges;

Expand All @@ -51,6 +52,8 @@ const INITIAL_RTT: Duration = Duration::from_millis(500);

const PERSISTENT_CONGESTION_THRESHOLD: u32 = 3;

const RTT_WINDOW: Duration = Duration::from_secs(300);

pub struct Sent {
pub pkt_num: u64,

Expand Down Expand Up @@ -111,6 +114,8 @@ pub struct Recovery {

rttvar: Duration,

minmax_filter: minmax::Minmax,

min_rtt: Duration,

pub max_ack_delay: Duration,
Expand Down Expand Up @@ -159,6 +164,8 @@ impl Recovery {

smoothed_rtt: None,

minmax_filter: minmax::Minmax::new(),

min_rtt: Duration::new(0, 0),

rttvar: Duration::new(0, 0),
Expand Down Expand Up @@ -381,20 +388,25 @@ impl Recovery {
}

fn update_rtt(&mut self, latest_rtt: Duration, ack_delay: Duration) {
let update_time = Instant::now();
self.latest_rtt = latest_rtt;

match self.smoothed_rtt {
// First RTT sample.
None => {
self.min_rtt = latest_rtt;
self.min_rtt = self.minmax_filter.reset(update_time, latest_rtt);

self.smoothed_rtt = Some(latest_rtt);

self.rttvar = latest_rtt / 2;
},

Some(srtt) => {
self.min_rtt = cmp::min(self.min_rtt, latest_rtt);
self.min_rtt = self.minmax_filter.running_min(
RTT_WINDOW,
update_time,
latest_rtt,
);

let ack_delay = cmp::min(self.max_ack_delay, ack_delay);

Expand Down

0 comments on commit f806c43

Please sign in to comment.