diff --git a/.codecov.yml b/.codecov.yml index 25fe4e459..7690ca044 100644 --- a/.codecov.yml +++ b/.codecov.yml @@ -31,6 +31,10 @@ component_management: name: ddtelemetry # this is a display name, and can be changed freely paths: - ddtelemetry + - component_id: dogstatsd-client # this is an identifier that should not be changed + name: dogstatsd-client # this is a display name, and can be changed freely + paths: + - dogstatsd-client - component_id: ipc # this is an identifier that should not be changed name: ipc # this is a display name, and can be changed freely paths: diff --git a/Cargo.lock b/Cargo.lock index ff32eb484..a479b7a94 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1346,6 +1346,8 @@ dependencies = [ "datadog-trace-protobuf", "datadog-trace-utils", "ddcommon", + "dogstatsd-client", + "either", "futures", "httpmock", "hyper 0.14.28", @@ -1620,6 +1622,7 @@ dependencies = [ "datadog-trace-utils", "ddcommon", "ddtelemetry", + "dogstatsd-client", "futures", "hashbrown 0.12.3", "http 0.2.12", @@ -1671,6 +1674,7 @@ dependencies = [ "ddcommon-ffi", "ddtelemetry", "ddtelemetry-ffi", + "dogstatsd-client", "hyper 0.14.28", "libc", "paste", @@ -1995,6 +1999,22 @@ dependencies = [ "ustr", ] +[[package]] +name = "dogstatsd-client" +version = "13.0.0" +dependencies = [ + "anyhow", + "cadence", + "datadog-ddsketch", + "datadog-trace-normalization", + "datadog-trace-protobuf", + "ddcommon", + "http 0.2.12", + "hyper 0.14.28", + "serde", + "tracing", +] + [[package]] name = "dunce" version = "1.0.4" @@ -2025,9 +2045,9 @@ dependencies = [ [[package]] name = "either" -version = "1.10.0" +version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11157ac094ffbdde99aa67b23417ebdd801842852b500e395a45a9c0aac03e4a" +checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0" [[package]] name = "ena" diff --git a/Cargo.toml b/Cargo.toml index a44b5f4d9..8f7d7687f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,7 @@ members = [ "data-pipeline-ffi", "ddsketch", "tinybytes", + "dogstatsd-client", ] # https://doc.rust-lang.org/cargo/reference/resolver.html#feature-resolver-version-2 diff --git a/LICENSE-3rdparty.yml b/LICENSE-3rdparty.yml index 08be29e5f..75940d939 100644 --- a/LICENSE-3rdparty.yml +++ b/LICENSE-3rdparty.yml @@ -1,4 +1,4 @@ -root_name: datadog-alloc, builder, build_common, datadog-profiling-ffi, data-pipeline-ffi, data-pipeline, datadog-ddsketch, datadog-trace-normalization, datadog-trace-protobuf, datadog-trace-obfuscation, datadog-trace-utils, ddcommon, tinybytes, ddcommon-ffi, datadog-crashtracker-ffi, datadog-crashtracker, ddtelemetry, datadog-profiling, ddtelemetry-ffi, symbolizer-ffi, tools, datadog-profiling-replayer, dogstatsd, datadog-ipc, datadog-ipc-macros, tarpc, tarpc-plugins, spawn_worker, cc_utils, datadog-sidecar, datadog-remote-config, datadog-dynamic-configuration, datadog-sidecar-macros, datadog-sidecar-ffi, sidecar_mockgen, test_spawn_from_lib, datadog-serverless-trace-mini-agent, datadog-trace-mini-agent +root_name: datadog-alloc, builder, build_common, datadog-profiling-ffi, data-pipeline-ffi, data-pipeline, datadog-ddsketch, datadog-trace-normalization, datadog-trace-protobuf, datadog-trace-obfuscation, datadog-trace-utils, ddcommon, tinybytes, dogstatsd-client, ddcommon-ffi, datadog-crashtracker-ffi, datadog-crashtracker, ddtelemetry, datadog-profiling, ddtelemetry-ffi, symbolizer-ffi, tools, datadog-profiling-replayer, dogstatsd, datadog-ipc, datadog-ipc-macros, tarpc, tarpc-plugins, spawn_worker, cc_utils, datadog-sidecar, datadog-remote-config, datadog-dynamic-configuration, datadog-sidecar-macros, datadog-sidecar-ffi, sidecar_mockgen, test_spawn_from_lib, datadog-serverless-trace-mini-agent, datadog-trace-mini-agent third_party_libraries: - package_name: addr2line package_version: 0.21.0 @@ -10338,7 +10338,7 @@ third_party_libraries: OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - package_name: either - package_version: 1.10.0 + package_version: 1.13.0 repository: https://github.com/rayon-rs/either license: MIT OR Apache-2.0 licenses: diff --git a/data-pipeline/Cargo.toml b/data-pipeline/Cargo.toml index df5825917..8f39c853d 100644 --- a/data-pipeline/Cargo.toml +++ b/data-pipeline/Cargo.toml @@ -16,14 +16,16 @@ hyper = {version = "0.14", features = ["client"], default-features = false} log = "0.4" rmp-serde = "1.1.1" bytes = "1.4" +either = "1.13.0" tokio = { version = "1.23", features = ["rt", "test-util", "time"], default-features = false } ddcommon = { path = "../ddcommon" } datadog-trace-protobuf = { path = "../trace-protobuf" } datadog-trace-utils = { path = "../trace-utils" } datadog-trace-normalization = { path = "../trace-normalization" } +datadog-ddsketch = { path = "../ddsketch"} +dogstatsd-client = { path = "../dogstatsd-client"} datadog-trace-obfuscation = { path = "../trace-obfuscation" } -datadog-ddsketch = { path = "../ddsketch" } uuid = { version = "1.10.0", features = ["v4"] } tokio-util = "0.7.11" diff --git a/data-pipeline/src/health_metrics.rs b/data-pipeline/src/health_metrics.rs new file mode 100644 index 000000000..12d34a86c --- /dev/null +++ b/data-pipeline/src/health_metrics.rs @@ -0,0 +1,14 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +/// health_metrics holds data to emit info about the health of the data-pipeline + +pub(crate) const STAT_SEND_TRACES: &str = "datadog.libdatadog.send.traces"; +pub(crate) const STAT_SEND_TRACES_ERRORS: &str = "datadog.libdatadog.send.traces.errors"; +pub(crate) const STAT_DESER_TRACES: &str = "datadog.libdatadog.deser_traces"; +pub(crate) const STAT_DESER_TRACES_ERRORS: &str = "datadog.libdatadog.deser_traces.errors"; +pub(crate) const STAT_SER_TRACES_ERRORS: &str = "datadog.libdatadog.ser_traces.errors"; + +pub(crate) enum HealthMetric { + Count(&'static str, i64), +} diff --git a/data-pipeline/src/lib.rs b/data-pipeline/src/lib.rs index 9886087e4..05db3236d 100644 --- a/data-pipeline/src/lib.rs +++ b/data-pipeline/src/lib.rs @@ -6,6 +6,7 @@ //! project at this state is to provide a basic API in order to test its viability and integration //! in different languages. +mod health_metrics; #[allow(missing_docs)] pub mod span_concentrator; #[allow(missing_docs)] diff --git a/data-pipeline/src/trace_exporter.rs b/data-pipeline/src/trace_exporter.rs index 236bd8d25..48f667693 100644 --- a/data-pipeline/src/trace_exporter.rs +++ b/data-pipeline/src/trace_exporter.rs @@ -1,6 +1,9 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use crate::{span_concentrator::SpanConcentrator, stats_exporter}; +use crate::{ + health_metrics, health_metrics::HealthMetric, span_concentrator::SpanConcentrator, + stats_exporter, +}; use bytes::Bytes; use datadog_trace_protobuf::pb; use datadog_trace_utils::trace_utils::{ @@ -8,9 +11,12 @@ use datadog_trace_utils::trace_utils::{ }; use datadog_trace_utils::tracer_payload; use datadog_trace_utils::tracer_payload::TraceCollection; -use ddcommon::{connector, Endpoint}; +use ddcommon::tag::Tag; +use ddcommon::{connector, tag, Endpoint}; +use dogstatsd_client::{new_flusher, Client, DogStatsDAction}; +use either::Either; use hyper::http::uri::PathAndQuery; -use hyper::{Body, Client, Method, Uri}; +use hyper::{Body, Method, Uri}; use log::error; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -18,6 +24,14 @@ use std::{borrow::Borrow, collections::HashMap, str::FromStr, time}; use tokio::{runtime::Runtime, task::JoinHandle}; use tokio_util::sync::CancellationToken; +macro_rules! emit_metric { + ($self:ident, $name:expr, $tags:expr) => { + if $self.dogstatsd.is_some() { + $self.emit_metric($name, $tags); + } + }; +} + /// TraceExporterInputFormat represents the format of the input traces. /// The input format can be either Proxy or V0.4, where V0.4 is the default. #[derive(Copy, Clone, Debug, Default, PartialEq)] @@ -184,6 +198,9 @@ pub struct TraceExporter { // TODO - do something with the response callback - https://datadoghq.atlassian.net/browse/APMSP-1019 _response_callback: Option>, runtime: Runtime, + /// None if dogstatsd is disabled + dogstatsd: Option, + common_stats_tags: Vec, client_computed_top_level: bool, stats: StatsComputationStatus, } @@ -270,26 +287,60 @@ impl TraceExporter { .body(Body::from(Bytes::copy_from_slice(data))) .unwrap(); - match Client::builder() + match hyper::Client::builder() .build(connector::Connector::default()) .request(req) .await { Ok(response) => { - if response.status() != 200 { + let response_status = response.status(); + if !response_status.is_success() { let body_bytes = hyper::body::to_bytes(response.into_body()).await?; let response_body = String::from_utf8(body_bytes.to_vec()).unwrap_or_default(); + let resp_tag_res = &Tag::new("response_code", response_status.as_str()); + match resp_tag_res { + Ok(resp_tag) => { + emit_metric!(self, HealthMetric::Count( + health_metrics::STAT_SEND_TRACES_ERRORS, + 1, + ), + Some(vec![&resp_tag])); + } + Err(tag_err) => { + // This should really never happen as response_status is a + // `NonZeroU16`, but if the response status or tag requirements + // ever change in the future we still don't want to panic. + error!("Failed to serialize response_code to tag {}", tag_err) + } + } anyhow::bail!("Agent did not accept traces: {response_body}"); } match hyper::body::to_bytes(response.into_body()).await { - Ok(body) => Ok(String::from_utf8_lossy(&body).to_string()), + Ok(body) => { + self.emit_metric( + HealthMetric::Count( + health_metrics::STAT_SEND_TRACES, + trace_count as i64, + ), + None, + ); + Ok(String::from_utf8_lossy(&body).to_string()) + } Err(err) => { + self.emit_metric( + HealthMetric::Count(health_metrics::STAT_SEND_TRACES_ERRORS, 1), + None, + ); anyhow::bail!("Error reading agent response body: {err}"); } } } - Err(err) => anyhow::bail!("Failed to send traces: {err}"), + Err(err) => { + emit_metric!(self, HealthMetric::Count(health_metrics::STAT_SEND_TRACES_ERRORS, 1), + None); + anyhow::bail!("Failed to send traces: {err}") + } } }) .or_else(|err| { @@ -298,6 +349,21 @@ impl TraceExporter { }) } + /// Emit a health metric to dogstatsd + fn emit_metric(&self, metric: HealthMetric, custom_tags: Option>) { + if let Some(flusher) = &self.dogstatsd { + let tags = match custom_tags { + None => Either::Left(&self.common_stats_tags), + Some(custom) => Either::Right(self.common_stats_tags.iter().chain(custom)), + }; + match metric { + HealthMetric::Count(name, c) => { + flusher.send(vec![DogStatsDAction::Count(name, c, tags.into_iter())]) + } + } + } + } + /// Add all spans from the given iterator into the stats concentrator /// # Panic /// Will panic if another thread panicked will holding the lock on `stats_concentrator` @@ -322,6 +388,10 @@ impl TraceExporter { Ok(res) => res, Err(err) => { error!("Error deserializing trace from request body: {err}"); + self.emit_metric( + HealthMetric::Count(health_metrics::STAT_DESER_TRACES_ERRORS, 1), + None, + ); return Ok(String::from("{}")); } }; @@ -331,6 +401,11 @@ impl TraceExporter { return Ok(String::from("{}")); } + self.emit_metric( + HealthMetric::Count(health_metrics::STAT_DESER_TRACES, traces.len() as i64), + None, + ); + // Stats computation if let StatsComputationStatus::StatsEnabled { .. } = &self.stats { if !self.client_computed_top_level { @@ -347,19 +422,23 @@ impl TraceExporter { let header_tags: TracerHeaderTags<'_> = (&self.tags).into(); match self.output_format { - TraceExporterOutputFormat::V04 => rmp_serde::to_vec_named(&traces).map_or_else( - |err| { + TraceExporterOutputFormat::V04 => rmp_serde::to_vec_named(&traces) + .map_err(|err| { error!("Error serializing traces: {err}"); - Ok(String::from("{}")) - }, - |res| { + self.emit_metric( + HealthMetric::Count(health_metrics::STAT_SER_TRACES_ERRORS, 1), + None, + ); + String::from("{}") + }) + .and_then(|res| { self.send_data_to_url( &res, traces.len(), self.output_format.add_path(&self.endpoint.url), ) - }, - ), + }), + TraceExporterOutputFormat::V07 => { let tracer_payload = trace_utils::collect_trace_chunks( TraceCollection::V07(traces), @@ -379,11 +458,19 @@ impl TraceExporter { Ok(body) => Ok(String::from_utf8_lossy(&body).to_string()), Err(err) => { error!("Error reading agent response body: {err}"); + self.emit_metric( + HealthMetric::Count(health_metrics::STAT_SEND_TRACES_ERRORS, 1), + None, + ); Ok(String::from("{}")) } }, Err(err) => { error!("Error sending traces: {err}"); + self.emit_metric( + HealthMetric::Count(health_metrics::STAT_SEND_TRACES_ERRORS, 1), + None, + ); Ok(String::from("{}")) } } @@ -410,6 +497,7 @@ pub struct TraceExporterBuilder { input_format: TraceExporterInputFormat, output_format: TraceExporterOutputFormat, response_callback: Option>, + dogstatsd_url: Option, client_computed_stats: bool, client_computed_top_level: bool, @@ -428,6 +516,12 @@ impl TraceExporterBuilder { self } + /// Set the URL to communicate with a dogstatsd server + pub fn set_dogstatsd_url(mut self, url: &str) -> Self { + self.dogstatsd_url = Some(url.to_owned()); + self + } + pub fn set_hostname(mut self, hostname: &str) -> Self { hostname.clone_into(&mut self.hostname); self @@ -530,6 +624,12 @@ impl TraceExporterBuilder { .enable_all() .build()?; + let dogstatsd = self.dogstatsd_url.and_then(|u| { + new_flusher(Endpoint::from_slice(&u)).ok() // If we couldn't set the endpoint return + // None + }); + + let libdatadog_version = tag!("libdatadog_version", env!("CARGO_PKG_VERSION")); let mut stats = StatsComputationStatus::StatsDisabled; // Proxy mode does not support stats @@ -594,6 +694,8 @@ impl TraceExporterBuilder { _response_callback: self.response_callback, client_computed_top_level: self.client_computed_top_level, runtime, + dogstatsd, + common_stats_tags: vec![libdatadog_version], stats, }) } @@ -611,7 +713,8 @@ mod tests { use httpmock::prelude::*; use httpmock::MockServer; use std::collections::HashMap; - use time::Duration; + use std::net; + use std::time::Duration; #[test] fn new() { @@ -923,4 +1026,105 @@ mod tests { mock_traces.assert(); } + + fn read(socket: &net::UdpSocket) -> String { + let mut buf = [0; 1_000]; + socket.recv(&mut buf).expect("No data"); + let datagram = String::from_utf8_lossy(buf.as_ref()); + datagram.trim_matches(char::from(0)).to_string() + } + + fn build_test_exporter(url: String, dogstatsd_url: String) -> TraceExporter { + TraceExporterBuilder::default() + .set_url(&url) + .set_dogstatsd_url(&dogstatsd_url) + .set_tracer_version("v0.1") + .set_language("nodejs") + .set_language_version("1.0") + .set_language_interpreter("v8") + .build() + .unwrap() + } + + #[test] + #[cfg_attr(miri, ignore)] + fn health_metrics() { + let stats_socket = net::UdpSocket::bind("127.0.0.1:0").expect("failed to bind host socket"); + let _ = stats_socket.set_read_timeout(Some(Duration::from_millis(500))); + + let fake_agent = MockServer::start(); + let _mock_traces = fake_agent.mock(|_, then| { + then.status(200) + .header("content-type", "application/json") + .body("{}"); + }); + + let exporter = build_test_exporter( + fake_agent.url("/v0.4/traces"), + stats_socket.local_addr().unwrap().to_string(), + ); + + let traces: Vec> = vec![ + vec![pb::Span { + name: "test".to_string(), + ..Default::default() + }], + vec![pb::Span { + name: "test2".to_string(), + ..Default::default() + }], + ]; + let bytes = rmp_serde::to_vec_named(&traces).expect("failed to serialize static trace"); + let _result = exporter.send(&bytes, 1).expect("failed to send trace"); + + assert_eq!( + &format!( + "datadog.libdatadog.deser_traces:2|c|#libdatadog_version:{}", + env!("CARGO_PKG_VERSION") + ), + &read(&stats_socket) + ); + assert_eq!( + &format!( + "datadog.libdatadog.send.traces:2|c|#libdatadog_version:{}", + env!("CARGO_PKG_VERSION") + ), + &read(&stats_socket) + ); + } + + #[test] + #[cfg_attr(miri, ignore)] + fn health_metrics_error() { + let stats_socket = net::UdpSocket::bind("127.0.0.1:0").expect("failed to bind host socket"); + let _ = stats_socket.set_read_timeout(Some(Duration::from_millis(500))); + + let fake_agent = MockServer::start(); + let _mock_traces = fake_agent.mock(|_, then| { + then.status(400) + .header("content-type", "application/json") + .body("{}"); + }); + + let exporter = build_test_exporter( + fake_agent.url("/v0.4/traces"), + stats_socket.local_addr().unwrap().to_string(), + ); + + let traces: Vec> = vec![vec![pb::Span { + name: "test".to_string(), + ..Default::default() + }]]; + let bytes = rmp_serde::to_vec_named(&traces).expect("failed to serialize static trace"); + let _result = exporter.send(&bytes, 1).expect("failed to send trace"); + + assert_eq!( + &format!( + "datadog.libdatadog.deser_traces:1|c|#libdatadog_version:{}", + env!("CARGO_PKG_VERSION") + ), + &read(&stats_socket) + ); + assert_eq!(&format!("datadog.libdatadog.send.traces.errors:1|c|#libdatadog_version:{},response_code:400", env!("CARGO_PKG_VERSION")), &read(&stats_socket)); + } } diff --git a/dogstatsd-client/Cargo.toml b/dogstatsd-client/Cargo.toml new file mode 100644 index 000000000..84bc0914d --- /dev/null +++ b/dogstatsd-client/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "dogstatsd-client" +description = "A dogstatsd client for sending metrics to a dogstatsd server." +rust-version.workspace = true +edition.workspace = true +version.workspace = true +license.workspace = true + +[lib] +bench = false + +[dependencies] +ddcommon = { path = "../ddcommon" } +datadog-trace-protobuf = { path = "../trace-protobuf" } +datadog-trace-normalization = { path = "../trace-normalization" } +datadog-ddsketch = { path = "../ddsketch"} +cadence = "1.3.0" +serde = { version = "1.0", features = ["derive", "rc"] } +tracing = { version = "0.1", default-features = false } +anyhow = { version = "1.0" } +hyper = { version = "0.14", features = ["client"], default-features = false } +http = "0.2" diff --git a/dogstatsd-client/README.md b/dogstatsd-client/README.md new file mode 100644 index 000000000..730c4425b --- /dev/null +++ b/dogstatsd-client/README.md @@ -0,0 +1,5 @@ +# dogstatsd-client + +## Status +This client provides rust methods to interact with a dogstatsd server. It is mainly used in the sidecar and data-pipeline +crates, but should be capable of being used elsewhere. See the crate docs for usage details. \ No newline at end of file diff --git a/dogstatsd-client/src/lib.rs b/dogstatsd-client/src/lib.rs new file mode 100644 index 000000000..326a039a2 --- /dev/null +++ b/dogstatsd-client/src/lib.rs @@ -0,0 +1,331 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 +#![deny(missing_docs)] + +//! dogstatsd-client implements a client to emit metrics to a dogstatsd server. +//! This is made use of in at least the data-pipeline and sidecar crates. + +use ddcommon::tag::Tag; +use ddcommon::Endpoint; +use serde::{Deserialize, Serialize}; +use std::fmt::Debug; +use tracing::{debug, error, info}; + +use anyhow::anyhow; +use cadence::prelude::*; +#[cfg(unix)] +use cadence::UnixMetricSink; +use cadence::{Metric, MetricBuilder, QueuingMetricSink, StatsdClient, UdpMetricSink}; +#[cfg(unix)] +use ddcommon::connector::uds::socket_path_from_uri; +use std::net::{ToSocketAddrs, UdpSocket}; +#[cfg(unix)] +use std::os::unix::net::UnixDatagram; + +// Queue with a maximum capacity of 32K elements +const QUEUE_SIZE: usize = 32 * 1024; + +/// The `DogStatsDActionOwned` enum gathers the metric types that can be sent to the DogStatsD +/// server. This type takes ownership of the relevant data to support the sidecar better. +/// For documentation on the dogstatsd metric types: https://docs.datadoghq.com/metrics/types/?tab=count#metric-types +/// +/// Originally I attempted to combine this type with `DogStatsDAction` but this GREATLY complicates +/// the types to the point of insanity. I was unable to come up with a satisfactory approach that +/// allows both the data-pipeline and sidecar crates to use the same type. If a future rustacean +/// wants to take a stab and open a PR please do so! +#[derive(Debug, Serialize, Deserialize)] +pub enum DogStatsDActionOwned { + #[allow(missing_docs)] + Count(String, i64, Vec), + #[allow(missing_docs)] + Distribution(String, f64, Vec), + #[allow(missing_docs)] + Gauge(String, f64, Vec), + #[allow(missing_docs)] + Histogram(String, f64, Vec), + /// Cadence only support i64 type as value + /// but Golang implementation uses string (https://github.com/DataDog/datadog-go/blob/331d24832f7eac97b091efd696278fe2c4192b29/statsd/statsd.go#L230) + /// and PHP implementation uses float or string (https://github.com/DataDog/php-datadogstatsd/blob/0efdd1c38f6d3dd407efbb899ad1fd2e5cd18085/src/DogStatsd.php#L251) + Set(String, i64, Vec), +} + +/// The `DogStatsDAction` enum gathers the metric types that can be sent to the DogStatsD server. +#[derive(Debug, Serialize, Deserialize)] +pub enum DogStatsDAction<'a, T: AsRef, V: IntoIterator> { + // TODO: instead of AsRef we can accept a marker Trait that users of this crate implement + #[allow(missing_docs)] + Count(T, i64, V), + #[allow(missing_docs)] + Distribution(T, f64, V), + #[allow(missing_docs)] + Gauge(T, f64, V), + #[allow(missing_docs)] + Histogram(T, f64, V), + /// Cadence only support i64 type as value + /// but Golang implementation uses string (https://github.com/DataDog/datadog-go/blob/331d24832f7eac97b091efd696278fe2c4192b29/statsd/statsd.go#L230) + /// and PHP implementation uses float or string (https://github.com/DataDog/php-datadogstatsd/blob/0efdd1c38f6d3dd407efbb899ad1fd2e5cd18085/src/DogStatsd.php#L251) + Set(T, i64, V), +} + +/// A dogstatsd-client that flushes stats to a given endpoint. Use `new_flusher` to build one. +pub struct Client { + client: StatsdClient, +} + +/// Build a new flusher instance pointed at the provided endpoint. +/// Returns error if the provided endpoint is not valid. +pub fn new_flusher(endpoint: Endpoint) -> anyhow::Result { + Ok(Client { + client: create_client(&endpoint)?, + }) +} + +impl Client { + /// Set the destination for dogstatsd metrics, if an API Key is provided the client is disabled + /// as dogstatsd is not allowed in agentless mode. Returns an error if the provided endpoint + /// is invalid. + pub fn set_endpoint(&mut self, endpoint: Endpoint) -> anyhow::Result<()> { + self.client = match endpoint.api_key { + Some(_) => { + info!("DogStatsD is not available in agentless mode"); + anyhow::bail!("DogStatsD is not available in agentless mode"); + } + None => { + debug!("Updating DogStatsD endpoint to {}", endpoint.url); + create_client(&endpoint)? + } + }; + Ok(()) + } + + /// Send a vector of DogStatsDActionOwned, this is the same as `send` except it uses the "owned" + /// version of DogStatsDAction. See the docs for DogStatsDActionOwned for details. + pub fn send_owned(&self, actions: Vec) { + let client = &self.client; + + for action in actions { + if let Err(err) = match action { + DogStatsDActionOwned::Count(metric, value, tags) => { + do_send(client.count_with_tags(metric.as_ref(), value), &tags) + } + DogStatsDActionOwned::Distribution(metric, value, tags) => { + do_send(client.distribution_with_tags(metric.as_ref(), value), &tags) + } + DogStatsDActionOwned::Gauge(metric, value, tags) => { + do_send(client.gauge_with_tags(metric.as_ref(), value), &tags) + } + DogStatsDActionOwned::Histogram(metric, value, tags) => { + do_send(client.histogram_with_tags(metric.as_ref(), value), &tags) + } + DogStatsDActionOwned::Set(metric, value, tags) => { + do_send(client.set_with_tags(metric.as_ref(), value), &tags) + } + } { + error!("Error while sending metric: {}", err); + } + } + } + + /// Send a vector of DogStatsDAction, this is the same as `send_owned` except it only borrows + /// the provided values.See the docs for DogStatsDActionOwned for details. + pub fn send<'a, T: AsRef, V: IntoIterator>( + &self, + actions: Vec>, + ) { + let client = &self.client; + + for action in actions { + if let Err(err) = match action { + DogStatsDAction::Count(metric, value, tags) => { + let metric_builder = client.count_with_tags(metric.as_ref(), value); + do_send(metric_builder, tags) + } + DogStatsDAction::Distribution(metric, value, tags) => { + do_send(client.distribution_with_tags(metric.as_ref(), value), tags) + } + DogStatsDAction::Gauge(metric, value, tags) => { + do_send(client.gauge_with_tags(metric.as_ref(), value), tags) + } + DogStatsDAction::Histogram(metric, value, tags) => { + do_send(client.histogram_with_tags(metric.as_ref(), value), tags) + } + DogStatsDAction::Set(metric, value, tags) => { + do_send(client.set_with_tags(metric.as_ref(), value), tags) + } + } { + error!("Error while sending metric: {}", err); + } + } + } +} + +fn do_send<'m, 't, T, V: IntoIterator>( + mut builder: MetricBuilder<'m, '_, T>, + tags: V, +) -> anyhow::Result<()> +where + T: Metric + From, + 't: 'm, +{ + let mut tags_iter = tags.into_iter(); + let mut tag_opt = tags_iter.next(); + while tag_opt.is_some() { + builder = builder.with_tag_value(tag_opt.unwrap().as_ref()); + tag_opt = tags_iter.next(); + } + builder.try_send()?; + Ok(()) +} + +fn create_client(endpoint: &Endpoint) -> anyhow::Result { + match endpoint.url.scheme_str() { + #[cfg(unix)] + Some("unix") => { + let socket = UnixDatagram::unbound()?; + socket.set_nonblocking(true)?; + let sink = QueuingMetricSink::with_capacity( + UnixMetricSink::from(socket_path_from_uri(&endpoint.url)?, socket), + QUEUE_SIZE, + ); + + Ok(StatsdClient::from_sink("", sink)) + } + _ => { + let host = endpoint.url.host().ok_or(anyhow!("invalid host"))?; + let port = endpoint.url.port().ok_or(anyhow!("invalid port"))?.as_u16(); + + let server_address = (host, port) + .to_socket_addrs()? + .next() + .ok_or(anyhow!("invalid address"))?; + + let socket = if server_address.is_ipv4() { + UdpSocket::bind("0.0.0.0:0")? + } else { + UdpSocket::bind("[::]:0")? + }; + socket.set_nonblocking(true)?; + + let sink = QueuingMetricSink::with_capacity( + UdpMetricSink::from((host, port), socket)?, + QUEUE_SIZE, + ); + + Ok(StatsdClient::from_sink("", sink)) + } + } +} + +#[cfg(test)] +mod test { + use crate::DogStatsDAction::{Count, Distribution, Gauge, Histogram, Set}; + use crate::{create_client, new_flusher, DogStatsDActionOwned}; + #[cfg(unix)] + use ddcommon::connector::uds::socket_path_to_uri; + use ddcommon::{tag, Endpoint}; + #[cfg(unix)] + use http::Uri; + use std::net; + use std::time::Duration; + + #[test] + #[cfg_attr(miri, ignore)] + fn test_flusher() { + let socket = net::UdpSocket::bind("127.0.0.1:0").expect("failed to bind host socket"); + let _ = socket.set_read_timeout(Some(Duration::from_millis(500))); + + let flusher = new_flusher(Endpoint::from_slice( + socket.local_addr().unwrap().to_string().as_str(), + )) + .unwrap(); + flusher.send(vec![ + Count("test_count", 3, &vec![tag!("foo", "bar")]), + Count("test_neg_count", -2, &vec![]), + Distribution("test_distribution", 4.2, &vec![]), + Gauge("test_gauge", 7.6, &vec![]), + Histogram("test_histogram", 8.0, &vec![]), + Set("test_set", 9, &vec![tag!("the", "end")]), + Set("test_neg_set", -1, &vec![]), + ]); + + fn read(socket: &net::UdpSocket) -> String { + let mut buf = [0; 100]; + socket.recv(&mut buf).expect("No data"); + let datagram = String::from_utf8_lossy(buf.strip_suffix(&[0]).unwrap()); + datagram.trim_matches(char::from(0)).to_string() + } + + assert_eq!("test_count:3|c|#foo:bar", read(&socket)); + assert_eq!("test_neg_count:-2|c", read(&socket)); + assert_eq!("test_distribution:4.2|d", read(&socket)); + assert_eq!("test_gauge:7.6|g", read(&socket)); + assert_eq!("test_histogram:8|h", read(&socket)); + assert_eq!("test_set:9|s|#the:end", read(&socket)); + assert_eq!("test_neg_set:-1|s", read(&socket)); + } + + #[test] + #[cfg_attr(miri, ignore)] + fn test_create_client_udp() { + let res = create_client(&Endpoint::default()); + assert!(res.is_err()); + assert_eq!("invalid host", res.unwrap_err().to_string().as_str()); + + let res = create_client(&Endpoint::from_slice("localhost:99999")); + assert!(res.is_err()); + assert_eq!("invalid port", res.unwrap_err().to_string().as_str()); + + let res = create_client(&Endpoint::from_slice("localhost:80")); + assert!(res.is_ok()); + + let res = create_client(&Endpoint::from_slice("http://localhost:80")); + assert!(res.is_ok()); + } + + #[test] + #[cfg(unix)] + #[cfg_attr(miri, ignore)] + fn test_create_client_unix_domain_socket() { + let res = create_client(&Endpoint::from_url( + "unix://localhost:80".parse::().unwrap(), + )); + assert!(res.is_err()); + assert_eq!("invalid url", res.unwrap_err().to_string().as_str()); + + let res = create_client(&Endpoint::from_url( + socket_path_to_uri("/path/to/a/socket.sock".as_ref()).unwrap(), + )); + assert!(res.is_ok()); + } + + #[test] + fn test_owned_sync() { + // This test ensures that if a new variant is added to either `DogStatsDActionOwned` or + // `DogStatsDAction` this test will NOT COMPILE to act as a reminder that BOTH locations + // must be updated. + let owned_act = DogStatsDActionOwned::Count("test".to_string(), 1, vec![]); + match owned_act { + DogStatsDActionOwned::Count(_, _, _) => {} + DogStatsDActionOwned::Distribution(_, _, _) => {} + DogStatsDActionOwned::Gauge(_, _, _) => {} + DogStatsDActionOwned::Histogram(_, _, _) => {} + DogStatsDActionOwned::Set(_, _, _) => {} + } + + let act = Count("test".to_string(), 1, vec![]); + match act { + Count(_, _, _) => {} + Distribution(_, _, _) => {} + Gauge(_, _, _) => {} + Histogram(_, _, _) => {} + Set(_, _, _) => {} + } + + // TODO: when std::mem::variant_count is in stable we can do this instead + // assert_eq!( + // std::mem::variant_count::(), + // std::mem::variant_count::>>(), + // "DogStatsDActionOwned and DogStatsDAction should have the same number of variants, + // did you forget to update one?", ); + } +} diff --git a/sidecar-ffi/Cargo.toml b/sidecar-ffi/Cargo.toml index 68cc0aeba..b0fa91da5 100644 --- a/sidecar-ffi/Cargo.toml +++ b/sidecar-ffi/Cargo.toml @@ -21,6 +21,7 @@ ddtelemetry-ffi = { path = "../ddtelemetry-ffi", default-features = false } datadog-remote-config = { path = "../remote-config" } paste = "1" libc = "0.2" +dogstatsd-client = { path = "../dogstatsd-client" } [dev-dependencies] hyper = { version = "0.14", default-features = false } diff --git a/sidecar-ffi/src/lib.rs b/sidecar-ffi/src/lib.rs index d786e95a5..895a1b9fb 100644 --- a/sidecar-ffi/src/lib.rs +++ b/sidecar-ffi/src/lib.rs @@ -12,7 +12,6 @@ use datadog_sidecar::agent_remote_config::{ use datadog_sidecar::config; use datadog_sidecar::config::LogMethod; use datadog_sidecar::crashtracker::crashtracker_unix_socket_path; -use datadog_sidecar::dogstatsd::DogStatsDAction; use datadog_sidecar::one_way_shared_memory::{OneWayShmReader, ReaderOpener}; use datadog_sidecar::service::{ blocking::{self, SidecarTransport}, @@ -28,6 +27,7 @@ use ddtelemetry::{ worker::{LifecycleAction, TelemetryActions}, }; use ddtelemetry_ffi::try_c; +use dogstatsd_client::DogStatsDActionOwned; use ffi::slice::AsBytes; use libc::c_char; use std::ffi::c_void; @@ -689,7 +689,7 @@ pub unsafe extern "C" fn ddog_sidecar_dogstatsd_count( try_c!(blocking::send_dogstatsd_actions( transport, instance_id, - vec![DogStatsDAction::Count( + vec![DogStatsDActionOwned::Count( metric.to_utf8_lossy().into_owned(), value, tags.map(|tags| tags.iter().cloned().collect()) @@ -713,7 +713,7 @@ pub unsafe extern "C" fn ddog_sidecar_dogstatsd_distribution( try_c!(blocking::send_dogstatsd_actions( transport, instance_id, - vec![DogStatsDAction::Distribution( + vec![DogStatsDActionOwned::Distribution( metric.to_utf8_lossy().into_owned(), value, tags.map(|tags| tags.iter().cloned().collect()) @@ -737,7 +737,7 @@ pub unsafe extern "C" fn ddog_sidecar_dogstatsd_gauge( try_c!(blocking::send_dogstatsd_actions( transport, instance_id, - vec![DogStatsDAction::Gauge( + vec![DogStatsDActionOwned::Gauge( metric.to_utf8_lossy().into_owned(), value, tags.map(|tags| tags.iter().cloned().collect()) @@ -761,7 +761,7 @@ pub unsafe extern "C" fn ddog_sidecar_dogstatsd_histogram( try_c!(blocking::send_dogstatsd_actions( transport, instance_id, - vec![DogStatsDAction::Histogram( + vec![DogStatsDActionOwned::Histogram( metric.to_utf8_lossy().into_owned(), value, tags.map(|tags| tags.iter().cloned().collect()) @@ -785,7 +785,7 @@ pub unsafe extern "C" fn ddog_sidecar_dogstatsd_set( try_c!(blocking::send_dogstatsd_actions( transport, instance_id, - vec![DogStatsDAction::Set( + vec![DogStatsDActionOwned::Set( metric.to_utf8_lossy().into_owned(), value, tags.map(|tags| tags.iter().cloned().collect()) diff --git a/sidecar/Cargo.toml b/sidecar/Cargo.toml index 05eded757..d60525e77 100644 --- a/sidecar/Cargo.toml +++ b/sidecar/Cargo.toml @@ -26,6 +26,7 @@ datadog-trace-utils = { path = "../trace-utils" } datadog-trace-normalization = { path = "../trace-normalization" } datadog-remote-config = { path = "../remote-config" } datadog-crashtracker = { path = "../crashtracker" } +dogstatsd-client = { path = "../dogstatsd-client" } tinybytes = { path = "../tinybytes" } futures = { version = "0.3", default-features = false } diff --git a/sidecar/src/dogstatsd.rs b/sidecar/src/dogstatsd.rs deleted file mode 100644 index b2b6777dd..000000000 --- a/sidecar/src/dogstatsd.rs +++ /dev/null @@ -1,248 +0,0 @@ -// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/ -// SPDX-License-Identifier: Apache-2.0 - -use ddcommon::tag::Tag; -use ddcommon::Endpoint; -use serde::{Deserialize, Serialize}; -use std::fmt::Debug; -use tracing::{debug, error, info, warn}; - -use anyhow::anyhow; -use cadence::prelude::*; -#[cfg(unix)] -use cadence::UnixMetricSink; -use cadence::{Metric, MetricBuilder, QueuingMetricSink, StatsdClient, UdpMetricSink}; -#[cfg(unix)] -use ddcommon::connector::uds::socket_path_from_uri; -use std::net::{ToSocketAddrs, UdpSocket}; - -#[cfg(unix)] -use std::os::unix::net::UnixDatagram; - -// Queue with a maximum capacity of 32K elements -const QUEUE_SIZE: usize = 32 * 1024; - -/// The `DogStatsDAction` enum gathers the metric types that can be sent to the DogStatsD server. -#[derive(Debug, Serialize, Deserialize)] -pub enum DogStatsDAction { - Count(String, i64, Vec), - Distribution(String, f64, Vec), - Gauge(String, f64, Vec), - Histogram(String, f64, Vec), - // Cadence only support i64 type as value - // but Golang implementation uses string (https://github.com/DataDog/datadog-go/blob/331d24832f7eac97b091efd696278fe2c4192b29/statsd/statsd.go#L230) - // and PHP implementation uses float or string (https://github.com/DataDog/php-datadogstatsd/blob/0efdd1c38f6d3dd407efbb899ad1fd2e5cd18085/src/DogStatsd.php#L251) - Set(String, i64, Vec), -} - -#[derive(Default)] -pub struct Flusher { - pub endpoint: Option, - client: Option, -} - -impl Flusher { - pub fn set_endpoint(&mut self, endpoint: Endpoint) -> anyhow::Result<()> { - self.client = None; - self.endpoint = match endpoint.api_key { - Some(_) => { - info!("DogStatsD is not available in agentless mode"); - None - } - None => { - debug!("Updating DogStatsD endpoint to {}", endpoint.url); - Some(endpoint) - } - }; - Ok(()) - } - - pub fn send(&mut self, actions: Vec) { - if self.endpoint.is_none() { - return; - } - - let client = match self.get_client() { - Ok(client) => client, - Err(msg) => { - self.endpoint = None; - warn!("Cannot send DogStatsD metrics: {}", msg); - return; - } - }; - - for action in actions { - if let Err(err) = match action { - DogStatsDAction::Count(metric, value, tags) => { - do_send(client.count_with_tags(metric.as_str(), value), &tags) - } - DogStatsDAction::Distribution(metric, value, tags) => { - do_send(client.distribution_with_tags(metric.as_str(), value), &tags) - } - DogStatsDAction::Gauge(metric, value, tags) => { - do_send(client.gauge_with_tags(metric.as_str(), value), &tags) - } - DogStatsDAction::Histogram(metric, value, tags) => { - do_send(client.histogram_with_tags(metric.as_str(), value), &tags) - } - DogStatsDAction::Set(metric, value, tags) => { - do_send(client.set_with_tags(metric.as_str(), value), &tags) - } - } { - error!("Error while sending metric: {}", err); - } - } - } - - fn get_client(&mut self) -> anyhow::Result<&StatsdClient> { - let opt = &mut self.client; - let client = match opt { - Some(client) => client, - None => opt.get_or_insert(create_client(self.endpoint.clone())?), - }; - - Ok(client) - } -} - -fn do_send<'a, T>(mut builder: MetricBuilder<'a, '_, T>, tags: &'a Vec) -> anyhow::Result<()> -where - T: Metric + From, -{ - for tag in tags { - builder = builder.with_tag_value(tag.as_ref()); - } - builder.try_send()?; - Ok(()) -} - -fn create_client(endpoint: Option) -> anyhow::Result { - let endpoint = match endpoint { - Some(endpoint) => endpoint, - None => return Err(anyhow!("no endpoint set")), - }; - - return match endpoint.url.scheme_str() { - #[cfg(unix)] - Some("unix") => { - let socket = UnixDatagram::unbound()?; - socket.set_nonblocking(true)?; - let sink = QueuingMetricSink::with_capacity( - UnixMetricSink::from(socket_path_from_uri(&endpoint.url)?, socket), - QUEUE_SIZE, - ); - - Ok(StatsdClient::from_sink("", sink)) - } - _ => { - let host = endpoint.url.host().ok_or(anyhow!("invalid host"))?; - let port = endpoint.url.port().ok_or(anyhow!("invalid port"))?.as_u16(); - - let server_address = (host, port) - .to_socket_addrs()? - .next() - .ok_or(anyhow!("invalid address"))?; - - let socket = if server_address.is_ipv4() { - UdpSocket::bind("0.0.0.0:0")? - } else { - UdpSocket::bind("[::]:0")? - }; - socket.set_nonblocking(true)?; - - let sink = QueuingMetricSink::with_capacity( - UdpMetricSink::from((host, port), socket)?, - QUEUE_SIZE, - ); - - Ok(StatsdClient::from_sink("", sink)) - } - }; -} - -#[cfg(test)] -mod test { - use crate::dogstatsd::DogStatsDAction::{Count, Distribution, Gauge, Histogram, Set}; - use crate::dogstatsd::{create_client, Flusher}; - #[cfg(unix)] - use ddcommon::connector::uds::socket_path_to_uri; - use ddcommon::{tag, Endpoint}; - #[cfg(unix)] - use http::Uri; - use std::net; - use std::time::Duration; - - #[test] - #[cfg_attr(miri, ignore)] - fn test_flusher() { - let socket = net::UdpSocket::bind("127.0.0.1:0").expect("failed to bind host socket"); - let _ = socket.set_read_timeout(Some(Duration::from_millis(500))); - - let mut flusher = Flusher::default(); - _ = flusher.set_endpoint(Endpoint::from_slice( - socket.local_addr().unwrap().to_string().as_str(), - )); - flusher.send(vec![ - Count("test_count".to_string(), 3, vec![tag!("foo", "bar")]), - Count("test_neg_count".to_string(), -2, vec![]), - Distribution("test_distribution".to_string(), 4.2, vec![]), - Gauge("test_gauge".to_string(), 7.6, vec![]), - Histogram("test_histogram".to_string(), 8.0, vec![]), - Set("test_set".to_string(), 9, vec![tag!("the", "end")]), - Set("test_neg_set".to_string(), -1, vec![]), - ]); - - fn read(socket: &net::UdpSocket) -> String { - let mut buf = [0; 100]; - socket.recv(&mut buf).expect("No data"); - let datagram = String::from_utf8_lossy(buf.strip_suffix(&[0]).unwrap()); - datagram.trim_matches(char::from(0)).to_string() - } - - assert_eq!("test_count:3|c|#foo:bar", read(&socket)); - assert_eq!("test_neg_count:-2|c", read(&socket)); - assert_eq!("test_distribution:4.2|d", read(&socket)); - assert_eq!("test_gauge:7.6|g", read(&socket)); - assert_eq!("test_histogram:8|h", read(&socket)); - assert_eq!("test_set:9|s|#the:end", read(&socket)); - assert_eq!("test_neg_set:-1|s", read(&socket)); - } - - #[test] - #[cfg_attr(miri, ignore)] - fn test_create_client_udp() { - let res = create_client(None); - assert!(res.is_err()); - assert_eq!("no endpoint set", res.unwrap_err().to_string().as_str()); - - let res = create_client(Some(Endpoint::default())); - assert!(res.is_err()); - assert_eq!("invalid host", res.unwrap_err().to_string().as_str()); - - let res = create_client(Some(Endpoint::from_slice("localhost:99999"))); - assert!(res.is_err()); - assert_eq!("invalid port", res.unwrap_err().to_string().as_str()); - - let res = create_client(Some(Endpoint::from_slice("localhost:80"))); - assert!(res.is_ok()); - - let res = create_client(Some(Endpoint::from_slice("http://localhost:80"))); - assert!(res.is_ok()); - } - - #[test] - #[cfg(unix)] - #[cfg_attr(miri, ignore)] - fn test_create_client_unix_domain_socket() { - let res = create_client(Some(Endpoint::from_url( - "unix://localhost:80".parse::().unwrap(), - ))); - assert!(res.is_err()); - assert_eq!("invalid url", res.unwrap_err().to_string().as_str()); - - let res = create_client(Some(Endpoint::from_url( - socket_path_to_uri("/path/to/a/socket.sock".as_ref()).unwrap(), - ))); - assert!(res.is_ok()); - } -} diff --git a/sidecar/src/lib.rs b/sidecar/src/lib.rs index fe2d8ba15..62349a761 100644 --- a/sidecar/src/lib.rs +++ b/sidecar/src/lib.rs @@ -3,7 +3,6 @@ pub mod agent_remote_config; pub mod config; pub mod crashtracker; -pub mod dogstatsd; mod dump; pub mod entry; #[cfg(feature = "tracing")] diff --git a/sidecar/src/service/blocking.rs b/sidecar/src/service/blocking.rs index f1c057689..cd48d87be 100644 --- a/sidecar/src/service/blocking.rs +++ b/sidecar/src/service/blocking.rs @@ -5,9 +5,9 @@ use super::{ InstanceId, QueueId, RuntimeMetadata, SerializedTracerHeaderTags, SessionConfig, SidecarAction, SidecarInterfaceRequest, SidecarInterfaceResponse, }; -use crate::dogstatsd::DogStatsDAction; use datadog_ipc::platform::{Channel, ShmHandle}; use datadog_ipc::transport::blocking::BlockingTransport; +use dogstatsd_client::DogStatsDActionOwned; use std::sync::Mutex; use std::{ borrow::Cow, @@ -318,7 +318,7 @@ pub fn set_remote_config_data( pub fn send_dogstatsd_actions( transport: &mut SidecarTransport, instance_id: &InstanceId, - actions: Vec, + actions: Vec, ) -> io::Result<()> { transport.send(SidecarInterfaceRequest::SendDogstatsdActions { instance_id: instance_id.clone(), diff --git a/sidecar/src/service/session_info.rs b/sidecar/src/service/session_info.rs index 7acc3e816..f060b1207 100644 --- a/sidecar/src/service/session_info.rs +++ b/sidecar/src/service/session_info.rs @@ -12,7 +12,7 @@ use futures::future; use tracing::{enabled, info, Level}; use crate::log::{MultiEnvFilterGuard, MultiWriterGuard}; -use crate::{dogstatsd, tracer}; +use crate::tracer; use crate::service::{InstanceId, RuntimeInfo}; /// `SessionInfo` holds information about a session. @@ -24,7 +24,7 @@ pub(crate) struct SessionInfo { runtimes: Arc>>, pub(crate) session_config: Arc>>, tracer_config: Arc>, - dogstatsd: Arc>, + dogstatsd: Arc>>, remote_config_invariants: Arc>>, #[cfg(windows)] pub(crate) remote_config_notify_function: @@ -168,13 +168,13 @@ impl SessionInfo { f(&mut self.get_trace_config()); } - pub(crate) fn get_dogstatsd(&self) -> MutexGuard { + pub(crate) fn get_dogstatsd(&self) -> MutexGuard> { self.dogstatsd.lock().unwrap() } pub(crate) fn configure_dogstatsd(&self, f: F) where - F: FnOnce(&mut dogstatsd::Flusher), + F: FnOnce(&mut Option), { f(&mut self.get_dogstatsd()); } diff --git a/sidecar/src/service/sidecar_interface.rs b/sidecar/src/service/sidecar_interface.rs index eb8b2fbc9..dc198d67d 100644 --- a/sidecar/src/service/sidecar_interface.rs +++ b/sidecar/src/service/sidecar_interface.rs @@ -1,7 +1,6 @@ // Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use crate::dogstatsd::DogStatsDAction; use crate::service::{ InstanceId, QueueId, RequestIdentification, RequestIdentifier, RuntimeMetadata, SerializedTracerHeaderTags, SessionConfig, SidecarAction, @@ -9,6 +8,7 @@ use crate::service::{ use anyhow::Result; use datadog_ipc::platform::ShmHandle; use datadog_ipc::tarpc; +use dogstatsd_client::DogStatsDActionOwned; // This is a bit weird, but depending on the OS we're interested in different things... // and the macro expansion is not going to be happy with #[cfg()] instructions inside them. @@ -133,7 +133,7 @@ pub trait SidecarInterface { /// /// * `instance_id` - The ID of the instance. /// * `actions` - The DogStatsD actions to send. - async fn send_dogstatsd_actions(instance_id: InstanceId, actions: Vec); + async fn send_dogstatsd_actions(instance_id: InstanceId, actions: Vec); /// Flushes any outstanding traces queued for sending. async fn flush_traces(); diff --git a/sidecar/src/service/sidecar_server.rs b/sidecar/src/service/sidecar_server.rs index e7ed49f13..88cd38275 100644 --- a/sidecar/src/service/sidecar_server.rs +++ b/sidecar/src/service/sidecar_server.rs @@ -38,7 +38,6 @@ use serde::{Deserialize, Serialize}; use tokio::task::{JoinError, JoinHandle}; use crate::config::get_product_endpoint; -use crate::dogstatsd::DogStatsDAction; use crate::service::remote_configs::{RemoteConfigNotifyTarget, RemoteConfigs}; use crate::service::runtime_info::ActiveApplication; use crate::service::telemetry::enqueued_telemetry_stats::EnqueuedTelemetryStats; @@ -47,6 +46,7 @@ use datadog_ipc::platform::FileBackedHandle; use datadog_ipc::tarpc::server::{Channel, InFlightRequest}; use datadog_remote_config::fetch::ConfigInvariants; use datadog_trace_utils::tracer_header_tags::TracerHeaderTags; +use dogstatsd_client::{new_flusher, DogStatsDActionOwned}; use tinybytes; type NoResponse = Ready<()>; @@ -670,9 +670,8 @@ impl SidecarInterface for SidecarServer { cfg.set_endpoint(endpoint).ok(); }); session.configure_dogstatsd(|dogstatsd| { - dogstatsd - .set_endpoint(config.dogstatsd_endpoint.clone()) - .ok(); + let d = new_flusher(config.dogstatsd_endpoint.clone()).ok(); + *dogstatsd = d; }); session.set_remote_config_invariants(ConfigInvariants { language: config.language, @@ -848,12 +847,13 @@ impl SidecarInterface for SidecarServer { self, _: Context, instance_id: InstanceId, - actions: Vec, + actions: Vec, ) -> Self::SendDogstatsdActionsFut { tokio::spawn(async move { self.get_session(&instance_id.session_id) .get_dogstatsd() - .send(actions); + .as_ref() + .inspect(|f| f.send_owned(actions)); }); no_response() @@ -901,9 +901,10 @@ impl SidecarInterface for SidecarServer { session.modify_trace_config(|cfg| { update_cfg(cfg.endpoint.take(), |e| cfg.set_endpoint(e), &token); }); - session.configure_dogstatsd(|cfg| { - update_cfg(cfg.endpoint.take(), |e| cfg.set_endpoint(e), &token); - }); + // TODO(APMSP-1377): the dogstatsd-client doesn't support test_session tokens yet + // session.configure_dogstatsd(|cfg| { + // update_cfg(cfg.endpoint.take(), |e| cfg.set_endpoint(e), &token); + // }); no_response() } diff --git a/tools/docker/Dockerfile.build b/tools/docker/Dockerfile.build index 13cc2cfac..01b4ae2d9 100644 --- a/tools/docker/Dockerfile.build +++ b/tools/docker/Dockerfile.build @@ -78,6 +78,7 @@ COPY "ddtelemetry/Cargo.toml" "ddtelemetry/" COPY "ddtelemetry-ffi/Cargo.toml" "ddtelemetry-ffi/" COPY "ddsketch/Cargo.toml" "ddsketch/" COPY "dogstatsd/Cargo.toml" "dogstatsd/" +COPY "dogstatsd-client/Cargo.toml" "dogstatsd-client/" COPY "dynamic-configuration/Cargo.toml" "dynamic-configuration/" COPY "profiling/Cargo.toml" "profiling/" COPY "profiling-ffi/Cargo.toml" "profiling-ffi/"