Skip to content

Commit

Permalink
Provide a common atomic Rate Limiter implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Bob Weinand <bob.weinand@datadoghq.com>
  • Loading branch information
bwoebi committed Jul 31, 2024
1 parent f3fb542 commit 60c49cd
Show file tree
Hide file tree
Showing 12 changed files with 473 additions and 58 deletions.
64 changes: 37 additions & 27 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions ddcommon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ tokio = { version = "1.23", features = ["rt", "macros"] }
tokio-rustls = { version = "0.26", default-features = false }
serde = { version = "1.0", features = ["derive"] }
static_assertions = "1.1.0"
libc = "0.2"

[target.'cfg(windows)'.dependencies.windows-sys]
version = "0.52"
features = [
"Win32_Foundation",
"Win32_System_SystemInformation",
]

[target.'cfg(unix)'.dependencies]
hyper-rustls = { version = "0.27", default-features = false, features = [
Expand Down
1 change: 1 addition & 0 deletions ddcommon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub mod entity_id;
#[macro_use]
pub mod cstr;
pub mod config;
pub mod rate_limiter;
pub mod tag;

pub mod header {
Expand Down
138 changes: 138 additions & 0 deletions ddcommon/src/rate_limiter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

use std::sync::atomic::{AtomicI64, AtomicU32, AtomicU64, Ordering};

pub trait Limiter {
/// Takes the limit per interval.
/// Returns false if the limit is exceeded, otherwise true.
fn inc(&self, limit: u32) -> bool;
/// Returns the effective rate per interval.
fn rate(&self) -> f64;
}

/// A thread-safe limiter built on Atomics.
/// It's base unit is in seconds, i.e. the minimum allowed rate is 1 per second.
/// Internally the limiter works with the system time granularity, i.e. nanoseconds on unix and
/// milliseconds on windows.
/// The implementation is a sliding window: every time the limiter is increased, the as much time as
/// has passed is also refilled.
#[repr(C)]
pub struct LocalLimiter {
hit_count: AtomicI64,
last_update: AtomicU64,
last_limit: AtomicU32,
granularity: i64,
}

/// Returns nanoseconds on Unix, milliseconds on Windows (system time granularity is bad there).
#[cfg(windows)]
const TIME_PER_SECOND: i64 = 1_000; // milliseconds
#[cfg(not(windows))]
const TIME_PER_SECOND: i64 = 1_000_000_000; // nanoseconds

fn now() -> u64 {
#[cfg(windows)]
let now = unsafe { windows_sys::Win32::System::SystemInformation::GetTickCount64() };
#[cfg(not(windows))]
let now = {
let mut ts: libc::timespec = libc::timespec {
tv_sec: 0,
tv_nsec: 0,
};
unsafe { libc::clock_gettime(libc::CLOCK_MONOTONIC, &mut ts) };
(ts.tv_sec * TIME_PER_SECOND + ts.tv_nsec) as u64
};
now
}

impl Default for LocalLimiter {
fn default() -> Self {
LocalLimiter {
hit_count: Default::default(),
last_update: AtomicU64::from(now()),
last_limit: Default::default(),
granularity: TIME_PER_SECOND,
}
}
}

impl LocalLimiter {
/// Allows setting a custom time granularity. The default() implementation is 1 second.
pub fn with_granularity(seconds: u32) -> LocalLimiter {
let mut limiter = LocalLimiter::default();
limiter.granularity *= seconds as i64;
limiter
}

/// Resets, with a given granularity.
pub fn reset(&mut self, seconds: u32) {
self.last_update.store(now(), Ordering::Relaxed);
self.hit_count.store(0, Ordering::Relaxed);
self.last_limit.store(0, Ordering::Relaxed);
self.granularity = TIME_PER_SECOND * seconds as i64;
}
}

impl Limiter for LocalLimiter {
fn inc(&self, limit: u32) -> bool {
let now = now();
let last = self.last_update.swap(now, Ordering::SeqCst);
// Make sure reducing the limit doesn't stall for a long time
let clear_limit = limit.max(self.last_limit.load(Ordering::Relaxed));
let clear_counter = (now as i64 - last as i64) * (clear_limit as i64);
let subtract = clear_counter - self.granularity;
let mut previous_hits = self.hit_count.fetch_sub(subtract, Ordering::SeqCst);
// Handle where the limiter goes below zero
if previous_hits < subtract {
let add = clear_counter - previous_hits.max(0);
self.hit_count.fetch_add(add, Ordering::Acquire);
previous_hits += add - clear_counter;
}
if previous_hits / self.granularity >= limit as i64 {
self.hit_count
.fetch_sub(self.granularity, Ordering::Acquire);
false
} else {
// We don't care about race conditions here:
// If the last limit was high enough to increase the previous_hits, we are anyway close
// to a number realistic to decrease the count quickly; i.e. we won't stall the limiter
// indefinitely when switching from a high to a low limit.
self.last_limit.store(limit, Ordering::Relaxed);
true
}
}

fn rate(&self) -> f64 {
let last_limit = self.last_limit.load(Ordering::Relaxed) as f64;
let hit_count = self.hit_count.load(Ordering::Relaxed) as f64;
(last_limit / hit_count * self.granularity as f64).clamp(0., 1.)
}
}

#[cfg(test)]
mod tests {
use crate::rate_limiter::{Limiter, LocalLimiter, TIME_PER_SECOND};
use std::sync::atomic::Ordering;

#[test]
#[cfg_attr(miri, ignore)]
fn test_rate_limiter() {
let limiter = LocalLimiter::default();
// Two are allowed, then one more because a small amount of time passed since the first one
assert!(limiter.inc(2));
assert!(limiter.inc(2));
assert!(limiter.inc(2));
assert!(!limiter.inc(2));
assert!(!limiter.inc(2));

// reduce 4 times, we're going into negative territory. Next increment will reset to zero.
limiter
.last_update
.fetch_sub(3 * TIME_PER_SECOND as u64, Ordering::Relaxed);
assert!(limiter.inc(2));
assert!(limiter.inc(2));
assert!(limiter.inc(2));
assert!(!limiter.inc(2));
}
}
1 change: 1 addition & 0 deletions ipc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ libc = { version = "0.2" }
# tarpc needed extensions to allow 1 way communication and to export some internal structs
tarpc = { path = "tarpc/tarpc", default-features = false, features = ["serde-transport"], package = "tarpc" }

ddcommon = { path = "../ddcommon" }
datadog-ipc-macros = { path = "macros" }

[dev-dependencies]
Expand Down
1 change: 1 addition & 0 deletions ipc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub mod handles;
pub mod transport;

pub mod platform;
pub mod rate_limiter;
pub mod sequential;

pub use tarpc;
4 changes: 2 additions & 2 deletions ipc/src/platform/mem_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ mod tests {
let shm = ShmHandle::new(5).unwrap();
let mut mapped = shm.map().unwrap();
_ = mapped.as_slice_mut().write(&[1, 2, 3, 4, 5]).unwrap();
let mapped = mapped.ensure_space(100000);
mapped.ensure_space(100000);
assert!(mapped.as_slice().len() >= 100000);
let mut exp = vec![0u8; mapped.as_slice().len()];
_ = (&mut exp[..5]).write(&[1, 2, 3, 4, 5]).unwrap();
Expand All @@ -229,7 +229,7 @@ mod tests {
let shm = NamedShmHandle::create(path.clone(), 5).unwrap();
let mut mapped = shm.map().unwrap();
_ = mapped.as_slice_mut().write(&[1, 2, 3, 4, 5]).unwrap();
let mapped = mapped.ensure_space(100000);
mapped.ensure_space(100000);
assert!(mapped.as_slice().len() >= 100000);

let other = NamedShmHandle::open(&path).unwrap().map().unwrap();
Expand Down
9 changes: 5 additions & 4 deletions ipc/src/platform/unix/mem_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,15 @@ impl NamedShmHandle {
}

impl<T: FileBackedHandle + From<MappedMem<T>>> MappedMem<T> {
pub fn ensure_space(self, expected_size: usize) -> MappedMem<T> {
pub fn ensure_space(&mut self, expected_size: usize) {
if expected_size <= self.mem.get_shm().size {
return self;
return;
}

let mut handle: T = self.into();
// SAFETY: we'll overwrite the original memory later
let mut handle: T = unsafe { std::ptr::read(self) }.into();
_ = handle.resize(expected_size);
handle.map().unwrap()
unsafe { std::ptr::write(self, handle.map().unwrap()) };
}
}

Expand Down
Loading

0 comments on commit 60c49cd

Please sign in to comment.