From 9a479dcba48543713c0bb39d1944955985320096 Mon Sep 17 00:00:00 2001 From: Toby Lawrence Date: Mon, 9 Sep 2024 19:12:47 +0000 Subject: [PATCH] switch to papaya, but we expire too well... gotta go back to delayed background reclamation --- Cargo.lock | 98 ++++++- Cargo.toml | 1 + LICENSE-3rdparty.csv | 3 + .../src/sources/dogstatsd/mod.rs | 3 +- lib/saluki-context/Cargo.toml | 1 + lib/saluki-context/src/lib.rs | 265 ++++++------------ 6 files changed, 186 insertions(+), 185 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 53076857..852b70b9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -159,6 +159,16 @@ dependencies = [ "bytemuck", ] +[[package]] +name = "atomic-wait" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a55b94919229f2c42292fd71ffa4b75e83193bffdd77b1e858cd55fd2d0b0ea8" +dependencies = [ + "libc", + "windows-sys 0.42.0", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -1804,6 +1814,16 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "papaya" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d17fbf29d99ed1d2a1fecdb37d08898790965c85fd2634ba4023ab9710089059" +dependencies = [ + "atomic-wait", + "seize", +] + [[package]] name = "parking_lot" version = "0.12.3" @@ -2541,6 +2561,7 @@ dependencies = [ "indexmap 2.4.0", "metrics", "metrics-util", + "papaya", "saluki-metrics", "stringtheory", "tracing", @@ -2796,6 +2817,12 @@ dependencies = [ "libc", ] +[[package]] +name = "seize" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d659fa6f19e82a52ab8d3fff3c380bd8cc16462eaea411395618a38760eb85bc" + [[package]] name = "serde" version = "1.0.209" @@ -3747,6 +3774,21 @@ dependencies = [ "windows-targets", ] +[[package]] +name = "windows-sys" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a3e1820f08b8513f676f7ab6c1f99ff312fb97b553d30ff4dd86f9f15728aa7" +dependencies = [ + "windows_aarch64_gnullvm 0.42.2", + "windows_aarch64_msvc 0.42.2", + "windows_i686_gnu 0.42.2", + "windows_i686_msvc 0.42.2", + "windows_x86_64_gnu 0.42.2", + "windows_x86_64_gnullvm 0.42.2", + "windows_x86_64_msvc 0.42.2", +] + [[package]] name = "windows-sys" version = "0.52.0" @@ -3771,28 +3813,46 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" dependencies = [ - "windows_aarch64_gnullvm", - "windows_aarch64_msvc", - "windows_i686_gnu", + "windows_aarch64_gnullvm 0.52.6", + "windows_aarch64_msvc 0.52.6", + "windows_i686_gnu 0.52.6", "windows_i686_gnullvm", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_gnullvm", - "windows_x86_64_msvc", + "windows_i686_msvc 0.52.6", + "windows_x86_64_gnu 0.52.6", + "windows_x86_64_gnullvm 0.52.6", + "windows_x86_64_msvc 0.52.6", ] +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8" + [[package]] name = "windows_aarch64_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" +[[package]] +name = "windows_aarch64_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43" + [[package]] name = "windows_aarch64_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" +[[package]] +name = "windows_i686_gnu" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f" + [[package]] name = "windows_i686_gnu" version = "0.52.6" @@ -3805,24 +3865,48 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" +[[package]] +name = "windows_i686_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060" + [[package]] name = "windows_i686_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" +[[package]] +name = "windows_x86_64_gnu" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36" + [[package]] name = "windows_x86_64_gnu" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3" + [[package]] name = "windows_x86_64_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" +[[package]] +name = "windows_x86_64_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" + [[package]] name = "windows_x86_64_msvc" version = "0.52.6" diff --git a/Cargo.toml b/Cargo.toml index b62acab3..94ec8cfd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -123,6 +123,7 @@ windows-sys = { version = "0.59", default-features = false } cgroupfs = { version = "0.8", default-features = false } rustls-native-certs = { version = "0.7", default-features = false } hashbrown = { version = "0.14.5", default-features = false } +papaya = { version = "0.1", default-features = false } [patch.crates-io] # Git dependency for `containerd-client` to: diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index 2047882a..fef5b476 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -14,6 +14,7 @@ async-compression,https://github.com/Nullus157/async-compression,MIT OR Apache-2 async-stream,https://github.com/tokio-rs/async-stream,MIT,Carl Lerche async-trait,https://github.com/dtolnay/async-trait,MIT OR Apache-2.0,David Tolnay atomic,https://github.com/Amanieu/atomic-rs,Apache-2.0 OR MIT,Amanieu d'Antras +atomic-wait,https://github.com/m-ou-se/atomic-wait,BSD-2-Clause,The atomic-wait Authors atomic-waker,https://github.com/smol-rs/atomic-waker,Apache-2.0 OR MIT,"Stjepan Glavina , Contributors to futures-rs" average,https://github.com/vks/average,MIT OR Apache-2.0,Vinzent Steinberg aws-lc-fips-sys,https://github.com/aws/aws-lc-rs,ISC AND (Apache-2.0 OR ISC) AND OpenSSL,AWS-LC @@ -148,6 +149,7 @@ openat,https://github.com/tailhook/openat,MIT OR Apache-2.0,paul@colomiets.name openssl-probe,https://github.com/alexcrichton/openssl-probe,MIT OR Apache-2.0,Alex Crichton ordered-float,https://github.com/reem/rust-ordered-float,MIT,"Jonathan Reem , Matt Brubeck " overload,https://github.com/danaugrs/overload,MIT,Daniel Salvadori +papaya,https://github.com/ibraheemdev/papaya,MIT,Ibraheem Ahmed parking_lot,https://github.com/Amanieu/parking_lot,MIT OR Apache-2.0,Amanieu d'Antras paste,https://github.com/dtolnay/paste,MIT OR Apache-2.0,David Tolnay pear,https://github.com/SergioBenitez/Pear,MIT OR Apache-2.0,Sergio Benitez @@ -198,6 +200,7 @@ schannel,https://github.com/steffengy/schannel-rs,MIT,"Steven Fackler security-framework,https://github.com/kornelski/rust-security-framework,MIT OR Apache-2.0,"Steven Fackler , Kornel " +seize,https://github.com/ibraheemdev/seize,MIT,Ibraheem Ahmed serde,https://github.com/serde-rs/serde,MIT OR Apache-2.0,"Erick Tryzelaar , David Tolnay " serde-value,https://github.com/arcnmx/serde-value,MIT,arcnmx serde_json,https://github.com/serde-rs/json,MIT OR Apache-2.0,"Erick Tryzelaar , David Tolnay " diff --git a/lib/saluki-components/src/sources/dogstatsd/mod.rs b/lib/saluki-components/src/sources/dogstatsd/mod.rs index e6c312be..1d506970 100644 --- a/lib/saluki-components/src/sources/dogstatsd/mod.rs +++ b/lib/saluki-components/src/sources/dogstatsd/mod.rs @@ -245,8 +245,7 @@ impl SourceBuilder for DogStatsDConfiguration { .ok_or_else(|| generic_error!("context_string_interner_size must be greater than 0"))?; let context_interner = FixedSizeInterner::new(context_string_interner_size); let context_resolver = ContextResolver::from_interner("dogstatsd", context_interner) - .with_heap_allocations(self.allow_context_heap_allocations) - .with_background_expiration(); + .with_heap_allocations(self.allow_context_heap_allocations); let codec_config = DogstatsdCodecConfiguration::default().with_timestamps(self.no_aggregation_pipeline_support); let codec = DogstatsdCodec::from_context_resolver(context_resolver) diff --git a/lib/saluki-context/Cargo.toml b/lib/saluki-context/Cargo.toml index ed42d326..dce51bb4 100644 --- a/lib/saluki-context/Cargo.toml +++ b/lib/saluki-context/Cargo.toml @@ -9,6 +9,7 @@ repository = { workspace = true } ahash = { workspace = true } indexmap = { workspace = true, features = ["std"] } metrics = { workspace = true } +papaya = { workspace = true } saluki-metrics = { workspace = true } stringtheory = { workspace = true } tracing = { workspace = true } diff --git a/lib/saluki-context/src/lib.rs b/lib/saluki-context/src/lib.rs index 1de50fab..32eca47b 100644 --- a/lib/saluki-context/src/lib.rs +++ b/lib/saluki-context/src/lib.rs @@ -8,15 +8,15 @@ use std::{ hash::{self, BuildHasher, Hash as _, Hasher as _}, num::NonZeroUsize, ops::Deref as _, - sync::{Arc, OnceLock, RwLock}, - time::Duration, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, OnceLock, + }, }; -use indexmap::{Equivalent, IndexSet}; -use metrics::Gauge; +use papaya::HashMap; use saluki_metrics::static_metrics; use stringtheory::{interning::FixedSizeInterner, MetaString}; -use tracing::trace; static DIRTY_CONTEXT_HASH: OnceLock = OnceLock::new(); @@ -39,7 +39,8 @@ type PrehashedHashSet = HashSet; #[derive(Debug)] struct State { - resolved_contexts: IndexSet, + resolved_contexts: HashMap, + metrics: ContextMetrics, } /// A centralized store for resolved contexts. @@ -67,12 +68,10 @@ struct State { /// components. #[derive(Debug)] pub struct ContextResolver { - context_metrics: ContextMetrics, interner: FixedSizeInterner, - state: Arc>, + state: Arc, hash_seen_buffer: PrehashedHashSet, allow_heap_allocations: bool, - background_expiration_running: bool, } impl ContextResolver { @@ -81,21 +80,17 @@ impl ContextResolver { where S: Into, { - let context_metrics = ContextMetrics::new(name.into()); - - context_metrics - .interner_capacity_bytes() - .set(interner.capacity_bytes() as f64); + let metrics = ContextMetrics::new(name.into()); + metrics.interner_capacity_bytes().set(interner.capacity_bytes() as f64); Self { - context_metrics, interner, - state: Arc::new(RwLock::new(State { - resolved_contexts: IndexSet::with_hasher(ahash::RandomState::new()), - })), + state: Arc::new(State { + resolved_contexts: HashMap::with_hasher(ahash::RandomState::new()), + metrics, + }), hash_seen_buffer: PrehashedHashSet::with_hasher(NoopU64Hasher::new()), allow_heap_allocations: true, - background_expiration_running: false, } } @@ -121,28 +116,6 @@ impl ContextResolver { self } - /// Enables background expiration of resolved contexts. - /// - /// This spawns a background thread which incrementally scans the list of resolved contexts and removes "expired" - /// ones, where an expired context is one that is no longer actively being used. This continuously happens in small - /// chunks, in order to make consistent, incremental progress on removing expired values without unnecessarily - /// blocking normal resolving operations for too long. - /// - /// ## Panics - /// - /// If background expiration is already configured, this method will panic. - pub fn with_background_expiration(mut self) -> Self { - if self.background_expiration_running { - panic!("Background expiration already configured!"); - } - - let state = Arc::clone(&self.state); - std::thread::spawn(move || run_background_expiration(state)); - - self.background_expiration_running = true; - self - } - fn intern(&self, s: &str) -> Option { // First we'll see if we can inline the string, and if we can't, then we try to actually intern it. If interning // fails, then we just fall back to allocating a new `MetaString` instance. @@ -151,7 +124,7 @@ impl ContextResolver { .or_else(|| self.allow_heap_allocations.then(|| MetaString::from(s))) } - fn create_context_from_ref(&self, context_ref: ContextRef<'_, I>, active_count: Gauge) -> Option + fn create_context_from_ref(&self, context_ref: ContextRef<'_, I>) -> Option where I: IntoIterator, T: AsRef + hash::Hash + std::fmt::Debug, @@ -168,7 +141,8 @@ impl ContextResolver { name, tags, hash: context_ref.hash, - active_count, + active_refs: AtomicUsize::new(1), + resolver: Some(Arc::clone(&self.state)), }), }) } @@ -201,132 +175,66 @@ impl ContextResolver { I: IntoIterator, T: AsRef + hash::Hash + std::fmt::Debug, { - let state = self.state.read().unwrap(); - match state.resolved_contexts.get(&context_ref) { - Some(context) => { - self.context_metrics.resolved_existing_context_total().increment(1); - Some(context.clone()) - } - None => { - // Switch from read to write lock. - drop(state); - let mut state = self.state.write().unwrap(); - - // Create our new context and store it. - let active_count = self.context_metrics.active_contexts().clone(); - let context = self.create_context_from_ref(context_ref, active_count)?; - state.resolved_contexts.insert(context.clone()); - - // TODO: This is lazily updated during resolve, which means this metric might lag behind the actual - // count as interned strings are dropped/reclaimed... but we don't have a way to figure out if a given - // `MetaString` is an interned string and if dropping it would actually reclaim the interned string... - // so this is our next best option short of instrumenting `FixedSizeInterner` directly. - // - // We probably want to do that in the future, but this is just a little cleaner without adding extra - // fluff to `FixedSizeInterner` which is already complex as-is. - self.context_metrics.interner_entries().set(self.interner.len() as f64); - self.context_metrics - .interner_len_bytes() - .set(self.interner.len_bytes() as f64); - self.context_metrics.resolved_new_context_total().increment(1); - self.context_metrics.active_contexts().increment(1); - - Some(context) + let context_hash = context_ref.hash; + let mut was_existing = true; + + // Try to either find an existing resolved context for this context reference, or create one if it doesn't yet + // exist. We do a little scoping here to limit the time we have the context map pinned. + let context = { + let contexts = self.state.resolved_contexts.pin(); + match contexts.get(&context_hash) { + Some(context) => { + self.state.metrics.resolved_existing_context_total().increment(1); + context.clone() + } + None => { + // Create the context before we try and insert it. + let new_context = self.create_context_from_ref(context_ref)?; + + // Now try to insert it, which is where we'll figure out if someone else beat us to it. + let context = contexts.get_or_insert_with(context_hash, || { + was_existing = false; + new_context + }); + context.clone() + } } + }; + + // If this was a new context, update our telemetry accordingly. + // + // TODO: This is lazily updated during resolve, which means this metric might lag behind the actual + // count as interned strings are dropped/reclaimed... but we don't have a way to figure out if a given + // `MetaString` is an interned string and if dropping it would actually reclaim the interned string... + // so this is our next best option short of instrumenting `FixedSizeInterner` directly. + // + // We probably want to do that in the future, but this is just a little cleaner without adding extra + // fluff to `FixedSizeInterner` which is already complex as-is. + if !was_existing { + self.state.metrics.interner_entries().set(self.interner.len() as f64); + self.state + .metrics + .interner_len_bytes() + .set(self.interner.len_bytes() as f64); + self.state.metrics.resolved_new_context_total().increment(1); + self.state.metrics.active_contexts().increment(1); } + + Some(context) } } impl Clone for ContextResolver { fn clone(&self) -> Self { Self { - context_metrics: self.context_metrics.clone(), interner: self.interner.clone(), - state: self.state.clone(), + state: Arc::clone(&self.state), hash_seen_buffer: PrehashedHashSet::with_hasher(NoopU64Hasher::new()), allow_heap_allocations: self.allow_heap_allocations, - background_expiration_running: self.background_expiration_running, } } } -fn run_background_expiration(state: Arc>) { - // TODO: When there's a ton of contexts to expire, this is actually super slow, in terms of the rate at which we - // recover memory. - - // Our expiration logic is pretty straightforward: we iterate over the resolved contexts in small chunks, trying to - // do a little bit of work, releasing the lock, sleeping, and then attempting to pick up where we left off before. - // - // This means that expiration is incremental and makes progress over time, while attempting to not block for long - // periods of time. - - const PER_LOOP_ITERATIONS: usize = 100; - - let mut last_check_idx = 0; - - loop { - // Sleep for a bit before we start our next iteration. - std::thread::sleep(Duration::from_secs(1)); - - let mut state_rw = state.write().unwrap(); - - let contexts_len = state_rw.resolved_contexts.len(); - trace!(contexts_len, "Starting context expiration iteration..."); - - // If we have no contexts, go back to sleep. - if contexts_len == 0 { - trace!("No contexts to expire. Sleeping."); - continue; - } - - // Iterate over the contexts, expiring them if they are no longer active. - // - // Crucially, this means that we remove contexts if there's only a single strong reference, since that implies - // nobody else is holding a reference apart from ourselves. - let mut visited_contexts = 0; - let mut expired_contexts = 0; - - let mut idx = last_check_idx; - while !state_rw.resolved_contexts.is_empty() && visited_contexts < PER_LOOP_ITERATIONS { - // If we've hit the end of the set of contexts, wrap back around. - if idx >= state_rw.resolved_contexts.len() { - idx = 0; - } - - // Figure out if we should delete the context at `idx`. - // - // If the context is no longer active -- i.e. we are the only ones holding a reference to it -- then we can - // delete it. We'll only increment `idx` if we don't delete, since the swap remove will place a new element - // at the same index, which we'll want to check next. - let should_delete = state_rw - .resolved_contexts - .get_index(idx) - .map(|context| Arc::strong_count(&context.inner) == 1) - .unwrap_or(false); - - if should_delete { - let old_context = state_rw.resolved_contexts.swap_remove_index(idx); - drop(old_context); - - expired_contexts += 1; - } else { - idx += 1; - } - - visited_contexts += 1; - } - - if expired_contexts > 0 { - trace!("Expired {} contexts", expired_contexts); - } - - trace!("Finished context expiration iteration."); - - // Track where we left off for next time. - last_check_idx = idx; - } -} - /// A metric context. #[derive(Clone, Debug, Eq, Hash, PartialEq)] pub struct Context { @@ -344,7 +252,8 @@ impl Context { name: MetaString::from_static(name), tags: TagSet::default(), hash, - active_count: Gauge::noop(), + active_refs: AtomicUsize::new(1), + resolver: None, }), } } @@ -362,7 +271,8 @@ impl Context { name: MetaString::from_static(name), tags: tag_set, hash, - active_count: Gauge::noop(), + active_refs: AtomicUsize::new(1), + resolver: None, }), } } @@ -401,6 +311,20 @@ impl Context { } } +impl Drop for Context { + fn drop(&mut self) { + if self.inner.active_refs.fetch_sub(1, Ordering::AcqRel) == 1 { + // The last reference to this context is in the contexts map, so we can stand to remove it. + if let Some(resolver) = &self.inner.resolver { + resolver.metrics.active_contexts().decrement(1); + + // We're the last reference to this context, so we can drop it from the contexts map. + resolver.resolved_contexts.pin().remove(&self.inner.hash); + } + } + } +} + impl From<&'static str> for Context { fn from(name: &'static str) -> Self { Self::from_static_name(name) @@ -441,30 +365,28 @@ struct ContextInner { name: MetaString, tags: TagSet, hash: u64, - active_count: Gauge, + active_refs: AtomicUsize, + resolver: Option>, } impl Clone for ContextInner { fn clone(&self) -> Self { // Increment the context count when cloning the context, since we only get here when we're about to create a // brand new context for the purpose of mutating the data... so we have a new context. - self.active_count.increment(1); + if let Some(resolver) = &self.resolver { + resolver.metrics.active_contexts().increment(1); + } Self { name: self.name.clone(), tags: self.tags.clone(), hash: self.hash, - active_count: self.active_count.clone(), + active_refs: AtomicUsize::new(1), + resolver: self.resolver.clone(), } } } -impl Drop for ContextInner { - fn drop(&mut self) { - self.active_count.decrement(1); - } -} - impl PartialEq for ContextInner { fn eq(&self, other: &ContextInner) -> bool { // NOTE: See the documentation for `ContextRef` on why/how we only check equality using the hash of the context. @@ -539,16 +461,6 @@ where } } -impl Equivalent for ContextRef<'_, I> -where - I: IntoIterator, - T: hash::Hash + std::fmt::Debug, -{ - fn equivalent(&self, other: &Context) -> bool { - self.hash == other.inner.hash - } -} - /// A metric tag. #[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd)] pub struct Tag(MetaString); @@ -1029,7 +941,8 @@ mod tests { let active_contexts = get_gauge_value(&metrics_before, ContextMetrics::active_contexts_name()); assert_eq!(active_contexts, 1.0); - // Now drop the context, and observe the active context count drop to zero: + // Now drop the context, and the resolver, so that our context is entirely dropped, and observe the active + // context count drop to zero: drop(context); let metrics_after = snapshotter.snapshot().into_vec(); let active_contexts = get_gauge_value(&metrics_after, ContextMetrics::active_contexts_name());