Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[APR-205] chore: allow for contexts to be expired from ContextResolver #225

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitlab/benchmark.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ build-adp-baseline-image:
--tag ${BASELINE_SALUKI_IMG}
--build-arg BUILD_IMAGE=${SALUKI_BUILD_CI_IMAGE}
--build-arg APP_IMAGE=${GBI_BASE_IMAGE}
--build-arg BUILD_PROFILE=optimized-debug-release
--label git.repository=${CI_PROJECT_NAME}
--label git.branch=${CI_COMMIT_REF_NAME}
--label git.commit=${CI_COMMIT_SHA}
Expand Down Expand Up @@ -77,6 +78,7 @@ build-adp-comparison-image:
--tag ${COMPARISON_SALUKI_IMG}
--build-arg BUILD_IMAGE=${SALUKI_BUILD_CI_IMAGE}
--build-arg APP_IMAGE=${GBI_BASE_IMAGE}
--build-arg BUILD_PROFILE=optimized-debug-release
--label git.repository=${CI_PROJECT_NAME}
--label git.branch=${CI_COMMIT_REF_NAME}
--label git.commit=${CI_COMMIT_SHA}
Expand Down
18 changes: 18 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ members = [
"lib/saluki-health",
"lib/saluki-io",
"lib/saluki-metrics",
"lib/saluki-time",
"lib/saluki-tls",
"lib/stringtheory",
]
Expand Down Expand Up @@ -44,6 +45,7 @@ saluki-event = { path = "lib/saluki-event" }
saluki-health = { path = "lib/saluki-health" }
saluki-io = { path = "lib/saluki-io" }
saluki-metrics = { path = "lib/saluki-metrics" }
saluki-time = { path = "lib/saluki-time" }
saluki-tls = { path = "lib/saluki-tls" }
stringtheory = { path = "lib/stringtheory" }
async-trait = { version = "0.1", default-features = false }
Expand Down Expand Up @@ -124,6 +126,8 @@ windows-sys = { version = "0.59", default-features = false }
cgroupfs = { version = "0.8", default-features = false }
rustls-native-certs = { version = "0.7", default-features = false }
hashbrown = { version = "0.14.5", default-features = false }
sharded-slab = { version = "0.1", default-features = false }
crossbeam-queue = { version = "0.3", default-features = false }

[patch.crates-io]
# Git dependency for `containerd-client` to:
Expand All @@ -137,6 +141,7 @@ containerd-client = { git = "https://github.com/tobz/rust-extensions", branch =
lto = "thin"
codegen-units = 4
debug = true
strip = "none"

[profile.optimized-release]
inherits = "release"
Expand All @@ -147,6 +152,7 @@ debug = false
[profile.optimized-debug-release]
inherits = "optimized-release"
debug = true
strip = "none"

[workspace.lints.clippy]
new_without_default = "allow"
1 change: 1 addition & 0 deletions LICENSE-3rdparty.csv
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ crc32fast,https://github.com/srijs/rust-crc32fast,MIT OR Apache-2.0,"Sam Rijs <s
criterion-plot,https://github.com/bheisler/criterion.rs,MIT OR Apache-2.0,"Jorge Aparicio <japaricious@gmail.com>, Brook Heisler <brookheisler@gmail.com>"
crossbeam-deque,https://github.com/crossbeam-rs/crossbeam,MIT OR Apache-2.0,The crossbeam-deque Authors
crossbeam-epoch,https://github.com/crossbeam-rs/crossbeam,MIT OR Apache-2.0,The crossbeam-epoch Authors
crossbeam-queue,https://github.com/crossbeam-rs/crossbeam,MIT OR Apache-2.0,The crossbeam-queue Authors
crossbeam-utils,https://github.com/crossbeam-rs/crossbeam,MIT OR Apache-2.0,The crossbeam-utils Authors
crunchy,https://github.com/eira-fransham/crunchy,MIT,Vurich <jackefransham@hotmail.co.uk>
crypto-common,https://github.com/RustCrypto/traits,MIT OR Apache-2.0,RustCrypto Developers
Expand Down
8 changes: 7 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ build-adp: ## Builds the ADP binary in release mode
@echo "[*] Building ADP locally..."
@cargo build --release --package agent-data-plane

.PHONY: build-adp-profiling
build-adp-profiling: check-rust-build-tools
build-adp-profiling: ## Builds the ADP binary for profiling (optimized-debug-release profile)
@echo "[*] Building ADP locally..."
@cargo build --profile optimized-debug-release --package agent-data-plane

.PHONY: build-adp-image
build-adp-image: ## Builds the ADP container image ('latest' tag)
@echo "[*] Building ADP image..."
Expand Down Expand Up @@ -351,7 +357,7 @@ endif
##@ Profiling

.PHONY: profile-run-adp-ddprof
profile-run-adp-ddprof: ensure-ddprof build-adp
profile-run-adp-ddprof: ensure-ddprof build-adp-profiling
profile-run-adp-ddprof: ## Runs ADP under ddprof locally
ifeq ($(shell test -S /var/run/datadog/apm.socket || echo not-found), not-found)
$(error "APM socket at /var/run/datadog/apm.socket not found. Is the Datadog Agent running?")
Expand Down
3 changes: 2 additions & 1 deletion bin/agent-data-plane/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ saluki-env = { workspace = true }
saluki-error = { workspace = true }
saluki-event = { workspace = true }
saluki-health = { workspace = true }
saluki-io = { workspace = true }
saluki-io = { workspace = true }
saluki-time = { workspace = true }
serde = { workspace = true }
stringtheory = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt", "rt-multi-thread", "signal"] }
Expand Down
9 changes: 5 additions & 4 deletions bin/agent-data-plane/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@

#![deny(warnings)]
#![deny(missing_docs)]
use std::{future::pending, time::Instant};
use std::{alloc::System, future::pending, time::Instant};

use memory_accounting::ComponentRegistry;
use memory_accounting::{allocator::TrackingAllocator, ComponentRegistry};
use saluki_app::{api::APIBuilder, prelude::*};
use saluki_components::{
destinations::{DatadogEventsServiceChecksConfiguration, DatadogMetricsConfiguration, PrometheusConfiguration},
Expand All @@ -30,8 +30,7 @@ mod env_provider;
use self::env_provider::ADPEnvironmentProvider;

#[global_allocator]
static ALLOC: memory_accounting::allocator::TrackingAllocator<std::alloc::System> =
memory_accounting::allocator::TrackingAllocator::new(std::alloc::System);
static ALLOC: TrackingAllocator<System> = TrackingAllocator::new(System);

const ADP_VERSION: &str = env!("ADP_VERSION");
const ADP_BUILD_DESC: &str = env!("ADP_BUILD_DESC");
Expand All @@ -56,6 +55,8 @@ async fn main() {
fatal_and_exit(format!("failed to initialize TLS: {}", e));
}

saluki_time::initialize_coarse_time_updater();

match run(started).await {
Ok(()) => info!("Agent Data Plane stopped."),
Err(e) => {
Expand Down
1 change: 1 addition & 0 deletions lib/saluki-components/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ saluki-env = { workspace = true }
saluki-error = { workspace = true }
saluki-event = { workspace = true }
saluki-io = { workspace = true }
saluki-time = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
slab = { workspace = true }
Expand Down
4 changes: 2 additions & 2 deletions lib/saluki-components/src/transforms/aggregate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ use saluki_core::{
pooling::{FixedSizeObjectPool, ObjectPool as _},
topology::{interconnect::EventBuffer, OutputDefinition},
};
use saluki_env::time::get_unix_timestamp;
use saluki_error::GenericError;
use saluki_event::{metric::*, DataType, Event};
use saluki_time::{get_unix_timestamp, get_unix_timestamp_coarse};
use serde::Deserialize;
use smallvec::SmallVec;
use tokio::{select, time::interval_at};
Expand Down Expand Up @@ -282,7 +282,7 @@ impl Transform for Aggregate {
trace!(events_len = events.len(), "Received events.");

let event_buffer_len = event_buffer.len();
let current_time = get_unix_timestamp();
let current_time = get_unix_timestamp_coarse();

for event in events {
if let Some(metric) = event.try_into_metric() {
Expand Down
3 changes: 3 additions & 0 deletions lib/saluki-context/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ repository = { workspace = true }

[dependencies]
ahash = { workspace = true }
crossbeam-queue = { workspace = true, features = ["std"] }
indexmap = { workspace = true, features = ["std"] }
metrics = { workspace = true }
saluki-metrics = { workspace = true }
saluki-time = { workspace = true }
stringtheory = { workspace = true }
tracing = { workspace = true }

[dev-dependencies]
metrics-util = { workspace = true, features = ["debugging"] }
132 changes: 132 additions & 0 deletions lib/saluki-context/src/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
use std::{
collections::HashMap,
sync::{Arc, Mutex, RwLock},
thread,
time::Duration,
};

use saluki_time::get_unix_timestamp_coarse;
use tracing::trace;

use crate::{hash::NoopU64Hasher, Context};

#[derive(Debug)]
struct Inner {
name: String,
contexts: RwLock<HashMap<u64, Context, NoopU64Hasher>>,
contexts_last_seen: Mutex<HashMap<u64, Context, NoopU64Hasher>>,
}

#[derive(Clone, Debug)]
pub struct ContextCache {
inner: Arc<Inner>,
}

impl ContextCache {
pub fn new(name: String) -> Self {
let inner = Arc::new(Inner {
name,
contexts: RwLock::new(HashMap::with_hasher(NoopU64Hasher::new())),
contexts_last_seen: Mutex::new(HashMap::with_hasher(NoopU64Hasher::new())),
});

let bg_inner = Arc::clone(&inner);
thread::spawn(move || run_background_expiration(bg_inner));

Self { inner }
}

pub fn get_or_insert<F>(&self, id: u64, f: F) -> Option<Context>
where
F: FnOnce() -> Option<Context>,
{
let mut is_new_context = false;

// First, we try and find the cached context, and if it doesn't exist, we'll create it via `f`.
let context = {
let contexts_read = self.inner.contexts.read().unwrap();
match contexts_read.get(&id) {
Some(context) => context.clone(),
None => {
drop(contexts_read);

is_new_context = true;

let context = f()?;
let context_to_insert = context.clone();
let mut contexts_write = self.inner.contexts.write().unwrap();
contexts_write.insert(id, context_to_insert);

context
}
}
};

// Now we update the recency state for this context.
self.track_context_touched(id, &context, is_new_context);

Some(context)
}

pub fn len(&self) -> usize {
self.inner.contexts.read().unwrap().len()
}

fn track_context_touched(&self, id: u64, context: &Context, is_new_context: bool) {
// Update the context's "last touched" time.
context.update_last_touched();

// If this is a new context, we'll add it to the pending queue, which the background updated will use to update
// our tracking map.
if is_new_context {
let mut last_seen_write = self.inner.contexts_last_seen.lock().unwrap();
last_seen_write.entry(id).or_insert_with(|| context.clone());
}
}
}

fn run_background_expiration(bg_inner: Arc<Inner>) {
const CONTEXT_EXPIRATION_IDLE_SECONDS: u64 = 30;

let mut contexts_to_expire = Vec::new();

loop {
thread::sleep(Duration::from_secs(1));

// Get the current time, and then pop every pending context ID off the queue and update their entry.
let current_time = get_unix_timestamp_coarse();
trace!(resolver_id = bg_inner.name, "Running background expiration.");

// Iterate through our last seen map and figure out which contexts are eligible for expiration.
let mut last_seen_write = bg_inner.contexts_last_seen.lock().unwrap();
for (context_id, context) in last_seen_write.iter() {
if current_time - context.last_touched() > CONTEXT_EXPIRATION_IDLE_SECONDS {
contexts_to_expire.push(*context_id);
}
}

if contexts_to_expire.is_empty() {
continue;
}

// Now expire the contexts.
let mut contexts_write = bg_inner.contexts.write().unwrap();

trace!(
resolver_id = bg_inner.name,
contexts_len = contexts_write.len(),
contexts_to_expire = contexts_to_expire.len(),
"Expiring contexts."
);
for context_id in contexts_to_expire.drain(..) {
contexts_write.remove(&context_id);
last_seen_write.remove(&context_id);
}

trace!(
resolver_id = bg_inner.name,
contexts_len = contexts_write.len(),
"Finished expiring contexts."
);
}
}
37 changes: 37 additions & 0 deletions lib/saluki-context/src/hash.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use std::hash::{self, BuildHasher};

pub struct NoopU64Hasher(u64);

impl NoopU64Hasher {
pub fn new() -> Self {
Self(0)
}
}

impl Clone for NoopU64Hasher {
fn clone(&self) -> Self {
Self(0)
}
}

impl hash::Hasher for NoopU64Hasher {
fn finish(&self) -> u64 {
self.0
}

fn write_u64(&mut self, value: u64) {
self.0 = value;
}

fn write(&mut self, _: &[u8]) {
panic!("NoopU64Hasher is only valid for hashing `u64` values");
}
}

impl BuildHasher for NoopU64Hasher {
type Hasher = NoopU64Hasher;

fn build_hasher(&self) -> Self::Hasher {
Self(0)
}
}
Loading