diff --git a/remote-config/src/fetch/fetcher.rs b/remote-config/src/fetch/fetcher.rs index eccf8eb39..849f3ee5b 100644 --- a/remote-config/src/fetch/fetcher.rs +++ b/remote-config/src/fetch/fetcher.rs @@ -15,9 +15,11 @@ use ddcommon::{connector, Endpoint}; use http::uri::Scheme; use hyper::http::uri::PathAndQuery; use hyper::{Client, StatusCode}; +use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256, Sha512}; use std::collections::{HashMap, HashSet}; use std::mem::transmute; +use std::ops::Add; use std::sync::{Arc, Mutex, MutexGuard}; use std::time::Duration; use tracing::{debug, trace, warn}; @@ -79,6 +81,21 @@ pub struct ConfigFetcherState { pub expire_unused_files: bool, } +#[derive(Default, Serialize, Deserialize)] +pub struct ConfigFetcherStateStats { + pub active_files: u32, +} + +impl Add for ConfigFetcherStateStats { + type Output = Self; + + fn add(self, rhs: Self) -> Self::Output { + ConfigFetcherStateStats { + active_files: self.active_files + rhs.active_files, + } + } +} + pub struct ConfigFetcherFilesLock<'a, S> { inner: MutexGuard<'a, HashMap, StoredTargetFile>>, } @@ -146,6 +163,12 @@ impl ConfigFetcherState { } } } + + pub fn stats(&self) -> ConfigFetcherStateStats { + ConfigFetcherStateStats { + active_files: self.target_files_by_path.lock().unwrap().len() as u32, + } + } } pub struct ConfigFetcher { diff --git a/remote-config/src/fetch/multitarget.rs b/remote-config/src/fetch/multitarget.rs index d5600e2d7..4fa61e918 100644 --- a/remote-config/src/fetch/multitarget.rs +++ b/remote-config/src/fetch/multitarget.rs @@ -3,17 +3,19 @@ use crate::fetch::{ ConfigApplyState, ConfigFetcherState, ConfigInvariants, FileStorage, RefcountedFile, - RefcountingStorage, SharedFetcher, + RefcountingStorage, RefcountingStorageStats, SharedFetcher, }; use crate::Target; use futures_util::future::Shared; use futures_util::FutureExt; use manual_future::ManualFuture; +use serde::{Deserialize, Serialize}; use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; use std::default::Default; use std::fmt::Debug; use std::hash::Hash; +use std::ops::Add; use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -52,6 +54,31 @@ where fetcher_semaphore: Semaphore, } +#[derive(Default, Serialize, Deserialize)] +pub struct MultiTargetStats { + known_runtimes: u32, + starting_fetchers: u32, + active_fetchers: u32, + inactive_fetchers: u32, + removing_fetchers: u32, + storage: RefcountingStorageStats, +} + +impl Add for MultiTargetStats { + type Output = Self; + + fn add(self, rhs: Self) -> Self::Output { + MultiTargetStats { + known_runtimes: self.known_runtimes + rhs.known_runtimes, + starting_fetchers: self.starting_fetchers + rhs.starting_fetchers, + active_fetchers: self.active_fetchers + rhs.active_fetchers, + inactive_fetchers: self.inactive_fetchers + rhs.inactive_fetchers, + removing_fetchers: self.removing_fetchers + rhs.removing_fetchers, + storage: self.storage + rhs.storage, + } + } +} + enum KnownTargetStatus { Pending, Alive, @@ -477,6 +504,33 @@ where pub fn invariants(&self) -> &ConfigInvariants { self.storage.invariants() } + + pub fn stats(&self) -> MultiTargetStats { + let (starting_fetchers, active_fetchers, inactive_fetchers, removing_fetchers) = { + let services = self.services.lock().unwrap(); + let mut starting = 0; + let mut active = 0; + let mut inactive = 0; + let mut removing = 0; + for (_, known_target) in services.iter() { + match *known_target.status.lock().unwrap() { + KnownTargetStatus::Pending => starting += 1, + KnownTargetStatus::Alive => active += 1, + KnownTargetStatus::RemoveAt(_) => inactive += 1, + KnownTargetStatus::Removing(_) => removing += 1, + } + } + (starting, active, inactive, removing) + }; + MultiTargetStats { + known_runtimes: self.runtimes.lock().unwrap().len() as u32, + starting_fetchers, + active_fetchers, + inactive_fetchers, + removing_fetchers, + storage: self.storage.stats(), + } + } } #[cfg(test)] diff --git a/remote-config/src/fetch/shared.rs b/remote-config/src/fetch/shared.rs index 8c5d266ac..dbffa466f 100644 --- a/remote-config/src/fetch/shared.rs +++ b/remote-config/src/fetch/shared.rs @@ -2,11 +2,13 @@ // SPDX-License-Identifier: Apache-2.0 use crate::fetch::{ - ConfigApplyState, ConfigClientState, ConfigFetcher, ConfigFetcherState, ConfigInvariants, - FileStorage, + ConfigApplyState, ConfigClientState, ConfigFetcher, ConfigFetcherState, + ConfigFetcherStateStats, ConfigInvariants, FileStorage, }; use crate::{RemoteConfigPath, Target}; +use serde::{Deserialize, Serialize}; use std::collections::HashMap; +use std::ops::Add; use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -138,6 +140,23 @@ where run_id: Arc, } +#[derive(Default, Serialize, Deserialize)] +pub struct RefcountingStorageStats { + pub inactive_files: u32, + pub fetcher: ConfigFetcherStateStats, +} + +impl Add for RefcountingStorageStats { + type Output = Self; + + fn add(self, rhs: Self) -> Self::Output { + RefcountingStorageStats { + inactive_files: self.inactive_files + rhs.inactive_files, + fetcher: self.fetcher + rhs.fetcher, + } + } +} + impl Clone for RefcountingStorage where S::StoredFile: RefcountedFile, @@ -191,6 +210,13 @@ where pub fn invariants(&self) -> &ConfigInvariants { &self.state.invariants } + + pub fn stats(&self) -> RefcountingStorageStats { + RefcountingStorageStats { + inactive_files: self.inactive.lock().unwrap().len() as u32, + fetcher: self.state.stats(), + } + } } impl FileStorage for RefcountingStorage diff --git a/sidecar-ffi/src/lib.rs b/sidecar-ffi/src/lib.rs index 22e2e1562..87e3edb27 100644 --- a/sidecar-ffi/src/lib.rs +++ b/sidecar-ffi/src/lib.rs @@ -488,6 +488,7 @@ pub unsafe extern "C" fn ddog_sidecar_session_set_config( language: ffi::CharSlice, tracer_version: ffi::CharSlice, flush_interval_milliseconds: u32, + remote_config_poll_interval_millis: u32, telemetry_heartbeat_interval_millis: u32, exception_hash_rate_limiter_seconds: u32, force_flush_size: usize, @@ -515,6 +516,9 @@ pub unsafe extern "C" fn ddog_sidecar_session_set_config( language: language.to_utf8_lossy().into(), tracer_version: tracer_version.to_utf8_lossy().into(), flush_interval: Duration::from_millis(flush_interval_milliseconds as u64), + remote_config_poll_interval: Duration::from_millis( + remote_config_poll_interval_millis as u64 + ), telemetry_heartbeat_interval: Duration::from_millis( telemetry_heartbeat_interval_millis as u64 ), diff --git a/sidecar/src/service/mod.rs b/sidecar/src/service/mod.rs index 49030e3fa..9e16c6e04 100644 --- a/sidecar/src/service/mod.rs +++ b/sidecar/src/service/mod.rs @@ -49,6 +49,7 @@ pub struct SessionConfig { pub language: String, pub tracer_version: String, pub flush_interval: Duration, + pub remote_config_poll_interval: Duration, pub telemetry_heartbeat_interval: Duration, pub exception_hash_rate_limiter_seconds: u32, pub force_flush_size: usize, diff --git a/sidecar/src/service/remote_configs.rs b/sidecar/src/service/remote_configs.rs index e2d3499e9..c6748e580 100644 --- a/sidecar/src/service/remote_configs.rs +++ b/sidecar/src/service/remote_configs.rs @@ -2,10 +2,11 @@ // SPDX-License-Identifier: Apache-2.0 use crate::shm_remote_config::{ShmRemoteConfigs, ShmRemoteConfigsGuard}; -use datadog_remote_config::fetch::{ConfigInvariants, NotifyTarget}; +use datadog_remote_config::fetch::{ConfigInvariants, MultiTargetStats, NotifyTarget}; use std::collections::hash_map::Entry; use std::fmt::Debug; use std::sync::{Arc, Mutex}; +use std::time::Duration; use zwohash::HashMap; #[cfg(windows)] @@ -96,6 +97,7 @@ impl RemoteConfigs { pub fn add_runtime( &self, invariants: ConfigInvariants, + poll_interval: Duration, runtime_id: String, notify_target: RemoteConfigNotifyTarget, env: String, @@ -112,6 +114,7 @@ impl RemoteConfigs { Box::new(move || { this.lock().unwrap().remove(&invariants); }), + poll_interval, )) } } @@ -123,4 +126,13 @@ impl RemoteConfigs { rc.shutdown(); } } + + pub fn stats(&self) -> MultiTargetStats { + self.0 + .lock() + .unwrap() + .values() + .map(|rc| rc.stats()) + .fold(MultiTargetStats::default(), |a, b| a + b) + } } diff --git a/sidecar/src/service/session_info.rs b/sidecar/src/service/session_info.rs index 08f897441..fe9c66c1f 100644 --- a/sidecar/src/service/session_info.rs +++ b/sidecar/src/service/session_info.rs @@ -30,6 +30,7 @@ pub(crate) struct SessionInfo { tracer_config: Arc>, dogstatsd: Arc>, remote_config_invariants: Arc>>, + pub(crate) remote_config_interval: Arc>, #[cfg(windows)] pub(crate) remote_config_notify_function: Arc>, @@ -48,6 +49,7 @@ impl Clone for SessionInfo { tracer_config: self.tracer_config.clone(), dogstatsd: self.dogstatsd.clone(), remote_config_invariants: self.remote_config_invariants.clone(), + remote_config_interval: self.remote_config_interval.clone(), #[cfg(windows)] remote_config_notify_function: self.remote_config_notify_function.clone(), log_guard: self.log_guard.clone(), diff --git a/sidecar/src/service/sidecar_server.rs b/sidecar/src/service/sidecar_server.rs index cca35542b..570fe83b2 100644 --- a/sidecar/src/service/sidecar_server.rs +++ b/sidecar/src/service/sidecar_server.rs @@ -48,7 +48,7 @@ use crate::service::tracing::trace_flusher::TraceFlusherStats; use datadog_ipc::platform::FileBackedHandle; use datadog_ipc::tarpc::server::{Channel, InFlightRequest}; use datadog_live_debugger::sender::DebuggerType; -use datadog_remote_config::fetch::ConfigInvariants; +use datadog_remote_config::fetch::{ConfigInvariants, MultiTargetStats}; use datadog_trace_utils::tracer_header_tags::TracerHeaderTags; use ddcommon::tag::Tag; @@ -69,6 +69,7 @@ struct SidecarStats { enqueued_apps: u32, enqueued_telemetry_data: EnqueuedTelemetryStats, remote_config_clients: u32, + remote_configs: MultiTargetStats, telemetry_metrics_contexts: u32, telemetry_worker: TelemetryWorkerStats, telemetry_worker_errors: u32, @@ -445,6 +446,7 @@ impl SidecarServer { .sum::() }) .sum(), + remote_configs: self.remote_configs.stats(), telemetry_metrics_contexts: sessions .values() .map(|s| { @@ -692,6 +694,7 @@ impl SidecarInterface for SidecarServer { products: config.remote_config_products, capabilities: config.remote_config_capabilities, }); + *session.remote_config_interval.lock().unwrap() = config.remote_config_poll_interval; self.trace_flusher .interval_ms .store(config.flush_interval.as_millis() as u64, Ordering::Relaxed); @@ -877,6 +880,7 @@ impl SidecarInterface for SidecarServer { .as_ref() .expect("Expecting remote config invariants to be set early") .clone(), + *session.remote_config_interval.lock().unwrap(), instance_id.runtime_id, notify_target, env_name.clone(), diff --git a/sidecar/src/shm_remote_config.rs b/sidecar/src/shm_remote_config.rs index e38b28ea0..b69f66f2a 100644 --- a/sidecar/src/shm_remote_config.rs +++ b/sidecar/src/shm_remote_config.rs @@ -12,7 +12,7 @@ use datadog_ipc::platform::{FileBackedHandle, MappedMem, NamedShmHandle}; use datadog_ipc::rate_limiter::ShmLimiter; use datadog_remote_config::fetch::{ ConfigInvariants, FileRefcountData, FileStorage, MultiTargetFetcher, MultiTargetHandlers, - NotifyTarget, RefcountedFile, + MultiTargetStats, NotifyTarget, RefcountedFile, }; use datadog_remote_config::{RemoteConfigPath, RemoteConfigProduct, RemoteConfigValue, Target}; use priority_queue::PriorityQueue; @@ -270,19 +270,20 @@ pub struct ShmRemoteConfigs( // pertaining to that env refcounting RemoteConfigIdentifier tuples by their unique runtime_id impl ShmRemoteConfigs { - pub fn new(invariants: ConfigInvariants, on_dead: Box) -> Self { - let is_test = invariants.endpoint.test_token.is_some(); + pub fn new( + invariants: ConfigInvariants, + on_dead: Box, + interval: Duration, + ) -> Self { let storage = ConfigFileStorage { invariants: invariants.clone(), writers: Default::default(), on_dead: Arc::new(Mutex::new(Some(on_dead))), }; let fetcher = MultiTargetFetcher::new(storage, invariants); - if is_test { - fetcher - .remote_config_interval - .store(10_000_000, Ordering::Relaxed); - } + fetcher + .remote_config_interval + .store(interval.as_nanos() as u64, Ordering::Relaxed); ShmRemoteConfigs(fetcher) } @@ -315,6 +316,10 @@ impl ShmRemoteConfigs { pub fn shutdown(&self) { self.0.shutdown(); } + + pub fn stats(&self) -> MultiTargetStats { + self.0.stats() + } } fn read_config(path: &str) -> anyhow::Result<(RemoteConfigValue, u32)> { @@ -586,6 +591,7 @@ mod tests { Box::new(|| { tokio::spawn(on_dead_completer.complete(())); }), + Duration::from_millis(10), ); let mut manager = RemoteConfigManager::new(server.dummy_invariants());