diff --git a/Cargo.lock b/Cargo.lock index 61814595..53076857 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2543,6 +2543,7 @@ dependencies = [ "metrics-util", "saluki-metrics", "stringtheory", + "tracing", ] [[package]] diff --git a/Makefile b/Makefile index ada47fc9..ba6b5b47 100644 --- a/Makefile +++ b/Makefile @@ -26,6 +26,11 @@ export GO_APP_IMAGE ?= debian:bullseye-slim export CARGO_BIN_DIR ?= $(shell echo "${HOME}/.cargo/bin") export GIT_COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo not-in-git) +# Runtime-specific environment variables that get passed to ADP. +# +# We export them here so that we can easily override them while providing defaults. +export DD_LOG_LEVEL ?= info + # Specific versions of various Rust tools we use. export CARGO_TOOL_VERSION_dd-rust-license-tool ?= 1.0.3 export CARGO_TOOL_VERSION_cargo-deny ?= 0.15.0 @@ -159,7 +164,7 @@ endif @echo "[*] Running ADP..." @DD_ADP_USE_NOOP_WORKLOAD_PROVIDER=true \ DD_DOGSTATSD_PORT=0 DD_DOGSTATSD_SOCKET=/tmp/adp-dsd.sock DD_DOGSTATSD_EXPIRY_SECONDS=30 \ - DD_TELEMETRY_ENABLED=true DD_PROMETHEUS_LISTEN_ADDR=tcp://127.0.0.1:6000 DD_LOG_LEVEL=info \ + DD_TELEMETRY_ENABLED=true DD_PROMETHEUS_LISTEN_ADDR=tcp://127.0.0.1:6000 DD_LOG_LEVEL=$(DD_LOG_LEVEL) \ target/release/agent-data-plane .PHONY: run-dsd-basic-udp diff --git a/lib/saluki-components/src/sources/dogstatsd/mod.rs b/lib/saluki-components/src/sources/dogstatsd/mod.rs index 1d506970..e6c312be 100644 --- a/lib/saluki-components/src/sources/dogstatsd/mod.rs +++ b/lib/saluki-components/src/sources/dogstatsd/mod.rs @@ -245,7 +245,8 @@ impl SourceBuilder for DogStatsDConfiguration { .ok_or_else(|| generic_error!("context_string_interner_size must be greater than 0"))?; let context_interner = FixedSizeInterner::new(context_string_interner_size); let context_resolver = ContextResolver::from_interner("dogstatsd", context_interner) - .with_heap_allocations(self.allow_context_heap_allocations); + .with_heap_allocations(self.allow_context_heap_allocations) + .with_background_expiration(); let codec_config = DogstatsdCodecConfiguration::default().with_timestamps(self.no_aggregation_pipeline_support); let codec = DogstatsdCodec::from_context_resolver(context_resolver) diff --git a/lib/saluki-context/Cargo.toml b/lib/saluki-context/Cargo.toml index 002e3a2c..ed42d326 100644 --- a/lib/saluki-context/Cargo.toml +++ b/lib/saluki-context/Cargo.toml @@ -11,6 +11,7 @@ indexmap = { workspace = true, features = ["std"] } metrics = { workspace = true } saluki-metrics = { workspace = true } stringtheory = { workspace = true } +tracing = { workspace = true } [dev-dependencies] metrics-util = { workspace = true, features = ["debugging"] } diff --git a/lib/saluki-context/src/lib.rs b/lib/saluki-context/src/lib.rs index 6b6b963c..1de50fab 100644 --- a/lib/saluki-context/src/lib.rs +++ b/lib/saluki-context/src/lib.rs @@ -9,12 +9,14 @@ use std::{ num::NonZeroUsize, ops::Deref as _, sync::{Arc, OnceLock, RwLock}, + time::Duration, }; use indexmap::{Equivalent, IndexSet}; use metrics::Gauge; use saluki_metrics::static_metrics; use stringtheory::{interning::FixedSizeInterner, MetaString}; +use tracing::trace; static DIRTY_CONTEXT_HASH: OnceLock = OnceLock::new(); @@ -70,6 +72,7 @@ pub struct ContextResolver { state: Arc>, hash_seen_buffer: PrehashedHashSet, allow_heap_allocations: bool, + background_expiration_running: bool, } impl ContextResolver { @@ -92,6 +95,7 @@ impl ContextResolver { })), hash_seen_buffer: PrehashedHashSet::with_hasher(NoopU64Hasher::new()), allow_heap_allocations: true, + background_expiration_running: false, } } @@ -117,6 +121,28 @@ impl ContextResolver { self } + /// Enables background expiration of resolved contexts. + /// + /// This spawns a background thread which incrementally scans the list of resolved contexts and removes "expired" + /// ones, where an expired context is one that is no longer actively being used. This continuously happens in small + /// chunks, in order to make consistent, incremental progress on removing expired values without unnecessarily + /// blocking normal resolving operations for too long. + /// + /// ## Panics + /// + /// If background expiration is already configured, this method will panic. + pub fn with_background_expiration(mut self) -> Self { + if self.background_expiration_running { + panic!("Background expiration already configured!"); + } + + let state = Arc::clone(&self.state); + std::thread::spawn(move || run_background_expiration(state)); + + self.background_expiration_running = true; + self + } + fn intern(&self, s: &str) -> Option { // First we'll see if we can inline the string, and if we can't, then we try to actually intern it. If interning // fails, then we just fall back to allocating a new `MetaString` instance. @@ -219,7 +245,85 @@ impl Clone for ContextResolver { state: self.state.clone(), hash_seen_buffer: PrehashedHashSet::with_hasher(NoopU64Hasher::new()), allow_heap_allocations: self.allow_heap_allocations, + background_expiration_running: self.background_expiration_running, + } + } +} + +fn run_background_expiration(state: Arc>) { + // TODO: When there's a ton of contexts to expire, this is actually super slow, in terms of the rate at which we + // recover memory. + + // Our expiration logic is pretty straightforward: we iterate over the resolved contexts in small chunks, trying to + // do a little bit of work, releasing the lock, sleeping, and then attempting to pick up where we left off before. + // + // This means that expiration is incremental and makes progress over time, while attempting to not block for long + // periods of time. + + const PER_LOOP_ITERATIONS: usize = 100; + + let mut last_check_idx = 0; + + loop { + // Sleep for a bit before we start our next iteration. + std::thread::sleep(Duration::from_secs(1)); + + let mut state_rw = state.write().unwrap(); + + let contexts_len = state_rw.resolved_contexts.len(); + trace!(contexts_len, "Starting context expiration iteration..."); + + // If we have no contexts, go back to sleep. + if contexts_len == 0 { + trace!("No contexts to expire. Sleeping."); + continue; } + + // Iterate over the contexts, expiring them if they are no longer active. + // + // Crucially, this means that we remove contexts if there's only a single strong reference, since that implies + // nobody else is holding a reference apart from ourselves. + let mut visited_contexts = 0; + let mut expired_contexts = 0; + + let mut idx = last_check_idx; + while !state_rw.resolved_contexts.is_empty() && visited_contexts < PER_LOOP_ITERATIONS { + // If we've hit the end of the set of contexts, wrap back around. + if idx >= state_rw.resolved_contexts.len() { + idx = 0; + } + + // Figure out if we should delete the context at `idx`. + // + // If the context is no longer active -- i.e. we are the only ones holding a reference to it -- then we can + // delete it. We'll only increment `idx` if we don't delete, since the swap remove will place a new element + // at the same index, which we'll want to check next. + let should_delete = state_rw + .resolved_contexts + .get_index(idx) + .map(|context| Arc::strong_count(&context.inner) == 1) + .unwrap_or(false); + + if should_delete { + let old_context = state_rw.resolved_contexts.swap_remove_index(idx); + drop(old_context); + + expired_contexts += 1; + } else { + idx += 1; + } + + visited_contexts += 1; + } + + if expired_contexts > 0 { + trace!("Expired {} contexts", expired_contexts); + } + + trace!("Finished context expiration iteration."); + + // Track where we left off for next time. + last_check_idx = idx; } }