Skip to content

Commit

Permalink
Add sort-of-ugly rate limit for exception replay
Browse files Browse the repository at this point in the history
Signed-off-by: Bob Weinand <bob.weinand@datadoghq.com>
  • Loading branch information
bwoebi committed Sep 9, 2024
1 parent 0ded09c commit f6d72d6
Show file tree
Hide file tree
Showing 14 changed files with 759 additions and 58 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ members = [
"tools",
"ipc",
"ipc/macros",
"live-debugger",
"live-debugger-ffi",
"remote-config",
"sidecar",
"sidecar/macros",
"sidecar-ffi",
Expand Down
468 changes: 463 additions & 5 deletions LICENSE-3rdparty.yml

Large diffs are not rendered by default.

20 changes: 16 additions & 4 deletions ddcommon/src/rate_limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ pub trait Limiter {
/// Returns the effective rate per interval.
/// Note: The rate is only guaranteed to be accurate immediately after a call to inc().
fn rate(&self) -> f64;
/// Updates the rate and returns it
fn update_rate(&self) -> f64;
}

/// A thread-safe limiter built on Atomics.
Expand Down Expand Up @@ -83,23 +85,28 @@ impl LocalLimiter {
self.last_limit.store(0, Ordering::Relaxed);
self.granularity = TIME_PER_SECOND * seconds as i64;
}
}

impl Limiter for LocalLimiter {
fn inc(&self, limit: u32) -> bool {
fn update(&self, limit: u32, inc: i64) -> i64 {
let now = now();
let last = self.last_update.swap(now, Ordering::SeqCst);
// Make sure reducing the limit doesn't stall for a long time
let clear_limit = limit.max(self.last_limit.load(Ordering::Relaxed));
let clear_counter = (now as i64 - last as i64) * (clear_limit as i64);
let subtract = clear_counter - self.granularity;
let subtract = clear_counter - inc;
let mut previous_hits = self.hit_count.fetch_sub(subtract, Ordering::SeqCst);
// Handle where the limiter goes below zero
if previous_hits < subtract {
let add = clear_counter - previous_hits.max(0);
self.hit_count.fetch_add(add, Ordering::Acquire);
previous_hits += add - clear_counter;
}
previous_hits
}
}

impl Limiter for LocalLimiter {
fn inc(&self, limit: u32) -> bool {
let previous_hits = self.update(limit, self.granularity);
if previous_hits / self.granularity >= limit as i64 {
self.hit_count
.fetch_sub(self.granularity, Ordering::Acquire);
Expand All @@ -119,6 +126,11 @@ impl Limiter for LocalLimiter {
let hit_count = self.hit_count.load(Ordering::Relaxed);
(hit_count as f64 / (last_limit as i64 * self.granularity) as f64).clamp(0., 1.)
}

fn update_rate(&self) -> f64 {
self.update(0, self.granularity);
self.rate()
}
}

#[cfg(test)]
Expand Down
163 changes: 122 additions & 41 deletions ipc/src/rate_limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use crate::platform::{FileBackedHandle, MappedMem, NamedShmHandle};
use ddcommon::rate_limiter::{Limiter, LocalLimiter};
use std::cell::UnsafeCell;
use std::ffi::CString;
use std::fmt::{Debug, Formatter};
use std::io;
Expand All @@ -12,18 +13,30 @@ use std::sync::{Arc, RwLock};

#[repr(C)]
#[derive(Default)]
struct ShmLimiterData<'a> {
struct ShmLimiterData<'a, Inner> {
next_free: AtomicU32, // free list
rc: AtomicI32,
limiter: LocalLimiter,
_phantom: PhantomData<&'a ShmLimiterMemory>,
inner: UnsafeCell<Inner>,
_phantom: PhantomData<&'a ShmLimiterMemory<Inner>>,
}

#[derive(Clone)]
pub struct ShmLimiterMemory(Arc<RwLock<MappedMem<NamedShmHandle>>>);
pub struct ShmLimiterMemory<Inner = ()> {
mem: Arc<RwLock<MappedMem<NamedShmHandle>>>,
_phantom: PhantomData<Inner>,
}

impl<Inner> Clone for ShmLimiterMemory<Inner> {
fn clone(&self) -> Self {
ShmLimiterMemory {
mem: self.mem.clone(),
_phantom: Default::default(),
}
}
}

impl ShmLimiterMemory {
const START_OFFSET: u32 = std::mem::align_of::<ShmLimiterData>() as u32;
impl<Inner> ShmLimiterMemory<Inner> {
const START_OFFSET: u32 = std::mem::align_of::<ShmLimiterData<Inner>>() as u32;

pub fn create(path: CString) -> io::Result<Self> {
// Clean leftover shm
Expand All @@ -41,13 +54,16 @@ impl ShmLimiterMemory {
}

fn new(handle: MappedMem<NamedShmHandle>) -> Self {
Self(Arc::new(RwLock::new(handle)))
Self {
mem: Arc::new(RwLock::new(handle)),
_phantom: Default::default(),
}
}

/// The start of the ShmLimiter memory has 4 bytes indicating an offset to the first free
/// element in the free list. It is zero if there is no element on the free list.
fn first_free_ref(&self) -> &AtomicU32 {
unsafe { &*self.0.read().unwrap().as_slice().as_ptr().cast() }
unsafe { &*self.mem.read().unwrap().as_slice().as_ptr().cast() }
}

fn next_free(&mut self) -> u32 {
Expand All @@ -62,10 +78,10 @@ impl ShmLimiterMemory {
.load(Ordering::Relaxed);
// Not yet used memory will always be 0. The next free entry will then be just above.
if target_next_free == 0 {
target_next_free = first_free + std::mem::size_of::<ShmLimiterData>() as u32;
target_next_free = first_free + std::mem::size_of::<ShmLimiterData<Inner>>() as u32;
// target_next_free is the end of the current entry - but we need one more
self.0.write().unwrap().ensure_space(
target_next_free as usize + std::mem::size_of::<ShmLimiterData>(),
self.mem.write().unwrap().ensure_space(
target_next_free as usize + std::mem::size_of::<ShmLimiterData<Inner>>(),
);
}
match self.first_free_ref().compare_exchange(
Expand All @@ -80,7 +96,11 @@ impl ShmLimiterMemory {
}
}

pub fn alloc(&mut self) -> ShmLimiter {
pub fn alloc(&mut self) -> ShmLimiter<Inner> {
self.alloc_with_granularity(1)
}

pub fn alloc_with_granularity(&mut self, seconds: u32) -> ShmLimiter<Inner> {
let reference = ShmLimiter {
idx: self.next_free(),
memory: self.clone(),
Expand All @@ -89,16 +109,16 @@ impl ShmLimiterMemory {
limiter.rc.store(1, Ordering::Relaxed);
// SAFETY: we initialize the struct here
unsafe {
(*(limiter as *const _ as *mut ShmLimiterData))
(*(limiter as *const _ as *mut ShmLimiterData<Inner>))
.limiter
.reset(1)
.reset(seconds)
};
reference
}

pub fn get(&self, idx: u32) -> Option<ShmLimiter> {
pub fn get(&self, idx: u32) -> Option<ShmLimiter<Inner>> {
assert_eq!(
idx % std::mem::size_of::<ShmLimiterData>() as u32,
idx % std::mem::size_of::<ShmLimiterData<Inner>>() as u32,
Self::START_OFFSET
);
let reference = ShmLimiter {
Expand All @@ -120,25 +140,48 @@ impl ShmLimiterMemory {
}
}
}

pub fn find<F>(&self, cond: F) -> Option<ShmLimiter<Inner>>
where
F: Fn(&Inner) -> bool,
{
let mut cur = Self::START_OFFSET;
let mem = self.mem.read().unwrap();
loop {
let data: &ShmLimiterData<Inner> =
unsafe { &*mem.as_slice().as_ptr().add(cur as usize).cast() };
if data.next_free.load(Ordering::Relaxed) == 0 {
return None;
}
if data.rc.load(Ordering::Relaxed) > 0 && cond(unsafe { &*data.inner.get() }) {
if let Some(limiter) = self.get(cur) {
if cond(limiter.data()) {
return Some(limiter);
}
}
}
cur += std::mem::size_of::<ShmLimiterData<Inner>>() as u32;
}
}
}

pub struct ShmLimiter {
pub struct ShmLimiter<Inner = ()> {
idx: u32,
memory: ShmLimiterMemory,
memory: ShmLimiterMemory<Inner>,
}

impl Debug for ShmLimiter {
impl<Inner> Debug for ShmLimiter<Inner> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
self.idx.fmt(f)
}
}

impl ShmLimiter {
fn limiter(&self) -> &ShmLimiterData {
impl<Inner> ShmLimiter<Inner> {
fn limiter(&self) -> &ShmLimiterData<Inner> {
unsafe {
&*self
.memory
.0
.mem
.read()
.unwrap()
.as_slice()
Expand All @@ -148,39 +191,73 @@ impl ShmLimiter {
}
}

pub fn data(&self) -> &Inner {
unsafe { &*self.limiter().inner.get() }
}

pub fn index(&self) -> u32 {
self.idx
}

/// # Safety
/// Callers MUST NOT do any other operations on this instance if dropping was successful.
pub unsafe fn drop_if_rc_1(&mut self) -> bool {
let limiter = self.limiter();

if limiter
.rc
.compare_exchange(1, 0, Ordering::SeqCst, Ordering::Relaxed)
.is_ok()
{
self.actual_free(limiter);
self.idx = 0;
true
} else {
false
}
}

fn actual_free(&self, limiter: &ShmLimiterData<Inner>) {
let next_free_ref = self.memory.first_free_ref();
let mut next_free = next_free_ref.load(Ordering::Relaxed);
loop {
limiter.next_free.store(next_free, Ordering::Relaxed);
match next_free_ref.compare_exchange(
next_free,
self.idx,
Ordering::SeqCst,
Ordering::Relaxed,
) {
Ok(_) => return,
Err(found) => next_free = found,
}
}
}
}

impl Limiter for ShmLimiter {
impl<Inner> Limiter for ShmLimiter<Inner> {
fn inc(&self, limit: u32) -> bool {
self.limiter().limiter.inc(limit)
}

fn rate(&self) -> f64 {
self.limiter().limiter.rate()
}

fn update_rate(&self) -> f64 {
self.limiter().limiter.update_rate()
}
}

impl Drop for ShmLimiter {
impl<Inner> Drop for ShmLimiter<Inner> {
fn drop(&mut self) {
if self.idx == 0 {
return;
}

let limiter = self.limiter();
if limiter.rc.fetch_sub(1, Ordering::SeqCst) == 1 {
let next_free_ref = self.memory.first_free_ref();
let mut next_free = next_free_ref.load(Ordering::Relaxed);
loop {
limiter.next_free.store(next_free, Ordering::Relaxed);
match next_free_ref.compare_exchange(
next_free,
self.idx,
Ordering::SeqCst,
Ordering::Relaxed,
) {
Ok(_) => return,
Err(found) => next_free = found,
}
}
self.actual_free(limiter);
}
}
}
Expand All @@ -207,6 +284,10 @@ impl Limiter for AnyLimiter {
fn rate(&self) -> f64 {
self.limiter().rate()
}

fn update_rate(&self) -> f64 {
self.limiter().update_rate()
}
}

#[cfg(test)]
Expand All @@ -224,7 +305,7 @@ mod tests {
#[test]
#[cfg_attr(miri, ignore)]
fn test_limiters() {
let mut limiters = ShmLimiterMemory::create(path()).unwrap();
let mut limiters = ShmLimiterMemory::<()>::create(path()).unwrap();
let limiter = limiters.alloc();
let limiter_idx = limiter.idx;
// Two are allowed, then one more because a small amount of time passed since the first one
Expand All @@ -243,7 +324,7 @@ mod tests {
let limiter2 = limiters.alloc();
assert_eq!(
limiter2.idx,
limiter_idx + std::mem::size_of::<ShmLimiterData>() as u32
limiter_idx + std::mem::size_of::<ShmLimiterData<()>>() as u32
);
drop(limiter);

Expand All @@ -253,7 +334,7 @@ mod tests {
let limiter3 = limiters.alloc();
assert_eq!(
limiter3.idx,
limiter2.idx + std::mem::size_of::<ShmLimiterData>() as u32
limiter2.idx + std::mem::size_of::<ShmLimiterData<()>>() as u32
);
}
}
7 changes: 2 additions & 5 deletions live-debugger/src/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,8 @@ impl PayloadSender {
parts.path_and_query = Some(PathAndQuery::from_str(&query)?);
url = Uri::from_parts(parts)?;

let mut req = hyper::Request::builder()
.header(
hyper::header::USER_AGENT,
concat!("Tracer/", env!("CARGO_PKG_VERSION")),
)
let mut req = endpoint
.into_request_builder(concat!("Tracer/", env!("CARGO_PKG_VERSION")))?
.method(Method::POST)
.uri(url);

Expand Down
2 changes: 2 additions & 0 deletions sidecar-ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,7 @@ pub unsafe extern "C" fn ddog_sidecar_session_set_config(
tracer_version: ffi::CharSlice,
flush_interval_milliseconds: u32,
telemetry_heartbeat_interval_millis: u32,
exception_hash_rate_limiter_seconds: u32,
force_flush_size: usize,
force_drop_size: usize,
log_level: ffi::CharSlice,
Expand Down Expand Up @@ -517,6 +518,7 @@ pub unsafe extern "C" fn ddog_sidecar_session_set_config(
telemetry_heartbeat_interval: Duration::from_millis(
telemetry_heartbeat_interval_millis as u64
),
exception_hash_rate_limiter_seconds,
force_flush_size,
force_drop_size,
log_level: log_level.to_utf8_lossy().into(),
Expand Down
Loading

0 comments on commit f6d72d6

Please sign in to comment.