Skip to content

Commit

Permalink
PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ajgajg1134 committed Sep 20, 2024
1 parent ea71062 commit 9e78eef
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 59 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ default-members = [
"data-pipeline-ffi",
"ddsketch",
"tinybytes",
"dogstatsd-client",
]

# https://doc.rust-lang.org/cargo/reference/resolver.html#feature-resolver-version-2
Expand Down
11 changes: 11 additions & 0 deletions data-pipeline/src/health_metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/// health_metrics holds data to emit info about the health of the data-pipeline

pub(crate) const STAT_SEND_TRACES: &str = "datadog.libdatadog.send.traces";
pub(crate) const STAT_SEND_TRACES_ERRORS: &str = "datadog.libdatadog.send.traces.errors";
pub(crate) const STAT_DESER_TRACES: &str = "datadog.libdatadog.deser_traces";
pub(crate) const STAT_DESER_TRACES_ERRORS: &str = "datadog.libdatadog.deser_traces.errors";
pub(crate) const STAT_SER_TRACES_ERRORS: &str = "datadog.libdatadog.ser_traces.errors";

pub(crate) enum HealthMetric {
Count(&'static str, i64),
}
1 change: 1 addition & 0 deletions data-pipeline/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//! project at this state is to provide a basic API in order to test its viability and integration
//! in different languages.

mod health_metrics;
#[allow(missing_docs)]
pub mod span_concentrator;
#[allow(missing_docs)]
Expand Down
76 changes: 47 additions & 29 deletions data-pipeline/src/trace_exporter.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0
use crate::{span_concentrator::SpanConcentrator, stats_exporter};
use crate::{
health_metrics, health_metrics::HealthMetric, span_concentrator::SpanConcentrator,
stats_exporter,
};
use bytes::Bytes;
use datadog_trace_protobuf::pb;
use datadog_trace_utils::trace_utils::{
Expand Down Expand Up @@ -46,13 +49,6 @@ pub enum TraceExporterOutputFormat {
V07,
}

// internal health metrics
const STAT_SEND_TRACES: &str = "datadog.libdatadog.send.traces";
const STAT_SEND_TRACES_ERRORS: &str = "datadog.libdatadog.send.traces.errors";
const STAT_DESER_TRACES: &str = "datadog.libdatadog.deser_traces";
const STAT_DESER_TRACES_ERRORS: &str = "datadog.libdatadog.deser_traces.errors";
const STAT_SER_TRACES_ERRORS: &str = "datadog.libdatadog.ser_traces.errors";

impl TraceExporterOutputFormat {
/// Add the agent trace endpoint path to the URL.
fn add_path(&self, url: &Uri) -> Uri {
Expand Down Expand Up @@ -158,11 +154,6 @@ impl<'a> From<&'a TracerTags> for HashMap<&'static str, String> {
}
}

enum HealthMetric {
Count(&'static str, i64),
//TODO: Add more DogStatsDAction as we need them
}

enum StatsComputationStatus {
StatsDisabled,
StatsEnabled {
Expand Down Expand Up @@ -199,7 +190,7 @@ pub struct TraceExporter {
// TODO - do something with the response callback - https://datadoghq.atlassian.net/browse/APMSP-1019
_response_callback: Option<Box<dyn ResponseCallback>>,
runtime: Runtime,
// None if dogstatsd is disabled
/// None if dogstatsd is disabled
dogstatsd: Option<Flusher>,
common_stats_tags: Vec<Tag>,
client_computed_top_level: bool,
Expand Down Expand Up @@ -295,37 +286,55 @@ impl TraceExporter {
{
Ok(response) => {
let response_status = response.status();
if response_status != http::StatusCode::OK {
if !response_status.is_success() {
let body_bytes = hyper::body::to_bytes(response.into_body()).await?;
let response_body =
String::from_utf8(body_bytes.to_vec()).unwrap_or_default();
self.emit_metric(
HealthMetric::Count(STAT_SEND_TRACES_ERRORS, 1),
Some(vec![
&Tag::new("response_code", response_status.as_str()).unwrap()
]),
);
let resp_tag_res = &Tag::new("response_code", response_status.as_str());
match resp_tag_res {
Ok(resp_tag) => {
self.emit_metric(
HealthMetric::Count(
health_metrics::STAT_SEND_TRACES_ERRORS,
1,
),
Some(vec![&resp_tag]),
);
}
Err(tag_err) => {
// This should really never happen as response_status is a
// `NonZeroU16`, but if the response status or tag requirements
// ever change in the future we still don't want to panic.
error!("Failed to serialize response_code to tag {}", tag_err)
}
}
anyhow::bail!("Agent did not accept traces: {response_body}");
}
match hyper::body::to_bytes(response.into_body()).await {
Ok(body) => {
self.emit_metric(
HealthMetric::Count(STAT_SEND_TRACES, trace_count as i64),
HealthMetric::Count(
health_metrics::STAT_SEND_TRACES,
trace_count as i64,
),
None,
);
Ok(String::from_utf8_lossy(&body).to_string())
}
Err(err) => {
self.emit_metric(
HealthMetric::Count(STAT_SEND_TRACES_ERRORS, 1),
HealthMetric::Count(health_metrics::STAT_SEND_TRACES_ERRORS, 1),
None,
);
anyhow::bail!("Error reading agent response body: {err}");
}
}
}
Err(err) => {
self.emit_metric(HealthMetric::Count(STAT_SEND_TRACES_ERRORS, 1), None);
self.emit_metric(
HealthMetric::Count(health_metrics::STAT_SEND_TRACES_ERRORS, 1),
None,
);
anyhow::bail!("Failed to send traces: {err}")
}
}
Expand Down Expand Up @@ -375,7 +384,10 @@ impl TraceExporter {
Ok(res) => res,
Err(err) => {
error!("Error deserializing trace from request body: {err}");
self.emit_metric(HealthMetric::Count(STAT_DESER_TRACES_ERRORS, 1), None);
self.emit_metric(
HealthMetric::Count(health_metrics::STAT_DESER_TRACES_ERRORS, 1),
None,
);
return Ok(String::from("{}"));
}
};
Expand All @@ -386,7 +398,7 @@ impl TraceExporter {
}

self.emit_metric(
HealthMetric::Count(STAT_DESER_TRACES, traces.len() as i64),
HealthMetric::Count(health_metrics::STAT_DESER_TRACES, traces.len() as i64),
None,
);

Expand All @@ -409,7 +421,10 @@ impl TraceExporter {
TraceExporterOutputFormat::V04 => rmp_serde::to_vec_named(&traces)
.map_err(|err| {
error!("Error serializing traces: {err}");
self.emit_metric(HealthMetric::Count(STAT_SER_TRACES_ERRORS, 1), None);
self.emit_metric(
HealthMetric::Count(health_metrics::STAT_SER_TRACES_ERRORS, 1),
None,
);
String::from("{}")
})
.and_then(|res| {
Expand Down Expand Up @@ -440,15 +455,18 @@ impl TraceExporter {
Err(err) => {
error!("Error reading agent response body: {err}");
self.emit_metric(
HealthMetric::Count(STAT_SEND_TRACES_ERRORS, 1),
HealthMetric::Count(health_metrics::STAT_SEND_TRACES_ERRORS, 1),
None,
);
Ok(String::from("{}"))
}
},
Err(err) => {
error!("Error sending traces: {err}");
self.emit_metric(HealthMetric::Count(STAT_SEND_TRACES_ERRORS, 1), None);
self.emit_metric(
HealthMetric::Count(health_metrics::STAT_SEND_TRACES_ERRORS, 1),
None,
);
Ok(String::from("{}"))
}
}
Expand Down
35 changes: 13 additions & 22 deletions dogstatsd-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,20 +67,17 @@ pub enum DogStatsDAction<'a, T: AsRef<str>, V: IntoIterator<Item = &'a Tag>> {
Set(T, i64, V),
}

/// A dogstatsd-client that flushes stats to a given endpoint.
/// The default value has no address and is thus disabled, use `new_flusher` or `set_endpoint` to
/// configure an endpoint.
#[derive(Default)]
/// A dogstatsd-client that flushes stats to a given endpoint. Use `new_flusher` to build one.
pub struct Flusher {
client: Option<StatsdClient>,
client: StatsdClient,
}

/// Build a new flusher instance pointed at the provided endpoint.
/// Returns error if the provided endpoint is not valid.
pub fn new_flusher(endpoint: Endpoint) -> anyhow::Result<Flusher> {
let mut f = Flusher::default();
f.set_endpoint(endpoint)?;
Ok(f)
Ok(Flusher {
client: create_client(&endpoint)?,
})
}

impl Flusher {
Expand All @@ -91,11 +88,11 @@ impl Flusher {
self.client = match endpoint.api_key {
Some(_) => {
info!("DogStatsD is not available in agentless mode");
None
anyhow::bail!("DogStatsD is not available in agentless mode");
}
None => {
debug!("Updating DogStatsD endpoint to {}", endpoint.url);
Some(create_client(&endpoint)?)
create_client(&endpoint)?
}
};
Ok(())
Expand All @@ -104,10 +101,7 @@ impl Flusher {
/// Send a vector of DogStatsDActionOwned, this is the same as `send` except it uses the "owned"
/// version of DogStatsDAction. See the docs for DogStatsDActionOwned for details.
pub fn send_owned(&self, actions: Vec<DogStatsDActionOwned>) {
if self.client.is_none() {
return;
}
let client = self.client.as_ref().unwrap();
let client = &self.client;

for action in actions {
if let Err(err) = match action {
Expand Down Expand Up @@ -138,10 +132,7 @@ impl Flusher {
&self,
actions: Vec<DogStatsDAction<'a, T, V>>,
) {
if self.client.is_none() {
return;
}
let client = self.client.as_ref().unwrap();
let client = &self.client;

for action in actions {
if let Err(err) = match action {
Expand Down Expand Up @@ -228,7 +219,7 @@ fn create_client(endpoint: &Endpoint) -> anyhow::Result<StatsdClient> {
#[cfg(test)]
mod test {
use crate::DogStatsDAction::{Count, Distribution, Gauge, Histogram, Set};
use crate::{create_client, DogStatsDActionOwned, Flusher};
use crate::{create_client, new_flusher, DogStatsDActionOwned};
#[cfg(unix)]
use ddcommon::connector::uds::socket_path_to_uri;
use ddcommon::{tag, Endpoint};
Expand All @@ -243,10 +234,10 @@ mod test {
let socket = net::UdpSocket::bind("127.0.0.1:0").expect("failed to bind host socket");
let _ = socket.set_read_timeout(Some(Duration::from_millis(500)));

let mut flusher = Flusher::default();
_ = flusher.set_endpoint(Endpoint::from_slice(
let flusher = new_flusher(Endpoint::from_slice(
socket.local_addr().unwrap().to_string().as_str(),
));
))
.unwrap();
flusher.send(vec![
Count("test_count", 3, &vec![tag!("foo", "bar")]),
Count("test_neg_count", -2, &vec![]),
Expand Down
6 changes: 3 additions & 3 deletions sidecar/src/service/session_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub(crate) struct SessionInfo {
runtimes: Arc<Mutex<HashMap<String, RuntimeInfo>>>,
pub(crate) session_config: Arc<Mutex<Option<ddtelemetry::config::Config>>>,
tracer_config: Arc<Mutex<tracer::Config>>,
dogstatsd: Arc<Mutex<dogstatsd_client::Flusher>>,
dogstatsd: Arc<Mutex<Option<dogstatsd_client::Flusher>>>,
remote_config_invariants: Arc<Mutex<Option<ConfigInvariants>>>,
#[cfg(windows)]
pub(crate) remote_config_notify_function:
Expand Down Expand Up @@ -168,13 +168,13 @@ impl SessionInfo {
f(&mut self.get_trace_config());
}

pub(crate) fn get_dogstatsd(&self) -> MutexGuard<dogstatsd_client::Flusher> {
pub(crate) fn get_dogstatsd(&self) -> MutexGuard<Option<dogstatsd_client::Flusher>> {
self.dogstatsd.lock().unwrap()
}

pub(crate) fn configure_dogstatsd<F>(&self, f: F)
where
F: FnOnce(&mut dogstatsd_client::Flusher),
F: FnOnce(&mut Option<dogstatsd_client::Flusher>),
{
f(&mut self.get_dogstatsd());
}
Expand Down
10 changes: 5 additions & 5 deletions sidecar/src/service/sidecar_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use datadog_ipc::platform::FileBackedHandle;
use datadog_ipc::tarpc::server::{Channel, InFlightRequest};
use datadog_remote_config::fetch::ConfigInvariants;
use datadog_trace_utils::tracer_header_tags::TracerHeaderTags;
use dogstatsd_client::DogStatsDActionOwned;
use dogstatsd_client::{new_flusher, DogStatsDActionOwned};
use tinybytes;

type NoResponse = Ready<()>;
Expand Down Expand Up @@ -670,9 +670,8 @@ impl SidecarInterface for SidecarServer {
cfg.set_endpoint(endpoint).ok();
});
session.configure_dogstatsd(|dogstatsd| {
dogstatsd
.set_endpoint(config.dogstatsd_endpoint.clone())
.ok();
let d = new_flusher(config.dogstatsd_endpoint.clone()).ok();
*dogstatsd = d;
});
session.set_remote_config_invariants(ConfigInvariants {
language: config.language,
Expand Down Expand Up @@ -853,7 +852,8 @@ impl SidecarInterface for SidecarServer {
tokio::spawn(async move {
self.get_session(&instance_id.session_id)
.get_dogstatsd()
.send_owned(actions);
.as_ref()
.inspect(|f| f.send_owned(actions));
});

no_response()
Expand Down

0 comments on commit 9e78eef

Please sign in to comment.