From 75ecb44e0184ba49540c8b112effe67403bf6b76 Mon Sep 17 00:00:00 2001 From: Andrew Glaude Date: Mon, 9 Sep 2024 16:14:21 -0400 Subject: [PATCH 01/35] move dogstatsd-client to separate crate so it can be shared --- Cargo.lock | 19 +++++++++++++++++++ Cargo.toml | 2 +- dogstatsd-client/Cargo.toml | 19 +++++++++++++++++++ .../src/lib.rs | 8 +++++--- sidecar-ffi/Cargo.toml | 1 + sidecar-ffi/src/lib.rs | 2 +- sidecar/Cargo.toml | 1 + sidecar/src/lib.rs | 1 - sidecar/src/service/blocking.rs | 2 +- sidecar/src/service/session_info.rs | 8 ++++---- sidecar/src/service/sidecar_interface.rs | 2 +- sidecar/src/service/sidecar_server.rs | 2 +- 12 files changed, 54 insertions(+), 13 deletions(-) create mode 100644 dogstatsd-client/Cargo.toml rename sidecar/src/dogstatsd.rs => dogstatsd-client/src/lib.rs (98%) diff --git a/Cargo.lock b/Cargo.lock index d878b3ff6..c40e566a1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1612,6 +1612,7 @@ dependencies = [ "datadog-trace-utils", "ddcommon", "ddtelemetry", + "dogstatsd-client", "futures", "hashbrown 0.12.3", "http 0.2.12", @@ -1662,6 +1663,7 @@ dependencies = [ "ddcommon-ffi", "ddtelemetry", "ddtelemetry-ffi", + "dogstatsd-client", "hyper 0.14.28", "libc", "paste", @@ -1984,6 +1986,23 @@ dependencies = [ "ustr", ] +[[package]] +name = "dogstatsd-client" +version = "12.0.0" +dependencies = [ + "anyhow", + "cadence", + "datadog-ddsketch", + "datadog-trace-normalization", + "datadog-trace-protobuf", + "datadog-trace-utils", + "ddcommon", + "http 0.2.12", + "hyper 0.14.28", + "serde", + "tracing", +] + [[package]] name = "dunce" version = "1.0.4" diff --git a/Cargo.toml b/Cargo.toml index d4b3238b1..db00b0877 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,7 +33,7 @@ members = [ "data-pipeline", "data-pipeline-ffi", "ddsketch", - "tinybytes", + "tinybytes", "dogstatsd-client", ] default-members = [ diff --git a/dogstatsd-client/Cargo.toml b/dogstatsd-client/Cargo.toml new file mode 100644 index 000000000..1e49792b4 --- /dev/null +++ b/dogstatsd-client/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "dogstatsd-client" +rust-version.workspace = true +edition.workspace = true +version.workspace = true +license.workspace = true + +[dependencies] +ddcommon = { path = "../ddcommon" } +datadog-trace-protobuf = { path = "../trace-protobuf" } +datadog-trace-utils = { path = "../trace-utils" } +datadog-trace-normalization = { path = "../trace-normalization" } +datadog-ddsketch = { path = "../ddsketch"} +cadence = "1.3.0" +serde = { version = "1.0", features = ["derive", "rc"] } +tracing = { version = "0.1", default-features = false } +anyhow = { version = "1.0" } +hyper = { version = "0.14", features = ["client"], default-features = false } +http = "0.2" diff --git a/sidecar/src/dogstatsd.rs b/dogstatsd-client/src/lib.rs similarity index 98% rename from sidecar/src/dogstatsd.rs rename to dogstatsd-client/src/lib.rs index b2b6777dd..b27a7b030 100644 --- a/sidecar/src/dogstatsd.rs +++ b/dogstatsd-client/src/lib.rs @@ -1,4 +1,4 @@ -// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 use ddcommon::tag::Tag; @@ -122,7 +122,7 @@ fn create_client(endpoint: Option) -> anyhow::Result { None => return Err(anyhow!("no endpoint set")), }; - return match endpoint.url.scheme_str() { + match endpoint.url.scheme_str() { #[cfg(unix)] Some("unix") => { let socket = UnixDatagram::unbound()?; @@ -157,7 +157,7 @@ fn create_client(endpoint: Option) -> anyhow::Result { Ok(StatsdClient::from_sink("", sink)) } - }; + } } #[cfg(test)] @@ -245,4 +245,6 @@ mod test { ))); assert!(res.is_ok()); } + + } diff --git a/sidecar-ffi/Cargo.toml b/sidecar-ffi/Cargo.toml index 68cc0aeba..b0fa91da5 100644 --- a/sidecar-ffi/Cargo.toml +++ b/sidecar-ffi/Cargo.toml @@ -21,6 +21,7 @@ ddtelemetry-ffi = { path = "../ddtelemetry-ffi", default-features = false } datadog-remote-config = { path = "../remote-config" } paste = "1" libc = "0.2" +dogstatsd-client = { path = "../dogstatsd-client" } [dev-dependencies] hyper = { version = "0.14", default-features = false } diff --git a/sidecar-ffi/src/lib.rs b/sidecar-ffi/src/lib.rs index d786e95a5..a8a929c39 100644 --- a/sidecar-ffi/src/lib.rs +++ b/sidecar-ffi/src/lib.rs @@ -12,7 +12,7 @@ use datadog_sidecar::agent_remote_config::{ use datadog_sidecar::config; use datadog_sidecar::config::LogMethod; use datadog_sidecar::crashtracker::crashtracker_unix_socket_path; -use datadog_sidecar::dogstatsd::DogStatsDAction; +use dogstatsd_client::DogStatsDAction; use datadog_sidecar::one_way_shared_memory::{OneWayShmReader, ReaderOpener}; use datadog_sidecar::service::{ blocking::{self, SidecarTransport}, diff --git a/sidecar/Cargo.toml b/sidecar/Cargo.toml index eb39ca826..abeb6c796 100644 --- a/sidecar/Cargo.toml +++ b/sidecar/Cargo.toml @@ -26,6 +26,7 @@ datadog-trace-utils = { path = "../trace-utils" } datadog-trace-normalization = { path = "../trace-normalization" } datadog-remote-config = { path = "../remote-config" } datadog-crashtracker = { path = "../crashtracker" } +dogstatsd-client = { path = "../dogstatsd-client" } futures = { version = "0.3", default-features = false } manual_future = "0.1.1" diff --git a/sidecar/src/lib.rs b/sidecar/src/lib.rs index fe2d8ba15..62349a761 100644 --- a/sidecar/src/lib.rs +++ b/sidecar/src/lib.rs @@ -3,7 +3,6 @@ pub mod agent_remote_config; pub mod config; pub mod crashtracker; -pub mod dogstatsd; mod dump; pub mod entry; #[cfg(feature = "tracing")] diff --git a/sidecar/src/service/blocking.rs b/sidecar/src/service/blocking.rs index f1c057689..fb9d8b4ef 100644 --- a/sidecar/src/service/blocking.rs +++ b/sidecar/src/service/blocking.rs @@ -5,7 +5,7 @@ use super::{ InstanceId, QueueId, RuntimeMetadata, SerializedTracerHeaderTags, SessionConfig, SidecarAction, SidecarInterfaceRequest, SidecarInterfaceResponse, }; -use crate::dogstatsd::DogStatsDAction; +use dogstatsd_client::DogStatsDAction; use datadog_ipc::platform::{Channel, ShmHandle}; use datadog_ipc::transport::blocking::BlockingTransport; use std::sync::Mutex; diff --git a/sidecar/src/service/session_info.rs b/sidecar/src/service/session_info.rs index 7acc3e816..4e1a20fd7 100644 --- a/sidecar/src/service/session_info.rs +++ b/sidecar/src/service/session_info.rs @@ -12,7 +12,7 @@ use futures::future; use tracing::{enabled, info, Level}; use crate::log::{MultiEnvFilterGuard, MultiWriterGuard}; -use crate::{dogstatsd, tracer}; +use crate::tracer; use crate::service::{InstanceId, RuntimeInfo}; /// `SessionInfo` holds information about a session. @@ -24,7 +24,7 @@ pub(crate) struct SessionInfo { runtimes: Arc>>, pub(crate) session_config: Arc>>, tracer_config: Arc>, - dogstatsd: Arc>, + dogstatsd: Arc>, remote_config_invariants: Arc>>, #[cfg(windows)] pub(crate) remote_config_notify_function: @@ -168,13 +168,13 @@ impl SessionInfo { f(&mut self.get_trace_config()); } - pub(crate) fn get_dogstatsd(&self) -> MutexGuard { + pub(crate) fn get_dogstatsd(&self) -> MutexGuard { self.dogstatsd.lock().unwrap() } pub(crate) fn configure_dogstatsd(&self, f: F) where - F: FnOnce(&mut dogstatsd::Flusher), + F: FnOnce(&mut dogstatsd_client::Flusher), { f(&mut self.get_dogstatsd()); } diff --git a/sidecar/src/service/sidecar_interface.rs b/sidecar/src/service/sidecar_interface.rs index eb8b2fbc9..d860ee8bd 100644 --- a/sidecar/src/service/sidecar_interface.rs +++ b/sidecar/src/service/sidecar_interface.rs @@ -1,7 +1,7 @@ // Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use crate::dogstatsd::DogStatsDAction; +use dogstatsd_client::DogStatsDAction; use crate::service::{ InstanceId, QueueId, RequestIdentification, RequestIdentifier, RuntimeMetadata, SerializedTracerHeaderTags, SessionConfig, SidecarAction, diff --git a/sidecar/src/service/sidecar_server.rs b/sidecar/src/service/sidecar_server.rs index fb65646b3..52f9e7d3d 100644 --- a/sidecar/src/service/sidecar_server.rs +++ b/sidecar/src/service/sidecar_server.rs @@ -39,7 +39,7 @@ use serde::{Deserialize, Serialize}; use tokio::task::{JoinError, JoinHandle}; use crate::config::get_product_endpoint; -use crate::dogstatsd::DogStatsDAction; +use dogstatsd_client::DogStatsDAction; use crate::service::remote_configs::{RemoteConfigNotifyTarget, RemoteConfigs}; use crate::service::runtime_info::ActiveApplication; use crate::service::telemetry::enqueued_telemetry_stats::EnqueuedTelemetryStats; From ba57b1daaa305a042c9b2949bc773a12c5010449 Mon Sep 17 00:00:00 2001 From: Andrew Glaude Date: Tue, 10 Sep 2024 16:45:12 -0400 Subject: [PATCH 02/35] adding some initial stats and tests --- Cargo.lock | 2 + data-pipeline/Cargo.toml | 2 + data-pipeline/src/trace_exporter.rs | 136 ++++++++++++++++++++++++---- dogstatsd-client/src/lib.rs | 14 ++- 4 files changed, 134 insertions(+), 20 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c40e566a1..551e4de32 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1345,7 +1345,9 @@ dependencies = [ "datadog-trace-protobuf", "datadog-trace-utils", "ddcommon", + "dogstatsd-client", "futures", + "httpmock", "hyper 0.14.28", "log", "rand", diff --git a/data-pipeline/Cargo.toml b/data-pipeline/Cargo.toml index 556f38618..a3a9244ed 100644 --- a/data-pipeline/Cargo.toml +++ b/data-pipeline/Cargo.toml @@ -23,6 +23,7 @@ datadog-trace-protobuf = { path = "../trace-protobuf" } datadog-trace-utils = { path = "../trace-utils" } datadog-trace-normalization = { path = "../trace-normalization" } datadog-ddsketch = { path = "../ddsketch"} +dogstatsd-client = { path = "../dogstatsd-client"} [lib] bench = false @@ -35,3 +36,4 @@ path = "benches/main.rs" [dev-dependencies] criterion = "0.5.1" rand = "0.8.5" +httpmock = "0.7.0" diff --git a/data-pipeline/src/trace_exporter.rs b/data-pipeline/src/trace_exporter.rs index 7aa25cd9c..f58f5424c 100644 --- a/data-pipeline/src/trace_exporter.rs +++ b/data-pipeline/src/trace_exporter.rs @@ -12,6 +12,7 @@ use hyper::{Body, Client, Method, Uri}; use log::error; use std::{borrow::Borrow, collections::HashMap, str::FromStr}; use tokio::runtime::Runtime; +use dogstatsd_client::{Flusher, DogStatsDAction}; /// TraceExporterInputFormat represents the format of the input traces. /// The input format can be either Proxy or V0.4, where V0.4 is the default. @@ -113,6 +114,8 @@ pub struct TraceExporter { // TODO - do something with the response callback - https://datadoghq.atlassian.net/browse/APMSP-1019 _response_callback: Option>, runtime: Runtime, + // None if dogstatsd is disabled + dogstatsd: Option, } impl TraceExporter { @@ -120,14 +123,14 @@ impl TraceExporter { TraceExporterBuilder::default() } - pub fn send(&self, data: &[u8], trace_count: usize) -> Result { + pub fn send(&mut self, data: &[u8], trace_count: usize) -> Result { match self.input_format { TraceExporterInputFormat::Proxy => self.send_proxy(data, trace_count), TraceExporterInputFormat::V04 => self.send_deser_ser(data), } } - fn send_proxy(&self, data: &[u8], trace_count: usize) -> Result { + fn send_proxy(&mut self, data: &[u8], trace_count: usize) -> Result { self.send_data_to_url( data, trace_count, @@ -136,7 +139,7 @@ impl TraceExporter { } fn send_data_to_url( - &self, + &mut self, data: &[u8], trace_count: usize, uri: Uri, @@ -187,17 +190,32 @@ impl TraceExporter { }) .or_else(|err| { error!("Error sending traces: {err}"); + if let Some(ref mut flusher) = self.dogstatsd { + flusher.send(vec![ + DogStatsDAction::Count(String::from("datadog.libdatadog.send.errors"), + 1, + vec![])]); + } Ok(String::from("{}")) }) } - fn send_deser_ser(&self, data: &[u8]) -> Result { + fn emit_stat(&mut self, action: DogStatsDAction) { + if let Some(ref mut flusher) = self.dogstatsd { + flusher.send(vec![action]); + } + } + + fn send_deser_ser(&mut self, data: &[u8]) -> Result { let size = data.len(); // TODO base on input format let traces: Vec> = match rmp_serde::from_slice(data) { Ok(res) => res, Err(err) => { error!("Error deserializing trace from request body: {err}"); + self.emit_stat(DogStatsDAction::Count(String::from("datadog.libdatadog.deser_traces.errors"), + 1, + vec![])); return Ok(String::from("{}")); } }; @@ -207,23 +225,36 @@ impl TraceExporter { return Ok(String::from("{}")); } + // todo: do we need to modify the client to allow for &str to avoid allocating a String? + // todo: what tags to attach + self.emit_stat(DogStatsDAction::Count(String::from("datadog.libdatadog.deser_traces"), + traces.len() as i64, + vec![])); + let header_tags: TracerHeaderTags<'_> = (&self.tags).into(); match self.output_format { - TraceExporterOutputFormat::V04 => rmp_serde::to_vec_named(&traces).map_or_else( - |err| { - error!("Error serializing traces: {err}"); - Ok(String::from("{}")) - }, - |res| { - self.send_data_to_url( - &res, - traces.len(), - self.output_format.add_path(&self.endpoint.url), + TraceExporterOutputFormat::V04 => + { + rmp_serde::to_vec_named(&traces).map_err( + |err| { + error!("Error serializing traces: {err}"); + self.emit_stat(DogStatsDAction::Count(String::from("datadog.libdatadog.ser_traces.errors"), + 1, + vec![])); + String::from("{}") + }).and_then( + |res| { + self.send_data_to_url( + &res, + traces.len(), + self.output_format.add_path(&self.endpoint.url), + ) + }, ) - }, - ), - TraceExporterOutputFormat::V07 => { + } + + TraceExporterOutputFormat::V07 => { let tracer_payload = trace_utils::collect_trace_chunks( traces, &header_tags, @@ -243,11 +274,21 @@ impl TraceExporter { Ok(body) => Ok(String::from_utf8_lossy(&body).to_string()), Err(err) => { error!("Error reading agent response body: {err}"); + if let Some(ref mut flusher) = self.dogstatsd { + flusher.send(vec![DogStatsDAction::Count(String::from("datadog.libdatadog.send.errors"), + 1, + vec![])]); + } Ok(String::from("{}")) } }, Err(err) => { error!("Error sending traces: {err}"); + if let Some(ref mut flusher) = self.dogstatsd { + flusher.send(vec![DogStatsDAction::Count(String::from("datadog.libdatadog.send.errors"), + 1, + vec![])]); + } Ok(String::from("{}")) } } @@ -267,6 +308,7 @@ pub struct TraceExporterBuilder { input_format: TraceExporterInputFormat, output_format: TraceExporterOutputFormat, response_callback: Option>, + dogstatsd_url: Option, } impl TraceExporterBuilder { @@ -275,6 +317,11 @@ impl TraceExporterBuilder { self } + pub fn set_dogstatsd_url(mut self, url: &str) -> Self { + self.dogstatsd_url = Some(url.to_owned()); + self + } + pub fn set_tracer_version(mut self, tracer_version: &str) -> Self { tracer_version.clone_into(&mut self.tracer_version); self @@ -315,6 +362,13 @@ impl TraceExporterBuilder { .enable_all() .build()?; + let dogstatsd = self.dogstatsd_url.and_then( + |u| { + let mut flusher = Flusher::default(); + flusher.set_endpoint(Endpoint::from_slice(&u)).map(|_| flusher).ok() // If we couldn't set the endpoint return None + } + ); + Ok(TraceExporter { endpoint: Endpoint::from_slice(self.url.as_deref().unwrap_or("http://127.0.0.1:8126")), tags: TracerTags { @@ -327,6 +381,7 @@ impl TraceExporterBuilder { output_format: self.output_format, _response_callback: self.response_callback, runtime, + dogstatsd, }) } } @@ -339,6 +394,9 @@ pub trait ResponseCallback { mod tests { use super::*; use std::collections::HashMap; + use std::net; + use std::time::Duration; + use httpmock::prelude::*; #[test] fn new() { @@ -429,4 +487,48 @@ mod tests { "rustc" ); } + + #[test] + fn health_metrics() { + let stats_socket = net::UdpSocket::bind("127.0.0.1:0").expect("failed to bind host socket"); + let _ = stats_socket.set_read_timeout(Some(Duration::from_millis(500))); + + let fake_agent = MockServer::start(); + let _mock_traces = fake_agent.mock(|when, then| { + when.method(GET) + .path("/v0.4/traces"); + then.status(200) + .header("content-type", "application/json") + .body("{}"); + }); + + let builder = TraceExporterBuilder::default(); + let mut exporter = builder + .set_url(&fake_agent.url("/v0.4/traces")) + .set_dogstatsd_url(&stats_socket.local_addr().unwrap().to_string()) + .set_tracer_version("v0.1") + .set_language("nodejs") + .set_language_version("1.0") + .set_language_interpreter("v8") + .build() + .unwrap(); + + let traces: Vec> = vec![vec![pb::Span{name: "test".to_string(), ..Default::default()}], vec![pb::Span{name: "test2".to_string(), ..Default::default()}]]; + let bytes = rmp_serde::to_vec_named(&traces).expect("failed to serialize static trace"); + let result = exporter.send(&*bytes, 1).expect("failed to send trace"); + + // flush so we don't have to wait + //todo: why does this test take so long still + exporter.dogstatsd.map(|mut d| d.flush()); + + fn read(socket: &net::UdpSocket) -> String { + let mut buf = [0; 1_000]; + socket.recv(&mut buf).expect("No data"); + let datagram = String::from_utf8_lossy(buf.as_ref()); + datagram.trim_matches(char::from(0)).to_string() + } + + assert_eq!("datadog.libdatadog.deser_traces:2|c", read(&stats_socket)); + assert_eq!("datadog.libdatadog.send.errors:1|c", read(&stats_socket)); + } } diff --git a/dogstatsd-client/src/lib.rs b/dogstatsd-client/src/lib.rs index b27a7b030..a98d0a6ea 100644 --- a/dogstatsd-client/src/lib.rs +++ b/dogstatsd-client/src/lib.rs @@ -11,7 +11,7 @@ use anyhow::anyhow; use cadence::prelude::*; #[cfg(unix)] use cadence::UnixMetricSink; -use cadence::{Metric, MetricBuilder, QueuingMetricSink, StatsdClient, UdpMetricSink}; +use cadence::{Metric, MetricBuilder, MetricResult, QueuingMetricSink, StatsdClient, UdpMetricSink}; #[cfg(unix)] use ddcommon::connector::uds::socket_path_from_uri; use std::net::{ToSocketAddrs, UdpSocket}; @@ -57,6 +57,14 @@ impl Flusher { Ok(()) } + pub fn flush(&mut self) -> MetricResult<()> { + if let Ok(client) = self.get_client() { + client.flush() + } else { + Ok(()) + } + } + pub fn send(&mut self, actions: Vec) { if self.endpoint.is_none() { return; @@ -162,8 +170,8 @@ fn create_client(endpoint: Option) -> anyhow::Result { #[cfg(test)] mod test { - use crate::dogstatsd::DogStatsDAction::{Count, Distribution, Gauge, Histogram, Set}; - use crate::dogstatsd::{create_client, Flusher}; + use crate::DogStatsDAction::{Count, Distribution, Gauge, Histogram, Set}; + use crate::{create_client, Flusher}; #[cfg(unix)] use ddcommon::connector::uds::socket_path_to_uri; use ddcommon::{tag, Endpoint}; From 581c2432a00f713886809460c8f9835a0006d021 Mon Sep 17 00:00:00 2001 From: Andrew Glaude Date: Tue, 10 Sep 2024 16:51:49 -0400 Subject: [PATCH 03/35] make traceexporter mut --- data-pipeline-ffi/src/trace_exporter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data-pipeline-ffi/src/trace_exporter.rs b/data-pipeline-ffi/src/trace_exporter.rs index 3786451bd..cdaa714da 100644 --- a/data-pipeline-ffi/src/trace_exporter.rs +++ b/data-pipeline-ffi/src/trace_exporter.rs @@ -89,7 +89,7 @@ pub unsafe extern "C" fn ddog_trace_exporter_free(handle: Box) { /// * `trace_count` - The number of traces to send to the Datadog Agent. #[no_mangle] pub unsafe extern "C" fn ddog_trace_exporter_send( - handle: &TraceExporter, + handle: &mut TraceExporter, trace: ByteSlice, trace_count: usize, ) -> MaybeError { From 7411f46954c5f8a59b5c5d8ab53f315bbd475626 Mon Sep 17 00:00:00 2001 From: Andrew Glaude Date: Wed, 11 Sep 2024 10:55:15 -0400 Subject: [PATCH 04/35] miri can ignore unit test --- data-pipeline/src/trace_exporter.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/data-pipeline/src/trace_exporter.rs b/data-pipeline/src/trace_exporter.rs index 4c0bddfae..607e458db 100644 --- a/data-pipeline/src/trace_exporter.rs +++ b/data-pipeline/src/trace_exporter.rs @@ -508,6 +508,7 @@ mod tests { } #[test] + #[cfg_attr(miri, ignore)] fn health_metrics() { let stats_socket = net::UdpSocket::bind("127.0.0.1:0").expect("failed to bind host socket"); let _ = stats_socket.set_read_timeout(Some(Duration::from_millis(500))); From 651525007489985b0cbf03577d2fd0497adabf8b Mon Sep 17 00:00:00 2001 From: Andrew Glaude Date: Thu, 12 Sep 2024 11:16:17 -0400 Subject: [PATCH 05/35] refactor dogstatsd-client to not need mutability --- data-pipeline-ffi/src/trace_exporter.rs | 2 +- data-pipeline/src/trace_exporter.rs | 23 ++++--- dogstatsd-client/src/lib.rs | 80 +++++++++---------------- sidecar/src/service/sidecar_server.rs | 7 ++- 4 files changed, 43 insertions(+), 69 deletions(-) diff --git a/data-pipeline-ffi/src/trace_exporter.rs b/data-pipeline-ffi/src/trace_exporter.rs index cdaa714da..3786451bd 100644 --- a/data-pipeline-ffi/src/trace_exporter.rs +++ b/data-pipeline-ffi/src/trace_exporter.rs @@ -89,7 +89,7 @@ pub unsafe extern "C" fn ddog_trace_exporter_free(handle: Box) { /// * `trace_count` - The number of traces to send to the Datadog Agent. #[no_mangle] pub unsafe extern "C" fn ddog_trace_exporter_send( - handle: &mut TraceExporter, + handle: &TraceExporter, trace: ByteSlice, trace_count: usize, ) -> MaybeError { diff --git a/data-pipeline/src/trace_exporter.rs b/data-pipeline/src/trace_exporter.rs index 607e458db..5df25c482 100644 --- a/data-pipeline/src/trace_exporter.rs +++ b/data-pipeline/src/trace_exporter.rs @@ -12,7 +12,7 @@ use hyper::{Body, Client, Method, Uri}; use log::error; use std::{borrow::Borrow, collections::HashMap, str::FromStr}; use tokio::runtime::Runtime; -use dogstatsd_client::{Flusher, DogStatsDAction}; +use dogstatsd_client::{Flusher, DogStatsDAction, new_flusher}; /// TraceExporterInputFormat represents the format of the input traces. /// The input format can be either Proxy or V0.4, where V0.4 is the default. @@ -129,14 +129,14 @@ impl TraceExporter { } #[allow(missing_docs)] - pub fn send(&mut self, data: &[u8], trace_count: usize) -> Result { + pub fn send(&self, data: &[u8], trace_count: usize) -> Result { match self.input_format { TraceExporterInputFormat::Proxy => self.send_proxy(data, trace_count), TraceExporterInputFormat::V04 => self.send_deser_ser(data), } } - fn send_proxy(&mut self, data: &[u8], trace_count: usize) -> Result { + fn send_proxy(&self, data: &[u8], trace_count: usize) -> Result { self.send_data_to_url( data, trace_count, @@ -145,7 +145,7 @@ impl TraceExporter { } fn send_data_to_url( - &mut self, + &self, data: &[u8], trace_count: usize, uri: Uri, @@ -196,7 +196,7 @@ impl TraceExporter { }) .or_else(|err| { error!("Error sending traces: {err}"); - if let Some(ref mut flusher) = self.dogstatsd { + if let Some(flusher) = &self.dogstatsd { flusher.send(vec![ DogStatsDAction::Count(String::from("datadog.libdatadog.send.errors"), 1, @@ -206,13 +206,13 @@ impl TraceExporter { }) } - fn emit_stat(&mut self, action: DogStatsDAction) { - if let Some(ref mut flusher) = self.dogstatsd { + fn emit_stat(&self, action: DogStatsDAction) { + if let Some(flusher) = &self.dogstatsd { flusher.send(vec![action]); } } - fn send_deser_ser(&mut self, data: &[u8]) -> Result { + fn send_deser_ser(&self, data: &[u8]) -> Result { let size = data.len(); // TODO base on input format let traces: Vec> = match rmp_serde::from_slice(data) { @@ -280,7 +280,7 @@ impl TraceExporter { Ok(body) => Ok(String::from_utf8_lossy(&body).to_string()), Err(err) => { error!("Error reading agent response body: {err}"); - if let Some(ref mut flusher) = self.dogstatsd { + if let Some(flusher) = &self.dogstatsd { flusher.send(vec![DogStatsDAction::Count(String::from("datadog.libdatadog.send.errors"), 1, vec![])]); @@ -290,7 +290,7 @@ impl TraceExporter { }, Err(err) => { error!("Error sending traces: {err}"); - if let Some(ref mut flusher) = self.dogstatsd { + if let Some(flusher) = &self.dogstatsd { flusher.send(vec![DogStatsDAction::Count(String::from("datadog.libdatadog.send.errors"), 1, vec![])]); @@ -381,8 +381,7 @@ impl TraceExporterBuilder { let dogstatsd = self.dogstatsd_url.and_then( |u| { - let mut flusher = Flusher::default(); - flusher.set_endpoint(Endpoint::from_slice(&u)).map(|_| flusher).ok() // If we couldn't set the endpoint return None + new_flusher(Endpoint::from_slice(&u)).ok() // If we couldn't set the endpoint return None } ); diff --git a/dogstatsd-client/src/lib.rs b/dogstatsd-client/src/lib.rs index a98d0a6ea..f43cc31d5 100644 --- a/dogstatsd-client/src/lib.rs +++ b/dogstatsd-client/src/lib.rs @@ -5,13 +5,13 @@ use ddcommon::tag::Tag; use ddcommon::Endpoint; use serde::{Deserialize, Serialize}; use std::fmt::Debug; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, info}; use anyhow::anyhow; use cadence::prelude::*; #[cfg(unix)] use cadence::UnixMetricSink; -use cadence::{Metric, MetricBuilder, MetricResult, QueuingMetricSink, StatsdClient, UdpMetricSink}; +use cadence::{Metric, MetricBuilder, QueuingMetricSink, StatsdClient, UdpMetricSink}; #[cfg(unix)] use ddcommon::connector::uds::socket_path_from_uri; use std::net::{ToSocketAddrs, UdpSocket}; @@ -35,49 +35,42 @@ pub enum DogStatsDAction { Set(String, i64, Vec), } +/// A dogstatsd-client that flushes stats to a given endpoint. +/// The default value has no address, use `new_flusher` or `set_endpoint` to configure an endpoint. #[derive(Default)] pub struct Flusher { - pub endpoint: Option, client: Option, } +pub fn new_flusher(endpoint: Endpoint) -> anyhow::Result { + let mut f = Flusher::default(); + f.set_endpoint(endpoint)?; + Ok(f) +} + impl Flusher { + /// Set the destination for dogstatsd metrics, if an API Key is provided the client is disabled + /// as dogstatsd is not allowed in agentless mode. Returns an error if the provided endpoint + /// is invalid. pub fn set_endpoint(&mut self, endpoint: Endpoint) -> anyhow::Result<()> { - self.client = None; - self.endpoint = match endpoint.api_key { + self.client = match endpoint.api_key { Some(_) => { info!("DogStatsD is not available in agentless mode"); None } None => { debug!("Updating DogStatsD endpoint to {}", endpoint.url); - Some(endpoint) + Some(create_client(&endpoint)?) } }; Ok(()) } - pub fn flush(&mut self) -> MetricResult<()> { - if let Ok(client) = self.get_client() { - client.flush() - } else { - Ok(()) - } - } - - pub fn send(&mut self, actions: Vec) { - if self.endpoint.is_none() { + pub fn send(&self, actions: Vec) { + if self.client.is_none() { return; } - - let client = match self.get_client() { - Ok(client) => client, - Err(msg) => { - self.endpoint = None; - warn!("Cannot send DogStatsD metrics: {}", msg); - return; - } - }; + let client = self.client.as_ref().unwrap(); for action in actions { if let Err(err) = match action { @@ -101,16 +94,6 @@ impl Flusher { } } } - - fn get_client(&mut self) -> anyhow::Result<&StatsdClient> { - let opt = &mut self.client; - let client = match opt { - Some(client) => client, - None => opt.get_or_insert(create_client(self.endpoint.clone())?), - }; - - Ok(client) - } } fn do_send<'a, T>(mut builder: MetricBuilder<'a, '_, T>, tags: &'a Vec) -> anyhow::Result<()> @@ -124,12 +107,7 @@ where Ok(()) } -fn create_client(endpoint: Option) -> anyhow::Result { - let endpoint = match endpoint { - Some(endpoint) => endpoint, - None => return Err(anyhow!("no endpoint set")), - }; - +fn create_client(endpoint: &Endpoint) -> anyhow::Result { match endpoint.url.scheme_str() { #[cfg(unix)] Some("unix") => { @@ -219,22 +197,18 @@ mod test { #[test] #[cfg_attr(miri, ignore)] fn test_create_client_udp() { - let res = create_client(None); - assert!(res.is_err()); - assert_eq!("no endpoint set", res.unwrap_err().to_string().as_str()); - - let res = create_client(Some(Endpoint::default())); + let res = create_client(&Endpoint::default()); assert!(res.is_err()); assert_eq!("invalid host", res.unwrap_err().to_string().as_str()); - let res = create_client(Some(Endpoint::from_slice("localhost:99999"))); + let res = create_client(&Endpoint::from_slice("localhost:99999")); assert!(res.is_err()); assert_eq!("invalid port", res.unwrap_err().to_string().as_str()); - let res = create_client(Some(Endpoint::from_slice("localhost:80"))); + let res = create_client(&Endpoint::from_slice("localhost:80")); assert!(res.is_ok()); - let res = create_client(Some(Endpoint::from_slice("http://localhost:80"))); + let res = create_client(&Endpoint::from_slice("http://localhost:80")); assert!(res.is_ok()); } @@ -242,15 +216,15 @@ mod test { #[cfg(unix)] #[cfg_attr(miri, ignore)] fn test_create_client_unix_domain_socket() { - let res = create_client(Some(Endpoint::from_url( + let res = create_client(&Endpoint::from_url( "unix://localhost:80".parse::().unwrap(), - ))); + )); assert!(res.is_err()); assert_eq!("invalid url", res.unwrap_err().to_string().as_str()); - let res = create_client(Some(Endpoint::from_url( + let res = create_client(&Endpoint::from_url( socket_path_to_uri("/path/to/a/socket.sock".as_ref()).unwrap(), - ))); + )); assert!(res.is_ok()); } diff --git a/sidecar/src/service/sidecar_server.rs b/sidecar/src/service/sidecar_server.rs index 52f9e7d3d..02f107385 100644 --- a/sidecar/src/service/sidecar_server.rs +++ b/sidecar/src/service/sidecar_server.rs @@ -897,9 +897,10 @@ impl SidecarInterface for SidecarServer { session.modify_trace_config(|cfg| { update_cfg(cfg.endpoint.take(), |e| cfg.set_endpoint(e), &token); }); - session.configure_dogstatsd(|cfg| { - update_cfg(cfg.endpoint.take(), |e| cfg.set_endpoint(e), &token); - }); + // TODO: the dogstatsd-client doesn't support test_session tokens yet + // session.configure_dogstatsd(|cfg| { + // update_cfg(cfg.endpoint.take(), |e| cfg.set_endpoint(e), &token); + // }); no_response() } From a388c809d2bf62f042f49a3aeb3e33d000b5fae9 Mon Sep 17 00:00:00 2001 From: Andrew Glaude Date: Thu, 12 Sep 2024 11:21:39 -0400 Subject: [PATCH 06/35] use vec default to avoid unneeded allocation --- data-pipeline/src/trace_exporter.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/data-pipeline/src/trace_exporter.rs b/data-pipeline/src/trace_exporter.rs index 5df25c482..5c0b5a4ea 100644 --- a/data-pipeline/src/trace_exporter.rs +++ b/data-pipeline/src/trace_exporter.rs @@ -200,7 +200,7 @@ impl TraceExporter { flusher.send(vec![ DogStatsDAction::Count(String::from("datadog.libdatadog.send.errors"), 1, - vec![])]); + Vec::default())]); } Ok(String::from("{}")) }) @@ -221,7 +221,7 @@ impl TraceExporter { error!("Error deserializing trace from request body: {err}"); self.emit_stat(DogStatsDAction::Count(String::from("datadog.libdatadog.deser_traces.errors"), 1, - vec![])); + Vec::default())); return Ok(String::from("{}")); } }; @@ -235,7 +235,7 @@ impl TraceExporter { // todo: what tags to attach self.emit_stat(DogStatsDAction::Count(String::from("datadog.libdatadog.deser_traces"), traces.len() as i64, - vec![])); + Vec::default())); let header_tags: TracerHeaderTags<'_> = (&self.tags).into(); @@ -247,7 +247,7 @@ impl TraceExporter { error!("Error serializing traces: {err}"); self.emit_stat(DogStatsDAction::Count(String::from("datadog.libdatadog.ser_traces.errors"), 1, - vec![])); + Vec::default())); String::from("{}") }).and_then( |res| { @@ -283,7 +283,7 @@ impl TraceExporter { if let Some(flusher) = &self.dogstatsd { flusher.send(vec![DogStatsDAction::Count(String::from("datadog.libdatadog.send.errors"), 1, - vec![])]); + Vec::default())]); } Ok(String::from("{}")) } @@ -293,7 +293,7 @@ impl TraceExporter { if let Some(flusher) = &self.dogstatsd { flusher.send(vec![DogStatsDAction::Count(String::from("datadog.libdatadog.send.errors"), 1, - vec![])]); + Vec::default())]); } Ok(String::from("{}")) } From dfb1e51e131c5b2dd88c6ed2bf5de076fa4e4319 Mon Sep 17 00:00:00 2001 From: Andrew Glaude Date: Thu, 12 Sep 2024 11:26:34 -0400 Subject: [PATCH 07/35] fmt --- data-pipeline/src/trace_exporter.rs | 104 +++++++++++++---------- dogstatsd-client/src/lib.rs | 2 - sidecar-ffi/src/lib.rs | 2 +- sidecar/src/service/blocking.rs | 2 +- sidecar/src/service/sidecar_interface.rs | 2 +- sidecar/src/service/sidecar_server.rs | 2 +- 6 files changed, 63 insertions(+), 51 deletions(-) diff --git a/data-pipeline/src/trace_exporter.rs b/data-pipeline/src/trace_exporter.rs index 5c0b5a4ea..d479e861a 100644 --- a/data-pipeline/src/trace_exporter.rs +++ b/data-pipeline/src/trace_exporter.rs @@ -7,12 +7,12 @@ use datadog_trace_utils::trace_utils::{self, SendData, TracerHeaderTags}; use datadog_trace_utils::tracer_payload; use datadog_trace_utils::tracer_payload::TraceEncoding; use ddcommon::{connector, Endpoint}; +use dogstatsd_client::{new_flusher, DogStatsDAction, Flusher}; use hyper::http::uri::PathAndQuery; use hyper::{Body, Client, Method, Uri}; use log::error; use std::{borrow::Borrow, collections::HashMap, str::FromStr}; use tokio::runtime::Runtime; -use dogstatsd_client::{Flusher, DogStatsDAction, new_flusher}; /// TraceExporterInputFormat represents the format of the input traces. /// The input format can be either Proxy or V0.4, where V0.4 is the default. @@ -197,10 +197,11 @@ impl TraceExporter { .or_else(|err| { error!("Error sending traces: {err}"); if let Some(flusher) = &self.dogstatsd { - flusher.send(vec![ - DogStatsDAction::Count(String::from("datadog.libdatadog.send.errors"), - 1, - Vec::default())]); + flusher.send(vec![DogStatsDAction::Count( + String::from("datadog.libdatadog.send.errors"), + 1, + Vec::default(), + )]); } Ok(String::from("{}")) }) @@ -219,9 +220,11 @@ impl TraceExporter { Ok(res) => res, Err(err) => { error!("Error deserializing trace from request body: {err}"); - self.emit_stat(DogStatsDAction::Count(String::from("datadog.libdatadog.deser_traces.errors"), - 1, - Vec::default())); + self.emit_stat(DogStatsDAction::Count( + String::from("datadog.libdatadog.deser_traces.errors"), + 1, + Vec::default(), + )); return Ok(String::from("{}")); } }; @@ -233,34 +236,34 @@ impl TraceExporter { // todo: do we need to modify the client to allow for &str to avoid allocating a String? // todo: what tags to attach - self.emit_stat(DogStatsDAction::Count(String::from("datadog.libdatadog.deser_traces"), - traces.len() as i64, - Vec::default())); + self.emit_stat(DogStatsDAction::Count( + String::from("datadog.libdatadog.deser_traces"), + traces.len() as i64, + Vec::default(), + )); let header_tags: TracerHeaderTags<'_> = (&self.tags).into(); match self.output_format { - TraceExporterOutputFormat::V04 => - { - rmp_serde::to_vec_named(&traces).map_err( - |err| { - error!("Error serializing traces: {err}"); - self.emit_stat(DogStatsDAction::Count(String::from("datadog.libdatadog.ser_traces.errors"), - 1, - Vec::default())); - String::from("{}") - }).and_then( - |res| { - self.send_data_to_url( - &res, - traces.len(), - self.output_format.add_path(&self.endpoint.url), - ) - }, + TraceExporterOutputFormat::V04 => rmp_serde::to_vec_named(&traces) + .map_err(|err| { + error!("Error serializing traces: {err}"); + self.emit_stat(DogStatsDAction::Count( + String::from("datadog.libdatadog.ser_traces.errors"), + 1, + Vec::default(), + )); + String::from("{}") + }) + .and_then(|res| { + self.send_data_to_url( + &res, + traces.len(), + self.output_format.add_path(&self.endpoint.url), ) - } + }), - TraceExporterOutputFormat::V07 => { + TraceExporterOutputFormat::V07 => { let tracer_payload = trace_utils::collect_trace_chunks( traces, &header_tags, @@ -281,9 +284,11 @@ impl TraceExporter { Err(err) => { error!("Error reading agent response body: {err}"); if let Some(flusher) = &self.dogstatsd { - flusher.send(vec![DogStatsDAction::Count(String::from("datadog.libdatadog.send.errors"), - 1, - Vec::default())]); + flusher.send(vec![DogStatsDAction::Count( + String::from("datadog.libdatadog.send.errors"), + 1, + Vec::default(), + )]); } Ok(String::from("{}")) } @@ -291,9 +296,11 @@ impl TraceExporter { Err(err) => { error!("Error sending traces: {err}"); if let Some(flusher) = &self.dogstatsd { - flusher.send(vec![DogStatsDAction::Count(String::from("datadog.libdatadog.send.errors"), - 1, - Vec::default())]); + flusher.send(vec![DogStatsDAction::Count( + String::from("datadog.libdatadog.send.errors"), + 1, + Vec::default(), + )]); } Ok(String::from("{}")) } @@ -379,11 +386,10 @@ impl TraceExporterBuilder { .enable_all() .build()?; - let dogstatsd = self.dogstatsd_url.and_then( - |u| { - new_flusher(Endpoint::from_slice(&u)).ok() // If we couldn't set the endpoint return None - } - ); + let dogstatsd = self.dogstatsd_url.and_then(|u| { + new_flusher(Endpoint::from_slice(&u)).ok() // If we couldn't set the endpoint return + // None + }); Ok(TraceExporter { endpoint: Endpoint::from_slice(self.url.as_deref().unwrap_or("http://127.0.0.1:8126")), @@ -411,10 +417,10 @@ pub trait ResponseCallback { #[cfg(test)] mod tests { use super::*; + use httpmock::prelude::*; use std::collections::HashMap; use std::net; use std::time::Duration; - use httpmock::prelude::*; #[test] fn new() { @@ -514,8 +520,7 @@ mod tests { let fake_agent = MockServer::start(); let _mock_traces = fake_agent.mock(|when, then| { - when.method(GET) - .path("/v0.4/traces"); + when.method(GET).path("/v0.4/traces"); then.status(200) .header("content-type", "application/json") .body("{}"); @@ -532,7 +537,16 @@ mod tests { .build() .unwrap(); - let traces: Vec> = vec![vec![pb::Span{name: "test".to_string(), ..Default::default()}], vec![pb::Span{name: "test2".to_string(), ..Default::default()}]]; + let traces: Vec> = vec![ + vec![pb::Span { + name: "test".to_string(), + ..Default::default() + }], + vec![pb::Span { + name: "test2".to_string(), + ..Default::default() + }], + ]; let bytes = rmp_serde::to_vec_named(&traces).expect("failed to serialize static trace"); let result = exporter.send(&*bytes, 1).expect("failed to send trace"); diff --git a/dogstatsd-client/src/lib.rs b/dogstatsd-client/src/lib.rs index f43cc31d5..5f8fa11ae 100644 --- a/dogstatsd-client/src/lib.rs +++ b/dogstatsd-client/src/lib.rs @@ -227,6 +227,4 @@ mod test { )); assert!(res.is_ok()); } - - } diff --git a/sidecar-ffi/src/lib.rs b/sidecar-ffi/src/lib.rs index a8a929c39..48b863dfb 100644 --- a/sidecar-ffi/src/lib.rs +++ b/sidecar-ffi/src/lib.rs @@ -12,7 +12,6 @@ use datadog_sidecar::agent_remote_config::{ use datadog_sidecar::config; use datadog_sidecar::config::LogMethod; use datadog_sidecar::crashtracker::crashtracker_unix_socket_path; -use dogstatsd_client::DogStatsDAction; use datadog_sidecar::one_way_shared_memory::{OneWayShmReader, ReaderOpener}; use datadog_sidecar::service::{ blocking::{self, SidecarTransport}, @@ -28,6 +27,7 @@ use ddtelemetry::{ worker::{LifecycleAction, TelemetryActions}, }; use ddtelemetry_ffi::try_c; +use dogstatsd_client::DogStatsDAction; use ffi::slice::AsBytes; use libc::c_char; use std::ffi::c_void; diff --git a/sidecar/src/service/blocking.rs b/sidecar/src/service/blocking.rs index fb9d8b4ef..ad086a192 100644 --- a/sidecar/src/service/blocking.rs +++ b/sidecar/src/service/blocking.rs @@ -5,9 +5,9 @@ use super::{ InstanceId, QueueId, RuntimeMetadata, SerializedTracerHeaderTags, SessionConfig, SidecarAction, SidecarInterfaceRequest, SidecarInterfaceResponse, }; -use dogstatsd_client::DogStatsDAction; use datadog_ipc::platform::{Channel, ShmHandle}; use datadog_ipc::transport::blocking::BlockingTransport; +use dogstatsd_client::DogStatsDAction; use std::sync::Mutex; use std::{ borrow::Cow, diff --git a/sidecar/src/service/sidecar_interface.rs b/sidecar/src/service/sidecar_interface.rs index d860ee8bd..f66029234 100644 --- a/sidecar/src/service/sidecar_interface.rs +++ b/sidecar/src/service/sidecar_interface.rs @@ -1,7 +1,6 @@ // Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use dogstatsd_client::DogStatsDAction; use crate::service::{ InstanceId, QueueId, RequestIdentification, RequestIdentifier, RuntimeMetadata, SerializedTracerHeaderTags, SessionConfig, SidecarAction, @@ -9,6 +8,7 @@ use crate::service::{ use anyhow::Result; use datadog_ipc::platform::ShmHandle; use datadog_ipc::tarpc; +use dogstatsd_client::DogStatsDAction; // This is a bit weird, but depending on the OS we're interested in different things... // and the macro expansion is not going to be happy with #[cfg()] instructions inside them. diff --git a/sidecar/src/service/sidecar_server.rs b/sidecar/src/service/sidecar_server.rs index 02f107385..e94119100 100644 --- a/sidecar/src/service/sidecar_server.rs +++ b/sidecar/src/service/sidecar_server.rs @@ -39,7 +39,6 @@ use serde::{Deserialize, Serialize}; use tokio::task::{JoinError, JoinHandle}; use crate::config::get_product_endpoint; -use dogstatsd_client::DogStatsDAction; use crate::service::remote_configs::{RemoteConfigNotifyTarget, RemoteConfigs}; use crate::service::runtime_info::ActiveApplication; use crate::service::telemetry::enqueued_telemetry_stats::EnqueuedTelemetryStats; @@ -48,6 +47,7 @@ use datadog_ipc::platform::FileBackedHandle; use datadog_ipc::tarpc::server::{Channel, InFlightRequest}; use datadog_remote_config::fetch::ConfigInvariants; use datadog_trace_utils::tracer_header_tags::TracerHeaderTags; +use dogstatsd_client::DogStatsDAction; type NoResponse = Ready<()>; From 062a891814a213cebb53355ad5274ffcfa7f4e34 Mon Sep 17 00:00:00 2001 From: Andrew Glaude Date: Thu, 12 Sep 2024 13:03:42 -0400 Subject: [PATCH 08/35] add dogstatsd-client to docker build tool --- tools/docker/Dockerfile.build | 1 + 1 file changed, 1 insertion(+) diff --git a/tools/docker/Dockerfile.build b/tools/docker/Dockerfile.build index 13cc2cfac..01b4ae2d9 100644 --- a/tools/docker/Dockerfile.build +++ b/tools/docker/Dockerfile.build @@ -78,6 +78,7 @@ COPY "ddtelemetry/Cargo.toml" "ddtelemetry/" COPY "ddtelemetry-ffi/Cargo.toml" "ddtelemetry-ffi/" COPY "ddsketch/Cargo.toml" "ddsketch/" COPY "dogstatsd/Cargo.toml" "dogstatsd/" +COPY "dogstatsd-client/Cargo.toml" "dogstatsd-client/" COPY "dynamic-configuration/Cargo.toml" "dynamic-configuration/" COPY "profiling/Cargo.toml" "profiling/" COPY "profiling-ffi/Cargo.toml" "profiling-ffi/" From d9d21ee86834b86be945730dd18dab51d5254642 Mon Sep 17 00:00:00 2001 From: Andrew Glaude Date: Thu, 12 Sep 2024 13:21:55 -0400 Subject: [PATCH 09/35] clean up cargo.toml --- Cargo.toml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index db00b0877..977462d11 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,7 +33,8 @@ members = [ "data-pipeline", "data-pipeline-ffi", "ddsketch", - "tinybytes", "dogstatsd-client", + "tinybytes", + "dogstatsd-client", ] default-members = [ From 61d5f835f1fbc869b88013f29748e446257b0ea4 Mon Sep 17 00:00:00 2001 From: Andrew Glaude Date: Thu, 12 Sep 2024 13:25:25 -0400 Subject: [PATCH 10/35] remove unneeded dep --- Cargo.lock | 1 - dogstatsd-client/Cargo.toml | 1 - 2 files changed, 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 551e4de32..4dff22600 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1997,7 +1997,6 @@ dependencies = [ "datadog-ddsketch", "datadog-trace-normalization", "datadog-trace-protobuf", - "datadog-trace-utils", "ddcommon", "http 0.2.12", "hyper 0.14.28", diff --git a/dogstatsd-client/Cargo.toml b/dogstatsd-client/Cargo.toml index 1e49792b4..5be229170 100644 --- a/dogstatsd-client/Cargo.toml +++ b/dogstatsd-client/Cargo.toml @@ -8,7 +8,6 @@ license.workspace = true [dependencies] ddcommon = { path = "../ddcommon" } datadog-trace-protobuf = { path = "../trace-protobuf" } -datadog-trace-utils = { path = "../trace-utils" } datadog-trace-normalization = { path = "../trace-normalization" } datadog-ddsketch = { path = "../ddsketch"} cadence = "1.3.0" From 0bb2986523461a24270f5da45851dc0892622dd9 Mon Sep 17 00:00:00 2001 From: Andrew Glaude Date: Thu, 12 Sep 2024 15:19:00 -0400 Subject: [PATCH 11/35] use AsRef for metric strings, improve perf and simplify usage --- data-pipeline/src/trace_exporter.rs | 25 +++++++++------ dogstatsd-client/src/lib.rs | 41 ++++++++++++------------ sidecar/src/service/blocking.rs | 2 +- sidecar/src/service/sidecar_interface.rs | 2 +- sidecar/src/service/sidecar_server.rs | 2 +- 5 files changed, 39 insertions(+), 33 deletions(-) diff --git a/data-pipeline/src/trace_exporter.rs b/data-pipeline/src/trace_exporter.rs index d479e861a..0f072c741 100644 --- a/data-pipeline/src/trace_exporter.rs +++ b/data-pipeline/src/trace_exporter.rs @@ -39,6 +39,12 @@ pub enum TraceExporterOutputFormat { V07, } +// internal health metrics +const STAT_SEND_ERRORS: &str = "datadog.libdatadog.send.errors"; +const STAT_DESER_TRACES: &str = "datadog.libdatadog.deser_traces"; +const STAT_DESER_TRACES_ERRORS: &str = "datadog.libdatadog.deser_traces.errors"; +const STAT_SER_TRACES_ERRORS: &str = "datadog.libdatadog.ser_traces.errors"; + impl TraceExporterOutputFormat { /// Add the agent trace endpoint path to the URL. fn add_path(&self, url: &Uri) -> Uri { @@ -198,7 +204,7 @@ impl TraceExporter { error!("Error sending traces: {err}"); if let Some(flusher) = &self.dogstatsd { flusher.send(vec![DogStatsDAction::Count( - String::from("datadog.libdatadog.send.errors"), + STAT_SEND_ERRORS, 1, Vec::default(), )]); @@ -207,7 +213,7 @@ impl TraceExporter { }) } - fn emit_stat(&self, action: DogStatsDAction) { + fn emit_stat(&self, action: DogStatsDAction<&'static str>) { if let Some(flusher) = &self.dogstatsd { flusher.send(vec![action]); } @@ -221,7 +227,7 @@ impl TraceExporter { Err(err) => { error!("Error deserializing trace from request body: {err}"); self.emit_stat(DogStatsDAction::Count( - String::from("datadog.libdatadog.deser_traces.errors"), + STAT_DESER_TRACES_ERRORS, 1, Vec::default(), )); @@ -234,10 +240,9 @@ impl TraceExporter { return Ok(String::from("{}")); } - // todo: do we need to modify the client to allow for &str to avoid allocating a String? // todo: what tags to attach self.emit_stat(DogStatsDAction::Count( - String::from("datadog.libdatadog.deser_traces"), + STAT_DESER_TRACES, traces.len() as i64, Vec::default(), )); @@ -249,7 +254,7 @@ impl TraceExporter { .map_err(|err| { error!("Error serializing traces: {err}"); self.emit_stat(DogStatsDAction::Count( - String::from("datadog.libdatadog.ser_traces.errors"), + STAT_SER_TRACES_ERRORS, 1, Vec::default(), )); @@ -285,7 +290,7 @@ impl TraceExporter { error!("Error reading agent response body: {err}"); if let Some(flusher) = &self.dogstatsd { flusher.send(vec![DogStatsDAction::Count( - String::from("datadog.libdatadog.send.errors"), + STAT_SEND_ERRORS, 1, Vec::default(), )]); @@ -297,7 +302,7 @@ impl TraceExporter { error!("Error sending traces: {err}"); if let Some(flusher) = &self.dogstatsd { flusher.send(vec![DogStatsDAction::Count( - String::from("datadog.libdatadog.send.errors"), + STAT_SEND_ERRORS, 1, Vec::default(), )]); @@ -527,7 +532,7 @@ mod tests { }); let builder = TraceExporterBuilder::default(); - let mut exporter = builder + let exporter = builder .set_url(&fake_agent.url("/v0.4/traces")) .set_dogstatsd_url(&stats_socket.local_addr().unwrap().to_string()) .set_tracer_version("v0.1") @@ -548,7 +553,7 @@ mod tests { }], ]; let bytes = rmp_serde::to_vec_named(&traces).expect("failed to serialize static trace"); - let result = exporter.send(&*bytes, 1).expect("failed to send trace"); + let _result = exporter.send(&*bytes, 1).expect("failed to send trace"); fn read(socket: &net::UdpSocket) -> String { let mut buf = [0; 1_000]; diff --git a/dogstatsd-client/src/lib.rs b/dogstatsd-client/src/lib.rs index 5f8fa11ae..c2df9a0a8 100644 --- a/dogstatsd-client/src/lib.rs +++ b/dogstatsd-client/src/lib.rs @@ -22,17 +22,18 @@ use std::os::unix::net::UnixDatagram; // Queue with a maximum capacity of 32K elements const QUEUE_SIZE: usize = 32 * 1024; -/// The `DogStatsDAction` enum gathers the metric types that can be sent to the DogStatsD server. +/// The `DogStatsDActionRef` enum gathers the metric types that can be sent to the DogStatsD server. #[derive(Debug, Serialize, Deserialize)] -pub enum DogStatsDAction { - Count(String, i64, Vec), - Distribution(String, f64, Vec), - Gauge(String, f64, Vec), - Histogram(String, f64, Vec), +pub enum DogStatsDAction> { + // TODO: instead of AsRef we can accept a marker Trait that users of this crate implement + Count(T, i64, Vec), + Distribution(T, f64, Vec), + Gauge(T, f64, Vec), + Histogram(T, f64, Vec), // Cadence only support i64 type as value // but Golang implementation uses string (https://github.com/DataDog/datadog-go/blob/331d24832f7eac97b091efd696278fe2c4192b29/statsd/statsd.go#L230) // and PHP implementation uses float or string (https://github.com/DataDog/php-datadogstatsd/blob/0efdd1c38f6d3dd407efbb899ad1fd2e5cd18085/src/DogStatsd.php#L251) - Set(String, i64, Vec), + Set(T, i64, Vec), } /// A dogstatsd-client that flushes stats to a given endpoint. @@ -66,7 +67,7 @@ impl Flusher { Ok(()) } - pub fn send(&self, actions: Vec) { + pub fn send>(&self, actions: Vec>) { if self.client.is_none() { return; } @@ -75,19 +76,19 @@ impl Flusher { for action in actions { if let Err(err) = match action { DogStatsDAction::Count(metric, value, tags) => { - do_send(client.count_with_tags(metric.as_str(), value), &tags) + do_send(client.count_with_tags(metric.as_ref(), value), &tags) } DogStatsDAction::Distribution(metric, value, tags) => { - do_send(client.distribution_with_tags(metric.as_str(), value), &tags) + do_send(client.distribution_with_tags(metric.as_ref(), value), &tags) } DogStatsDAction::Gauge(metric, value, tags) => { - do_send(client.gauge_with_tags(metric.as_str(), value), &tags) + do_send(client.gauge_with_tags(metric.as_ref(), value), &tags) } DogStatsDAction::Histogram(metric, value, tags) => { - do_send(client.histogram_with_tags(metric.as_str(), value), &tags) + do_send(client.histogram_with_tags(metric.as_ref(), value), &tags) } DogStatsDAction::Set(metric, value, tags) => { - do_send(client.set_with_tags(metric.as_str(), value), &tags) + do_send(client.set_with_tags(metric.as_ref(), value), &tags) } } { error!("Error while sending metric: {}", err); @@ -169,13 +170,13 @@ mod test { socket.local_addr().unwrap().to_string().as_str(), )); flusher.send(vec![ - Count("test_count".to_string(), 3, vec![tag!("foo", "bar")]), - Count("test_neg_count".to_string(), -2, vec![]), - Distribution("test_distribution".to_string(), 4.2, vec![]), - Gauge("test_gauge".to_string(), 7.6, vec![]), - Histogram("test_histogram".to_string(), 8.0, vec![]), - Set("test_set".to_string(), 9, vec![tag!("the", "end")]), - Set("test_neg_set".to_string(), -1, vec![]), + Count("test_count", 3, vec![tag!("foo", "bar")]), + Count("test_neg_count", -2, vec![]), + Distribution("test_distribution", 4.2, vec![]), + Gauge("test_gauge", 7.6, vec![]), + Histogram("test_histogram", 8.0, vec![]), + Set("test_set", 9, vec![tag!("the", "end")]), + Set("test_neg_set", -1, vec![]), ]); fn read(socket: &net::UdpSocket) -> String { diff --git a/sidecar/src/service/blocking.rs b/sidecar/src/service/blocking.rs index ad086a192..67eaf9ddd 100644 --- a/sidecar/src/service/blocking.rs +++ b/sidecar/src/service/blocking.rs @@ -318,7 +318,7 @@ pub fn set_remote_config_data( pub fn send_dogstatsd_actions( transport: &mut SidecarTransport, instance_id: &InstanceId, - actions: Vec, + actions: Vec>, ) -> io::Result<()> { transport.send(SidecarInterfaceRequest::SendDogstatsdActions { instance_id: instance_id.clone(), diff --git a/sidecar/src/service/sidecar_interface.rs b/sidecar/src/service/sidecar_interface.rs index f66029234..0c38945ac 100644 --- a/sidecar/src/service/sidecar_interface.rs +++ b/sidecar/src/service/sidecar_interface.rs @@ -133,7 +133,7 @@ pub trait SidecarInterface { /// /// * `instance_id` - The ID of the instance. /// * `actions` - The DogStatsD actions to send. - async fn send_dogstatsd_actions(instance_id: InstanceId, actions: Vec); + async fn send_dogstatsd_actions(instance_id: InstanceId, actions: Vec>); /// Flushes any outstanding traces queued for sending. async fn flush_traces(); diff --git a/sidecar/src/service/sidecar_server.rs b/sidecar/src/service/sidecar_server.rs index e94119100..29877e277 100644 --- a/sidecar/src/service/sidecar_server.rs +++ b/sidecar/src/service/sidecar_server.rs @@ -844,7 +844,7 @@ impl SidecarInterface for SidecarServer { self, _: Context, instance_id: InstanceId, - actions: Vec, + actions: Vec>, ) -> Self::SendDogstatsdActionsFut { tokio::spawn(async move { self.get_session(&instance_id.session_id) From 2961334ff396cbcf9f3839c3f17bda07e2203df1 Mon Sep 17 00:00:00 2001 From: Andrew Glaude Date: Thu, 12 Sep 2024 15:20:24 -0400 Subject: [PATCH 12/35] simplify emitting stats in data-pipeline --- data-pipeline/src/trace_exporter.rs | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/data-pipeline/src/trace_exporter.rs b/data-pipeline/src/trace_exporter.rs index 0f072c741..7e9088291 100644 --- a/data-pipeline/src/trace_exporter.rs +++ b/data-pipeline/src/trace_exporter.rs @@ -288,25 +288,21 @@ impl TraceExporter { Ok(body) => Ok(String::from_utf8_lossy(&body).to_string()), Err(err) => { error!("Error reading agent response body: {err}"); - if let Some(flusher) = &self.dogstatsd { - flusher.send(vec![DogStatsDAction::Count( - STAT_SEND_ERRORS, - 1, - Vec::default(), - )]); - } + self.emit_stat(DogStatsDAction::Count( + STAT_SEND_ERRORS, + 1, + Vec::default(), + )); Ok(String::from("{}")) } }, Err(err) => { error!("Error sending traces: {err}"); - if let Some(flusher) = &self.dogstatsd { - flusher.send(vec![DogStatsDAction::Count( - STAT_SEND_ERRORS, - 1, - Vec::default(), - )]); - } + self.emit_stat(DogStatsDAction::Count( + STAT_SEND_ERRORS, + 1, + Vec::default(), + )); Ok(String::from("{}")) } } From b0b02564107ea068dd45ea46f9712d79db352ec2 Mon Sep 17 00:00:00 2001 From: Andrew Glaude Date: Thu, 12 Sep 2024 15:30:43 -0400 Subject: [PATCH 13/35] fix lint --- data-pipeline/src/trace_exporter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data-pipeline/src/trace_exporter.rs b/data-pipeline/src/trace_exporter.rs index 7e9088291..a03456bf1 100644 --- a/data-pipeline/src/trace_exporter.rs +++ b/data-pipeline/src/trace_exporter.rs @@ -549,7 +549,7 @@ mod tests { }], ]; let bytes = rmp_serde::to_vec_named(&traces).expect("failed to serialize static trace"); - let _result = exporter.send(&*bytes, 1).expect("failed to send trace"); + let _result = exporter.send(&bytes, 1).expect("failed to send trace"); fn read(socket: &net::UdpSocket) -> String { let mut buf = [0; 1_000]; From 2f659460a9d591ee10f309a38bde37a96d56468f Mon Sep 17 00:00:00 2001 From: Andrew Glaude Date: Fri, 13 Sep 2024 10:55:03 -0400 Subject: [PATCH 14/35] Remove copy of tags and pass to dogstatsd client by reference --- data-pipeline/src/trace_exporter.rs | 69 +++++++++++------------- dogstatsd-client/src/lib.rs | 46 ++++++++-------- sidecar/src/service/blocking.rs | 3 +- sidecar/src/service/sidecar_interface.rs | 6 ++- sidecar/src/service/sidecar_server.rs | 3 +- 5 files changed, 64 insertions(+), 63 deletions(-) diff --git a/data-pipeline/src/trace_exporter.rs b/data-pipeline/src/trace_exporter.rs index a03456bf1..de7308e8a 100644 --- a/data-pipeline/src/trace_exporter.rs +++ b/data-pipeline/src/trace_exporter.rs @@ -6,7 +6,8 @@ use datadog_trace_protobuf::pb; use datadog_trace_utils::trace_utils::{self, SendData, TracerHeaderTags}; use datadog_trace_utils::tracer_payload; use datadog_trace_utils::tracer_payload::TraceEncoding; -use ddcommon::{connector, Endpoint}; +use ddcommon::tag::Tag; +use ddcommon::{connector, tag, Endpoint}; use dogstatsd_client::{new_flusher, DogStatsDAction, Flusher}; use hyper::http::uri::PathAndQuery; use hyper::{Body, Client, Method, Uri}; @@ -115,6 +116,11 @@ impl<'a> From<&'a TracerTags> for HashMap<&'static str, String> { } } +enum HealthMetric { + Count(&'static str, i64), + //TODO: Add more DogStatsDAction as we need them +} + #[allow(missing_docs)] pub struct TraceExporter { endpoint: Endpoint, @@ -126,6 +132,7 @@ pub struct TraceExporter { runtime: Runtime, // None if dogstatsd is disabled dogstatsd: Option, + common_stats_tags: Vec, } impl TraceExporter { @@ -202,20 +209,20 @@ impl TraceExporter { }) .or_else(|err| { error!("Error sending traces: {err}"); - if let Some(flusher) = &self.dogstatsd { - flusher.send(vec![DogStatsDAction::Count( - STAT_SEND_ERRORS, - 1, - Vec::default(), - )]); - } + self.emit_metric(HealthMetric::Count(STAT_SEND_ERRORS, 1)); Ok(String::from("{}")) }) } - fn emit_stat(&self, action: DogStatsDAction<&'static str>) { + fn emit_metric(&self, metric: HealthMetric) { if let Some(flusher) = &self.dogstatsd { - flusher.send(vec![action]); + match metric { + HealthMetric::Count(name, c) => flusher.send(vec![DogStatsDAction::Count( + name, + c, + &self.common_stats_tags, + )]), + } } } @@ -226,11 +233,7 @@ impl TraceExporter { Ok(res) => res, Err(err) => { error!("Error deserializing trace from request body: {err}"); - self.emit_stat(DogStatsDAction::Count( - STAT_DESER_TRACES_ERRORS, - 1, - Vec::default(), - )); + self.emit_metric(HealthMetric::Count(STAT_DESER_TRACES_ERRORS, 1)); return Ok(String::from("{}")); } }; @@ -240,12 +243,7 @@ impl TraceExporter { return Ok(String::from("{}")); } - // todo: what tags to attach - self.emit_stat(DogStatsDAction::Count( - STAT_DESER_TRACES, - traces.len() as i64, - Vec::default(), - )); + self.emit_metric(HealthMetric::Count(STAT_DESER_TRACES, traces.len() as i64)); let header_tags: TracerHeaderTags<'_> = (&self.tags).into(); @@ -253,11 +251,7 @@ impl TraceExporter { TraceExporterOutputFormat::V04 => rmp_serde::to_vec_named(&traces) .map_err(|err| { error!("Error serializing traces: {err}"); - self.emit_stat(DogStatsDAction::Count( - STAT_SER_TRACES_ERRORS, - 1, - Vec::default(), - )); + self.emit_metric(HealthMetric::Count(STAT_SER_TRACES_ERRORS, 1)); String::from("{}") }) .and_then(|res| { @@ -288,21 +282,13 @@ impl TraceExporter { Ok(body) => Ok(String::from_utf8_lossy(&body).to_string()), Err(err) => { error!("Error reading agent response body: {err}"); - self.emit_stat(DogStatsDAction::Count( - STAT_SEND_ERRORS, - 1, - Vec::default(), - )); + self.emit_metric(HealthMetric::Count(STAT_SEND_ERRORS, 1)); Ok(String::from("{}")) } }, Err(err) => { error!("Error sending traces: {err}"); - self.emit_stat(DogStatsDAction::Count( - STAT_SEND_ERRORS, - 1, - Vec::default(), - )); + self.emit_metric(HealthMetric::Count(STAT_SEND_ERRORS, 1)); Ok(String::from("{}")) } } @@ -392,6 +378,8 @@ impl TraceExporterBuilder { // None }); + let libdatadog_version = tag!("libdatadog_version", env!("CARGO_PKG_VERSION")); + Ok(TraceExporter { endpoint: Endpoint::from_slice(self.url.as_deref().unwrap_or("http://127.0.0.1:8126")), tags: TracerTags { @@ -405,6 +393,7 @@ impl TraceExporterBuilder { _response_callback: self.response_callback, runtime, dogstatsd, + common_stats_tags: vec![libdatadog_version], }) } } @@ -557,8 +546,10 @@ mod tests { let datagram = String::from_utf8_lossy(buf.as_ref()); datagram.trim_matches(char::from(0)).to_string() } - - assert_eq!("datadog.libdatadog.deser_traces:2|c", read(&stats_socket)); - assert_eq!("datadog.libdatadog.send.errors:1|c", read(&stats_socket)); + // Compare with the start of the metric to avoid breaking this test when the version changes + assert!(&read(&stats_socket) + .starts_with("datadog.libdatadog.deser_traces:2|c|#libdatadog_version:")); + assert!(&read(&stats_socket) + .starts_with("datadog.libdatadog.send.errors:1|c|#libdatadog_version:")); } } diff --git a/dogstatsd-client/src/lib.rs b/dogstatsd-client/src/lib.rs index c2df9a0a8..0aac7cb59 100644 --- a/dogstatsd-client/src/lib.rs +++ b/dogstatsd-client/src/lib.rs @@ -24,16 +24,16 @@ const QUEUE_SIZE: usize = 32 * 1024; /// The `DogStatsDActionRef` enum gathers the metric types that can be sent to the DogStatsD server. #[derive(Debug, Serialize, Deserialize)] -pub enum DogStatsDAction> { +pub enum DogStatsDAction, V: AsRef<[Tag]>> { // TODO: instead of AsRef we can accept a marker Trait that users of this crate implement - Count(T, i64, Vec), - Distribution(T, f64, Vec), - Gauge(T, f64, Vec), - Histogram(T, f64, Vec), + Count(T, i64, V), + Distribution(T, f64, V), + Gauge(T, f64, V), + Histogram(T, f64, V), // Cadence only support i64 type as value // but Golang implementation uses string (https://github.com/DataDog/datadog-go/blob/331d24832f7eac97b091efd696278fe2c4192b29/statsd/statsd.go#L230) // and PHP implementation uses float or string (https://github.com/DataDog/php-datadogstatsd/blob/0efdd1c38f6d3dd407efbb899ad1fd2e5cd18085/src/DogStatsd.php#L251) - Set(T, i64, Vec), + Set(T, i64, V), } /// A dogstatsd-client that flushes stats to a given endpoint. @@ -67,7 +67,7 @@ impl Flusher { Ok(()) } - pub fn send>(&self, actions: Vec>) { + pub fn send, V: AsRef<[Tag]>>(&self, actions: Vec>) { if self.client.is_none() { return; } @@ -75,20 +75,24 @@ impl Flusher { for action in actions { if let Err(err) = match action { - DogStatsDAction::Count(metric, value, tags) => { - do_send(client.count_with_tags(metric.as_ref(), value), &tags) - } - DogStatsDAction::Distribution(metric, value, tags) => { - do_send(client.distribution_with_tags(metric.as_ref(), value), &tags) - } - DogStatsDAction::Gauge(metric, value, tags) => { - do_send(client.gauge_with_tags(metric.as_ref(), value), &tags) - } - DogStatsDAction::Histogram(metric, value, tags) => { - do_send(client.histogram_with_tags(metric.as_ref(), value), &tags) - } + DogStatsDAction::Count(metric, value, tags) => do_send( + client.count_with_tags(metric.as_ref(), value), + tags.as_ref(), + ), + DogStatsDAction::Distribution(metric, value, tags) => do_send( + client.distribution_with_tags(metric.as_ref(), value), + tags.as_ref(), + ), + DogStatsDAction::Gauge(metric, value, tags) => do_send( + client.gauge_with_tags(metric.as_ref(), value), + tags.as_ref(), + ), + DogStatsDAction::Histogram(metric, value, tags) => do_send( + client.histogram_with_tags(metric.as_ref(), value), + tags.as_ref(), + ), DogStatsDAction::Set(metric, value, tags) => { - do_send(client.set_with_tags(metric.as_ref(), value), &tags) + do_send(client.set_with_tags(metric.as_ref(), value), tags.as_ref()) } } { error!("Error while sending metric: {}", err); @@ -97,7 +101,7 @@ impl Flusher { } } -fn do_send<'a, T>(mut builder: MetricBuilder<'a, '_, T>, tags: &'a Vec) -> anyhow::Result<()> +fn do_send<'a, T>(mut builder: MetricBuilder<'a, '_, T>, tags: &'a [Tag]) -> anyhow::Result<()> where T: Metric + From, { diff --git a/sidecar/src/service/blocking.rs b/sidecar/src/service/blocking.rs index 67eaf9ddd..9d071572d 100644 --- a/sidecar/src/service/blocking.rs +++ b/sidecar/src/service/blocking.rs @@ -7,6 +7,7 @@ use super::{ }; use datadog_ipc::platform::{Channel, ShmHandle}; use datadog_ipc::transport::blocking::BlockingTransport; +use ddcommon::tag::Tag; use dogstatsd_client::DogStatsDAction; use std::sync::Mutex; use std::{ @@ -318,7 +319,7 @@ pub fn set_remote_config_data( pub fn send_dogstatsd_actions( transport: &mut SidecarTransport, instance_id: &InstanceId, - actions: Vec>, + actions: Vec>>, ) -> io::Result<()> { transport.send(SidecarInterfaceRequest::SendDogstatsdActions { instance_id: instance_id.clone(), diff --git a/sidecar/src/service/sidecar_interface.rs b/sidecar/src/service/sidecar_interface.rs index 0c38945ac..5b0a33520 100644 --- a/sidecar/src/service/sidecar_interface.rs +++ b/sidecar/src/service/sidecar_interface.rs @@ -8,6 +8,7 @@ use crate::service::{ use anyhow::Result; use datadog_ipc::platform::ShmHandle; use datadog_ipc::tarpc; +use ddcommon::tag::Tag; use dogstatsd_client::DogStatsDAction; // This is a bit weird, but depending on the OS we're interested in different things... @@ -133,7 +134,10 @@ pub trait SidecarInterface { /// /// * `instance_id` - The ID of the instance. /// * `actions` - The DogStatsD actions to send. - async fn send_dogstatsd_actions(instance_id: InstanceId, actions: Vec>); + async fn send_dogstatsd_actions( + instance_id: InstanceId, + actions: Vec>>, + ); /// Flushes any outstanding traces queued for sending. async fn flush_traces(); diff --git a/sidecar/src/service/sidecar_server.rs b/sidecar/src/service/sidecar_server.rs index 29877e277..13069ca68 100644 --- a/sidecar/src/service/sidecar_server.rs +++ b/sidecar/src/service/sidecar_server.rs @@ -18,6 +18,7 @@ use datadog_ipc::transport::Transport; use datadog_trace_utils::trace_utils::SendData; use datadog_trace_utils::tracer_payload; use datadog_trace_utils::tracer_payload::TraceEncoding; +use ddcommon::tag::Tag; use ddcommon::Endpoint; use ddtelemetry::worker::{ LifecycleAction, TelemetryActions, TelemetryWorkerBuilder, TelemetryWorkerStats, @@ -844,7 +845,7 @@ impl SidecarInterface for SidecarServer { self, _: Context, instance_id: InstanceId, - actions: Vec>, + actions: Vec>>, ) -> Self::SendDogstatsdActionsFut { tokio::spawn(async move { self.get_session(&instance_id.session_id) From 4d25dc2384c1acfe123846e4f94d3481cbbc1f99 Mon Sep 17 00:00:00 2001 From: Andrew Glaude Date: Fri, 13 Sep 2024 12:36:49 -0400 Subject: [PATCH 15/35] Update licenses --- LICENSE-3rdparty.yml | 2 +- data-pipeline/src/trace_exporter.rs | 1 + dogstatsd-client/src/lib.rs | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/LICENSE-3rdparty.yml b/LICENSE-3rdparty.yml index 57b32ae6b..659d895c9 100644 --- a/LICENSE-3rdparty.yml +++ b/LICENSE-3rdparty.yml @@ -1,4 +1,4 @@ -root_name: datadog-alloc, builder, build_common, datadog-profiling-ffi, data-pipeline-ffi, data-pipeline, datadog-ddsketch, datadog-trace-normalization, datadog-trace-protobuf, datadog-trace-utils, ddcommon, ddcommon-ffi, datadog-crashtracker-ffi, datadog-crashtracker, ddtelemetry, datadog-profiling, ddtelemetry-ffi, symbolizer-ffi, tools, datadog-profiling-replayer, dogstatsd, datadog-ipc, datadog-ipc-macros, tarpc, tarpc-plugins, spawn_worker, cc_utils, datadog-sidecar, datadog-remote-config, datadog-dynamic-configuration, datadog-sidecar-macros, datadog-sidecar-ffi, sidecar_mockgen, datadog-trace-obfuscation, test_spawn_from_lib, datadog-serverless-trace-mini-agent, datadog-trace-mini-agent, bin_tests +root_name: datadog-alloc, builder, build_common, datadog-profiling-ffi, data-pipeline-ffi, data-pipeline, datadog-ddsketch, datadog-trace-normalization, datadog-trace-protobuf, datadog-trace-utils, ddcommon, dogstatsd-client, ddcommon-ffi, datadog-crashtracker-ffi, datadog-crashtracker, ddtelemetry, datadog-profiling, ddtelemetry-ffi, symbolizer-ffi, tools, datadog-profiling-replayer, dogstatsd, datadog-ipc, datadog-ipc-macros, tarpc, tarpc-plugins, spawn_worker, cc_utils, datadog-sidecar, datadog-remote-config, datadog-dynamic-configuration, datadog-sidecar-macros, datadog-sidecar-ffi, sidecar_mockgen, datadog-trace-obfuscation, test_spawn_from_lib, datadog-serverless-trace-mini-agent, datadog-trace-mini-agent, bin_tests third_party_libraries: - package_name: addr2line package_version: 0.21.0 diff --git a/data-pipeline/src/trace_exporter.rs b/data-pipeline/src/trace_exporter.rs index de7308e8a..16e7fe7d4 100644 --- a/data-pipeline/src/trace_exporter.rs +++ b/data-pipeline/src/trace_exporter.rs @@ -214,6 +214,7 @@ impl TraceExporter { }) } + /// Emit a health metric to dogstatsd fn emit_metric(&self, metric: HealthMetric) { if let Some(flusher) = &self.dogstatsd { match metric { diff --git a/dogstatsd-client/src/lib.rs b/dogstatsd-client/src/lib.rs index 0aac7cb59..ca0f78c18 100644 --- a/dogstatsd-client/src/lib.rs +++ b/dogstatsd-client/src/lib.rs @@ -37,7 +37,7 @@ pub enum DogStatsDAction, V: AsRef<[Tag]>> { } /// A dogstatsd-client that flushes stats to a given endpoint. -/// The default value has no address, use `new_flusher` or `set_endpoint` to configure an endpoint. +/// The default value has no address and is thus disabled, use `new_flusher` or `set_endpoint` to configure an endpoint. #[derive(Default)] pub struct Flusher { client: Option, From 37fd00e34ffb6fe7d03e2b4ccff5ffe261fb1945 Mon Sep 17 00:00:00 2001 From: Andrew Glaude Date: Fri, 13 Sep 2024 15:18:56 -0400 Subject: [PATCH 16/35] Allow IntoIterator tags to support static and dynamic tags: WIP --- dogstatsd-client/src/lib.rs | 58 +++++++++++++++++++++---------------- 1 file changed, 33 insertions(+), 25 deletions(-) diff --git a/dogstatsd-client/src/lib.rs b/dogstatsd-client/src/lib.rs index ca0f78c18..943465b57 100644 --- a/dogstatsd-client/src/lib.rs +++ b/dogstatsd-client/src/lib.rs @@ -24,7 +24,7 @@ const QUEUE_SIZE: usize = 32 * 1024; /// The `DogStatsDActionRef` enum gathers the metric types that can be sent to the DogStatsD server. #[derive(Debug, Serialize, Deserialize)] -pub enum DogStatsDAction, V: AsRef<[Tag]>> { +pub enum DogStatsDAction<'a, T: AsRef, V: IntoIterator> { // TODO: instead of AsRef we can accept a marker Trait that users of this crate implement Count(T, i64, V), Distribution(T, f64, V), @@ -37,7 +37,8 @@ pub enum DogStatsDAction, V: AsRef<[Tag]>> { } /// A dogstatsd-client that flushes stats to a given endpoint. -/// The default value has no address and is thus disabled, use `new_flusher` or `set_endpoint` to configure an endpoint. +/// The default value has no address and is thus disabled, use `new_flusher` or `set_endpoint` to +/// configure an endpoint. #[derive(Default)] pub struct Flusher { client: Option, @@ -67,7 +68,10 @@ impl Flusher { Ok(()) } - pub fn send, V: AsRef<[Tag]>>(&self, actions: Vec>) { + pub fn send<'a, T: AsRef + 'a, V: IntoIterator>( + &self, + actions: Vec>, + ) { if self.client.is_none() { return; } @@ -75,24 +79,21 @@ impl Flusher { for action in actions { if let Err(err) = match action { - DogStatsDAction::Count(metric, value, tags) => do_send( - client.count_with_tags(metric.as_ref(), value), - tags.as_ref(), - ), - DogStatsDAction::Distribution(metric, value, tags) => do_send( - client.distribution_with_tags(metric.as_ref(), value), - tags.as_ref(), - ), - DogStatsDAction::Gauge(metric, value, tags) => do_send( - client.gauge_with_tags(metric.as_ref(), value), - tags.as_ref(), - ), - DogStatsDAction::Histogram(metric, value, tags) => do_send( - client.histogram_with_tags(metric.as_ref(), value), - tags.as_ref(), - ), + DogStatsDAction::Count(metric, value, tags) => { + let metric_builder = client.count_with_tags(metric.as_ref(), value); + do_send(metric_builder, tags) + } + DogStatsDAction::Distribution(metric, value, tags) => { + do_send(client.distribution_with_tags(metric.as_ref(), value), tags) + } + DogStatsDAction::Gauge(metric, value, tags) => { + do_send(client.gauge_with_tags(metric.as_ref(), value), tags) + } + DogStatsDAction::Histogram(metric, value, tags) => { + do_send(client.histogram_with_tags(metric.as_ref(), value), tags) + } DogStatsDAction::Set(metric, value, tags) => { - do_send(client.set_with_tags(metric.as_ref(), value), tags.as_ref()) + do_send(client.set_with_tags(metric.as_ref(), value), tags) } } { error!("Error while sending metric: {}", err); @@ -101,12 +102,19 @@ impl Flusher { } } -fn do_send<'a, T>(mut builder: MetricBuilder<'a, '_, T>, tags: &'a [Tag]) -> anyhow::Result<()> +fn do_send<'m, 't, T, V: IntoIterator>( + mut builder: MetricBuilder<'m, '_, T>, + tags: V, +) -> anyhow::Result<()> where T: Metric + From, + 't: 'm, { - for tag in tags { - builder = builder.with_tag_value(tag.as_ref()); + let mut tags_iter = tags.into_iter(); + let mut tag_opt = tags_iter.next(); + while tag_opt.is_some() { + builder = builder.with_tag_value(tag_opt.unwrap().as_ref()); + tag_opt = tags_iter.next(); } builder.try_send()?; Ok(()) @@ -174,12 +182,12 @@ mod test { socket.local_addr().unwrap().to_string().as_str(), )); flusher.send(vec![ - Count("test_count", 3, vec![tag!("foo", "bar")]), + Count("test_count", 3, vec![&tag!("foo", "bar")]), Count("test_neg_count", -2, vec![]), Distribution("test_distribution", 4.2, vec![]), Gauge("test_gauge", 7.6, vec![]), Histogram("test_histogram", 8.0, vec![]), - Set("test_set", 9, vec![tag!("the", "end")]), + Set("test_set", 9, vec![&tag!("the", "end")]), Set("test_neg_set", -1, vec![]), ]); From ad04e277485eaf29f92fcfb92c2546862c991d8d Mon Sep 17 00:00:00 2001 From: Bob Weinand Date: Tue, 17 Sep 2024 17:06:22 +0200 Subject: [PATCH 17/35] Fix lifetimes of DogstatsDAction Signed-off-by: Bob Weinand --- dogstatsd-client/src/lib.rs | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/dogstatsd-client/src/lib.rs b/dogstatsd-client/src/lib.rs index 943465b57..e5e1cbb83 100644 --- a/dogstatsd-client/src/lib.rs +++ b/dogstatsd-client/src/lib.rs @@ -24,7 +24,10 @@ const QUEUE_SIZE: usize = 32 * 1024; /// The `DogStatsDActionRef` enum gathers the metric types that can be sent to the DogStatsD server. #[derive(Debug, Serialize, Deserialize)] -pub enum DogStatsDAction<'a, T: AsRef, V: IntoIterator> { +pub enum DogStatsDAction, V> +where + for<'a> &'a V: IntoIterator, +{ // TODO: instead of AsRef we can accept a marker Trait that users of this crate implement Count(T, i64, V), Distribution(T, f64, V), @@ -68,10 +71,10 @@ impl Flusher { Ok(()) } - pub fn send<'a, T: AsRef + 'a, V: IntoIterator>( - &self, - actions: Vec>, - ) { + pub fn send, V>(&self, actions: Vec>) + where + for<'a> &'a V: IntoIterator, + { if self.client.is_none() { return; } @@ -79,20 +82,20 @@ impl Flusher { for action in actions { if let Err(err) = match action { - DogStatsDAction::Count(metric, value, tags) => { + DogStatsDAction::Count(metric, value, ref tags) => { let metric_builder = client.count_with_tags(metric.as_ref(), value); do_send(metric_builder, tags) } - DogStatsDAction::Distribution(metric, value, tags) => { + DogStatsDAction::Distribution(metric, value, ref tags) => { do_send(client.distribution_with_tags(metric.as_ref(), value), tags) } - DogStatsDAction::Gauge(metric, value, tags) => { + DogStatsDAction::Gauge(metric, value, ref tags) => { do_send(client.gauge_with_tags(metric.as_ref(), value), tags) } - DogStatsDAction::Histogram(metric, value, tags) => { + DogStatsDAction::Histogram(metric, value, ref tags) => { do_send(client.histogram_with_tags(metric.as_ref(), value), tags) } - DogStatsDAction::Set(metric, value, tags) => { + DogStatsDAction::Set(metric, value, ref tags) => { do_send(client.set_with_tags(metric.as_ref(), value), tags) } } { From d30df0675885980fbc91ff51ecf9194c1d38410c Mon Sep 17 00:00:00 2001 From: Bob Weinand Date: Tue, 17 Sep 2024 17:50:26 +0200 Subject: [PATCH 18/35] Actually also allow &Vec and &[Tag] Signed-off-by: Bob Weinand --- dogstatsd-client/src/lib.rs | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/dogstatsd-client/src/lib.rs b/dogstatsd-client/src/lib.rs index e5e1cbb83..70714dd20 100644 --- a/dogstatsd-client/src/lib.rs +++ b/dogstatsd-client/src/lib.rs @@ -15,7 +15,6 @@ use cadence::{Metric, MetricBuilder, QueuingMetricSink, StatsdClient, UdpMetricS #[cfg(unix)] use ddcommon::connector::uds::socket_path_from_uri; use std::net::{ToSocketAddrs, UdpSocket}; - #[cfg(unix)] use std::os::unix::net::UnixDatagram; @@ -24,9 +23,9 @@ const QUEUE_SIZE: usize = 32 * 1024; /// The `DogStatsDActionRef` enum gathers the metric types that can be sent to the DogStatsD server. #[derive(Debug, Serialize, Deserialize)] -pub enum DogStatsDAction, V> +pub enum DogStatsDAction, V: std::ops::Deref> where - for<'a> &'a V: IntoIterator, + for<'a> &'a ::Target: IntoIterator, { // TODO: instead of AsRef we can accept a marker Trait that users of this crate implement Count(T, i64, V), @@ -71,9 +70,9 @@ impl Flusher { Ok(()) } - pub fn send, V>(&self, actions: Vec>) + pub fn send, V: std::ops::Deref>(&self, actions: Vec>) where - for<'a> &'a V: IntoIterator, + for<'a> &'a ::Target: IntoIterator, { if self.client.is_none() { return; @@ -84,19 +83,19 @@ impl Flusher { if let Err(err) = match action { DogStatsDAction::Count(metric, value, ref tags) => { let metric_builder = client.count_with_tags(metric.as_ref(), value); - do_send(metric_builder, tags) + do_send(metric_builder, tags.deref()) } DogStatsDAction::Distribution(metric, value, ref tags) => { - do_send(client.distribution_with_tags(metric.as_ref(), value), tags) + do_send(client.distribution_with_tags(metric.as_ref(), value), tags.deref()) } DogStatsDAction::Gauge(metric, value, ref tags) => { - do_send(client.gauge_with_tags(metric.as_ref(), value), tags) + do_send(client.gauge_with_tags(metric.as_ref(), value), tags.deref()) } DogStatsDAction::Histogram(metric, value, ref tags) => { - do_send(client.histogram_with_tags(metric.as_ref(), value), tags) + do_send(client.histogram_with_tags(metric.as_ref(), value), tags.deref()) } DogStatsDAction::Set(metric, value, ref tags) => { - do_send(client.set_with_tags(metric.as_ref(), value), tags) + do_send(client.set_with_tags(metric.as_ref(), value), tags.deref()) } } { error!("Error while sending metric: {}", err); @@ -185,12 +184,12 @@ mod test { socket.local_addr().unwrap().to_string().as_str(), )); flusher.send(vec![ - Count("test_count", 3, vec![&tag!("foo", "bar")]), + Count("test_count", 3, vec![tag!("foo", "bar")]), Count("test_neg_count", -2, vec![]), Distribution("test_distribution", 4.2, vec![]), Gauge("test_gauge", 7.6, vec![]), Histogram("test_histogram", 8.0, vec![]), - Set("test_set", 9, vec![&tag!("the", "end")]), + Set("test_set", 9, vec![tag!("the", "end")]), Set("test_neg_set", -1, vec![]), ]); From 242224c69a0c8964553757ab530c843fb41515af Mon Sep 17 00:00:00 2001 From: Andrew Glaude Date: Tue, 17 Sep 2024 14:12:36 -0400 Subject: [PATCH 19/35] cargo fmt --- dogstatsd-client/src/lib.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/dogstatsd-client/src/lib.rs b/dogstatsd-client/src/lib.rs index 70714dd20..1b68c0e17 100644 --- a/dogstatsd-client/src/lib.rs +++ b/dogstatsd-client/src/lib.rs @@ -85,15 +85,17 @@ impl Flusher { let metric_builder = client.count_with_tags(metric.as_ref(), value); do_send(metric_builder, tags.deref()) } - DogStatsDAction::Distribution(metric, value, ref tags) => { - do_send(client.distribution_with_tags(metric.as_ref(), value), tags.deref()) - } + DogStatsDAction::Distribution(metric, value, ref tags) => do_send( + client.distribution_with_tags(metric.as_ref(), value), + tags.deref(), + ), DogStatsDAction::Gauge(metric, value, ref tags) => { do_send(client.gauge_with_tags(metric.as_ref(), value), tags.deref()) } - DogStatsDAction::Histogram(metric, value, ref tags) => { - do_send(client.histogram_with_tags(metric.as_ref(), value), tags.deref()) - } + DogStatsDAction::Histogram(metric, value, ref tags) => do_send( + client.histogram_with_tags(metric.as_ref(), value), + tags.deref(), + ), DogStatsDAction::Set(metric, value, ref tags) => { do_send(client.set_with_tags(metric.as_ref(), value), tags.deref()) } From 93f6cb6780a857d173e7f340d08be6e3819f5216 Mon Sep 17 00:00:00 2001 From: Andrew Glaude Date: Tue, 17 Sep 2024 14:28:55 -0400 Subject: [PATCH 20/35] disable cargo bench in dogstatd-client, we use criterion --- dogstatsd-client/Cargo.toml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dogstatsd-client/Cargo.toml b/dogstatsd-client/Cargo.toml index 5be229170..ba6164d72 100644 --- a/dogstatsd-client/Cargo.toml +++ b/dogstatsd-client/Cargo.toml @@ -5,6 +5,9 @@ edition.workspace = true version.workspace = true license.workspace = true +[lib] +bench = false + [dependencies] ddcommon = { path = "../ddcommon" } datadog-trace-protobuf = { path = "../trace-protobuf" } From 8fecb9a56bea44312c3659b2fd4cdac5dd42e28b Mon Sep 17 00:00:00 2001 From: Andrew Glaude Date: Tue, 17 Sep 2024 15:01:12 -0400 Subject: [PATCH 21/35] Try using iterators in data-pipeline for metrics --- data-pipeline/src/trace_exporter.rs | 44 +++++++++++++++++++---------- 1 file changed, 29 insertions(+), 15 deletions(-) diff --git a/data-pipeline/src/trace_exporter.rs b/data-pipeline/src/trace_exporter.rs index 16e7fe7d4..b61258e25 100644 --- a/data-pipeline/src/trace_exporter.rs +++ b/data-pipeline/src/trace_exporter.rs @@ -195,35 +195,49 @@ impl TraceExporter { let body_bytes = hyper::body::to_bytes(response.into_body()).await?; let response_body = String::from_utf8(body_bytes.to_vec()).unwrap_or_default(); + self.emit_metric(HealthMetric::Count(STAT_SEND_ERRORS, 1), None); anyhow::bail!("Agent did not accept traces: {response_body}"); } match hyper::body::to_bytes(response.into_body()).await { Ok(body) => Ok(String::from_utf8_lossy(&body).to_string()), Err(err) => { + self.emit_metric(HealthMetric::Count(STAT_SEND_ERRORS, 1), None); anyhow::bail!("Error reading agent response body: {err}"); } } } - Err(err) => anyhow::bail!("Failed to send traces: {err}"), + Err(err) => { + self.emit_metric(HealthMetric::Count(STAT_SEND_ERRORS, 1), None); + anyhow::bail!("Failed to send traces: {err}") + }, } }) .or_else(|err| { error!("Error sending traces: {err}"); - self.emit_metric(HealthMetric::Count(STAT_SEND_ERRORS, 1)); Ok(String::from("{}")) }) } /// Emit a health metric to dogstatsd - fn emit_metric(&self, metric: HealthMetric) { + fn emit_metric(&self, metric: HealthMetric, custom_tags: Option>) { if let Some(flusher) = &self.dogstatsd { - match metric { - HealthMetric::Count(name, c) => flusher.send(vec![DogStatsDAction::Count( - name, - c, - &self.common_stats_tags, - )]), - } + if custom_tags.is_some() { + match metric { + HealthMetric::Count(name, c) => flusher.send(vec![DogStatsDAction::Count( + name, + c, + &self.common_stats_tags.iter().chain(&custom_tags.unwrap()), + )]), + } + } else { + match metric { + HealthMetric::Count(name, c) => flusher.send(vec![DogStatsDAction::Count( + name, + c, + &self.common_stats_tags, + )]), + } + }; } } @@ -234,7 +248,7 @@ impl TraceExporter { Ok(res) => res, Err(err) => { error!("Error deserializing trace from request body: {err}"); - self.emit_metric(HealthMetric::Count(STAT_DESER_TRACES_ERRORS, 1)); + self.emit_metric(HealthMetric::Count(STAT_DESER_TRACES_ERRORS, 1), None); return Ok(String::from("{}")); } }; @@ -244,7 +258,7 @@ impl TraceExporter { return Ok(String::from("{}")); } - self.emit_metric(HealthMetric::Count(STAT_DESER_TRACES, traces.len() as i64)); + self.emit_metric(HealthMetric::Count(STAT_DESER_TRACES, traces.len() as i64), None); let header_tags: TracerHeaderTags<'_> = (&self.tags).into(); @@ -252,7 +266,7 @@ impl TraceExporter { TraceExporterOutputFormat::V04 => rmp_serde::to_vec_named(&traces) .map_err(|err| { error!("Error serializing traces: {err}"); - self.emit_metric(HealthMetric::Count(STAT_SER_TRACES_ERRORS, 1)); + self.emit_metric(HealthMetric::Count(STAT_SER_TRACES_ERRORS, 1), None); String::from("{}") }) .and_then(|res| { @@ -283,13 +297,13 @@ impl TraceExporter { Ok(body) => Ok(String::from_utf8_lossy(&body).to_string()), Err(err) => { error!("Error reading agent response body: {err}"); - self.emit_metric(HealthMetric::Count(STAT_SEND_ERRORS, 1)); + self.emit_metric(HealthMetric::Count(STAT_SEND_ERRORS, 1), None); Ok(String::from("{}")) } }, Err(err) => { error!("Error sending traces: {err}"); - self.emit_metric(HealthMetric::Count(STAT_SEND_ERRORS, 1)); + self.emit_metric(HealthMetric::Count(STAT_SEND_ERRORS, 1), None); Ok(String::from("{}")) } } From 1141243b8f0ecebb9feb73b9c257ecc02a1ce8f4 Mon Sep 17 00:00:00 2001 From: Andrew Glaude Date: Wed, 18 Sep 2024 09:35:55 -0400 Subject: [PATCH 22/35] bump ver --- Cargo.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index bc5f2dcd5..d109dac17 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1993,7 +1993,7 @@ dependencies = [ [[package]] name = "dogstatsd-client" -version = "12.0.0" +version = "13.0.0" dependencies = [ "anyhow", "cadence", From ab002f5cb5f118f3721ed4fea14de4086a34d787 Mon Sep 17 00:00:00 2001 From: Andrew Glaude Date: Wed, 18 Sep 2024 09:36:13 -0400 Subject: [PATCH 23/35] more metrics --- data-pipeline/src/trace_exporter.rs | 155 +++++++++++++++++++++------- dogstatsd-client/src/lib.rs | 14 ++- 2 files changed, 121 insertions(+), 48 deletions(-) diff --git a/data-pipeline/src/trace_exporter.rs b/data-pipeline/src/trace_exporter.rs index b61258e25..d3789a63d 100644 --- a/data-pipeline/src/trace_exporter.rs +++ b/data-pipeline/src/trace_exporter.rs @@ -10,7 +10,7 @@ use ddcommon::tag::Tag; use ddcommon::{connector, tag, Endpoint}; use dogstatsd_client::{new_flusher, DogStatsDAction, Flusher}; use hyper::http::uri::PathAndQuery; -use hyper::{Body, Client, Method, Uri}; +use hyper::{http, Body, Client, Method, Uri}; use log::error; use std::{borrow::Borrow, collections::HashMap, str::FromStr}; use tokio::runtime::Runtime; @@ -41,7 +41,8 @@ pub enum TraceExporterOutputFormat { } // internal health metrics -const STAT_SEND_ERRORS: &str = "datadog.libdatadog.send.errors"; +const STAT_SEND_TRACES: &str = "datadog.libdatadog.send.traces"; +const STAT_SEND_TRACES_ERRORS: &str = "datadog.libdatadog.send.traces.errors"; const STAT_DESER_TRACES: &str = "datadog.libdatadog.deser_traces"; const STAT_DESER_TRACES_ERRORS: &str = "datadog.libdatadog.deser_traces.errors"; const STAT_SER_TRACES_ERRORS: &str = "datadog.libdatadog.ser_traces.errors"; @@ -191,25 +192,40 @@ impl TraceExporter { .await { Ok(response) => { - if response.status() != 200 { + let response_status = response.status(); + if response_status != http::StatusCode::OK { let body_bytes = hyper::body::to_bytes(response.into_body()).await?; let response_body = String::from_utf8(body_bytes.to_vec()).unwrap_or_default(); - self.emit_metric(HealthMetric::Count(STAT_SEND_ERRORS, 1), None); + self.emit_metric( + HealthMetric::Count(STAT_SEND_TRACES_ERRORS, 1), + Some(vec![ + Tag::new("response_code", response_status.as_str()).unwrap() + ]), + ); anyhow::bail!("Agent did not accept traces: {response_body}"); } match hyper::body::to_bytes(response.into_body()).await { - Ok(body) => Ok(String::from_utf8_lossy(&body).to_string()), + Ok(body) => { + self.emit_metric( + HealthMetric::Count(STAT_SEND_TRACES, trace_count as i64), + None, + ); + Ok(String::from_utf8_lossy(&body).to_string()) + } Err(err) => { - self.emit_metric(HealthMetric::Count(STAT_SEND_ERRORS, 1), None); + self.emit_metric( + HealthMetric::Count(STAT_SEND_TRACES_ERRORS, 1), + None, + ); anyhow::bail!("Error reading agent response body: {err}"); } } } Err(err) => { - self.emit_metric(HealthMetric::Count(STAT_SEND_ERRORS, 1), None); + self.emit_metric(HealthMetric::Count(STAT_SEND_TRACES_ERRORS, 1), None); anyhow::bail!("Failed to send traces: {err}") - }, + } } }) .or_else(|err| { @@ -223,11 +239,15 @@ impl TraceExporter { if let Some(flusher) = &self.dogstatsd { if custom_tags.is_some() { match metric { - HealthMetric::Count(name, c) => flusher.send(vec![DogStatsDAction::Count( - name, - c, - &self.common_stats_tags.iter().chain(&custom_tags.unwrap()), - )]), + HealthMetric::Count(name, c) => { + let tags = self + .common_stats_tags + .clone() // TODO: how the heck do we get around needing a clone here... :( + .into_iter() + .chain(custom_tags.unwrap()) + .collect::>(); + flusher.send(vec![DogStatsDAction::Count(name, c, &tags)]) + } } } else { match metric { @@ -258,7 +278,10 @@ impl TraceExporter { return Ok(String::from("{}")); } - self.emit_metric(HealthMetric::Count(STAT_DESER_TRACES, traces.len() as i64), None); + self.emit_metric( + HealthMetric::Count(STAT_DESER_TRACES, traces.len() as i64), + None, + ); let header_tags: TracerHeaderTags<'_> = (&self.tags).into(); @@ -297,13 +320,16 @@ impl TraceExporter { Ok(body) => Ok(String::from_utf8_lossy(&body).to_string()), Err(err) => { error!("Error reading agent response body: {err}"); - self.emit_metric(HealthMetric::Count(STAT_SEND_ERRORS, 1), None); + self.emit_metric( + HealthMetric::Count(STAT_SEND_TRACES_ERRORS, 1), + None, + ); Ok(String::from("{}")) } }, Err(err) => { error!("Error sending traces: {err}"); - self.emit_metric(HealthMetric::Count(STAT_SEND_ERRORS, 1), None); + self.emit_metric(HealthMetric::Count(STAT_SEND_TRACES_ERRORS, 1), None); Ok(String::from("{}")) } } @@ -423,7 +449,6 @@ pub trait ResponseCallback { mod tests { use super::*; use httpmock::prelude::*; - use std::collections::HashMap; use std::net; use std::time::Duration; @@ -517,6 +542,25 @@ mod tests { ); } + fn read(socket: &net::UdpSocket) -> String { + let mut buf = [0; 1_000]; + socket.recv(&mut buf).expect("No data"); + let datagram = String::from_utf8_lossy(buf.as_ref()); + datagram.trim_matches(char::from(0)).to_string() + } + + fn build_test_exporter(url: String, dogstatsd_url: String) -> TraceExporter { + TraceExporterBuilder::default() + .set_url(&url) + .set_dogstatsd_url(&dogstatsd_url) + .set_tracer_version("v0.1") + .set_language("nodejs") + .set_language_version("1.0") + .set_language_interpreter("v8") + .build() + .unwrap() + } + #[test] #[cfg_attr(miri, ignore)] fn health_metrics() { @@ -524,23 +568,16 @@ mod tests { let _ = stats_socket.set_read_timeout(Some(Duration::from_millis(500))); let fake_agent = MockServer::start(); - let _mock_traces = fake_agent.mock(|when, then| { - when.method(GET).path("/v0.4/traces"); + let _mock_traces = fake_agent.mock(|_, then| { then.status(200) .header("content-type", "application/json") .body("{}"); }); - let builder = TraceExporterBuilder::default(); - let exporter = builder - .set_url(&fake_agent.url("/v0.4/traces")) - .set_dogstatsd_url(&stats_socket.local_addr().unwrap().to_string()) - .set_tracer_version("v0.1") - .set_language("nodejs") - .set_language_version("1.0") - .set_language_interpreter("v8") - .build() - .unwrap(); + let exporter = build_test_exporter( + fake_agent.url("/v0.4/traces"), + stats_socket.local_addr().unwrap().to_string(), + ); let traces: Vec> = vec![ vec![pb::Span { @@ -555,16 +592,54 @@ mod tests { let bytes = rmp_serde::to_vec_named(&traces).expect("failed to serialize static trace"); let _result = exporter.send(&bytes, 1).expect("failed to send trace"); - fn read(socket: &net::UdpSocket) -> String { - let mut buf = [0; 1_000]; - socket.recv(&mut buf).expect("No data"); - let datagram = String::from_utf8_lossy(buf.as_ref()); - datagram.trim_matches(char::from(0)).to_string() - } - // Compare with the start of the metric to avoid breaking this test when the version changes - assert!(&read(&stats_socket) - .starts_with("datadog.libdatadog.deser_traces:2|c|#libdatadog_version:")); - assert!(&read(&stats_socket) - .starts_with("datadog.libdatadog.send.errors:1|c|#libdatadog_version:")); + assert_eq!( + &format!( + "datadog.libdatadog.deser_traces:2|c|#libdatadog_version:{}", + env!("CARGO_PKG_VERSION") + ), + &read(&stats_socket) + ); + assert_eq!( + &format!( + "datadog.libdatadog.send.traces:2|c|#libdatadog_version:{}", + env!("CARGO_PKG_VERSION") + ), + &read(&stats_socket) + ); + } + + #[test] + #[cfg_attr(miri, ignore)] + fn health_metrics_error() { + let stats_socket = net::UdpSocket::bind("127.0.0.1:0").expect("failed to bind host socket"); + let _ = stats_socket.set_read_timeout(Some(Duration::from_millis(500))); + + let fake_agent = MockServer::start(); + let _mock_traces = fake_agent.mock(|_, then| { + then.status(400) + .header("content-type", "application/json") + .body("{}"); + }); + + let exporter = build_test_exporter( + fake_agent.url("/v0.4/traces"), + stats_socket.local_addr().unwrap().to_string(), + ); + + let traces: Vec> = vec![vec![pb::Span { + name: "test".to_string(), + ..Default::default() + }]]; + let bytes = rmp_serde::to_vec_named(&traces).expect("failed to serialize static trace"); + let _result = exporter.send(&bytes, 1).expect("failed to send trace"); + + assert_eq!( + &format!( + "datadog.libdatadog.deser_traces:1|c|#libdatadog_version:{}", + env!("CARGO_PKG_VERSION") + ), + &read(&stats_socket) + ); + assert_eq!(&format!("datadog.libdatadog.send.traces.errors:1|c|#libdatadog_version:{},response_code:400", env!("CARGO_PKG_VERSION")), &read(&stats_socket)); } } diff --git a/dogstatsd-client/src/lib.rs b/dogstatsd-client/src/lib.rs index 1b68c0e17..70714dd20 100644 --- a/dogstatsd-client/src/lib.rs +++ b/dogstatsd-client/src/lib.rs @@ -85,17 +85,15 @@ impl Flusher { let metric_builder = client.count_with_tags(metric.as_ref(), value); do_send(metric_builder, tags.deref()) } - DogStatsDAction::Distribution(metric, value, ref tags) => do_send( - client.distribution_with_tags(metric.as_ref(), value), - tags.deref(), - ), + DogStatsDAction::Distribution(metric, value, ref tags) => { + do_send(client.distribution_with_tags(metric.as_ref(), value), tags.deref()) + } DogStatsDAction::Gauge(metric, value, ref tags) => { do_send(client.gauge_with_tags(metric.as_ref(), value), tags.deref()) } - DogStatsDAction::Histogram(metric, value, ref tags) => do_send( - client.histogram_with_tags(metric.as_ref(), value), - tags.deref(), - ), + DogStatsDAction::Histogram(metric, value, ref tags) => { + do_send(client.histogram_with_tags(metric.as_ref(), value), tags.deref()) + } DogStatsDAction::Set(metric, value, ref tags) => { do_send(client.set_with_tags(metric.as_ref(), value), tags.deref()) } From aef310a79e04f16f109aaecc2c0e06f85e634c9b Mon Sep 17 00:00:00 2001 From: Andrew Glaude Date: Wed, 18 Sep 2024 10:27:05 -0400 Subject: [PATCH 24/35] Split dogstatsdaction type to simplify code --- dogstatsd-client/src/lib.rs | 97 +++++++++++++++++------- sidecar/src/service/blocking.rs | 4 +- sidecar/src/service/sidecar_interface.rs | 8 +- sidecar/src/service/sidecar_server.rs | 6 +- 4 files changed, 77 insertions(+), 38 deletions(-) diff --git a/dogstatsd-client/src/lib.rs b/dogstatsd-client/src/lib.rs index 70714dd20..548239741 100644 --- a/dogstatsd-client/src/lib.rs +++ b/dogstatsd-client/src/lib.rs @@ -21,12 +21,26 @@ use std::os::unix::net::UnixDatagram; // Queue with a maximum capacity of 32K elements const QUEUE_SIZE: usize = 32 * 1024; -/// The `DogStatsDActionRef` enum gathers the metric types that can be sent to the DogStatsD server. +/// The `DogStatsDActionOwned` enum gathers the metric types that can be sent to the DogStatsD +/// server. This type takes ownership of the relevant data to support the sidecar better +/// TODO: writeup why combining these types is FRAUGHT #[derive(Debug, Serialize, Deserialize)] -pub enum DogStatsDAction, V: std::ops::Deref> -where - for<'a> &'a ::Target: IntoIterator, -{ +pub enum DogStatsDActionOwned { + Count(String, i64, Vec), + Distribution(String, f64, Vec), + Gauge(String, f64, Vec), + Histogram(String, f64, Vec), + // Cadence only support i64 type as value + // but Golang implementation uses string (https://github.com/DataDog/datadog-go/blob/331d24832f7eac97b091efd696278fe2c4192b29/statsd/statsd.go#L230) + // and PHP implementation uses float or string (https://github.com/DataDog/php-datadogstatsd/blob/0efdd1c38f6d3dd407efbb899ad1fd2e5cd18085/src/DogStatsd.php#L251) + Set(String, i64, Vec), +} + +// TODO: is there a way to make sure both of these stay in sync easily? + +/// The `DogStatsDAction` enum gathers the metric types that can be sent to the DogStatsD server. +#[derive(Debug, Serialize, Deserialize)] +pub enum DogStatsDAction<'a, T: AsRef, V: IntoIterator> { // TODO: instead of AsRef we can accept a marker Trait that users of this crate implement Count(T, i64, V), Distribution(T, f64, V), @@ -70,10 +84,39 @@ impl Flusher { Ok(()) } - pub fn send, V: std::ops::Deref>(&self, actions: Vec>) - where - for<'a> &'a ::Target: IntoIterator, - { + pub fn send_owned(&self, actions: Vec) { + if self.client.is_none() { + return; + } + let client = self.client.as_ref().unwrap(); + + for action in actions { + if let Err(err) = match action { + DogStatsDActionOwned::Count(metric, value, tags) => { + do_send(client.count_with_tags(metric.as_ref(), value), &tags) + } + DogStatsDActionOwned::Distribution(metric, value, tags) => { + do_send(client.distribution_with_tags(metric.as_ref(), value), &tags) + } + DogStatsDActionOwned::Gauge(metric, value, tags) => { + do_send(client.gauge_with_tags(metric.as_ref(), value), &tags) + } + DogStatsDActionOwned::Histogram(metric, value, tags) => { + do_send(client.histogram_with_tags(metric.as_ref(), value), &tags) + } + DogStatsDActionOwned::Set(metric, value, tags) => { + do_send(client.set_with_tags(metric.as_ref(), value), &tags) + } + } { + error!("Error while sending metric: {}", err); + } + } + } + + pub fn send<'a, T: AsRef, V: IntoIterator>( + &self, + actions: Vec>, + ) { if self.client.is_none() { return; } @@ -81,21 +124,21 @@ impl Flusher { for action in actions { if let Err(err) = match action { - DogStatsDAction::Count(metric, value, ref tags) => { + DogStatsDAction::Count(metric, value, tags) => { let metric_builder = client.count_with_tags(metric.as_ref(), value); - do_send(metric_builder, tags.deref()) + do_send(metric_builder, tags) } - DogStatsDAction::Distribution(metric, value, ref tags) => { - do_send(client.distribution_with_tags(metric.as_ref(), value), tags.deref()) + DogStatsDAction::Distribution(metric, value, tags) => { + do_send(client.distribution_with_tags(metric.as_ref(), value), tags) } - DogStatsDAction::Gauge(metric, value, ref tags) => { - do_send(client.gauge_with_tags(metric.as_ref(), value), tags.deref()) + DogStatsDAction::Gauge(metric, value, tags) => { + do_send(client.gauge_with_tags(metric.as_ref(), value), tags) } - DogStatsDAction::Histogram(metric, value, ref tags) => { - do_send(client.histogram_with_tags(metric.as_ref(), value), tags.deref()) + DogStatsDAction::Histogram(metric, value, tags) => { + do_send(client.histogram_with_tags(metric.as_ref(), value), tags) } - DogStatsDAction::Set(metric, value, ref tags) => { - do_send(client.set_with_tags(metric.as_ref(), value), tags.deref()) + DogStatsDAction::Set(metric, value, tags) => { + do_send(client.set_with_tags(metric.as_ref(), value), tags) } } { error!("Error while sending metric: {}", err); @@ -164,7 +207,7 @@ fn create_client(endpoint: &Endpoint) -> anyhow::Result { #[cfg(test)] mod test { use crate::DogStatsDAction::{Count, Distribution, Gauge, Histogram, Set}; - use crate::{create_client, Flusher}; + use crate::{create_client, DogStatsDActionOwned, Flusher}; #[cfg(unix)] use ddcommon::connector::uds::socket_path_to_uri; use ddcommon::{tag, Endpoint}; @@ -184,13 +227,13 @@ mod test { socket.local_addr().unwrap().to_string().as_str(), )); flusher.send(vec![ - Count("test_count", 3, vec![tag!("foo", "bar")]), - Count("test_neg_count", -2, vec![]), - Distribution("test_distribution", 4.2, vec![]), - Gauge("test_gauge", 7.6, vec![]), - Histogram("test_histogram", 8.0, vec![]), - Set("test_set", 9, vec![tag!("the", "end")]), - Set("test_neg_set", -1, vec![]), + Count("test_count", 3, &vec![tag!("foo", "bar")]), + Count("test_neg_count", -2, &vec![]), + Distribution("test_distribution", 4.2, &vec![]), + Gauge("test_gauge", 7.6, &vec![]), + Histogram("test_histogram", 8.0, &vec![]), + Set("test_set", 9, &vec![tag!("the", "end")]), + Set("test_neg_set", -1, &vec![]), ]); fn read(socket: &net::UdpSocket) -> String { diff --git a/sidecar/src/service/blocking.rs b/sidecar/src/service/blocking.rs index 9d071572d..1d2f6a356 100644 --- a/sidecar/src/service/blocking.rs +++ b/sidecar/src/service/blocking.rs @@ -8,7 +8,7 @@ use super::{ use datadog_ipc::platform::{Channel, ShmHandle}; use datadog_ipc::transport::blocking::BlockingTransport; use ddcommon::tag::Tag; -use dogstatsd_client::DogStatsDAction; +use dogstatsd_client::DogStatsDActionOwned; use std::sync::Mutex; use std::{ borrow::Cow, @@ -319,7 +319,7 @@ pub fn set_remote_config_data( pub fn send_dogstatsd_actions( transport: &mut SidecarTransport, instance_id: &InstanceId, - actions: Vec>>, + actions: Vec, ) -> io::Result<()> { transport.send(SidecarInterfaceRequest::SendDogstatsdActions { instance_id: instance_id.clone(), diff --git a/sidecar/src/service/sidecar_interface.rs b/sidecar/src/service/sidecar_interface.rs index 5b0a33520..dc198d67d 100644 --- a/sidecar/src/service/sidecar_interface.rs +++ b/sidecar/src/service/sidecar_interface.rs @@ -8,8 +8,7 @@ use crate::service::{ use anyhow::Result; use datadog_ipc::platform::ShmHandle; use datadog_ipc::tarpc; -use ddcommon::tag::Tag; -use dogstatsd_client::DogStatsDAction; +use dogstatsd_client::DogStatsDActionOwned; // This is a bit weird, but depending on the OS we're interested in different things... // and the macro expansion is not going to be happy with #[cfg()] instructions inside them. @@ -134,10 +133,7 @@ pub trait SidecarInterface { /// /// * `instance_id` - The ID of the instance. /// * `actions` - The DogStatsD actions to send. - async fn send_dogstatsd_actions( - instance_id: InstanceId, - actions: Vec>>, - ); + async fn send_dogstatsd_actions(instance_id: InstanceId, actions: Vec); /// Flushes any outstanding traces queued for sending. async fn flush_traces(); diff --git a/sidecar/src/service/sidecar_server.rs b/sidecar/src/service/sidecar_server.rs index 13069ca68..5ed2316b0 100644 --- a/sidecar/src/service/sidecar_server.rs +++ b/sidecar/src/service/sidecar_server.rs @@ -48,7 +48,7 @@ use datadog_ipc::platform::FileBackedHandle; use datadog_ipc::tarpc::server::{Channel, InFlightRequest}; use datadog_remote_config::fetch::ConfigInvariants; use datadog_trace_utils::tracer_header_tags::TracerHeaderTags; -use dogstatsd_client::DogStatsDAction; +use dogstatsd_client::DogStatsDActionOwned; type NoResponse = Ready<()>; @@ -845,12 +845,12 @@ impl SidecarInterface for SidecarServer { self, _: Context, instance_id: InstanceId, - actions: Vec>>, + actions: Vec, ) -> Self::SendDogstatsdActionsFut { tokio::spawn(async move { self.get_session(&instance_id.session_id) .get_dogstatsd() - .send(actions); + .send_owned(actions); }); no_response() From 1cb870ac5ffd1d77594f806317fab31c6f20d992 Mon Sep 17 00:00:00 2001 From: Andrew Glaude Date: Wed, 18 Sep 2024 11:01:12 -0400 Subject: [PATCH 25/35] move to owned type for sidecar-ffi --- sidecar-ffi/src/lib.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sidecar-ffi/src/lib.rs b/sidecar-ffi/src/lib.rs index 48b863dfb..895a1b9fb 100644 --- a/sidecar-ffi/src/lib.rs +++ b/sidecar-ffi/src/lib.rs @@ -27,7 +27,7 @@ use ddtelemetry::{ worker::{LifecycleAction, TelemetryActions}, }; use ddtelemetry_ffi::try_c; -use dogstatsd_client::DogStatsDAction; +use dogstatsd_client::DogStatsDActionOwned; use ffi::slice::AsBytes; use libc::c_char; use std::ffi::c_void; @@ -689,7 +689,7 @@ pub unsafe extern "C" fn ddog_sidecar_dogstatsd_count( try_c!(blocking::send_dogstatsd_actions( transport, instance_id, - vec![DogStatsDAction::Count( + vec![DogStatsDActionOwned::Count( metric.to_utf8_lossy().into_owned(), value, tags.map(|tags| tags.iter().cloned().collect()) @@ -713,7 +713,7 @@ pub unsafe extern "C" fn ddog_sidecar_dogstatsd_distribution( try_c!(blocking::send_dogstatsd_actions( transport, instance_id, - vec![DogStatsDAction::Distribution( + vec![DogStatsDActionOwned::Distribution( metric.to_utf8_lossy().into_owned(), value, tags.map(|tags| tags.iter().cloned().collect()) @@ -737,7 +737,7 @@ pub unsafe extern "C" fn ddog_sidecar_dogstatsd_gauge( try_c!(blocking::send_dogstatsd_actions( transport, instance_id, - vec![DogStatsDAction::Gauge( + vec![DogStatsDActionOwned::Gauge( metric.to_utf8_lossy().into_owned(), value, tags.map(|tags| tags.iter().cloned().collect()) @@ -761,7 +761,7 @@ pub unsafe extern "C" fn ddog_sidecar_dogstatsd_histogram( try_c!(blocking::send_dogstatsd_actions( transport, instance_id, - vec![DogStatsDAction::Histogram( + vec![DogStatsDActionOwned::Histogram( metric.to_utf8_lossy().into_owned(), value, tags.map(|tags| tags.iter().cloned().collect()) @@ -785,7 +785,7 @@ pub unsafe extern "C" fn ddog_sidecar_dogstatsd_set( try_c!(blocking::send_dogstatsd_actions( transport, instance_id, - vec![DogStatsDAction::Set( + vec![DogStatsDActionOwned::Set( metric.to_utf8_lossy().into_owned(), value, tags.map(|tags| tags.iter().cloned().collect()) From 22b333dec3e8890f602ef5227800a9bbb986c9bc Mon Sep 17 00:00:00 2001 From: Andrew Glaude Date: Wed, 18 Sep 2024 11:07:27 -0400 Subject: [PATCH 26/35] remove unused import --- sidecar/src/service/blocking.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/sidecar/src/service/blocking.rs b/sidecar/src/service/blocking.rs index 1d2f6a356..cd48d87be 100644 --- a/sidecar/src/service/blocking.rs +++ b/sidecar/src/service/blocking.rs @@ -7,7 +7,6 @@ use super::{ }; use datadog_ipc::platform::{Channel, ShmHandle}; use datadog_ipc::transport::blocking::BlockingTransport; -use ddcommon::tag::Tag; use dogstatsd_client::DogStatsDActionOwned; use std::sync::Mutex; use std::{ From 88732cf9efc14eaa25f7dc3ce160da02c6365132 Mon Sep 17 00:00:00 2001 From: Andrew Glaude Date: Wed, 18 Sep 2024 11:18:00 -0400 Subject: [PATCH 27/35] more unused --- sidecar/src/service/sidecar_server.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/sidecar/src/service/sidecar_server.rs b/sidecar/src/service/sidecar_server.rs index 5ed2316b0..de705f43a 100644 --- a/sidecar/src/service/sidecar_server.rs +++ b/sidecar/src/service/sidecar_server.rs @@ -18,7 +18,6 @@ use datadog_ipc::transport::Transport; use datadog_trace_utils::trace_utils::SendData; use datadog_trace_utils::tracer_payload; use datadog_trace_utils::tracer_payload::TraceEncoding; -use ddcommon::tag::Tag; use ddcommon::Endpoint; use ddtelemetry::worker::{ LifecycleAction, TelemetryActions, TelemetryWorkerBuilder, TelemetryWorkerStats, From d7b6cd4a05bf4daf635253494327f2ad44ccd9a1 Mon Sep 17 00:00:00 2001 From: Andrew Glaude Date: Wed, 18 Sep 2024 11:39:13 -0400 Subject: [PATCH 28/35] Add variant check --- dogstatsd-client/src/lib.rs | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/dogstatsd-client/src/lib.rs b/dogstatsd-client/src/lib.rs index 548239741..2b2bcd5ed 100644 --- a/dogstatsd-client/src/lib.rs +++ b/dogstatsd-client/src/lib.rs @@ -1,5 +1,6 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +#![feature(variant_count)] use ddcommon::tag::Tag; use ddcommon::Endpoint; @@ -207,9 +208,10 @@ fn create_client(endpoint: &Endpoint) -> anyhow::Result { #[cfg(test)] mod test { use crate::DogStatsDAction::{Count, Distribution, Gauge, Histogram, Set}; - use crate::{create_client, DogStatsDActionOwned, Flusher}; + use crate::{create_client, DogStatsDAction, DogStatsDActionOwned, Flusher}; #[cfg(unix)] use ddcommon::connector::uds::socket_path_to_uri; + use ddcommon::tag::Tag; use ddcommon::{tag, Endpoint}; #[cfg(unix)] use http::Uri; @@ -285,4 +287,13 @@ mod test { )); assert!(res.is_ok()); } + + #[test] + fn test_owned_sync() { + assert_eq!( + std::mem::variant_count::(), + std::mem::variant_count::>>(), + "DogStatsDActionOwned and DogStatsDAction should have the same number of variants, did you forget to update one?", + ); + } } From 226f7ab8f46be9e0d6c50e657a183683cd439f1d Mon Sep 17 00:00:00 2001 From: Andrew Glaude Date: Wed, 18 Sep 2024 13:40:07 -0400 Subject: [PATCH 29/35] add test that fails when dogstatsdAction is updated --- dogstatsd-client/src/lib.rs | 35 ++++++++++++++++++++++++++++------- 1 file changed, 28 insertions(+), 7 deletions(-) diff --git a/dogstatsd-client/src/lib.rs b/dogstatsd-client/src/lib.rs index 2b2bcd5ed..272d753e4 100644 --- a/dogstatsd-client/src/lib.rs +++ b/dogstatsd-client/src/lib.rs @@ -208,10 +208,9 @@ fn create_client(endpoint: &Endpoint) -> anyhow::Result { #[cfg(test)] mod test { use crate::DogStatsDAction::{Count, Distribution, Gauge, Histogram, Set}; - use crate::{create_client, DogStatsDAction, DogStatsDActionOwned, Flusher}; + use crate::{create_client, DogStatsDActionOwned, Flusher}; #[cfg(unix)] use ddcommon::connector::uds::socket_path_to_uri; - use ddcommon::tag::Tag; use ddcommon::{tag, Endpoint}; #[cfg(unix)] use http::Uri; @@ -290,10 +289,32 @@ mod test { #[test] fn test_owned_sync() { - assert_eq!( - std::mem::variant_count::(), - std::mem::variant_count::>>(), - "DogStatsDActionOwned and DogStatsDAction should have the same number of variants, did you forget to update one?", - ); + // This test ensures that if a new variant is added to either `DogStatsDActionOwned` or + // `DogStatsDAction` this test will NOT COMPILE to act as a reminder that BOTH locations + // must be updated. + let owned_act = DogStatsDActionOwned::Count("test".to_string(), 1, vec![]); + match owned_act { + DogStatsDActionOwned::Count(_, _, _) => {} + DogStatsDActionOwned::Distribution(_, _, _) => {} + DogStatsDActionOwned::Gauge(_, _, _) => {} + DogStatsDActionOwned::Histogram(_, _, _) => {} + DogStatsDActionOwned::Set(_, _, _) => {} + } + + let act = Count("test".to_string(), 1, vec![]); + match act { + Count(_, _, _) => {} + Distribution(_, _, _) => {} + Gauge(_, _, _) => {} + Histogram(_, _, _) => {} + Set(_, _, _) => {} + } + + // TODO: when std::mem::variant_count is in stable we can do this instead + // assert_eq!( + // std::mem::variant_count::(), + // std::mem::variant_count::>>(), + // "DogStatsDActionOwned and DogStatsDAction should have the same number of variants, + // did you forget to update one?", ); } } From e4027aa1470013472fd0a7673c89e061779d0f6b Mon Sep 17 00:00:00 2001 From: Andrew Glaude Date: Wed, 18 Sep 2024 15:08:24 -0400 Subject: [PATCH 30/35] fix tests --- data-pipeline/src/trace_exporter.rs | 13 ++++--------- dogstatsd-client/src/lib.rs | 1 - 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/data-pipeline/src/trace_exporter.rs b/data-pipeline/src/trace_exporter.rs index d3789a63d..779c65953 100644 --- a/data-pipeline/src/trace_exporter.rs +++ b/data-pipeline/src/trace_exporter.rs @@ -200,7 +200,7 @@ impl TraceExporter { self.emit_metric( HealthMetric::Count(STAT_SEND_TRACES_ERRORS, 1), Some(vec![ - Tag::new("response_code", response_status.as_str()).unwrap() + &Tag::new("response_code", response_status.as_str()).unwrap() ]), ); anyhow::bail!("Agent did not accept traces: {response_body}"); @@ -235,18 +235,13 @@ impl TraceExporter { } /// Emit a health metric to dogstatsd - fn emit_metric(&self, metric: HealthMetric, custom_tags: Option>) { + fn emit_metric(&self, metric: HealthMetric, custom_tags: Option>) { if let Some(flusher) = &self.dogstatsd { if custom_tags.is_some() { match metric { HealthMetric::Count(name, c) => { - let tags = self - .common_stats_tags - .clone() // TODO: how the heck do we get around needing a clone here... :( - .into_iter() - .chain(custom_tags.unwrap()) - .collect::>(); - flusher.send(vec![DogStatsDAction::Count(name, c, &tags)]) + let tags = self.common_stats_tags.iter().chain(custom_tags.unwrap()); + flusher.send(vec![DogStatsDAction::Count(name, c, tags)]) } } } else { diff --git a/dogstatsd-client/src/lib.rs b/dogstatsd-client/src/lib.rs index 272d753e4..9728713a2 100644 --- a/dogstatsd-client/src/lib.rs +++ b/dogstatsd-client/src/lib.rs @@ -1,6 +1,5 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -#![feature(variant_count)] use ddcommon::tag::Tag; use ddcommon::Endpoint; From 7945997da4ea5601968361675a4fa4f7e0231ebd Mon Sep 17 00:00:00 2001 From: Andrew Glaude Date: Wed, 18 Sep 2024 15:54:15 -0400 Subject: [PATCH 31/35] use either to simplify emit_metric code --- Cargo.lock | 5 +++-- data-pipeline/Cargo.toml | 1 + data-pipeline/src/trace_exporter.rs | 24 +++++++++--------------- 3 files changed, 13 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d109dac17..a54639c24 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1346,6 +1346,7 @@ dependencies = [ "datadog-trace-utils", "ddcommon", "dogstatsd-client", + "either", "futures", "httpmock", "hyper 0.14.28", @@ -2037,9 +2038,9 @@ dependencies = [ [[package]] name = "either" -version = "1.10.0" +version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11157ac094ffbdde99aa67b23417ebdd801842852b500e395a45a9c0aac03e4a" +checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0" [[package]] name = "ena" diff --git a/data-pipeline/Cargo.toml b/data-pipeline/Cargo.toml index a3a9244ed..b9c927fdd 100644 --- a/data-pipeline/Cargo.toml +++ b/data-pipeline/Cargo.toml @@ -17,6 +17,7 @@ log = "0.4" rmp-serde = "1.1.1" bytes = "1.4" tokio = {version = "1.23", features = ["rt"], default-features = false} +either = "1.13.0" ddcommon = { path = "../ddcommon" } datadog-trace-protobuf = { path = "../trace-protobuf" } diff --git a/data-pipeline/src/trace_exporter.rs b/data-pipeline/src/trace_exporter.rs index 779c65953..c169e3a9a 100644 --- a/data-pipeline/src/trace_exporter.rs +++ b/data-pipeline/src/trace_exporter.rs @@ -14,6 +14,7 @@ use hyper::{http, Body, Client, Method, Uri}; use log::error; use std::{borrow::Borrow, collections::HashMap, str::FromStr}; use tokio::runtime::Runtime; +use either::Either; /// TraceExporterInputFormat represents the format of the input traces. /// The input format can be either Proxy or V0.4, where V0.4 is the default. @@ -236,23 +237,16 @@ impl TraceExporter { /// Emit a health metric to dogstatsd fn emit_metric(&self, metric: HealthMetric, custom_tags: Option>) { + let tags = match custom_tags { + None => Either::Left(&self.common_stats_tags), + Some(custom) => Either::Right(self.common_stats_tags.iter().chain(custom)), + }; if let Some(flusher) = &self.dogstatsd { - if custom_tags.is_some() { - match metric { - HealthMetric::Count(name, c) => { - let tags = self.common_stats_tags.iter().chain(custom_tags.unwrap()); - flusher.send(vec![DogStatsDAction::Count(name, c, tags)]) - } + match metric { + HealthMetric::Count(name, c) => { + flusher.send(vec![DogStatsDAction::Count(name, c, tags.into_iter())]) } - } else { - match metric { - HealthMetric::Count(name, c) => flusher.send(vec![DogStatsDAction::Count( - name, - c, - &self.common_stats_tags, - )]), - } - }; + } } } From 29a5f74d4d5a4c2713a325bb268236deb5ba700f Mon Sep 17 00:00:00 2001 From: Andrew Glaude Date: Wed, 18 Sep 2024 15:58:06 -0400 Subject: [PATCH 32/35] resolve todo --- data-pipeline/src/trace_exporter.rs | 2 +- dogstatsd-client/src/lib.rs | 10 ++++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/data-pipeline/src/trace_exporter.rs b/data-pipeline/src/trace_exporter.rs index c169e3a9a..63cad841c 100644 --- a/data-pipeline/src/trace_exporter.rs +++ b/data-pipeline/src/trace_exporter.rs @@ -9,12 +9,12 @@ use datadog_trace_utils::tracer_payload::TraceEncoding; use ddcommon::tag::Tag; use ddcommon::{connector, tag, Endpoint}; use dogstatsd_client::{new_flusher, DogStatsDAction, Flusher}; +use either::Either; use hyper::http::uri::PathAndQuery; use hyper::{http, Body, Client, Method, Uri}; use log::error; use std::{borrow::Borrow, collections::HashMap, str::FromStr}; use tokio::runtime::Runtime; -use either::Either; /// TraceExporterInputFormat represents the format of the input traces. /// The input format can be either Proxy or V0.4, where V0.4 is the default. diff --git a/dogstatsd-client/src/lib.rs b/dogstatsd-client/src/lib.rs index 9728713a2..8c0d233de 100644 --- a/dogstatsd-client/src/lib.rs +++ b/dogstatsd-client/src/lib.rs @@ -22,8 +22,12 @@ use std::os::unix::net::UnixDatagram; const QUEUE_SIZE: usize = 32 * 1024; /// The `DogStatsDActionOwned` enum gathers the metric types that can be sent to the DogStatsD -/// server. This type takes ownership of the relevant data to support the sidecar better -/// TODO: writeup why combining these types is FRAUGHT +/// server. This type takes ownership of the relevant data to support the sidecar better. +/// +/// Originally I attempted to combine this type with `DogStatsDAction` but this GREATLY complicates +/// the types to the point of insanity. I was unable to come up with a satisfactory approach that +/// allows both the data-pipeline and sidecar crates to use the same type. If a future rustacean +/// wants to take a stab and open a PR please do so! #[derive(Debug, Serialize, Deserialize)] pub enum DogStatsDActionOwned { Count(String, i64, Vec), @@ -36,8 +40,6 @@ pub enum DogStatsDActionOwned { Set(String, i64, Vec), } -// TODO: is there a way to make sure both of these stay in sync easily? - /// The `DogStatsDAction` enum gathers the metric types that can be sent to the DogStatsD server. #[derive(Debug, Serialize, Deserialize)] pub enum DogStatsDAction<'a, T: AsRef, V: IntoIterator> { From a4a14617df2a4f2604221be07b694cf2ee96bb3c Mon Sep 17 00:00:00 2001 From: vianney Date: Thu, 19 Sep 2024 11:33:06 +0200 Subject: [PATCH 33/35] Increase iterations --- trace-obfuscation/benches/benchmarks/credit_cards_bench.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/trace-obfuscation/benches/benchmarks/credit_cards_bench.rs b/trace-obfuscation/benches/benchmarks/credit_cards_bench.rs index 1c6b27bf9..612e6f41d 100644 --- a/trace-obfuscation/benches/benchmarks/credit_cards_bench.rs +++ b/trace-obfuscation/benches/benchmarks/credit_cards_bench.rs @@ -20,7 +20,7 @@ fn is_card_number_no_luhn_bench(c: &mut Criterion) { fn bench_is_card_number(c: &mut Criterion, function_name: &str, validate_luhn: bool) { let mut group = c.benchmark_group("credit_card"); // Measure over a number of calls to minimize impact of OS noise - let elements = 1000; + let elements = 100000; group.throughput(Elements(elements)); // We only need to measure for a small time since the function is very fast group.warm_up_time(Duration::from_secs(1)); From cb66a836ed555f53a2ac4e3efe1d82715e8612c2 Mon Sep 17 00:00:00 2001 From: vianney Date: Thu, 19 Sep 2024 11:54:23 +0200 Subject: [PATCH 34/35] Increase samples --- trace-obfuscation/benches/benchmarks/credit_cards_bench.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/trace-obfuscation/benches/benchmarks/credit_cards_bench.rs b/trace-obfuscation/benches/benchmarks/credit_cards_bench.rs index 612e6f41d..550a22d6a 100644 --- a/trace-obfuscation/benches/benchmarks/credit_cards_bench.rs +++ b/trace-obfuscation/benches/benchmarks/credit_cards_bench.rs @@ -26,7 +26,7 @@ fn bench_is_card_number(c: &mut Criterion, function_name: &str, validate_luhn: b group.warm_up_time(Duration::from_secs(1)); group.measurement_time(Duration::from_secs(2)); group.sampling_mode(criterion::SamplingMode::Flat); - group.sample_size(200); + group.sample_size(2000); let ccs = [ "378282246310005", " 378282246310005", From 7f4735c9ac6edaebbc86c9102d8dde8c64780b1f Mon Sep 17 00:00:00 2001 From: vianney Date: Thu, 19 Sep 2024 14:00:07 +0200 Subject: [PATCH 35/35] Revert 1 commits cb66a83 'Increase samples' --- trace-obfuscation/benches/benchmarks/credit_cards_bench.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/trace-obfuscation/benches/benchmarks/credit_cards_bench.rs b/trace-obfuscation/benches/benchmarks/credit_cards_bench.rs index 550a22d6a..612e6f41d 100644 --- a/trace-obfuscation/benches/benchmarks/credit_cards_bench.rs +++ b/trace-obfuscation/benches/benchmarks/credit_cards_bench.rs @@ -26,7 +26,7 @@ fn bench_is_card_number(c: &mut Criterion, function_name: &str, validate_luhn: b group.warm_up_time(Duration::from_secs(1)); group.measurement_time(Duration::from_secs(2)); group.sampling_mode(criterion::SamplingMode::Flat); - group.sample_size(2000); + group.sample_size(200); let ccs = [ "378282246310005", " 378282246310005",