From 10b636a1964a381ec65c45353db16fa296426849 Mon Sep 17 00:00:00 2001 From: neuronull Date: Mon, 7 Nov 2022 11:38:17 -0700 Subject: [PATCH] fix(datadog_traces sink): APM stats payloads are sent independent of trace payloads and at a set interval. (#15084) - APM stats are now "flushed" in a separate thread, detached from the sink's stream loop. - The stats flushing thread flushes the oldest bucket every 10 seconds, and caches the last two 10 second buckets. - When sink is shutting down, the APM stats thread flushes remaining buckets. --- .../docker-compose.datadog-traces.yml | 10 +- src/internal_events/datadog_traces.rs | 24 + .../datadog/traces/apm_stats/aggregation.rs | 423 +++++++++++ src/sinks/datadog/traces/apm_stats/bucket.rs | 187 +++++ src/sinks/datadog/traces/apm_stats/flusher.rs | 176 +++++ .../traces/apm_stats/integration_tests.rs | 427 +++++++++++ src/sinks/datadog/traces/apm_stats/mod.rs | 123 +++ src/sinks/datadog/traces/apm_stats/weight.rs | 95 +++ src/sinks/datadog/traces/config.rs | 46 +- src/sinks/datadog/traces/integration_tests.rs | 432 +---------- src/sinks/datadog/traces/mod.rs | 2 +- src/sinks/datadog/traces/request_builder.rs | 156 ++-- src/sinks/datadog/traces/sink.rs | 19 +- src/sinks/datadog/traces/stats.rs | 699 ------------------ src/sinks/datadog/traces/tests.rs | 7 +- 15 files changed, 1591 insertions(+), 1235 deletions(-) create mode 100644 src/sinks/datadog/traces/apm_stats/aggregation.rs create mode 100644 src/sinks/datadog/traces/apm_stats/bucket.rs create mode 100644 src/sinks/datadog/traces/apm_stats/flusher.rs create mode 100644 src/sinks/datadog/traces/apm_stats/integration_tests.rs create mode 100644 src/sinks/datadog/traces/apm_stats/mod.rs create mode 100644 src/sinks/datadog/traces/apm_stats/weight.rs delete mode 100644 src/sinks/datadog/traces/stats.rs diff --git a/scripts/integration/docker-compose.datadog-traces.yml b/scripts/integration/docker-compose.datadog-traces.yml index a0d47b671350b..cf4060493b72a 100644 --- a/scripts/integration/docker-compose.datadog-traces.yml +++ b/scripts/integration/docker-compose.datadog-traces.yml @@ -2,7 +2,9 @@ version: "3" services: datadog-trace-agent: - image: docker.io/datadog/agent:7.39.1 + # "latest" tag is always the most recent stable release + # this is a fallback in case "breaking" changes in the Agent aren't surfaced to Vector + image: docker.io/datadog/agent:latest networks: - backend environment: @@ -20,8 +22,11 @@ services: - DD_APM_ERROR_TPS=999999 - DD_APM_MAX_MEMORY=0 - DD_APM_MAX_CPU_PERCENT=0 + - DD_HOSTNAME=datadog-trace-agent datadog-trace-agent-to-vector: - image: docker.io/datadog/agent:7.39.1 + # "latest" tag is always the most recent stable release + # this is a fallback in case "breaking" changes in the Agent aren't surfaced to Vector + image: docker.io/datadog/agent:latest networks: - backend environment: @@ -39,6 +44,7 @@ services: - DD_APM_ERROR_TPS=999999 - DD_APM_MAX_MEMORY=0 - DD_APM_MAX_CPU_PERCENT=0 + - DD_HOSTNAME=datadog-trace-agent-to-vector runner: build: context: ${PWD} diff --git a/src/internal_events/datadog_traces.rs b/src/internal_events/datadog_traces.rs index eb58b0903b92c..e6724a9adb484 100644 --- a/src/internal_events/datadog_traces.rs +++ b/src/internal_events/datadog_traces.rs @@ -38,3 +38,27 @@ impl InternalEvent for DatadogTracesEncodingError { } } } + +#[derive(Debug)] +pub struct DatadogTracesAPMStatsError { + pub error: E, +} + +impl InternalEvent for DatadogTracesAPMStatsError { + fn emit(self) { + error!( + message = "Failed sending APM stats payload.", + error = %self.error, + error_type = error_type::WRITER_FAILED, + stage = error_stage::SENDING, + internal_log_rate_limit = true, + ); + counter!( + "component_errors_total", 1, + "error_type" => error_type::WRITER_FAILED, + "stage" => error_stage::SENDING, + ); + + // No dropped events because APM stats payloads are not considered events. + } +} diff --git a/src/sinks/datadog/traces/apm_stats/aggregation.rs b/src/sinks/datadog/traces/apm_stats/aggregation.rs new file mode 100644 index 0000000000000..b6a7551e2c77a --- /dev/null +++ b/src/sinks/datadog/traces/apm_stats/aggregation.rs @@ -0,0 +1,423 @@ +use std::{collections::BTreeMap, sync::Arc}; + +use chrono::Utc; + +use super::{ + bucket::Bucket, ClientStatsBucket, ClientStatsPayload, PartitionKey, + BUCKET_DURATION_NANOSECONDS, +}; +use crate::event::{TraceEvent, Value}; + +const MEASURED_KEY: &str = "_dd.measured"; +const PARTIAL_VERSION_KEY: &str = "_dd.partial_version"; +const TAG_STATUS_CODE: &str = "http.status_code"; +const TAG_SYNTHETICS: &str = "synthetics"; +const TOP_LEVEL_KEY: &str = "_top_level"; + +/// The number of bucket durations to keep in memory before flushing them. +const BUCKET_WINDOW_LEN: u64 = 2; + +#[derive(PartialEq, Eq, PartialOrd, Ord)] +pub(crate) struct AggregationKey { + pub(crate) payload_key: PayloadAggregationKey, + pub(crate) bucket_key: BucketAggregationKey, +} + +impl AggregationKey { + fn new_aggregation_from_span( + span: &BTreeMap, + payload_key: PayloadAggregationKey, + synthetics: bool, + ) -> Self { + AggregationKey { + payload_key: payload_key.with_span_context(span), + bucket_key: BucketAggregationKey { + service: span + .get("service") + .map(|v| v.to_string_lossy()) + .unwrap_or_default(), + name: span + .get("name") + .map(|v| v.to_string_lossy()) + .unwrap_or_default(), + resource: span + .get("resource") + .map(|v| v.to_string_lossy()) + .unwrap_or_default(), + ty: span + .get("type") + .map(|v| v.to_string_lossy()) + .unwrap_or_default(), + status_code: span + .get("meta") + .and_then(|m| m.as_object()) + .and_then(|m| m.get(TAG_STATUS_CODE)) + // the meta field is supposed to be a string/string map + .and_then(|s| s.to_string_lossy().parse::().ok()) + .unwrap_or_default(), + synthetics, + }, + } + } +} + +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)] +pub(crate) struct PayloadAggregationKey { + pub(crate) env: String, + pub(crate) hostname: String, + pub(crate) version: String, + pub(crate) container_id: String, +} + +impl PayloadAggregationKey { + fn with_span_context(self, span: &BTreeMap) -> Self { + PayloadAggregationKey { + env: span + .get("meta") + .and_then(|m| m.as_object()) + .and_then(|m| m.get("env")) + .map(|s| s.to_string_lossy()) + .unwrap_or(self.env), + hostname: self.hostname, + version: self.version, + container_id: self.container_id, + } + } +} + +#[derive(PartialEq, Eq, PartialOrd, Ord)] +pub(crate) struct BucketAggregationKey { + pub(crate) service: String, + pub(crate) name: String, + pub(crate) resource: String, + pub(crate) ty: String, + pub(crate) status_code: u32, + pub(crate) synthetics: bool, +} + +pub struct Aggregator { + /// The key represents the timestamp (in nanoseconds) of the beginning of the time window (that lasts 10 seconds) on + /// which the associated bucket will calculate statistics. + buckets: BTreeMap, + + /// The oldeest timestamp we will allow for the current time bucket. + oldest_timestamp: u64, + + /// Env asociated with the Agent. + agent_env: Option, + + /// Hostname associated with the Agent. + agent_hostname: Option, + + /// Version associated with the Agent. + agent_version: Option, + + /// API key associated with the Agent. + api_key: Option>, + + /// Default API key to use if api_key not set. + default_api_key: Arc, +} + +impl Aggregator { + pub fn new(default_api_key: Arc) -> Self { + Self { + buckets: BTreeMap::new(), + oldest_timestamp: align_timestamp(Utc::now().timestamp_nanos() as u64), + default_api_key, + // We can't know the below fields until have received a trace event + agent_env: None, + agent_hostname: None, + agent_version: None, + api_key: None, + } + } + + /// Updates cached properties from the Agent. + pub(crate) fn update_agent_properties(&mut self, partition_key: &PartitionKey) { + if self.agent_env.is_none() { + if let Some(env) = &partition_key.env { + self.agent_env = Some(env.clone()); + } + } + if self.agent_hostname.is_none() { + if let Some(hostname) = &partition_key.hostname { + self.agent_hostname = Some(hostname.clone()); + } + } + if self.agent_version.is_none() { + if let Some(version) = &partition_key.agent_version { + self.agent_version = Some(version.clone()); + } + } + if self.api_key.is_none() { + if let Some(api_key) = &partition_key.api_key { + self.api_key = Some(Arc::::clone(api_key)); + } + } + } + + pub(crate) fn get_agent_env(&self) -> String { + self.agent_env.clone().unwrap_or_default() + } + + pub(crate) fn get_agent_hostname(&self) -> String { + self.agent_hostname.clone().unwrap_or_default() + } + + pub(crate) fn get_agent_version(&self) -> String { + self.agent_version.clone().unwrap_or_default() + } + + pub(crate) fn get_api_key(&self) -> Arc { + self.api_key + .clone() + .unwrap_or_else(|| Arc::clone(&self.default_api_key)) + } + + /// Iterates over a trace's constituting spans and upon matching conditions it updates statistics (mostly using the top level span). + pub(crate) fn handle_trace(&mut self, partition_key: &PartitionKey, trace: &TraceEvent) { + // Based on https://github.com/DataDog/datadog-agent/blob/cfa750c7412faa98e87a015f8ee670e5828bbe7f/pkg/trace/stats/concentrator.go#L148-L184 + + let spans = match trace.get("spans") { + Some(Value::Array(v)) => v.iter().filter_map(|s| s.as_object()).collect(), + _ => vec![], + }; + + let weight = super::weight::extract_weight_from_root_span(&spans); + let payload_aggkey = PayloadAggregationKey { + env: partition_key.env.clone().unwrap_or_default(), + hostname: partition_key.hostname.clone().unwrap_or_default(), + version: trace + .get("app_version") + .map(|v| v.to_string_lossy()) + .unwrap_or_default(), + container_id: trace + .get("container_id") + .map(|v| v.to_string_lossy()) + .unwrap_or_default(), + }; + let synthetics = trace + .get("origin") + .map(|v| v.to_string_lossy().starts_with(TAG_SYNTHETICS)) + .unwrap_or(false); + + spans.iter().for_each(|span| { + let is_top = has_top_level(span); + if !(is_top || is_measured(span) || is_partial_snapshot(span)) { + return; + } + + self.handle_span(span, weight, is_top, synthetics, payload_aggkey.clone()); + }); + } + + /// Aggregates statistics per key over 10 seconds windows. + /// The key is constructed from various span/trace properties (see `AggregationKey`). + fn handle_span( + &mut self, + span: &BTreeMap, + weight: f64, + is_top: bool, + synthetics: bool, + payload_aggkey: PayloadAggregationKey, + ) { + // Based on: https://github.com/DataDog/datadog-agent/blob/cfa750c7412faa98e87a015f8ee670e5828bbe7f/pkg/trace/stats/statsraw.go#L147-L182 + + let aggkey = AggregationKey::new_aggregation_from_span(span, payload_aggkey, synthetics); + + let start = match span.get("start") { + Some(Value::Timestamp(val)) => val.timestamp_nanos() as u64, + _ => Utc::now().timestamp_nanos() as u64, + }; + + let duration = match span.get("duration") { + Some(Value::Integer(val)) => *val as u64, + None => 0, + _ => panic!("`duration` should be an i64"), + }; + + let end = start + duration; + + // 10 second bucket window + let mut btime = align_timestamp(end); + + // If too far in the past, use the oldest-allowed time bucket instead + if btime < self.oldest_timestamp { + btime = self.oldest_timestamp + } + + match self.buckets.get_mut(&btime) { + Some(b) => { + b.add(span, weight, is_top, aggkey); + } + None => { + let mut b = Bucket { + start: btime, + duration: BUCKET_DURATION_NANOSECONDS, + data: BTreeMap::new(), + }; + b.add(span, weight, is_top, aggkey); + + debug!("Created {} start_time bucket.", btime); + self.buckets.insert(btime, b); + } + } + } + + /// Flushes the bucket cache. + /// We cache and can compute stats only for the last `BUCKET_WINDOW_LEN * BUCKET_DURATION_NANOSECONDS` and after such time, + /// buckets are then flushed. This only applies to past buckets. Stats buckets in the future are cached with no restriction. + /// + /// # Arguments + /// + /// * `force` - If true, all cached buckets are flushed. + pub(crate) fn flush(&mut self, force: bool) -> Vec { + // Based on https://github.com/DataDog/datadog-agent/blob/cfa750c7412faa98e87a015f8ee670e5828bbe7f/pkg/trace/stats/concentrator.go#L38-L41 + // , and https://github.com/DataDog/datadog-agent/blob/cfa750c7412faa98e87a015f8ee670e5828bbe7f/pkg/trace/stats/concentrator.go#L195-L207 + + let now = Utc::now().timestamp_nanos() as u64; + + let flush_cutoff_time = if force { + // flush all the remaining buckets (the Vector process is exiting) + now + } else { + // maintain two buckets in the cache during normal operation + now - (BUCKET_DURATION_NANOSECONDS * BUCKET_WINDOW_LEN) + }; + + let client_stats_payloads = self.get_client_stats_payloads(flush_cutoff_time); + + // update the oldest_timestamp allowed, to prevent having stats for an already flushed + // bucket + let new_oldest_ts = + align_timestamp(now) - ((BUCKET_WINDOW_LEN - 1) * BUCKET_DURATION_NANOSECONDS); + + if new_oldest_ts > self.oldest_timestamp { + debug!("Updated oldest_timestamp to {}.", new_oldest_ts); + self.oldest_timestamp = new_oldest_ts; + } + + client_stats_payloads + } + + /// Builds the array of ClientStatsPayloads will be sent out as part of the StatsPayload. + /// + /// # Arguments + /// + /// * `flush_cutoff_time` - Timestamp in nanos to use to determine what buckets to keep in the cache and which to export. + fn get_client_stats_payloads(&mut self, flush_cutoff_time: u64) -> Vec { + let client_stats_buckets = self.export_buckets(flush_cutoff_time); + + client_stats_buckets + .into_iter() + .map(|(payload_aggkey, csb)| { + ClientStatsPayload { + env: payload_aggkey.env, + hostname: payload_aggkey.hostname, + container_id: payload_aggkey.container_id, + version: payload_aggkey.version, + stats: csb, + // All the following fields are left unset by the trace-agent: + // https://github.com/DataDog/datadog-agent/blob/42e72dd/pkg/trace/stats/concentrator.go#L216-L227 + service: "".to_string(), + agent_aggregation: "".to_string(), + sequence: 0, + runtime_id: "".to_string(), + lang: "".to_string(), + tracer_version: "".to_string(), + tags: vec![], + } + }) + .collect::>() + } + + /// Exports the buckets that began before `flush_cutoff_time` and purges them from the cache. + /// + /// # Arguments + /// + /// * `flush_cutoff_time` - Timestamp in nanos to use to determine what buckets to keep in the cache and which to export. + fn export_buckets( + &mut self, + flush_cutoff_time: u64, + ) -> BTreeMap> { + let mut m = BTreeMap::>::new(); + + self.buckets.retain(|&bucket_start, bucket| { + let retain = bucket_start > flush_cutoff_time; + + if !retain { + debug!("Flushing {} start_time bucket.", bucket_start); + + bucket.export().into_iter().for_each(|(payload_key, csb)| { + match m.get_mut(&payload_key) { + None => { + m.insert(payload_key.clone(), vec![csb]); + } + Some(s) => { + s.push(csb); + } + }; + }) + } + retain + }); + + m + } +} + +/// Returns the provided timestamp truncated to the bucket size. +/// This is the start time of the time bucket in which such timestamp falls. +const fn align_timestamp(start: u64) -> u64 { + // Based on https://github.com/DataDog/datadog-agent/blob/cfa750c7412faa98e87a015f8ee670e5828bbe7f/pkg/trace/stats/concentrator.go#L232-L234 + start - (start % BUCKET_DURATION_NANOSECONDS) +} + +/// Assumes that all metrics are all encoded as Value::Float. +/// Return the f64 of the specified key or None of key not present. +fn get_metric_value_float(span: &BTreeMap, key: &str) -> Option { + span.get("metrics") + .and_then(|m| m.as_object()) + .map(|m| match m.get(key) { + Some(Value::Float(f)) => Some(f.into_inner()), + None => None, + _ => panic!("`metric` values should be all be f64"), + }) + .unwrap_or(None) +} + +/// Returns true if the value of this metric is equal to 1.0 +fn metric_value_is_1(span: &BTreeMap, key: &str) -> bool { + match get_metric_value_float(span, key) { + Some(f) => f == 1.0, + None => false, + } +} + +/// Returns true if span is top-level. +fn has_top_level(span: &BTreeMap) -> bool { + // Based on: https://github.com/DataDog/datadog-agent/blob/cfa750c7412faa98e87a015f8ee670e5828bbe7f/pkg/trace/traceutil/span.go#L28-L31 + + metric_value_is_1(span, TOP_LEVEL_KEY) +} + +/// Returns true if a span should be measured (i.e. it should get trace metrics calculated). +fn is_measured(span: &BTreeMap) -> bool { + // Based on https://github.com/DataDog/datadog-agent/blob/cfa750c7412faa98e87a015f8ee670e5828bbe7f/pkg/trace/traceutil/span.go#L40-L43 + + metric_value_is_1(span, MEASURED_KEY) +} + +/// Returns true if the span is a partial snapshot. +/// These types of spans are partial images of long-running spans. +/// When incomplete, a partial snapshot has a metric _dd.partial_version which is a positive integer. +/// The metric usually increases each time a new version of the same span is sent by the tracer +fn is_partial_snapshot(span: &BTreeMap) -> bool { + // Based on: https://github.com/DataDog/datadog-agent/blob/cfa750c7412faa98e87a015f8ee670e5828bbe7f/pkg/trace/traceutil/span.go#L49-L52 + + match get_metric_value_float(span, PARTIAL_VERSION_KEY) { + Some(f) => f >= 0.0, + None => false, + } +} diff --git a/src/sinks/datadog/traces/apm_stats/bucket.rs b/src/sinks/datadog/traces/apm_stats/bucket.rs new file mode 100644 index 0000000000000..25b2c4a17d01b --- /dev/null +++ b/src/sinks/datadog/traces/apm_stats/bucket.rs @@ -0,0 +1,187 @@ +use std::collections::BTreeMap; + +use prost::Message; + +use super::{ + aggregation::{AggregationKey, PayloadAggregationKey}, + ddsketch_full, ClientGroupedStats, ClientStatsBucket, +}; +use crate::{event::Value, metrics::AgentDDSketch}; + +pub(crate) struct GroupedStats { + hits: f64, + top_level_hits: f64, + errors: f64, + duration: f64, + ok_distribution: AgentDDSketch, + err_distribution: AgentDDSketch, +} + +impl GroupedStats { + fn new() -> Self { + GroupedStats { + hits: 0.0, + top_level_hits: 0.0, + errors: 0.0, + duration: 0.0, + ok_distribution: AgentDDSketch::with_agent_defaults(), + err_distribution: AgentDDSketch::with_agent_defaults(), + } + } + + fn export(&self, key: &AggregationKey) -> ClientGroupedStats { + ClientGroupedStats { + service: key.bucket_key.service.clone(), + name: key.bucket_key.name.clone(), + resource: key.bucket_key.resource.clone(), + http_status_code: key.bucket_key.status_code, + r#type: key.bucket_key.ty.clone(), + db_type: "".to_string(), + hits: self.hits.round() as u64, + errors: self.errors.round() as u64, + duration: self.duration.round() as u64, + ok_summary: encode_sketch(&self.ok_distribution), + error_summary: encode_sketch(&self.err_distribution), + synthetics: key.bucket_key.synthetics, + top_level_hits: self.top_level_hits.round() as u64, + } + } +} + +/// Convert agent sketch variant to ./proto/dd_sketch_full.proto +fn encode_sketch(agent_sketch: &AgentDDSketch) -> Vec { + // AgentDDSketch partitions the set of real numbers into intervals like [gamma^(n), gamma^(n+1)[, + let index_mapping = ddsketch_full::IndexMapping { + // This is the gamma value used to build the aforementioned partition scheme + gamma: agent_sketch.gamma(), + // This offset is applied to the powers of gamma to adjust sketch accuracy + index_offset: agent_sketch.bin_index_offset() as f64, + // Interpolation::None is the interpolation type as there is no interpolation when using the + // aforementioned partition scheme + interpolation: ddsketch_full::index_mapping::Interpolation::None as i32, + }; + + // zeroes depicts the number of values that falled around zero based on the sketch local accuracy + // positives and negatives stores are repectively storing positive and negative values using the + // exact same mechanism. + let (positives, negatives, zeroes) = convert_stores(agent_sketch); + let positives_store = ddsketch_full::Store { + bin_counts: positives, + contiguous_bin_counts: Vec::new(), // Empty as this not used for the current interpolation (Interpolation::None) + contiguous_bin_index_offset: 0, // Empty as this not used for the current interpolation (Interpolation::None) + }; + let negatives_store = ddsketch_full::Store { + bin_counts: negatives, + contiguous_bin_counts: Vec::new(), // Empty as this not used for the current interpolation (Interpolation::None) + contiguous_bin_index_offset: 0, // Empty as this not used for the current interpolation (Interpolation::None) + }; + ddsketch_full::DdSketch { + mapping: Some(index_mapping), + positive_values: Some(positives_store), + negative_values: Some(negatives_store), + zero_count: zeroes, + } + .encode_to_vec() +} + +/// Split negative and positive values from an AgentDDSketch, also extract the number of values +/// that were accounted as 0.0. +fn convert_stores(agent_sketch: &AgentDDSketch) -> (BTreeMap, BTreeMap, f64) { + let mut positives = BTreeMap::::new(); + let mut negatives = BTreeMap::::new(); + let mut zeroes = 0.0; + let bin_map = agent_sketch.bin_map(); + bin_map + .keys + .into_iter() + .zip(bin_map.counts.into_iter()) + .for_each(|(k, n)| { + match k.signum() { + 0 => zeroes = n as f64, + 1 => { + positives.insert(k as i32, n as f64); + } + -1 => { + negatives.insert((-k) as i32, n as f64); + } + _ => {} + }; + }); + (positives, negatives, zeroes) +} + +/// Stores statistics for various `AggregationKey` in a given time window +pub(crate) struct Bucket { + pub(crate) start: u64, + pub(crate) duration: u64, + pub(crate) data: BTreeMap, +} + +impl Bucket { + pub(crate) fn export(&self) -> BTreeMap { + let mut m = BTreeMap::::new(); + self.data.iter().for_each(|(k, v)| { + let b = v.export(k); + match m.get_mut(&k.payload_key) { + None => { + let sb = ClientStatsBucket { + start: self.start, + duration: self.duration, + agent_time_shift: 0, + stats: vec![b], + }; + m.insert(k.payload_key.clone(), sb); + } + Some(s) => { + s.stats.push(b); + } + }; + }); + m + } + + pub(crate) fn add( + &mut self, + span: &BTreeMap, + weight: f64, + is_top: bool, + aggkey: AggregationKey, + ) { + match self.data.get_mut(&aggkey) { + Some(gs) => Bucket::update(span, weight, is_top, gs), + None => { + let mut gs = GroupedStats::new(); + Bucket::update(span, weight, is_top, &mut gs); + self.data.insert(aggkey, gs); + } + } + } + + /// Update a bucket with a new span. Computed statistics include the number of hits and the actual distribution of + /// execution time, with isolated measurements for spans flagged as errored and spans without error. + fn update(span: &BTreeMap, weight: f64, is_top: bool, gs: &mut GroupedStats) { + is_top.then(|| { + gs.top_level_hits += weight; + }); + gs.hits += weight; + let error = match span.get("error") { + Some(Value::Integer(val)) => *val, + None => 0, + _ => panic!("`error` should be an i64"), + }; + if error != 0 { + gs.errors += weight; + } + let duration = match span.get("duration") { + Some(Value::Integer(val)) => *val, + None => 0, + _ => panic!("`duration` should be an i64"), + }; + gs.duration += (duration as f64) * weight; + if error != 0 { + gs.err_distribution.insert(duration as f64) + } else { + gs.ok_distribution.insert(duration as f64) + } + } +} diff --git a/src/sinks/datadog/traces/apm_stats/flusher.rs b/src/sinks/datadog/traces/apm_stats/flusher.rs new file mode 100644 index 0000000000000..0dcdbd885e34e --- /dev/null +++ b/src/sinks/datadog/traces/apm_stats/flusher.rs @@ -0,0 +1,176 @@ +use std::{ + io::Write, + sync::{Arc, Mutex}, +}; + +use bytes::Bytes; +use snafu::ResultExt; +use tokio::sync::oneshot::{Receiver, Sender}; +use vector_common::{finalization::EventFinalizers, request_metadata::RequestMetadata}; + +use super::{ + aggregation::Aggregator, build_request, DDTracesMetadata, DatadogTracesEndpoint, + DatadogTracesEndpointConfiguration, RequestBuilderError, StatsPayload, + BUCKET_DURATION_NANOSECONDS, +}; +use crate::{ + http::{BuildRequestSnafu, HttpClient}, + internal_events::DatadogTracesAPMStatsError, + sinks::util::{Compression, Compressor}, +}; + +/// Flushes cached APM stats buckets to Datadog on a 10 second interval. +/// When the sink signals this thread that it is shutting down, all remaining +/// buckets are flush before the thread exits. +/// +/// # arguments +/// +/// * `tripwire` - Receiver that the sink signals when shutting down. +/// * `client` - HttpClient to use in sending the stats payloads. +/// * `compression` - Compression to use when creating the HTTP requests. +/// * `endpoint_configuration` - Endpoint configuration to use when creating the HTTP requests. +/// * `aggregator` - The Aggregator object containing cached stats buckets. +pub async fn flush_apm_stats_thread( + mut tripwire: Receiver>, + client: HttpClient, + compression: Compression, + endpoint_configuration: DatadogTracesEndpointConfiguration, + aggregator: Arc>, +) { + let sender = ApmStatsSender { + client, + compression, + endpoint_configuration, + aggregator, + }; + + // flush on the same interval as the stats buckets + let mut interval = + tokio::time::interval(std::time::Duration::from_nanos(BUCKET_DURATION_NANOSECONDS)); + + debug!("Starting APM stats flushing thread."); + + loop { + tokio::select! { + + _ = interval.tick() => { + // flush the oldest bucket from the cache to Datadog + sender.flush_apm_stats(false).await; + }, + signal = &mut tripwire => match signal { + // sink has signaled us that the process is shutting down + Ok(sink_shutdown_ack_sender) => { + + debug!("APM stats flushing thread received exit condition. Flushing remaining stats before exiting."); + sender.flush_apm_stats(true).await; + + // signal the sink (who tripped the tripwire), that we are done flushing + let _ = sink_shutdown_ack_sender.send(()); + break; + } + Err(_) => { + error!( + internal_log_rate_limit = true, + message = "Tokio Sender unexpectedly dropped." + ); + break; + }, + } + } + } +} + +struct ApmStatsSender { + client: HttpClient, + compression: Compression, + endpoint_configuration: DatadogTracesEndpointConfiguration, + aggregator: Arc>, +} + +impl ApmStatsSender { + async fn flush_apm_stats(&self, force: bool) { + // explicit scope to minimize duration that the Aggregator is locked. + if let Some((payload, api_key)) = { + let mut aggregator = self.aggregator.lock().unwrap(); + let client_stats_payloads = aggregator.flush(force); + + if client_stats_payloads.is_empty() { + // no sense proceeding if no payloads to flush + None + } else { + let payload = StatsPayload { + agent_hostname: aggregator.get_agent_hostname(), + agent_env: aggregator.get_agent_env(), + stats: client_stats_payloads, + agent_version: aggregator.get_agent_version(), + client_computed: false, + }; + + Some((payload, aggregator.get_api_key())) + } + } { + if let Err(error) = self.compress_and_send(payload, api_key).await { + emit!(DatadogTracesAPMStatsError { error }); + } + } + } + + async fn compress_and_send( + &self, + payload: StatsPayload, + api_key: Arc, + ) -> Result<(), Box> { + let (metadata, compressed_payload) = self.build_apm_stats_request_data(api_key, payload)?; + + let request_metadata = RequestMetadata::default(); + let trace_api_request = build_request( + (metadata, request_metadata), + compressed_payload, + self.compression, + &self.endpoint_configuration, + ); + + let http_request = trace_api_request + .into_http_request() + .context(BuildRequestSnafu)?; + + self.client.send(http_request).await?; + + Ok(()) + } + + fn build_apm_stats_request_data( + &self, + api_key: Arc, + payload: StatsPayload, + ) -> Result<(DDTracesMetadata, Bytes), RequestBuilderError> { + let encoded_payload = + rmp_serde::to_vec_named(&payload).map_err(|e| RequestBuilderError::FailedToBuild { + message: "Encoding failed.", + reason: e.to_string(), + dropped_events: 0, + })?; + let uncompressed_size = encoded_payload.len(); + let metadata = DDTracesMetadata { + api_key, + endpoint: DatadogTracesEndpoint::APMStats, + finalizers: EventFinalizers::default(), + uncompressed_size, + content_type: "application/msgpack".to_string(), + }; + + let mut compressor = Compressor::from(self.compression); + match compressor.write_all(&encoded_payload) { + Ok(()) => { + let bytes = compressor.into_inner().freeze(); + + Ok((metadata, bytes)) + } + Err(e) => Err(RequestBuilderError::FailedToBuild { + message: "Compression failed.", + reason: e.to_string(), + dropped_events: 0, + }), + } + } +} diff --git a/src/sinks/datadog/traces/apm_stats/integration_tests.rs b/src/sinks/datadog/traces/apm_stats/integration_tests.rs new file mode 100644 index 0000000000000..46d4988b6d274 --- /dev/null +++ b/src/sinks/datadog/traces/apm_stats/integration_tests.rs @@ -0,0 +1,427 @@ +use axum::{ + body::Body, + extract::Extension, + http::{header::CONTENT_TYPE, Request}, + routing::{get, post}, + Router, +}; +use chrono::Utc; +use flate2::read::GzDecoder; +use indoc::indoc; +use rmp_serde; +use serde::Serialize; +use std::{collections::HashMap, io::Read, net::SocketAddr, sync::Arc}; +use tokio::sync::mpsc::{self, Receiver, Sender}; +use tokio::time::{sleep, Duration}; + +use crate::{ + config::ConfigBuilder, + sinks::datadog::traces::{apm_stats::StatsPayload, DatadogTracesConfig}, + sources::datadog_agent::DatadogAgentConfig, + test_util::{start_topology, trace_init}, + topology::RunningTopology, +}; + +/// The port on which the Agent will send traces to vector, and vector `datadog_agent` source will +/// listen on +fn vector_receive_port() -> u16 { + std::env::var("VECTOR_PORT") + .unwrap_or_else(|_| "8081".to_string()) + .parse::() + .unwrap() +} + +/// The port for the http server to receive data from the agent +fn server_port_for_agent() -> u16 { + std::env::var("AGENT_PORT") + .unwrap_or_else(|_| "8082".to_string()) + .parse::() + .unwrap() +} + +/// The port for the http server to receive data from vector +const fn server_port_for_vector() -> u16 { + 1234 +} + +/// The agent url to post traces to [Agent only] +fn trace_agent_only_url() -> String { + std::env::var("TRACE_AGENT_URL") + .unwrap_or_else(|_| "http://127.0.0.1:8126/v0.3/traces".to_owned()) +} + +/// The agent url to post traces to [Agent -> Vector]. +fn trace_agent_to_vector_url() -> String { + std::env::var("TRACE_AGENT_TO_VECTOR_URL") + .unwrap_or_else(|_| "http://127.0.0.1:8126/v0.3/traces".to_owned()) +} + +/// Shared state for the HTTP server +struct AppState { + name: String, + tx: Sender, +} + +/// Runs an HTTP server on the specified port. +async fn run_server(name: String, port: u16, tx: Sender) { + let state = Arc::new(AppState { + name: name.clone(), + tx, + }); + let app = Router::new() + .route("/api/v1/validate", get(validate)) + .route("/api/v0.2/traces", post(process_traces)) + .route("/api/v0.2/stats", post(process_stats)) + .layer(Extension(state)); + + let addr = SocketAddr::from(([0, 0, 0, 0], port)); + + info!("HTTP server for `{}` listening on {}", name, addr); + + axum::Server::bind(&addr) + .serve(app.into_make_service()) + .await + .unwrap(); +} + +// Needed for the sink healthcheck +async fn validate() -> &'static str { + "" +} + +/// At a later time we could parse the trace payloads from vector and the agent and compare those +/// for consistency as well. +async fn process_traces(Extension(_state): Extension>, request: Request) { + let content_type_header = request.headers().get(CONTENT_TYPE); + let content_type = content_type_header.and_then(|value| value.to_str().ok()); + + if let Some(content_type) = content_type { + if content_type.starts_with("application/x-protobuf") { + debug!("Got trace payload."); + } + } +} + +/// Process a POST request from the stats endpoint. +/// De-compresses and De-serializes the payload, then forwards it on the Sender channel. +async fn process_stats(Extension(state): Extension>, mut request: Request) { + debug!( + "`{}` server process_stats request: {:?}", + state.name, request + ); + + let content_type_header = request.headers().get(CONTENT_TYPE); + let content_type = content_type_header.and_then(|value| value.to_str().ok()); + + if let Some(content_type) = content_type { + if content_type.starts_with("application/msgpack") { + debug!("`{}` server got stats payload.", state.name); + + let body = request.body_mut(); + let compressed_body_bytes = hyper::body::to_bytes(body) + .await + .expect("could not decode body into bytes"); + + let mut gz = GzDecoder::new(compressed_body_bytes.as_ref()); + let mut decompressed_body_bytes = vec![]; + gz.read_to_end(&mut decompressed_body_bytes) + .expect("unable to decompress gzip stats payload"); + + let payload: StatsPayload = rmp_serde::from_slice(&decompressed_body_bytes).unwrap(); + + info!( + "`{}` server received and deserialized stats payload.", + state.name + ); + debug!("{:?}", payload); + + state.tx.send(payload).await.unwrap(); + } + } +} + +#[derive(Serialize)] +struct Span { + duration: i64, + error: i32, + meta: HashMap, + metrics: HashMap, + name: String, + parent_id: u64, + resource: String, + service: String, + span_id: i64, + start: i64, + trace_id: u64, + r#type: String, +} + +fn build_traces_payload(start: i64, duration: i64, span_id: i64) -> Vec> { + let parent_id = 1; + let trace_id = 2; + let resource = "a_resource"; + let service = "a_service"; + + let span = Span { + duration, + error: 0, + meta: HashMap::from([("this_is".to_string(), "so_meta".to_string())]), + metrics: HashMap::from([("_top_level".to_string(), 1.0)]), + name: "a_name".to_string(), + parent_id, + resource: resource.to_string(), + service: service.to_string(), + span_id, + start, + trace_id, + r#type: "a_type".to_string(), + }; + + vec![vec![span]] +} + +/// Sends traces into the Agent containers. +/// Send two separate requests with thin the same bucket time window to invoke the aggregation logic in the Agent. +async fn send_agent_traces(urls: &Vec, start: i64, duration: i64, span_id: i64) { + // sends a trace to each of the urls + async fn send_trace(urls: &Vec, start: i64, duration: i64, span_id: i64) -> bool { + let traces_payload = build_traces_payload(start, duration, span_id); + let client = reqwest::Client::new(); + + for url in urls { + let res = client + .post(url) + .header(CONTENT_TYPE, "application/json") + .json(&traces_payload) + .send() + .await + .unwrap(); + + if res.status() != hyper::StatusCode::OK { + error!("Error sending traces to {}, res: {:?}.", url, res); + return false; + } + info!("Sent a trace to the Agent at {}.", url); + } + true + } + + // send first set of trace data + if !send_trace(urls, start, duration, span_id).await { + panic!("can't perform checks if traces aren't accepted by agent."); + } + + sleep(Duration::from_millis(100)).await; + + // send second set of trace data + if !send_trace(urls, start, duration, span_id + 1).await { + panic!("can't perform checks if traces aren't accepted by agent."); + } +} + +/// Receives the stats payloads from the Receiver channels from both of the server instances. +/// If either of the servers does not respond with a stats payload, the test will fail. +/// The lastest received stats payload is the only one considered. This is the same logic that the +/// Datadog UI follows. +/// Wait for up to 35 seconds for the stats payload to arrive. The Agent can take some time to send +/// the stats out. +/// TODO: Looking into if there is a way to configure the agent bucket interval to force the +/// flushing to occur faster (reducing the timeout we use and overall runtime of the test). +async fn receive_the_stats( + rx_agent_only: &mut Receiver, + rx_agent_vector: &mut Receiver, +) -> (StatsPayload, StatsPayload) { + let timeout = sleep(Duration::from_secs(35)); + tokio::pin!(timeout); + + let mut stats_agent_vector = None; + let mut stats_agent_only = None; + + // wait on the receive of stats payloads. expect one from agent, two from vector. + // The second payload from vector should be the aggregate. + loop { + tokio::select! { + d1 = rx_agent_vector.recv() => { + stats_agent_vector = d1; + if stats_agent_only.is_some() && stats_agent_vector.is_some() { + break; + } + }, + d2 = rx_agent_only.recv() => { + stats_agent_only = d2; + if stats_agent_vector.is_some() && stats_agent_only.is_some() { + break; + } + }, + _ = &mut timeout => break, + } + } + + assert!( + stats_agent_vector.is_some(), + "received no stats from vector" + ); + assert!(stats_agent_only.is_some(), "received no stats from agent"); + + (stats_agent_only.unwrap(), stats_agent_vector.unwrap()) +} + +/// Compares the stats payloads (specifically the bucket for the time window we sent events on) +/// between the Vector and Agent for consistency. +fn validate_stats(agent_stats: &StatsPayload, vector_stats: &StatsPayload) { + let agent_bucket = agent_stats.stats.first().unwrap().stats.first().unwrap(); + + let vector_bucket = vector_stats.stats.first().unwrap().stats.first().unwrap(); + + // NOTE: intentionally not validating the bucket start times because due to the nature of the + // test the bucket start times might not align perfectly, but everything else should. + + assert!( + agent_bucket.duration == vector_bucket.duration, + "bucket durations do not match" + ); + assert!( + agent_bucket.stats.len() == vector_bucket.stats.len(), + "vector and agent reporting different number of buckets" + ); + + let agent_s = agent_bucket.stats.first().unwrap(); + let vector_s = vector_bucket.stats.first().unwrap(); + + info!("\nagent_stats : {:?}", agent_s); + info!("\nvector_stats : {:?}", vector_s); + + assert!(agent_s.service == vector_s.service); + assert!(agent_s.name == vector_s.name); + assert!(agent_s.resource == vector_s.resource); + assert!(agent_s.http_status_code == vector_s.http_status_code); + assert!(agent_s.r#type == vector_s.r#type); + assert!(agent_s.db_type == vector_s.db_type); + + // hit counts should match + assert!(agent_s.hits == vector_s.hits); + assert!(agent_s.hits == 2); + + assert!(agent_s.errors == vector_s.errors); + assert!(agent_s.duration == vector_s.duration); + assert!(agent_s.synthetics == vector_s.synthetics); + assert!(agent_s.top_level_hits == vector_s.top_level_hits); +} + +/// Starts the vector instance with a datadog agent source and a datadog traces sink. +/// The input to the source is one of the Agent containers. +/// The output of the sink is our HTTP server. +/// Each member (topology, shutdown) of the Return value of this function must be kept +/// in scope by the caller until the test is done. +/// +/// The sink is run with a max batch size of one. +/// The two trace events are intentionally configured within the same time bucket window. +/// This creates a scenario where the stats payload that is output by the sink after processing the +/// *second* batch of events (the second event) *should* contain the aggregated statistics of both +/// of the trace events. i.e , the hit count for that bucket should be equal to "2" , not "1". +async fn start_vector() -> (RunningTopology, tokio::sync::mpsc::UnboundedReceiver<()>) { + let dd_agent_address = format!("0.0.0.0:{}", vector_receive_port()); + + let source_config = toml::from_str::(&format!( + indoc! { r#" + address = "{}" + multiple_outputs = true + "#}, + dd_agent_address, + )) + .unwrap(); + + let mut builder = ConfigBuilder::default(); + builder.add_source("in", source_config); + + let dd_traces_endpoint = format!("http://127.0.0.1:{}", server_port_for_vector()); + let cfg = format!( + indoc! { r#" + default_api_key = "atoken" + endpoint = "{}" + batch.max_events = 1 + "#}, + dd_traces_endpoint + ); + + let api_key = std::env::var("TEST_DATADOG_API_KEY") + .expect("couldn't find the Datatog api key in environment variables"); + assert!(!api_key.is_empty(), "TEST_DATADOG_API_KEY required"); + let cfg = cfg.replace("atoken", &api_key); + + let sink_config = toml::from_str::(&cfg).unwrap(); + + builder.add_sink("out", &["in.traces"], sink_config); + + let config = builder.build().expect("building config should not fail"); + + let (topology, shutdown) = start_topology(config, false).await; + info!("Started vector."); + + (topology, shutdown) +} + +/// An end-to-end test which validates the APM stats payloads output by of the `datadog_traces` sink are +/// correct by comparing them with the same APM stats payloads output by the Agent. +/// Two Agent containers are initialized, and fed the same trace data. +/// One Agent container feeds into an HTTP server where we parse the stats. The other Agent +/// container feeds traces into the Datadog Agent source of Vector, which outputs to the traces +/// sink and finally the same HTTP server to process stats payloads. +/// The fields of the stats payloads are then compared. +/// +/// This test specifically verifies the Aggregation of stats across multiple batches of events +/// through vector. Consumers of the Agent's stats payloads expect the Aggregation of stats to be +/// "bucketed" within `datadog::traces::apm_stats::BUCKET_DURATION_NANOSECONDS` (10 second) windows. +/// Each bucket must be sent only once. +/// These are requirements from the APM stats backend of Datadog. +#[tokio::test] +async fn apm_stats_e2e_test_dd_agent_to_vector_correctness() { + trace_init(); + + // channels for the servers to send us back data on + let (tx_agent_vector, mut rx_agent_vector) = mpsc::channel(32); + let (tx_agent_only, mut rx_agent_only) = mpsc::channel(32); + + // spawn the servers + { + // [vector -> the server] + tokio::spawn(async move { + run_server( + "vector".to_string(), + server_port_for_vector(), + tx_agent_vector, + ) + .await; + }); + + // [agent -> the server] + tokio::spawn(async move { + run_server("agent".to_string(), server_port_for_agent(), tx_agent_only).await; + }); + } + + // allow the Agent containers to start up + sleep(Duration::from_secs(8)).await; + + // starts the vector source and sink + // panics if vector errors during startup + let (_topology, _shutdown) = start_vector().await; + + // the URLs of the Agent trace endpoints that traces will be sent to + let urls = vec![trace_agent_only_url(), trace_agent_to_vector_url()]; + + let start = Utc::now().timestamp_nanos(); + let duration = 1; + let span_id = 3; + + // sends the traces through the agent containers + // panics if the HTTP post fails + send_agent_traces(&urls, start, duration, span_id).await; + + // receive the stats on the channel receivers from the servers + let (stats_agent, stats_vector) = + receive_the_stats(&mut rx_agent_only, &mut rx_agent_vector).await; + + // compare the stats from agent and vector for consistency + validate_stats(&stats_agent, &stats_vector); +} diff --git a/src/sinks/datadog/traces/apm_stats/mod.rs b/src/sinks/datadog/traces/apm_stats/mod.rs new file mode 100644 index 0000000000000..58ec6c26bbcae --- /dev/null +++ b/src/sinks/datadog/traces/apm_stats/mod.rs @@ -0,0 +1,123 @@ +//! APM stats +//! +//! This module contains the logic for computing APM statistics based on the incoming trace +//! events this sink receives. It is modelled closely to the trace-agent component of the +//! Datadog Agent, and sends out StatsPayload packets formatted and Aggregated by the same +//! algorithm, at ten second intervals, independently of the sink's trace payloads. + +use std::sync::{Arc, Mutex}; + +use serde::{Deserialize, Serialize}; +use vector_core::event::TraceEvent; + +pub use self::aggregation::Aggregator; +pub use self::flusher::flush_apm_stats_thread; + +pub(crate) use super::config::{DatadogTracesEndpoint, DatadogTracesEndpointConfiguration}; +pub(crate) use super::request_builder::{build_request, DDTracesMetadata, RequestBuilderError}; +pub(crate) use super::sink::PartitionKey; + +mod aggregation; +mod bucket; +mod flusher; +mod weight; + +#[cfg(all(test, feature = "datadog-traces-integration-tests"))] +mod integration_tests; + +/// The duration of time in nanoseconds that a bucket covers. +pub(crate) const BUCKET_DURATION_NANOSECONDS: u64 = 10_000_000_000; + +#[allow(warnings, clippy::pedantic, clippy::nursery)] +pub(crate) mod ddsketch_full { + include!(concat!(env!("OUT_DIR"), "/ddsketch_full.rs")); +} + +// On the agent side APM Stats payload are encoded into the messagepack format using this +// go code https://github.com/DataDog/datadog-agent/blob/b5bed4d/pkg/trace/pb/stats_gen.go. +// Note that this code is generated from code itself generate from this .proto file +// https://github.com/DataDog/datadog-agent/blob/dc2f202/pkg/trace/pb/stats.proto. +// All the subsequent struct are dedicated to be used with rmp_serde and the fields names +// exactly match the ones of the go code. +#[derive(Debug, PartialEq, Deserialize, Serialize)] +#[serde(rename_all = "PascalCase")] +pub(crate) struct StatsPayload { + pub(crate) agent_hostname: String, + pub(crate) agent_env: String, + pub(crate) stats: Vec, + pub(crate) agent_version: String, + pub(crate) client_computed: bool, +} + +#[derive(Debug, PartialEq, Deserialize, Serialize)] +#[serde(rename_all = "PascalCase")] +pub(crate) struct ClientStatsPayload { + pub(crate) hostname: String, + pub(crate) env: String, + pub(crate) version: String, + pub(crate) stats: Vec, + pub(crate) lang: String, + pub(crate) tracer_version: String, + #[serde(rename = "RuntimeID")] + pub(crate) runtime_id: String, + pub(crate) sequence: u64, + pub(crate) agent_aggregation: String, + pub(crate) service: String, + #[serde(rename = "ContainerID")] + pub(crate) container_id: String, + pub(crate) tags: Vec, +} + +#[derive(Debug, PartialEq, Deserialize, Serialize)] +#[serde(rename_all = "PascalCase")] +pub(crate) struct ClientStatsBucket { + pub(crate) start: u64, + pub(crate) duration: u64, + pub(crate) stats: Vec, + pub(crate) agent_time_shift: i64, +} + +#[derive(Debug, PartialEq, Deserialize, Serialize)] +#[serde(rename_all = "PascalCase")] +pub(crate) struct ClientGroupedStats { + pub(crate) service: String, + pub(crate) name: String, + pub(crate) resource: String, + #[serde(rename = "HTTPStatusCode")] + pub(crate) http_status_code: u32, + pub(crate) r#type: String, + #[serde(rename = "DBType")] + pub(crate) db_type: String, + pub(crate) hits: u64, + pub(crate) errors: u64, + pub(crate) duration: u64, + #[serde(with = "serde_bytes")] + pub(crate) ok_summary: Vec, + #[serde(with = "serde_bytes")] + pub(crate) error_summary: Vec, + pub(crate) synthetics: bool, + pub(crate) top_level_hits: u64, +} + +/// Computes APM stats from the incoming trace events. +/// +/// # arguments +/// +/// * `key` - PartitionKey associated with this set of trace events. +/// * `aggregator` - Aggregator to use in computing and caching APM stats buckets. +/// * `trace_events` - Newly received trace events to process. +pub(crate) fn compute_apm_stats( + key: &PartitionKey, + aggregator: Arc>, + trace_events: &[TraceEvent], +) { + let mut aggregator = aggregator.lock().unwrap(); + + // store properties that are available only at runtime + aggregator.update_agent_properties(key); + + // process the incoming traces + trace_events + .iter() + .for_each(|t| aggregator.handle_trace(key, t)); +} diff --git a/src/sinks/datadog/traces/apm_stats/weight.rs b/src/sinks/datadog/traces/apm_stats/weight.rs new file mode 100644 index 0000000000000..dfce6fbdb9f57 --- /dev/null +++ b/src/sinks/datadog/traces/apm_stats/weight.rs @@ -0,0 +1,95 @@ +use std::collections::BTreeMap; + +use crate::event::Value; + +const SAMPLING_RATE_KEY: &str = "_sample_rate"; + +/// This extracts the relative weights from the top level span (i.e. the span that does not have a parent). +pub(crate) fn extract_weight_from_root_span(spans: &[&BTreeMap]) -> f64 { + // Based on https://github.com/DataDog/datadog-agent/blob/cfa750c7412faa98e87a015f8ee670e5828bbe7f/pkg/trace/stats/weight.go#L17-L26. + + // TODO this logic likely has a bug(s) that need to be root caused. The root span is not reliably found and defaults to "1.0" + // regularly for users even when sampling is disabled in the Agent. + // GH issue to track that: https://github.com/vectordotdev/vector/issues/14859 + + if spans.is_empty() { + return 1.0; + } + + let mut trace_id: Option = None; + + let mut parent_id_to_child_weight = BTreeMap::::new(); + let mut span_ids = Vec::::new(); + for s in spans.iter() { + // TODO these need to change to u64 when the following issue is fixed: + // https://github.com/vectordotdev/vector/issues/14687 + let parent_id = match s.get("parent_id") { + Some(Value::Integer(val)) => *val, + None => 0, + _ => panic!("`parent_id` should be an i64"), + }; + let span_id = match s.get("span_id") { + Some(Value::Integer(val)) => *val, + None => 0, + _ => panic!("`span_id` should be an i64"), + }; + if trace_id.is_none() { + trace_id = match s.get("trace_id") { + Some(Value::Integer(v)) => Some(*v as usize), + _ => panic!("`trace_id` should be an i64"), + } + } + let weight = s + .get("metrics") + .and_then(|m| m.as_object()) + .map(|m| match m.get(SAMPLING_RATE_KEY) { + Some(Value::Float(v)) => { + let sample_rate = v.into_inner(); + if sample_rate <= 0.0 || sample_rate > 1.0 { + 1.0 + } else { + 1.0 / sample_rate + } + } + _ => 1.0, + }) + .unwrap_or(1.0); + + // found root + if parent_id == 0 { + return weight; + } + + span_ids.push(span_id); + + parent_id_to_child_weight.insert(parent_id, weight); + } + + // Remove all spans that have a parent + span_ids.iter().for_each(|id| { + parent_id_to_child_weight.remove(id); + }); + + // There should be only one value remaining, the weight from the root span + if parent_id_to_child_weight.len() != 1 { + // TODO remove the debug print and emit the Error event as outlined in + // https://github.com/vectordotdev/vector/issues/14859 + debug!( + "Didn't reliably find the root span for weight calculation of trace_id {:?}.", + trace_id + ); + } + + *parent_id_to_child_weight + .values() + .next() + .unwrap_or_else(|| { + // TODO remove the debug print and emit the Error event as outlined in + // https://github.com/vectordotdev/vector/issues/14859 + debug!( + "Root span was not found. Defaulting to weight of 1.0 for trace_id {:?}.", + trace_id + ); + &1.0 + }) +} diff --git a/src/sinks/datadog/traces/config.rs b/src/sinks/datadog/traces/config.rs index cad88eebe190c..7393d0b849cf7 100644 --- a/src/sinks/datadog/traces/config.rs +++ b/src/sinks/datadog/traces/config.rs @@ -1,15 +1,19 @@ -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use futures::FutureExt; use http::Uri; use indoc::indoc; use snafu::ResultExt; +use tokio::sync::oneshot::{channel, Sender}; use tower::ServiceBuilder; use vector_common::sensitive_string::SensitiveString; use vector_config::configurable_component; use vector_core::config::{proxy::ProxyConfig, AcknowledgementsConfig}; -use super::service::TraceApiRetry; +use super::{ + apm_stats::{flush_apm_stats_thread, Aggregator}, + service::TraceApiRetry, +}; use crate::{ common::datadog::get_base_domain, config::{GenerateConfig, Input, SinkConfig, SinkContext}, @@ -116,6 +120,7 @@ pub enum DatadogTracesEndpoint { } /// Store traces & APM stats endpoints actual URIs. +#[derive(Clone)] pub struct DatadogTracesEndpointConfiguration { traces_endpoint: Uri, stats_endpoint: Uri, @@ -156,22 +161,49 @@ impl DatadogTracesConfig { let default_api_key: Arc = Arc::from(self.default_api_key.inner()); let request_limits = self.request.unwrap_with(&DEFAULT_REQUEST_LIMITS); let endpoints = self.generate_traces_endpoint_configuration()?; + let batcher_settings = self .batch .validate()? .limit_max_bytes(BATCH_GOAL_BYTES)? .limit_max_events(BATCH_MAX_EVENTS)? .into_batcher_settings()?; + let service = ServiceBuilder::new() .settings(request_limits, TraceApiRetry) - .service(TraceApiService::new(client)); + .service(TraceApiService::new(client.clone())); + + // Object responsible for caching/processing APM stats from incoming trace events. + let apm_stats_aggregator = + Arc::new(Mutex::new(Aggregator::new(Arc::clone(&default_api_key)))); + + let compression = self.compression.unwrap_or_else(Compression::gzip_default); + let request_builder = DatadogTracesRequestBuilder::new( Arc::clone(&default_api_key), - endpoints, - self.compression.unwrap_or_else(Compression::gzip_default), + endpoints.clone(), + compression, PAYLOAD_LIMIT, + Arc::clone(&apm_stats_aggregator), )?; - let sink = TracesSink::new(service, request_builder, batcher_settings); + + // shutdown= Sender that the sink signals when input stream is exhauseted. + // tripwire= Receiver that APM stats flush thread listens for exit signal on. + let (shutdown, tripwire) = channel::>(); + + let sink = TracesSink::new(service, request_builder, batcher_settings, shutdown); + + // Send the APM stats payloads independently of the sink framework. + // This is necessary to comply with what the APM stats backend of Datadog expects with + // respect to receiving stats payloads. + tokio::spawn(flush_apm_stats_thread( + tripwire, + client, + compression, + endpoints, + Arc::clone(&apm_stats_aggregator), + )); + Ok(VectorSink::from_event_streamsink(sink)) } @@ -226,7 +258,7 @@ fn build_uri(host: &str, endpoint: &str) -> crate::Result { #[cfg(test)] mod test { - use crate::sinks::datadog::traces::DatadogTracesConfig; + use super::DatadogTracesConfig; #[test] fn generate_config() { diff --git a/src/sinks/datadog/traces/integration_tests.rs b/src/sinks/datadog/traces/integration_tests.rs index 1d817e0c7cb3b..f944009a9aad8 100644 --- a/src/sinks/datadog/traces/integration_tests.rs +++ b/src/sinks/datadog/traces/integration_tests.rs @@ -1,446 +1,20 @@ -use axum::{ - body::Body, - extract::Extension, - http::{header::CONTENT_TYPE, Request}, - routing::{get, post}, - Router, -}; -use chrono::Utc; -use flate2::read::GzDecoder; use futures::stream; use indoc::indoc; -use rmp_serde; -use serde::Serialize; -use std::{collections::HashMap, io::Read, net::SocketAddr, sync::Arc}; -use tokio::sync::mpsc::{self, Receiver, Sender}; -use tokio::time::{sleep, Duration}; use crate::{ - config::{ConfigBuilder, SinkConfig}, + config::SinkConfig, event::Event, sinks::{ - datadog::traces::{stats::StatsPayload, tests::simple_trace_event, DatadogTracesConfig}, + datadog::traces::{tests::simple_trace_event, DatadogTracesConfig}, util::test::load_sink, }, - sources::datadog_agent::DatadogAgentConfig, test_util::{ components::{assert_sink_compliance, SINK_TAGS}, - map_event_batch_stream, start_topology, trace_init, + map_event_batch_stream, }, - topology::RunningTopology, }; use vector_core::event::{BatchNotifier, BatchStatus}; -/// The port on which the Agent will send traces to vector, and vector `datadog_agent` source will -/// listen on -fn vector_receive_port() -> u16 { - std::env::var("VECTOR_PORT") - .unwrap_or_else(|_| "8081".to_string()) - .parse::() - .unwrap() -} - -/// The port for the http server to receive data from the agent -fn server_port_for_agent() -> u16 { - std::env::var("AGENT_PORT") - .unwrap_or_else(|_| "8082".to_string()) - .parse::() - .unwrap() -} - -/// The port for the http server to receive data from vector -const fn server_port_for_vector() -> u16 { - 1234 -} - -/// The agent url to post traces to [Agent only] -fn trace_agent_only_url() -> String { - std::env::var("TRACE_AGENT_URL") - .unwrap_or_else(|_| "http://127.0.0.1:8126/v0.3/traces".to_owned()) -} - -/// The agent url to post traces to [Agent -> Vector]. -fn trace_agent_to_vector_url() -> String { - std::env::var("TRACE_AGENT_TO_VECTOR_URL") - .unwrap_or_else(|_| "http://127.0.0.1:8126/v0.3/traces".to_owned()) -} - -/// Shared state for the HTTP server -struct AppState { - name: String, - tx: Sender, -} - -/// Runs an HTTP server on the specified port. -async fn run_server(name: String, port: u16, tx: Sender) { - let state = Arc::new(AppState { - name: name.clone(), - tx, - }); - let app = Router::new() - .route("/api/v1/validate", get(validate)) - .route("/api/v0.2/traces", post(process_traces)) - .route("/api/v0.2/stats", post(process_stats)) - .layer(Extension(state)); - - let addr = SocketAddr::from(([0, 0, 0, 0], port)); - - info!("HTTP server for `{}` listening on {}", name, addr); - - axum::Server::bind(&addr) - .serve(app.into_make_service()) - .await - .unwrap(); -} - -// Needed for the sink healthcheck -async fn validate() -> &'static str { - "" -} - -/// At a later time we could parse the trace payloads from vector and the agent and compare those -/// for consistency as well. -async fn process_traces(Extension(_state): Extension>, request: Request) { - let content_type_header = request.headers().get(CONTENT_TYPE); - let content_type = content_type_header.and_then(|value| value.to_str().ok()); - - if let Some(content_type) = content_type { - if content_type.starts_with("application/x-protobuf") { - debug!("Got trace payload."); - } - } -} - -/// Process a POST request from the stats endpoint. -/// De-compresses and De-serializes the payload, then forwards it on the Sender channel. -async fn process_stats(Extension(state): Extension>, mut request: Request) { - debug!( - "`{}` server process_stats request: {:?}", - state.name, request - ); - - let content_type_header = request.headers().get(CONTENT_TYPE); - let content_type = content_type_header.and_then(|value| value.to_str().ok()); - - if let Some(content_type) = content_type { - if content_type.starts_with("application/msgpack") { - debug!("`{}` server got stats payload.", state.name); - - let body = request.body_mut(); - let compressed_body_bytes = hyper::body::to_bytes(body) - .await - .expect("could not decode body into bytes"); - - let mut gz = GzDecoder::new(compressed_body_bytes.as_ref()); - let mut decompressed_body_bytes = vec![]; - gz.read_to_end(&mut decompressed_body_bytes) - .expect("unable to decompress gzip stats payload"); - - let payload: StatsPayload = rmp_serde::from_slice(&decompressed_body_bytes).unwrap(); - - info!( - "`{}` server received and deserialized stats payload.", - state.name - ); - debug!("{:?}", payload); - - state.tx.send(payload).await.unwrap(); - } - } -} - -#[derive(Serialize)] -struct Span { - duration: i64, - error: i32, - meta: HashMap, - metrics: HashMap, - name: String, - parent_id: u64, - resource: String, - service: String, - span_id: i64, - start: i64, - trace_id: u64, - r#type: String, -} - -fn build_traces_payload(start: i64, duration: i64, span_id: i64) -> Vec> { - let parent_id = 1; - let trace_id = 2; - let resource = "a_resource"; - let service = "a_service"; - - let span = Span { - duration, - error: 0, - meta: HashMap::from([("this_is".to_string(), "so_meta".to_string())]), - metrics: HashMap::from([("_top_level".to_string(), 1.0)]), - name: "a_name".to_string(), - parent_id, - resource: resource.to_string(), - service: service.to_string(), - span_id, - start, - trace_id, - r#type: "a_type".to_string(), - }; - - vec![vec![span]] -} - -/// Sends traces into the Agent containers. -/// Send two separate requests with thin the same bucket time window to invoke the aggregation logic in the Agent. -async fn send_agent_traces(urls: &Vec, start: i64, duration: i64, span_id: i64) { - // sends a trace to each of the urls - async fn send_trace(urls: &Vec, start: i64, duration: i64, span_id: i64) -> bool { - let traces_payload = build_traces_payload(start, duration, span_id); - let client = reqwest::Client::new(); - - for url in urls { - let res = client - .post(url) - .header(CONTENT_TYPE, "application/json") - .json(&traces_payload) - .send() - .await - .unwrap(); - - if res.status() != hyper::StatusCode::OK { - error!("Error sending traces to {}, res: {:?}.", url, res); - return false; - } - info!("Sent a trace to the Agent at {}.", url); - } - true - } - - // send first set of trace data - if !send_trace(urls, start, duration, span_id).await { - panic!("can't perform checks if traces aren't accepted by agent."); - } - - sleep(Duration::from_millis(100)).await; - - // send second set of trace data - if !send_trace(urls, start, duration, span_id + 1).await { - panic!("can't perform checks if traces aren't accepted by agent."); - } -} - -/// Receives the stats payloads from the Receiver channels from both of the server instances. -/// If either of the servers does not respond with a stats payload, the test will fail. -/// The lastest received stats payload is the only one considered. This is the same logic that the -/// Datadog UI follows. -/// Wait for up to 30 seconds for the stats payload to arrive. The Agent can take some time to send -/// the stats out. -/// TODO: Looking into if there is a way to configure the agent bucket interval to force the -/// flushing to occur faster (reducing the timeout we use and overall runtime of the test). -async fn receive_the_stats( - rx_agent_only: &mut Receiver, - rx_agent_vector: &mut Receiver, -) -> (StatsPayload, StatsPayload) { - let timeout = sleep(Duration::from_secs(30)); - tokio::pin!(timeout); - - let mut stats_agent_vector = None; - let mut stats_agent_only = None; - - let mut n_payloads_from_vector = 0; - - // wait on the receive of stats payloads. expect one from agent, two from vector. - // The second payload from vector should be the aggregate. - loop { - tokio::select! { - d1 = rx_agent_vector.recv() => { - stats_agent_vector = d1; - if stats_agent_vector.is_some() { - n_payloads_from_vector += 1 - } - if stats_agent_only.is_some() && n_payloads_from_vector == 2 { - break; - } - }, - d2 = rx_agent_only.recv() => { - stats_agent_only = d2; - if stats_agent_only.is_some() && n_payloads_from_vector == 2 { - break; - } - }, - _ = &mut timeout => break, - } - } - - assert!( - stats_agent_vector.is_some(), - "received no stats from vector" - ); - assert!(stats_agent_only.is_some(), "received no stats from agent"); - assert!( - n_payloads_from_vector == 2, - "only received one payload from vector" - ); - - (stats_agent_only.unwrap(), stats_agent_vector.unwrap()) -} - -/// Compares the stats payload (specifically the bucket for the time window we sent events on) -/// between the Vector and Agent for consistency. -fn validate_stats(agent_stats: &StatsPayload, vector_stats: &StatsPayload) { - let agent_bucket = agent_stats.stats.first().unwrap().stats.first().unwrap(); - - let vector_bucket = vector_stats.stats.first().unwrap().stats.first().unwrap(); - - // NOTE: intentionally not validating the bucket start times because due to the nature of the - // test the bucket start times might not align perfectly, but everything else should. - - assert!( - agent_bucket.duration == vector_bucket.duration, - "bucket durations do not match" - ); - assert!( - agent_bucket.stats.len() == vector_bucket.stats.len(), - "vector and agent reporting different number of buckets" - ); - - let agent_s = agent_bucket.stats.first().unwrap(); - let vector_s = vector_bucket.stats.first().unwrap(); - - info!("\nagent_stats : {:?}", agent_s); - info!("\nvector_stats : {:?}", vector_s); - - assert!(agent_s.service == vector_s.service); - assert!(agent_s.name == vector_s.name); - assert!(agent_s.resource == vector_s.resource); - assert!(agent_s.http_status_code == vector_s.http_status_code); - assert!(agent_s.r#type == vector_s.r#type); - assert!(agent_s.db_type == vector_s.db_type); - assert!(agent_s.hits == vector_s.hits); - - assert!(agent_s.hits == 2); - - assert!(agent_s.errors == vector_s.errors); - assert!(agent_s.duration == vector_s.duration); - assert!(agent_s.synthetics == vector_s.synthetics); - assert!(agent_s.top_level_hits == vector_s.top_level_hits); -} - -/// Starts the vector instance with a datadog agent source and a datadog traces sink. -/// The input to the source is one of the Agent containers. -/// The output of the sink is our HTTP server. -/// Each member (topology, shutdown) of the Return value of this function must be kept -/// in scope by the caller until the test is done. -/// -/// The sink is run with a max batch size of one. -/// The two trace events are intentionally configured within the same time bucket window. -/// This creates a scenario where the stats payload that is output by the sink after processing the -/// *second* batch of events (the second event) *should* contain the aggregated statistics of both -/// of the trace events. i.e , the hit count for that bucket should be equal to "2" , not "1". -async fn start_vector() -> (RunningTopology, tokio::sync::mpsc::UnboundedReceiver<()>) { - let dd_agent_address = format!("0.0.0.0:{}", vector_receive_port()); - - let source_config = toml::from_str::(&format!( - indoc! { r#" - address = "{}" - multiple_outputs = true - "#}, - dd_agent_address, - )) - .unwrap(); - - let mut builder = ConfigBuilder::default(); - builder.add_source("in", source_config); - - let dd_traces_endpoint = format!("http://127.0.0.1:{}", server_port_for_vector()); - let cfg = format!( - indoc! { r#" - default_api_key = "atoken" - endpoint = "{}" - batch.max_events = 1 - "#}, - dd_traces_endpoint - ); - - let api_key = std::env::var("TEST_DATADOG_API_KEY") - .expect("couldn't find the Datatog api key in environment variables"); - assert!(!api_key.is_empty(), "TEST_DATADOG_API_KEY required"); - let cfg = cfg.replace("atoken", &api_key); - - let sink_config = toml::from_str::(&cfg).unwrap(); - - builder.add_sink("out", &["in.traces"], sink_config); - - let config = builder.build().expect("building config should not fail"); - - let (topology, shutdown) = start_topology(config, false).await; - info!("Started vector."); - - (topology, shutdown) -} - -/// An end-to-end test which validates the APM stats payloads output by of the `datadog_traces` sink are -/// correct by comparing them with the same APM stats payloads output by the Agent. -/// Two Agent containers are initialized, and fed the same trace data. -/// One Agent container feeds into an HTTP server where we parse the stats. The other Agent -/// container feeds traces into the Datadog Agent source of Vector, which outputs to the traces -/// sink and finally the same HTTP server to process stats payloads. -/// The fields of the stats payloads are then compared. -/// -/// This test specifically verifies the Aggregation of stats across multiple batches of events -/// through vector. Consumers of the Agent's stats payloads expect the Aggregation of stats for a -/// 20 second time window on any given stats payload within that time window. -#[tokio::test] -async fn apm_stats_e2e_test_dd_agent_to_vector_correctness() { - trace_init(); - - // channels for the servers to send us back data on - let (tx_agent_vector, mut rx_agent_vector) = mpsc::channel(32); - let (tx_agent_only, mut rx_agent_only) = mpsc::channel(32); - - // spawn the servers - { - // [vector -> the server] - tokio::spawn(async move { - run_server( - "vector".to_string(), - server_port_for_vector(), - tx_agent_vector, - ) - .await; - }); - - // [agent -> the server] - tokio::spawn(async move { - run_server("agent".to_string(), server_port_for_agent(), tx_agent_only).await; - }); - } - - // allow the Agent containers to start up - sleep(Duration::from_secs(8)).await; - - // starts the vector source and sink - // panics if vector errors during startup - let (_topology, _shutdown) = start_vector().await; - - // the URLs of the Agent trace endpoints that traces will be sent to - let urls = vec![trace_agent_only_url(), trace_agent_to_vector_url()]; - - let start = Utc::now().timestamp_nanos(); - let duration = 1; - let span_id = 3; - - // sends the traces through the agent containers - // panics if the HTTP post fails - send_agent_traces(&urls, start, duration, span_id).await; - - // receive the stats on the channel receivers from the servers - let (stats_agent, stats_vector) = - receive_the_stats(&mut rx_agent_only, &mut rx_agent_vector).await; - - // compare the stats from agent and vector for consistency - validate_stats(&stats_agent, &stats_vector); -} - #[tokio::test] async fn to_real_traces_endpoint() { assert_sink_compliance(&SINK_TAGS, async { diff --git a/src/sinks/datadog/traces/mod.rs b/src/sinks/datadog/traces/mod.rs index c0ee4fd6933bf..83e37fdfb8aab 100644 --- a/src/sinks/datadog/traces/mod.rs +++ b/src/sinks/datadog/traces/mod.rs @@ -11,11 +11,11 @@ mod integration_tests; #[cfg(test)] mod tests; +pub(crate) mod apm_stats; mod config; mod request_builder; mod service; mod sink; -mod stats; #[allow(warnings, clippy::pedantic, clippy::nursery)] pub(crate) mod ddsketch_full { diff --git a/src/sinks/datadog/traces/request_builder.rs b/src/sinks/datadog/traces/request_builder.rs index 3f07c3f5d19a9..4665ab1bee8a2 100644 --- a/src/sinks/datadog/traces/request_builder.rs +++ b/src/sinks/datadog/traces/request_builder.rs @@ -7,17 +7,16 @@ use std::{ use bytes::Bytes; use prost::Message; -use rmp_serde; use snafu::Snafu; use vector_common::request_metadata::RequestMetadata; use vector_core::event::{EventFinalizers, Finalizable}; use super::{ + apm_stats::{compute_apm_stats, Aggregator}, config::{DatadogTracesEndpoint, DatadogTracesEndpointConfiguration}, dd_proto, service::TraceApiRequest, sink::PartitionKey, - stats, }; use crate::{ event::{Event, TraceEvent, Value}, @@ -28,8 +27,12 @@ use crate::{ #[derive(Debug, Snafu)] pub enum RequestBuilderError { - #[snafu(display("Encoding of a request payload failed ({}, {})", message, reason))] - FailedToEncode { + #[snafu(display( + "Building an APM stats request payload failed ({}, {})", + message, + reason + ))] + FailedToBuild { message: &'static str, reason: String, dropped_events: u64, @@ -43,7 +46,7 @@ impl RequestBuilderError { #[allow(clippy::missing_const_for_fn)] // const cannot run destructor pub fn into_parts(self) -> (&'static str, String, u64) { match self { - Self::FailedToEncode { + Self::FailedToBuild { message, reason, dropped_events, @@ -62,7 +65,7 @@ pub struct DatadogTracesRequestBuilder { compression: Compression, trace_encoder: DatadogTracesEncoder, /// Contains the Aggregated stats across a time window. - stats_aggregator: Arc>, + stats_aggregator: Arc>, } impl DatadogTracesRequestBuilder { @@ -71,23 +74,24 @@ impl DatadogTracesRequestBuilder { endpoint_configuration: DatadogTracesEndpointConfiguration, compression: Compression, max_size: usize, + stats_aggregator: Arc>, ) -> Result { Ok(Self { api_key, endpoint_configuration, compression, trace_encoder: DatadogTracesEncoder { max_size }, - stats_aggregator: Arc::new(Mutex::new(stats::Aggregator::new())), + stats_aggregator, }) } } pub struct DDTracesMetadata { - api_key: Arc, - endpoint: DatadogTracesEndpoint, - finalizers: EventFinalizers, - uncompressed_size: usize, - content_type: String, + pub api_key: Arc, + pub endpoint: DatadogTracesEndpoint, + pub finalizers: EventFinalizers, + pub uncompressed_size: usize, + pub content_type: String, } impl IncrementalRequestBuilder<(PartitionKey, Vec)> for DatadogTracesRequestBuilder { @@ -103,21 +107,17 @@ impl IncrementalRequestBuilder<(PartitionKey, Vec)> for DatadogTracesRequ let (key, events) = input; let mut results = Vec::new(); let n = events.len(); - let traces_event = events + let trace_events = events .into_iter() .filter_map(|e| e.try_into_trace()) .collect::>(); - results.push(build_apm_stats_request( - &key, - &traces_event, - self.compression, - &self.api_key, - &self.stats_aggregator, - )); + // Compute APM stats from the incoming events. The stats payloads are sent out + // separately from the sink framework, by the thread `flush_apm_stats_thread()` + compute_apm_stats(&key, Arc::clone(&self.stats_aggregator), &trace_events); self.trace_encoder - .encode_trace(&key, traces_event) + .encode_trace(&key, trace_events) .into_iter() .for_each(|r| match r { Ok((payload, mut processed)) => { @@ -146,14 +146,14 @@ impl IncrementalRequestBuilder<(PartitionKey, Vec)> for DatadogTracesRequ results.push(Ok(((metadata, request_metadata), bytes))) } - Err(e) => results.push(Err(RequestBuilderError::FailedToEncode { + Err(e) => results.push(Err(RequestBuilderError::FailedToBuild { message: "Payload compression failed.", reason: e.to_string(), dropped_events: n as u64, })), } } - Err(err) => results.push(Err(RequestBuilderError::FailedToEncode { + Err(err) => results.push(Err(RequestBuilderError::FailedToBuild { message: err.parts().0, reason: err.parts().1.into(), dropped_events: err.parts().2, @@ -163,26 +163,46 @@ impl IncrementalRequestBuilder<(PartitionKey, Vec)> for DatadogTracesRequ } fn build_request(&mut self, metadata: Self::Metadata, payload: Self::Payload) -> Self::Request { - let (ddtraces_metadata, request_metadata) = metadata; - let mut headers = BTreeMap::::new(); - headers.insert("Content-Type".to_string(), ddtraces_metadata.content_type); - headers.insert( - "DD-API-KEY".to_string(), - ddtraces_metadata.api_key.to_string(), - ); - if let Some(ce) = self.compression.content_encoding() { - headers.insert("Content-Encoding".to_string(), ce.to_string()); - } - TraceApiRequest { - body: payload, - headers, - finalizers: ddtraces_metadata.finalizers, - uri: self - .endpoint_configuration - .get_uri_for_endpoint(ddtraces_metadata.endpoint), - uncompressed_size: ddtraces_metadata.uncompressed_size, - metadata: request_metadata, - } + build_request( + metadata, + payload, + self.compression, + &self.endpoint_configuration, + ) + } +} + +/// Builds the `TraceApiRequest` from inputs. +/// +/// # Arguments +/// +/// * `metadata` - Tuple of Datadog traces specific metadata and the generic `RequestMetadata`. +/// * `payload` - Compressed and encoded bytes to send. +/// * `compression` - `Compression` used to reference the Content-Encoding header. +/// * `endpoint_configuration` - Endpoint configuration to use when creating the HTTP requests. +pub fn build_request( + metadata: (DDTracesMetadata, RequestMetadata), + payload: Bytes, + compression: Compression, + endpoint_configuration: &DatadogTracesEndpointConfiguration, +) -> TraceApiRequest { + let (ddtraces_metadata, request_metadata) = metadata; + let mut headers = BTreeMap::::new(); + headers.insert("Content-Type".to_string(), ddtraces_metadata.content_type); + headers.insert( + "DD-API-KEY".to_string(), + ddtraces_metadata.api_key.to_string(), + ); + if let Some(ce) = compression.content_encoding() { + headers.insert("Content-Encoding".to_string(), ce.to_string()); + } + TraceApiRequest { + body: payload, + headers, + finalizers: ddtraces_metadata.finalizers, + uri: endpoint_configuration.get_uri_for_endpoint(ddtraces_metadata.endpoint), + uncompressed_size: ddtraces_metadata.uncompressed_size, + metadata: request_metadata, } } @@ -429,53 +449,3 @@ impl DatadogTracesEncoder { } } } - -fn build_apm_stats_request( - key: &PartitionKey, - events: &[TraceEvent], - compression: Compression, - default_api_key: &Arc, - aggregator: &Arc>, -) -> Result<(RequestMetadata, Bytes), RequestBuilderError> { - let aggregator = Arc::clone(aggregator); - let mut aggregator = aggregator.lock().unwrap(); - let payload = stats::compute_apm_stats(key, &mut aggregator, events); - - let encoded_payload = - rmp_serde::to_vec_named(&payload).map_err(|e| RequestBuilderError::FailedToEncode { - message: "APM stats encoding failed.", - reason: e.to_string(), - dropped_events: 0, - })?; - let uncompressed_size = encoded_payload.len(); - let metadata = DDTracesMetadata { - api_key: key - .api_key - .clone() - .unwrap_or_else(|| Arc::clone(default_api_key)), - endpoint: DatadogTracesEndpoint::APMStats, - finalizers: EventFinalizers::default(), - uncompressed_size, - content_type: "application/msgpack".to_string(), - }; - - let mut compressor = Compressor::from(compression); - match compressor.write_all(&encoded_payload) { - Ok(()) => { - let bytes = compressor.into_inner().freeze(); - - // build RequestMetadata - let builder = RequestMetadataBuilder::new(0, uncompressed_size); - let bytes_len = - NonZeroUsize::new(bytes.len()).expect("payload should never be zero length"); - let request_metadata = builder.with_request_size(bytes_len); - - Ok(((metadata, request_metadata), bytes)) - } - Err(e) => Err(RequestBuilderError::FailedToEncode { - message: "APM stats payload compression failed.", - reason: e.to_string(), - dropped_events: 0, - }), - } -} diff --git a/src/sinks/datadog/traces/sink.rs b/src/sinks/datadog/traces/sink.rs index 28592ecfb6566..6991ea505ea49 100644 --- a/src/sinks/datadog/traces/sink.rs +++ b/src/sinks/datadog/traces/sink.rs @@ -5,6 +5,7 @@ use futures_util::{ stream::{self, BoxStream}, StreamExt, }; +use tokio::sync::oneshot::{channel, Sender}; use tower::Service; use vector_core::{ config::log_schema, @@ -63,6 +64,7 @@ pub struct TracesSink { service: S, request_builder: DatadogTracesRequestBuilder, batch_settings: BatcherSettings, + shutdown: Sender>, } impl TracesSink @@ -76,11 +78,13 @@ where service: S, request_builder: DatadogTracesRequestBuilder, batch_settings: BatcherSettings, + shutdown: Sender>, ) -> Self { TracesSink { service, request_builder, batch_settings, + shutdown, } } @@ -105,7 +109,20 @@ where }) .into_driver(self.service); - sink.run().await + sink.run().await?; + + // Create a channel for the stats flushing thread to communicate back that it has flushed + // remaining stats. This is necessary so that we do not terminate the process while the + // stats flushing thread is trying to complete the HTTP request. + let (sender, receiver) = channel(); + + // Signal the stats thread task to flush remaining payloads and shutdown. + let _ = self.shutdown.send(sender); + + // The stats flushing thread has until the component shutdown grace period to end + // gracefully. Otherwise the sink + stats flushing thread will be killed and an error + // reported upstream. + receiver.await.map_err(|_| ()) } } diff --git a/src/sinks/datadog/traces/stats.rs b/src/sinks/datadog/traces/stats.rs deleted file mode 100644 index a75e9cdd55d9d..0000000000000 --- a/src/sinks/datadog/traces/stats.rs +++ /dev/null @@ -1,699 +0,0 @@ -use std::collections::BTreeMap; - -use chrono::Utc; -use prost::Message; -use serde::{Deserialize, Serialize}; -use serde_bytes; - -use super::{ddsketch_full, sink::PartitionKey}; -use crate::{ - event::{TraceEvent, Value}, - metrics::AgentDDSketch, -}; - -const MEASURED_KEY: &str = "_dd.measured"; -const PARTIAL_VERSION_KEY: &str = "_dd.partial_version"; -const SAMPLING_RATE_KEY: &str = "_sample_rate"; -const TAG_STATUS_CODE: &str = "http.status_code"; -const TAG_SYNTHETICS: &str = "synthetics"; -const TOP_LEVEL_KEY: &str = "_top_level"; - -/// The duration of time in nanoseconds that a bucket covers. -const BUCKET_DURATION_NANOSECONDS: u64 = 10_000_000_000; - -/// The number of bucket durations to keep in memory before flushing them. -const BUCKET_WINDOW_LEN: u64 = 2; - -#[derive(PartialEq, Eq, PartialOrd, Ord)] -struct AggregationKey { - payload_key: PayloadAggregationKey, - bucket_key: BucketAggregationKey, -} - -impl AggregationKey { - fn new_aggregation_from_span( - span: &BTreeMap, - payload_key: PayloadAggregationKey, - synthetics: bool, - ) -> Self { - AggregationKey { - payload_key: payload_key.with_span_context(span), - bucket_key: BucketAggregationKey { - service: span - .get("service") - .map(|v| v.to_string_lossy()) - .unwrap_or_default(), - name: span - .get("name") - .map(|v| v.to_string_lossy()) - .unwrap_or_default(), - resource: span - .get("resource") - .map(|v| v.to_string_lossy()) - .unwrap_or_default(), - ty: span - .get("type") - .map(|v| v.to_string_lossy()) - .unwrap_or_default(), - status_code: span - .get("meta") - .and_then(|m| m.as_object()) - .and_then(|m| m.get(TAG_STATUS_CODE)) - // the meta field is supposed to be a string/string map - .and_then(|s| s.to_string_lossy().parse::().ok()) - .unwrap_or_default(), - synthetics, - }, - } - } -} - -#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)] -struct PayloadAggregationKey { - env: String, - hostname: String, - version: String, - container_id: String, -} - -impl PayloadAggregationKey { - fn with_span_context(self, span: &BTreeMap) -> Self { - PayloadAggregationKey { - env: span - .get("meta") - .and_then(|m| m.as_object()) - .and_then(|m| m.get("env")) - .map(|s| s.to_string_lossy()) - .unwrap_or(self.env), - hostname: self.hostname, - version: self.version, - container_id: self.container_id, - } - } -} - -#[derive(PartialEq, Eq, PartialOrd, Ord)] -struct BucketAggregationKey { - service: String, - name: String, - resource: String, - ty: String, - status_code: u32, - synthetics: bool, -} - -struct GroupedStats { - hits: f64, - top_level_hits: f64, - errors: f64, - duration: f64, - ok_distribution: AgentDDSketch, - err_distribution: AgentDDSketch, -} - -impl GroupedStats { - fn new() -> Self { - GroupedStats { - hits: 0.0, - top_level_hits: 0.0, - errors: 0.0, - duration: 0.0, - ok_distribution: AgentDDSketch::with_agent_defaults(), - err_distribution: AgentDDSketch::with_agent_defaults(), - } - } - - fn export(&self, key: &AggregationKey) -> ClientGroupedStats { - ClientGroupedStats { - service: key.bucket_key.service.clone(), - name: key.bucket_key.name.clone(), - resource: key.bucket_key.resource.clone(), - http_status_code: key.bucket_key.status_code, - r#type: key.bucket_key.ty.clone(), - db_type: "".to_string(), - hits: self.hits.round() as u64, - errors: self.errors.round() as u64, - duration: self.duration.round() as u64, - ok_summary: encode_sketch(&self.ok_distribution), - error_summary: encode_sketch(&self.err_distribution), - synthetics: key.bucket_key.synthetics, - top_level_hits: self.top_level_hits.round() as u64, - } - } -} - -/// Convert agent sketch variant to ./proto/dd_sketch_full.proto -fn encode_sketch(agent_sketch: &AgentDDSketch) -> Vec { - // AgentDDSketch partitions the set of real numbers into intervals like [gamma^(n), gamma^(n+1)[, - let index_mapping = ddsketch_full::IndexMapping { - // This is the gamma value used to build the aforementioned partition scheme - gamma: agent_sketch.gamma(), - // This offset is applied to the powers of gamma to adjust sketch accuracy - index_offset: agent_sketch.bin_index_offset() as f64, - // Interpolation::None is the interpolation type as there is no interpolation when using the - // aforementioned partition scheme - interpolation: ddsketch_full::index_mapping::Interpolation::None as i32, - }; - - // zeroes depicts the number of values that falled around zero based on the sketch local accuracy - // positives and negatives stores are repectively storing positive and negative values using the - // exact same mechanism. - let (positives, negatives, zeroes) = convert_stores(agent_sketch); - let positives_store = ddsketch_full::Store { - bin_counts: positives, - contiguous_bin_counts: Vec::new(), // Empty as this not used for the current interpolation (Interpolation::None) - contiguous_bin_index_offset: 0, // Empty as this not used for the current interpolation (Interpolation::None) - }; - let negatives_store = ddsketch_full::Store { - bin_counts: negatives, - contiguous_bin_counts: Vec::new(), // Empty as this not used for the current interpolation (Interpolation::None) - contiguous_bin_index_offset: 0, // Empty as this not used for the current interpolation (Interpolation::None) - }; - ddsketch_full::DdSketch { - mapping: Some(index_mapping), - positive_values: Some(positives_store), - negative_values: Some(negatives_store), - zero_count: zeroes, - } - .encode_to_vec() -} - -/// Split negative and positive values from an AgentDDSketch, also extract the number of values -/// that were accounted as 0.0. -fn convert_stores(agent_sketch: &AgentDDSketch) -> (BTreeMap, BTreeMap, f64) { - let mut positives = BTreeMap::::new(); - let mut negatives = BTreeMap::::new(); - let mut zeroes = 0.0; - let bin_map = agent_sketch.bin_map(); - bin_map - .keys - .into_iter() - .zip(bin_map.counts.into_iter()) - .for_each(|(k, n)| { - match k.signum() { - 0 => zeroes = n as f64, - 1 => { - positives.insert(k as i32, n as f64); - } - -1 => { - negatives.insert((-k) as i32, n as f64); - } - _ => {} - }; - }); - (positives, negatives, zeroes) -} - -/// Stores statistics for various `AggregationKey` in a given time window -struct Bucket { - start: u64, - duration: u64, - data: BTreeMap, -} - -impl Bucket { - fn export(&self) -> BTreeMap { - let mut m = BTreeMap::::new(); - self.data.iter().for_each(|(k, v)| { - let b = v.export(k); - match m.get_mut(&k.payload_key) { - None => { - let sb = ClientStatsBucket { - start: self.start, - duration: self.duration, - agent_time_shift: 0, - stats: vec![b], - }; - m.insert(k.payload_key.clone(), sb); - } - Some(s) => { - s.stats.push(b); - } - }; - }); - m - } - - fn add( - &mut self, - span: &BTreeMap, - weight: f64, - is_top: bool, - aggkey: AggregationKey, - ) { - match self.data.get_mut(&aggkey) { - Some(gs) => Bucket::update(span, weight, is_top, gs), - None => { - let mut gs = GroupedStats::new(); - Bucket::update(span, weight, is_top, &mut gs); - self.data.insert(aggkey, gs); - } - } - } - - /// Update a bucket with a new span. Computed statistics include the number of hits and the actual distribution of - /// execution time, with isolated measurements for spans flagged as errored and spans without error. - fn update(span: &BTreeMap, weight: f64, is_top: bool, gs: &mut GroupedStats) { - is_top.then(|| { - gs.top_level_hits += weight; - }); - gs.hits += weight; - let error = match span.get("error") { - Some(Value::Integer(val)) => *val, - None => 0, - _ => panic!("`error` should be an i64"), - }; - if error != 0 { - gs.errors += weight; - } - let duration = match span.get("duration") { - Some(Value::Integer(val)) => *val, - None => 0, - _ => panic!("`duration` should be an i64"), - }; - gs.duration += (duration as f64) * weight; - if error != 0 { - gs.err_distribution.insert(duration as f64) - } else { - gs.ok_distribution.insert(duration as f64) - } - } -} - -pub(crate) struct Aggregator { - /// The key represents the timestamp (in nanoseconds) of the beginning of the time window (that lasts 10 seconds) on - /// which the associated bucket will calculate statistics. - buckets: BTreeMap, - /// The oldeest timestamp we will allow for the current time bucket. - oldest_timestamp: u64, -} - -impl Aggregator { - pub fn new() -> Self { - Self { - buckets: BTreeMap::new(), - oldest_timestamp: align_timestamp(Utc::now().timestamp_nanos() as u64), - } - } - - /// Iterates over a trace's constituting spans and upon matching conditions it updates statistics (mostly using the top level span). - fn handle_trace(&mut self, partition_key: &PartitionKey, trace: &TraceEvent) { - // Based on https://github.com/DataDog/datadog-agent/blob/cfa750c7412faa98e87a015f8ee670e5828bbe7f/pkg/trace/stats/concentrator.go#L148-L184 - - let spans = match trace.get("spans") { - Some(Value::Array(v)) => v.iter().filter_map(|s| s.as_object()).collect(), - _ => vec![], - }; - - let weight = extract_weight_from_root_span(&spans); - let payload_aggkey = PayloadAggregationKey { - env: partition_key.env.clone().unwrap_or_default(), - hostname: partition_key.hostname.clone().unwrap_or_default(), - version: trace - .get("app_version") - .map(|v| v.to_string_lossy()) - .unwrap_or_default(), - container_id: trace - .get("container_id") - .map(|v| v.to_string_lossy()) - .unwrap_or_default(), - }; - let synthetics = trace - .get("origin") - .map(|v| v.to_string_lossy().starts_with(TAG_SYNTHETICS)) - .unwrap_or(false); - - spans.iter().for_each(|span| { - let is_top = has_top_level(span); - if !(is_top || is_measured(span) || is_partial_snapshot(span)) { - return; - } - - self.handle_span(span, weight, is_top, synthetics, payload_aggkey.clone()); - }); - } - - /// Aggregates statistics per key over 10 seconds windows. - /// The key is constructed from various span/trace properties (see `AggregationKey`). - fn handle_span( - &mut self, - span: &BTreeMap, - weight: f64, - is_top: bool, - synthetics: bool, - payload_aggkey: PayloadAggregationKey, - ) { - // Based on: https://github.com/DataDog/datadog-agent/blob/cfa750c7412faa98e87a015f8ee670e5828bbe7f/pkg/trace/stats/statsraw.go#L147-L182 - - let aggkey = AggregationKey::new_aggregation_from_span(span, payload_aggkey, synthetics); - - let start = match span.get("start") { - Some(Value::Timestamp(val)) => val.timestamp_nanos() as u64, - _ => Utc::now().timestamp_nanos() as u64, - }; - - let duration = match span.get("duration") { - Some(Value::Integer(val)) => *val as u64, - None => 0, - _ => panic!("`duration` should be an i64"), - }; - - let end = start + duration; - - // 10 second bucket window - let mut btime = align_timestamp(end); - - // If too far in the past, use the oldest-allowed time bucket instead - if btime < self.oldest_timestamp { - btime = self.oldest_timestamp - } - - match self.buckets.get_mut(&btime) { - Some(b) => { - b.add(span, weight, is_top, aggkey); - } - None => { - let mut b = Bucket { - start: btime, - duration: BUCKET_DURATION_NANOSECONDS, - data: BTreeMap::new(), - }; - b.add(span, weight, is_top, aggkey); - self.buckets.insert(btime, b); - } - } - } - - fn get_client_stats_payload(&self) -> Vec { - let client_stats_buckets = self.export_buckets(); - - client_stats_buckets - .into_iter() - .map(|(payload_aggkey, csb)| { - ClientStatsPayload { - env: payload_aggkey.env, - hostname: payload_aggkey.hostname, - container_id: payload_aggkey.container_id, - version: payload_aggkey.version, - stats: csb, - // All the following fields are left unset by the trace-agent: - // https://github.com/DataDog/datadog-agent/blob/42e72dd/pkg/trace/stats/concentrator.go#L216-L227 - service: "".to_string(), - agent_aggregation: "".to_string(), - sequence: 0, - runtime_id: "".to_string(), - lang: "".to_string(), - tracer_version: "".to_string(), - tags: vec![], - } - }) - .collect::>() - } - - fn export_buckets(&self) -> BTreeMap> { - let mut m = BTreeMap::>::new(); - self.buckets.values().for_each(|b| { - b.export().into_iter().for_each(|(payload_key, csb)| { - match m.get_mut(&payload_key) { - None => { - m.insert(payload_key.clone(), vec![csb]); - } - Some(s) => { - s.push(csb); - } - }; - }) - }); - m - } - - /// Flushes the bucket cache of stale entries. - /// It means that we cache and can compute stats only for the last `BUCKET_WINDOW_LEN * BUCKET_DURATION_NANOSECONDS` and after such time, - /// buckets are then flushed. This only applies to past buckets. Stats buckets in the future are cached with no restriction. - fn flush(&mut self) { - // Based on https://github.com/DataDog/datadog-agent/blob/cfa750c7412faa98e87a015f8ee670e5828bbe7f/pkg/trace/stats/concentrator.go#L38-L41 - // , and https://github.com/DataDog/datadog-agent/blob/cfa750c7412faa98e87a015f8ee670e5828bbe7f/pkg/trace/stats/concentrator.go#L195-L207 - - // NOTE: The Agent flushes on a specific time interval. We are currently flushing per batch - // input to the sink. - // If this becomes problematic for users, we will have to spin up a separate thread that - // flushes on a specific time interval. - // In theory the existing approach should work, and would just manifest as overall more stats payloads - // being output by Vector, but the payloads themselves should be correct. - // https://github.com/DataDog/datadog-agent/blob/cfa750c7412faa98e87a015f8ee670e5828bbe7f/pkg/trace/stats/concentrator.go#L83-L108 - - let now = Utc::now().timestamp_nanos() as u64; - let flush_cutoff_time = now - (BUCKET_DURATION_NANOSECONDS * BUCKET_WINDOW_LEN); - - // remove entries from the cache if the start time is outside the retaining window - self.buckets.retain(|&bucket_start, _bucket| { - let retain = bucket_start > flush_cutoff_time; - - if !retain { - debug!("Flushing {} start_time bucket.", bucket_start); - } - retain - }); - - // update the oldest_timestamp allowed, to prevent having stats for an already flushed - // bucket - let new_oldest_ts = - align_timestamp(now) - ((BUCKET_WINDOW_LEN - 1) * BUCKET_DURATION_NANOSECONDS); - - if new_oldest_ts > self.oldest_timestamp { - debug!("Updated oldest_timestamp to {}.", new_oldest_ts); - self.oldest_timestamp = new_oldest_ts; - } - } -} - -/// Returns the provided timestamp truncated to the bucket size. -/// This is the start time of the time bucket in which such timestamp falls. -const fn align_timestamp(start: u64) -> u64 { - // Based on https://github.com/DataDog/datadog-agent/blob/cfa750c7412faa98e87a015f8ee670e5828bbe7f/pkg/trace/stats/concentrator.go#L232-L234 - start - (start % BUCKET_DURATION_NANOSECONDS) -} - -/// Assumes that all metrics are all encoded as Value::Float. -/// Return the f64 of the specified key or None of key not present. -fn get_metric_value_float(span: &BTreeMap, key: &str) -> Option { - span.get("metrics") - .and_then(|m| m.as_object()) - .map(|m| match m.get(key) { - Some(Value::Float(f)) => Some(f.into_inner()), - None => None, - _ => panic!("`metric` values should be all be f64"), - }) - .unwrap_or(None) -} - -/// Returns true if the value of this metric is equal to 1.0 -fn metric_value_is_1(span: &BTreeMap, key: &str) -> bool { - match get_metric_value_float(span, key) { - Some(f) => f == 1.0, - None => false, - } -} - -/// Returns true if span is top-level. -fn has_top_level(span: &BTreeMap) -> bool { - // Based on: https://github.com/DataDog/datadog-agent/blob/cfa750c7412faa98e87a015f8ee670e5828bbe7f/pkg/trace/traceutil/span.go#L28-L31 - - metric_value_is_1(span, TOP_LEVEL_KEY) -} - -/// Returns true if a span should be measured (i.e. it should get trace metrics calculated). -fn is_measured(span: &BTreeMap) -> bool { - // Based on https://github.com/DataDog/datadog-agent/blob/cfa750c7412faa98e87a015f8ee670e5828bbe7f/pkg/trace/traceutil/span.go#L40-L43 - - metric_value_is_1(span, MEASURED_KEY) -} - -/// Returns true if the span is a partial snapshot. -/// These types of spans are partial images of long-running spans. -/// When incomplete, a partial snapshot has a metric _dd.partial_version which is a positive integer. -/// The metric usually increases each time a new version of the same span is sent by the tracer -fn is_partial_snapshot(span: &BTreeMap) -> bool { - // Based on: https://github.com/DataDog/datadog-agent/blob/cfa750c7412faa98e87a015f8ee670e5828bbe7f/pkg/trace/traceutil/span.go#L49-L52 - - match get_metric_value_float(span, PARTIAL_VERSION_KEY) { - Some(f) => f >= 0.0, - None => false, - } -} - -/// This extracts the relative weights from the top level span (i.e. the span that does not have a parent). -fn extract_weight_from_root_span(spans: &[&BTreeMap]) -> f64 { - // Based on https://github.com/DataDog/datadog-agent/blob/cfa750c7412faa98e87a015f8ee670e5828bbe7f/pkg/trace/stats/weight.go#L17-L26. - - // TODO this logic likely has a bug(s) that need to be root caused. The root span is not reliably found and defaults to "1.0" - // regularly for users even when sampling is disabled in the Agent. - // GH issue to track that: https://github.com/vectordotdev/vector/issues/14859 - - if spans.is_empty() { - return 1.0; - } - - let mut trace_id: Option = None; - - let mut parent_id_to_child_weight = BTreeMap::::new(); - let mut span_ids = Vec::::new(); - for s in spans.iter() { - // TODO these need to change to u64 when the following issue is fixed: - // https://github.com/vectordotdev/vector/issues/14687 - let parent_id = match s.get("parent_id") { - Some(Value::Integer(val)) => *val, - None => 0, - _ => panic!("`parent_id` should be an i64"), - }; - let span_id = match s.get("span_id") { - Some(Value::Integer(val)) => *val, - None => 0, - _ => panic!("`span_id` should be an i64"), - }; - if trace_id.is_none() { - trace_id = match s.get("trace_id") { - Some(Value::Integer(v)) => Some(*v as usize), - _ => panic!("`trace_id` should be an i64"), - } - } - let weight = s - .get("metrics") - .and_then(|m| m.as_object()) - .map(|m| match m.get(SAMPLING_RATE_KEY) { - Some(Value::Float(v)) => { - let sample_rate = v.into_inner(); - if sample_rate <= 0.0 || sample_rate > 1.0 { - 1.0 - } else { - 1.0 / sample_rate - } - } - _ => 1.0, - }) - .unwrap_or(1.0); - - // found root - if parent_id == 0 { - return weight; - } - - span_ids.push(span_id); - - parent_id_to_child_weight.insert(parent_id, weight); - } - - // Remove all spans that have a parent - span_ids.iter().for_each(|id| { - parent_id_to_child_weight.remove(id); - }); - - // There should be only one value remaining, the weight from the root span - if parent_id_to_child_weight.len() != 1 { - // TODO remove the debug print and emit the Error event as outlined in - // https://github.com/vectordotdev/vector/issues/14859 - debug!( - "Didn't reliably find the root span for weight calculation of trace_id {:?}.", - trace_id - ); - } - - *parent_id_to_child_weight - .values() - .next() - .unwrap_or_else(|| { - // TODO remove the debug print and emit the Error event as outlined in - // https://github.com/vectordotdev/vector/issues/14859 - debug!( - "Root span was not found. Defaulting to weight of 1.0 for trace_id {:?}.", - trace_id - ); - &1.0 - }) -} - -pub(crate) fn compute_apm_stats( - key: &PartitionKey, - aggregator: &mut Aggregator, - traces: &[TraceEvent], -) -> StatsPayload { - // flush stale entries from the cache - aggregator.flush(); - - // process the incoming traces - traces.iter().for_each(|t| aggregator.handle_trace(key, t)); - - StatsPayload { - agent_hostname: key.hostname.clone().unwrap_or_default(), - agent_env: key.env.clone().unwrap_or_default(), - stats: aggregator.get_client_stats_payload(), - agent_version: key.agent_version.clone().unwrap_or_default(), - client_computed: false, - } -} - -// On the agent side APM Stats payload are encoded into the messagepack format using this -// go code https://github.com/DataDog/datadog-agent/blob/b5bed4d/pkg/trace/pb/stats_gen.go. -// Note that this code is generated from code itself generate from this .proto file -// https://github.com/DataDog/datadog-agent/blob/dc2f202/pkg/trace/pb/stats.proto. -// All the subsequent struct are dedicated to be used with rmp_serde and the fields names -// exactly match the ones of the go code. -#[derive(Debug, PartialEq, Deserialize, Serialize)] -#[serde(rename_all = "PascalCase")] -pub(crate) struct StatsPayload { - pub(crate) agent_hostname: String, - pub(crate) agent_env: String, - pub(crate) stats: Vec, - pub(crate) agent_version: String, - pub(crate) client_computed: bool, -} - -#[derive(Debug, PartialEq, Deserialize, Serialize)] -#[serde(rename_all = "PascalCase")] -pub(crate) struct ClientStatsPayload { - pub(crate) hostname: String, - pub(crate) env: String, - pub(crate) version: String, - pub(crate) stats: Vec, - pub(crate) lang: String, - pub(crate) tracer_version: String, - #[serde(rename = "RuntimeID")] - pub(crate) runtime_id: String, - pub(crate) sequence: u64, - pub(crate) agent_aggregation: String, - pub(crate) service: String, - #[serde(rename = "ContainerID")] - pub(crate) container_id: String, - pub(crate) tags: Vec, -} - -#[derive(Debug, PartialEq, Deserialize, Serialize)] -#[serde(rename_all = "PascalCase")] -pub(crate) struct ClientStatsBucket { - pub(crate) start: u64, - pub(crate) duration: u64, - pub(crate) stats: Vec, - pub(crate) agent_time_shift: i64, -} - -#[derive(Debug, PartialEq, Deserialize, Serialize)] -#[serde(rename_all = "PascalCase")] -pub(crate) struct ClientGroupedStats { - pub(crate) service: String, - pub(crate) name: String, - pub(crate) resource: String, - #[serde(rename = "HTTPStatusCode")] - pub(crate) http_status_code: u32, - pub(crate) r#type: String, - #[serde(rename = "DBType")] - pub(crate) db_type: String, - pub(crate) hits: u64, - pub(crate) errors: u64, - pub(crate) duration: u64, - #[serde(with = "serde_bytes")] - pub(crate) ok_summary: Vec, - #[serde(with = "serde_bytes")] - pub(crate) error_summary: Vec, - pub(crate) synthetics: bool, - pub(crate) top_level_hits: u64, -} diff --git a/src/sinks/datadog/traces/tests.rs b/src/sinks/datadog/traces/tests.rs index 730233d0deb7c..1ba7468c7d9bb 100644 --- a/src/sinks/datadog/traces/tests.rs +++ b/src/sinks/datadog/traces/tests.rs @@ -10,7 +10,8 @@ use prost::Message; use rmp_serde; use vector_core::event::{BatchNotifier, BatchStatus, Event}; -use super::{dd_proto, ddsketch_full, stats::StatsPayload, DatadogTracesConfig}; +use super::{apm_stats::StatsPayload, dd_proto, ddsketch_full, DatadogTracesConfig}; + use crate::{ config::SinkConfig, event::{TraceEvent, Value}, @@ -135,8 +136,8 @@ async fn smoke() { // encoded & emitted in the same payload but we also get an APM stats payload let mut output = rx.take(2).collect::>().await; - let trace = output.pop(); let stats = output.pop(); + let trace = output.pop(); assert!(trace.is_some()); assert!(stats.is_some()); @@ -246,8 +247,8 @@ async fn multiple_traces() { let mut output = rx.take(2).collect::>().await; - let trace = output.pop(); let stats = output.pop(); + let trace = output.pop(); assert!(trace.is_some()); assert!(stats.is_some());