Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
tobz committed Sep 5, 2024
1 parent 159054c commit a03d559
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 2 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ export GO_APP_IMAGE ?= debian:bullseye-slim
export CARGO_BIN_DIR ?= $(shell echo "${HOME}/.cargo/bin")
export GIT_COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo not-in-git)

# Runtime-specific environment variables that get passed to ADP.
#
# We export them here so that we can easily override them while providing defaults.
export DD_LOG_LEVEL ?= info

# Specific versions of various Rust tools we use.
export CARGO_TOOL_VERSION_dd-rust-license-tool ?= 1.0.3
export CARGO_TOOL_VERSION_cargo-deny ?= 0.15.0
Expand Down Expand Up @@ -159,7 +164,7 @@ endif
@echo "[*] Running ADP..."
@DD_ADP_USE_NOOP_WORKLOAD_PROVIDER=true \
DD_DOGSTATSD_PORT=0 DD_DOGSTATSD_SOCKET=/tmp/adp-dsd.sock DD_DOGSTATSD_EXPIRY_SECONDS=30 \
DD_TELEMETRY_ENABLED=true DD_PROMETHEUS_LISTEN_ADDR=tcp://127.0.0.1:6000 DD_LOG_LEVEL=info \
DD_TELEMETRY_ENABLED=true DD_PROMETHEUS_LISTEN_ADDR=tcp://127.0.0.1:6000 DD_LOG_LEVEL=$(DD_LOG_LEVEL) \
target/release/agent-data-plane

.PHONY: run-dsd-basic-udp
Expand Down
3 changes: 2 additions & 1 deletion lib/saluki-components/src/sources/dogstatsd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,8 @@ impl SourceBuilder for DogStatsDConfiguration {
.ok_or_else(|| generic_error!("context_string_interner_size must be greater than 0"))?;
let context_interner = FixedSizeInterner::new(context_string_interner_size);
let context_resolver = ContextResolver::from_interner("dogstatsd", context_interner)
.with_heap_allocations(self.allow_context_heap_allocations);
.with_heap_allocations(self.allow_context_heap_allocations)
.with_background_expiration();

let codec_config = DogstatsdCodecConfiguration::default().with_timestamps(self.no_aggregation_pipeline_support);
let codec = DogstatsdCodec::from_context_resolver(context_resolver)
Expand Down
1 change: 1 addition & 0 deletions lib/saluki-context/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ indexmap = { workspace = true, features = ["std"] }
metrics = { workspace = true }
saluki-metrics = { workspace = true }
stringtheory = { workspace = true }
tracing = { workspace = true }

[dev-dependencies]
metrics-util = { workspace = true, features = ["debugging"] }
104 changes: 104 additions & 0 deletions lib/saluki-context/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ use std::{
num::NonZeroUsize,
ops::Deref as _,
sync::{Arc, OnceLock, RwLock},
time::Duration,
};

use indexmap::{Equivalent, IndexSet};
use metrics::Gauge;
use saluki_metrics::static_metrics;
use stringtheory::{interning::FixedSizeInterner, MetaString};
use tracing::trace;

static DIRTY_CONTEXT_HASH: OnceLock<u64> = OnceLock::new();

Expand Down Expand Up @@ -70,6 +72,7 @@ pub struct ContextResolver<const SHARD_FACTOR: usize = 8> {
state: Arc<RwLock<State>>,
hash_seen_buffer: PrehashedHashSet,
allow_heap_allocations: bool,
background_expiration_running: bool,
}

impl<const SHARD_FACTOR: usize> ContextResolver<SHARD_FACTOR> {
Expand All @@ -92,6 +95,7 @@ impl<const SHARD_FACTOR: usize> ContextResolver<SHARD_FACTOR> {
})),
hash_seen_buffer: PrehashedHashSet::with_hasher(NoopU64Hasher::new()),
allow_heap_allocations: true,
background_expiration_running: false,
}
}

Expand All @@ -117,6 +121,28 @@ impl<const SHARD_FACTOR: usize> ContextResolver<SHARD_FACTOR> {
self
}

/// Enables background expiration of resolved contexts.
///
/// This spawns a background thread which incrementally scans the list of resolved contexts and removes "expired"
/// ones, where an expired context is one that is no longer actively being used. This continuously happens in small
/// chunks, in order to make consistent, incremental progress on removing expired values without unnecessarily
/// blocking normal resolving operations for too long.
///
/// ## Panics
///
/// If background expiration is already configured, this method will panic.
pub fn with_background_expiration(mut self) -> Self {
if self.background_expiration_running {
panic!("Background expiration already configured!");
}

let state = Arc::clone(&self.state);
std::thread::spawn(move || run_background_expiration(state));

self.background_expiration_running = true;
self
}

fn intern(&self, s: &str) -> Option<MetaString> {
// First we'll see if we can inline the string, and if we can't, then we try to actually intern it. If interning
// fails, then we just fall back to allocating a new `MetaString` instance.
Expand Down Expand Up @@ -219,7 +245,85 @@ impl<const SHARD_FACTOR: usize> Clone for ContextResolver<SHARD_FACTOR> {
state: self.state.clone(),
hash_seen_buffer: PrehashedHashSet::with_hasher(NoopU64Hasher::new()),
allow_heap_allocations: self.allow_heap_allocations,
background_expiration_running: self.background_expiration_running,
}
}
}

fn run_background_expiration(state: Arc<RwLock<State>>) {
// TODO: When there's a ton of contexts to expire, this is actually super slow, in terms of the rate at which we
// recover memory.

// Our expiration logic is pretty straightforward: we iterate over the resolved contexts in small chunks, trying to
// do a little bit of work, releasing the lock, sleeping, and then attempting to pick up where we left off before.
//
// This means that expiration is incremental and makes progress over time, while attempting to not block for long
// periods of time.

const PER_LOOP_ITERATIONS: usize = 100;

let mut last_check_idx = 0;

loop {
// Sleep for a bit before we start our next iteration.
std::thread::sleep(Duration::from_secs(1));

let mut state_rw = state.write().unwrap();

let contexts_len = state_rw.resolved_contexts.len();
trace!(contexts_len, "Starting context expiration iteration...");

// If we have no contexts, go back to sleep.
if contexts_len == 0 {
trace!("No contexts to expire. Sleeping.");
continue;
}

// Iterate over the contexts, expiring them if they are no longer active.
//
// Crucially, this means that we remove contexts if there's only a single strong reference, since that implies
// nobody else is holding a reference apart from ourselves.
let mut visited_contexts = 0;
let mut expired_contexts = 0;

let mut idx = last_check_idx;
while !state_rw.resolved_contexts.is_empty() && visited_contexts < PER_LOOP_ITERATIONS {
// If we've hit the end of the set of contexts, wrap back around.
if idx >= state_rw.resolved_contexts.len() {
idx = 0;
}

// Figure out if we should delete the context at `idx`.
//
// If the context is no longer active -- i.e. we are the only ones holding a reference to it -- then we can
// delete it. We'll only increment `idx` if we don't delete, since the swap remove will place a new element
// at the same index, which we'll want to check next.
let should_delete = state_rw
.resolved_contexts
.get_index(idx)
.map(|context| Arc::strong_count(&context.inner) == 1)
.unwrap_or(false);

if should_delete {
let old_context = state_rw.resolved_contexts.swap_remove_index(idx);
drop(old_context);

expired_contexts += 1;
} else {
idx += 1;
}

visited_contexts += 1;
}

if expired_contexts > 0 {
trace!("Expired {} contexts", expired_contexts);
}

trace!("Finished context expiration iteration.");

// Track where we left off for next time.
last_check_idx = idx;
}
}

Expand Down

0 comments on commit a03d559

Please sign in to comment.