diff --git a/Cargo.toml b/Cargo.toml index fd8428c66..ba426cf2a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,6 +70,7 @@ default-members = [ "data-pipeline-ffi", "ddsketch", "tinybytes", + "dogstatsd-client", ] # https://doc.rust-lang.org/cargo/reference/resolver.html#feature-resolver-version-2 diff --git a/data-pipeline/src/health_metrics.rs b/data-pipeline/src/health_metrics.rs new file mode 100644 index 000000000..c93bdd613 --- /dev/null +++ b/data-pipeline/src/health_metrics.rs @@ -0,0 +1,11 @@ +/// health_metrics holds data to emit info about the health of the data-pipeline + +pub(crate) const STAT_SEND_TRACES: &str = "datadog.libdatadog.send.traces"; +pub(crate) const STAT_SEND_TRACES_ERRORS: &str = "datadog.libdatadog.send.traces.errors"; +pub(crate) const STAT_DESER_TRACES: &str = "datadog.libdatadog.deser_traces"; +pub(crate) const STAT_DESER_TRACES_ERRORS: &str = "datadog.libdatadog.deser_traces.errors"; +pub(crate) const STAT_SER_TRACES_ERRORS: &str = "datadog.libdatadog.ser_traces.errors"; + +pub(crate) enum HealthMetric { + Count(&'static str, i64), +} diff --git a/data-pipeline/src/lib.rs b/data-pipeline/src/lib.rs index 9886087e4..05db3236d 100644 --- a/data-pipeline/src/lib.rs +++ b/data-pipeline/src/lib.rs @@ -6,6 +6,7 @@ //! project at this state is to provide a basic API in order to test its viability and integration //! in different languages. +mod health_metrics; #[allow(missing_docs)] pub mod span_concentrator; #[allow(missing_docs)] diff --git a/data-pipeline/src/trace_exporter.rs b/data-pipeline/src/trace_exporter.rs index 08f4ae917..dd6adb919 100644 --- a/data-pipeline/src/trace_exporter.rs +++ b/data-pipeline/src/trace_exporter.rs @@ -1,6 +1,9 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use crate::{span_concentrator::SpanConcentrator, stats_exporter}; +use crate::{ + health_metrics, health_metrics::HealthMetric, span_concentrator::SpanConcentrator, + stats_exporter, +}; use bytes::Bytes; use datadog_trace_protobuf::pb; use datadog_trace_utils::trace_utils::{ @@ -46,13 +49,6 @@ pub enum TraceExporterOutputFormat { V07, } -// internal health metrics -const STAT_SEND_TRACES: &str = "datadog.libdatadog.send.traces"; -const STAT_SEND_TRACES_ERRORS: &str = "datadog.libdatadog.send.traces.errors"; -const STAT_DESER_TRACES: &str = "datadog.libdatadog.deser_traces"; -const STAT_DESER_TRACES_ERRORS: &str = "datadog.libdatadog.deser_traces.errors"; -const STAT_SER_TRACES_ERRORS: &str = "datadog.libdatadog.ser_traces.errors"; - impl TraceExporterOutputFormat { /// Add the agent trace endpoint path to the URL. fn add_path(&self, url: &Uri) -> Uri { @@ -158,11 +154,6 @@ impl<'a> From<&'a TracerTags> for HashMap<&'static str, String> { } } -enum HealthMetric { - Count(&'static str, i64), - //TODO: Add more DogStatsDAction as we need them -} - enum StatsComputationStatus { StatsDisabled, StatsEnabled { @@ -199,7 +190,7 @@ pub struct TraceExporter { // TODO - do something with the response callback - https://datadoghq.atlassian.net/browse/APMSP-1019 _response_callback: Option>, runtime: Runtime, - // None if dogstatsd is disabled + /// None if dogstatsd is disabled dogstatsd: Option, common_stats_tags: Vec, client_computed_top_level: bool, @@ -295,29 +286,44 @@ impl TraceExporter { { Ok(response) => { let response_status = response.status(); - if response_status != http::StatusCode::OK { + if !response_status.is_success() { let body_bytes = hyper::body::to_bytes(response.into_body()).await?; let response_body = String::from_utf8(body_bytes.to_vec()).unwrap_or_default(); - self.emit_metric( - HealthMetric::Count(STAT_SEND_TRACES_ERRORS, 1), - Some(vec![ - &Tag::new("response_code", response_status.as_str()).unwrap() - ]), - ); + let resp_tag_res = &Tag::new("response_code", response_status.as_str()); + match resp_tag_res { + Ok(resp_tag) => { + self.emit_metric( + HealthMetric::Count( + health_metrics::STAT_SEND_TRACES_ERRORS, + 1, + ), + Some(vec![&resp_tag]), + ); + } + Err(tag_err) => { + // This should really never happen as response_status is a + // `NonZeroU16`, but if the response status or tag requirements + // ever change in the future we still don't want to panic. + error!("Failed to serialize response_code to tag {}", tag_err) + } + } anyhow::bail!("Agent did not accept traces: {response_body}"); } match hyper::body::to_bytes(response.into_body()).await { Ok(body) => { self.emit_metric( - HealthMetric::Count(STAT_SEND_TRACES, trace_count as i64), + HealthMetric::Count( + health_metrics::STAT_SEND_TRACES, + trace_count as i64, + ), None, ); Ok(String::from_utf8_lossy(&body).to_string()) } Err(err) => { self.emit_metric( - HealthMetric::Count(STAT_SEND_TRACES_ERRORS, 1), + HealthMetric::Count(health_metrics::STAT_SEND_TRACES_ERRORS, 1), None, ); anyhow::bail!("Error reading agent response body: {err}"); @@ -325,7 +331,10 @@ impl TraceExporter { } } Err(err) => { - self.emit_metric(HealthMetric::Count(STAT_SEND_TRACES_ERRORS, 1), None); + self.emit_metric( + HealthMetric::Count(health_metrics::STAT_SEND_TRACES_ERRORS, 1), + None, + ); anyhow::bail!("Failed to send traces: {err}") } } @@ -375,7 +384,10 @@ impl TraceExporter { Ok(res) => res, Err(err) => { error!("Error deserializing trace from request body: {err}"); - self.emit_metric(HealthMetric::Count(STAT_DESER_TRACES_ERRORS, 1), None); + self.emit_metric( + HealthMetric::Count(health_metrics::STAT_DESER_TRACES_ERRORS, 1), + None, + ); return Ok(String::from("{}")); } }; @@ -386,7 +398,7 @@ impl TraceExporter { } self.emit_metric( - HealthMetric::Count(STAT_DESER_TRACES, traces.len() as i64), + HealthMetric::Count(health_metrics::STAT_DESER_TRACES, traces.len() as i64), None, ); @@ -409,7 +421,10 @@ impl TraceExporter { TraceExporterOutputFormat::V04 => rmp_serde::to_vec_named(&traces) .map_err(|err| { error!("Error serializing traces: {err}"); - self.emit_metric(HealthMetric::Count(STAT_SER_TRACES_ERRORS, 1), None); + self.emit_metric( + HealthMetric::Count(health_metrics::STAT_SER_TRACES_ERRORS, 1), + None, + ); String::from("{}") }) .and_then(|res| { @@ -440,7 +455,7 @@ impl TraceExporter { Err(err) => { error!("Error reading agent response body: {err}"); self.emit_metric( - HealthMetric::Count(STAT_SEND_TRACES_ERRORS, 1), + HealthMetric::Count(health_metrics::STAT_SEND_TRACES_ERRORS, 1), None, ); Ok(String::from("{}")) @@ -448,7 +463,10 @@ impl TraceExporter { }, Err(err) => { error!("Error sending traces: {err}"); - self.emit_metric(HealthMetric::Count(STAT_SEND_TRACES_ERRORS, 1), None); + self.emit_metric( + HealthMetric::Count(health_metrics::STAT_SEND_TRACES_ERRORS, 1), + None, + ); Ok(String::from("{}")) } } diff --git a/dogstatsd-client/src/lib.rs b/dogstatsd-client/src/lib.rs index 27fc284ac..979489b97 100644 --- a/dogstatsd-client/src/lib.rs +++ b/dogstatsd-client/src/lib.rs @@ -67,20 +67,17 @@ pub enum DogStatsDAction<'a, T: AsRef, V: IntoIterator> { Set(T, i64, V), } -/// A dogstatsd-client that flushes stats to a given endpoint. -/// The default value has no address and is thus disabled, use `new_flusher` or `set_endpoint` to -/// configure an endpoint. -#[derive(Default)] +/// A dogstatsd-client that flushes stats to a given endpoint. Use `new_flusher` to build one. pub struct Flusher { - client: Option, + client: StatsdClient, } /// Build a new flusher instance pointed at the provided endpoint. /// Returns error if the provided endpoint is not valid. pub fn new_flusher(endpoint: Endpoint) -> anyhow::Result { - let mut f = Flusher::default(); - f.set_endpoint(endpoint)?; - Ok(f) + Ok(Flusher { + client: create_client(&endpoint)?, + }) } impl Flusher { @@ -91,11 +88,11 @@ impl Flusher { self.client = match endpoint.api_key { Some(_) => { info!("DogStatsD is not available in agentless mode"); - None + anyhow::bail!("DogStatsD is not available in agentless mode"); } None => { debug!("Updating DogStatsD endpoint to {}", endpoint.url); - Some(create_client(&endpoint)?) + create_client(&endpoint)? } }; Ok(()) @@ -104,10 +101,7 @@ impl Flusher { /// Send a vector of DogStatsDActionOwned, this is the same as `send` except it uses the "owned" /// version of DogStatsDAction. See the docs for DogStatsDActionOwned for details. pub fn send_owned(&self, actions: Vec) { - if self.client.is_none() { - return; - } - let client = self.client.as_ref().unwrap(); + let client = &self.client; for action in actions { if let Err(err) = match action { @@ -138,10 +132,7 @@ impl Flusher { &self, actions: Vec>, ) { - if self.client.is_none() { - return; - } - let client = self.client.as_ref().unwrap(); + let client = &self.client; for action in actions { if let Err(err) = match action { @@ -228,7 +219,7 @@ fn create_client(endpoint: &Endpoint) -> anyhow::Result { #[cfg(test)] mod test { use crate::DogStatsDAction::{Count, Distribution, Gauge, Histogram, Set}; - use crate::{create_client, DogStatsDActionOwned, Flusher}; + use crate::{create_client, new_flusher, DogStatsDActionOwned}; #[cfg(unix)] use ddcommon::connector::uds::socket_path_to_uri; use ddcommon::{tag, Endpoint}; @@ -243,10 +234,10 @@ mod test { let socket = net::UdpSocket::bind("127.0.0.1:0").expect("failed to bind host socket"); let _ = socket.set_read_timeout(Some(Duration::from_millis(500))); - let mut flusher = Flusher::default(); - _ = flusher.set_endpoint(Endpoint::from_slice( + let flusher = new_flusher(Endpoint::from_slice( socket.local_addr().unwrap().to_string().as_str(), - )); + )) + .unwrap(); flusher.send(vec![ Count("test_count", 3, &vec![tag!("foo", "bar")]), Count("test_neg_count", -2, &vec![]), diff --git a/sidecar/src/service/session_info.rs b/sidecar/src/service/session_info.rs index 4e1a20fd7..c57d2c437 100644 --- a/sidecar/src/service/session_info.rs +++ b/sidecar/src/service/session_info.rs @@ -24,7 +24,7 @@ pub(crate) struct SessionInfo { runtimes: Arc>>, pub(crate) session_config: Arc>>, tracer_config: Arc>, - dogstatsd: Arc>, + dogstatsd: Arc>>, remote_config_invariants: Arc>>, #[cfg(windows)] pub(crate) remote_config_notify_function: @@ -168,13 +168,13 @@ impl SessionInfo { f(&mut self.get_trace_config()); } - pub(crate) fn get_dogstatsd(&self) -> MutexGuard { + pub(crate) fn get_dogstatsd(&self) -> MutexGuard> { self.dogstatsd.lock().unwrap() } pub(crate) fn configure_dogstatsd(&self, f: F) where - F: FnOnce(&mut dogstatsd_client::Flusher), + F: FnOnce(&mut Option), { f(&mut self.get_dogstatsd()); } diff --git a/sidecar/src/service/sidecar_server.rs b/sidecar/src/service/sidecar_server.rs index 6e514a16f..88cd38275 100644 --- a/sidecar/src/service/sidecar_server.rs +++ b/sidecar/src/service/sidecar_server.rs @@ -46,7 +46,7 @@ use datadog_ipc::platform::FileBackedHandle; use datadog_ipc::tarpc::server::{Channel, InFlightRequest}; use datadog_remote_config::fetch::ConfigInvariants; use datadog_trace_utils::tracer_header_tags::TracerHeaderTags; -use dogstatsd_client::DogStatsDActionOwned; +use dogstatsd_client::{new_flusher, DogStatsDActionOwned}; use tinybytes; type NoResponse = Ready<()>; @@ -670,9 +670,8 @@ impl SidecarInterface for SidecarServer { cfg.set_endpoint(endpoint).ok(); }); session.configure_dogstatsd(|dogstatsd| { - dogstatsd - .set_endpoint(config.dogstatsd_endpoint.clone()) - .ok(); + let d = new_flusher(config.dogstatsd_endpoint.clone()).ok(); + *dogstatsd = d; }); session.set_remote_config_invariants(ConfigInvariants { language: config.language, @@ -853,7 +852,8 @@ impl SidecarInterface for SidecarServer { tokio::spawn(async move { self.get_session(&instance_id.session_id) .get_dogstatsd() - .send_owned(actions); + .as_ref() + .inspect(|f| f.send_owned(actions)); }); no_response()