diff --git a/src/sinks/datadog/traces/request_builder.rs b/src/sinks/datadog/traces/request_builder.rs index 01ec1742a381e..633630100c622 100644 --- a/src/sinks/datadog/traces/request_builder.rs +++ b/src/sinks/datadog/traces/request_builder.rs @@ -64,7 +64,7 @@ pub struct DatadogTracesRequestBuilder { api_key: Arc, endpoint_configuration: DatadogTracesEndpointConfiguration, compression: Compression, - trace_encoder: DatadogTracesEncoder, + max_size: usize, /// Contains the Aggregated stats across a time window. stats_aggregator: Arc>, } @@ -81,7 +81,7 @@ impl DatadogTracesRequestBuilder { api_key, endpoint_configuration, compression, - trace_encoder: DatadogTracesEncoder { max_size }, + max_size, stats_aggregator, }) } @@ -106,8 +106,6 @@ impl IncrementalRequestBuilder<(PartitionKey, Vec)> for DatadogTracesRequ input: (PartitionKey, Vec), ) -> Vec> { let (key, events) = input; - let mut results = Vec::new(); - let n = events.len(); let trace_events = events .into_iter() .filter_map(|e| e.try_into_trace()) @@ -117,11 +115,10 @@ impl IncrementalRequestBuilder<(PartitionKey, Vec)> for DatadogTracesRequ // separately from the sink framework, by the thread `flush_apm_stats_thread()` compute_apm_stats(&key, Arc::clone(&self.stats_aggregator), &trace_events); - self.trace_encoder - .encode_trace(&key, trace_events) + encode_traces(&key, trace_events, self.max_size) .into_iter() - .for_each(|r| match r { - Ok((payload, mut processed)) => { + .map(|result| { + result.and_then(|(payload, mut processed)| { let uncompressed_size = payload.len(); let metadata = DDTracesMetadata { api_key: key @@ -146,22 +143,17 @@ impl IncrementalRequestBuilder<(PartitionKey, Vec)> for DatadogTracesRequ .expect("payload should never be zero length"); let request_metadata = builder.with_request_size(bytes_len); - results.push(Ok(((metadata, request_metadata), bytes))) + Ok(((metadata, request_metadata), bytes)) } - Err(e) => results.push(Err(RequestBuilderError::FailedToBuild { + Err(e) => Err(RequestBuilderError::FailedToBuild { message: "Payload compression failed.", reason: e.to_string(), - dropped_events: n as u64, - })), + dropped_events: processed.len() as u64, + }), } - } - Err(err) => results.push(Err(RequestBuilderError::FailedToBuild { - message: err.parts().0, - reason: err.parts().1.into(), - dropped_events: err.parts().2, - })), - }); - results + }) + }) + .collect() } fn build_request(&mut self, metadata: Self::Metadata, payload: Self::Payload) -> Self::Request { @@ -208,248 +200,335 @@ pub fn build_request( } } -pub struct DatadogTracesEncoder { +fn encode_traces( + key: &PartitionKey, + trace_events: Vec, max_size: usize, -} - -#[derive(Debug, Snafu)] -pub enum EncoderError { - #[snafu(display("Unable to split payload into small enough chunks"))] - UnableToSplit { - dropped_events: u64, - error_code: &'static str, - }, -} - -impl EncoderError { - pub const fn parts(&self) -> (&'static str, &'static str, u64) { - match self { - Self::UnableToSplit { - dropped_events: n, - error_code, - } => ("unable to split into small chunks", error_code, *n), - } - } -} - -impl DatadogTracesEncoder { - fn encode_trace( - &self, - key: &PartitionKey, - events: Vec, - ) -> Vec, Vec), EncoderError>> { - let mut encoded_payloads = Vec::new(); - let payload = DatadogTracesEncoder::trace_into_payload(key, &events); - let encoded_payload = payload.encode_to_vec(); - // This may happen exceptionally - if encoded_payload.len() > self.max_size { - debug!("A payload exceeded the maximum size, splitting into multiple."); - let n_chunks: usize = (encoded_payload.len() / self.max_size) + 1; - let chunk_size = (events.len() / n_chunks) + 1; - events.chunks(chunk_size).for_each(|events| { - let chunked_payload = DatadogTracesEncoder::trace_into_payload(key, events); - let encoded_chunk = chunked_payload.encode_to_vec(); - if encoded_chunk.len() > self.max_size { - encoded_payloads.push(Err(EncoderError::UnableToSplit { - dropped_events: events.len() as u64, - error_code: "message_too_big", +) -> Vec, Vec), RequestBuilderError>> { + let mut results = Vec::new(); + let mut processed = Vec::new(); + let mut payload = build_empty_payload(key); + + for trace in trace_events { + let mut proto = encode_trace(&trace); + + loop { + payload.tracer_payloads.push(proto); + if payload.encoded_len() >= max_size { + // take it back out + proto = payload.tracer_payloads.pop().expect("just pushed"); + if payload.tracer_payloads.is_empty() { + // this individual trace is too big + results.push(Err(RequestBuilderError::FailedToBuild { + message: "Dropped trace event", + reason: "Trace is larger than allowed payload size".into(), + dropped_events: 1, })); + + break; } else { - encoded_payloads.push(Ok((encoded_chunk, events.to_vec()))); + // try with a fresh payload + results.push(Ok(( + payload.encode_to_vec(), + std::mem::take(&mut processed), + ))); + payload = build_empty_payload(key); } - }) - } else { - encoded_payloads.push(Ok((encoded_payload, events))); + } else { + processed.push(trace); + break; + } } - encoded_payloads } + results.push(Ok(( + payload.encode_to_vec(), + std::mem::take(&mut processed), + ))); + results +} - fn trace_into_payload(key: &PartitionKey, events: &[TraceEvent]) -> dd_proto::TracePayload { - dd_proto::TracePayload { - host_name: key.hostname.clone().unwrap_or_default(), - env: key.env.clone().unwrap_or_default(), - traces: vec![], // Field reserved for the older trace payloads - transactions: vec![], // Field reserved for the older trace payloads - tracer_payloads: events - .iter() - .map(DatadogTracesEncoder::vector_trace_into_dd_tracer_payload) - .collect(), - // We only send tags at the Trace level - tags: BTreeMap::new(), - agent_version: key.agent_version.clone().unwrap_or_default(), - target_tps: key.target_tps.map(|tps| tps as f64).unwrap_or_default(), - error_tps: key.error_tps.map(|tps| tps as f64).unwrap_or_default(), - } +fn build_empty_payload(key: &PartitionKey) -> dd_proto::TracePayload { + dd_proto::TracePayload { + host_name: key.hostname.clone().unwrap_or_default(), + env: key.env.clone().unwrap_or_default(), + traces: vec![], // Field reserved for the older trace payloads + transactions: vec![], // Field reserved for the older trace payloads + tracer_payloads: vec![], + // We only send tags at the Trace level + tags: BTreeMap::new(), + agent_version: key.agent_version.clone().unwrap_or_default(), + target_tps: key.target_tps.map(|tps| tps as f64).unwrap_or_default(), + error_tps: key.error_tps.map(|tps| tps as f64).unwrap_or_default(), } +} - fn vector_trace_into_dd_tracer_payload(trace: &TraceEvent) -> dd_proto::TracerPayload { - let tags = trace - .get(event_path!("tags")) - .and_then(|m| m.as_object()) - .map(|m| { - m.iter() - .map(|(k, v)| (k.clone(), v.to_string_lossy().into_owned())) - .collect::>() - }) - .unwrap_or_default(); - - let spans = match trace.get(event_path!("spans")) { - Some(Value::Array(v)) => v - .iter() - .filter_map(|s| s.as_object().map(DatadogTracesEncoder::convert_span)) - .collect(), - _ => vec![], - }; +fn encode_trace(trace: &TraceEvent) -> dd_proto::TracerPayload { + let tags = trace + .get(event_path!("tags")) + .and_then(|m| m.as_object()) + .map(|m| { + m.iter() + .map(|(k, v)| (k.clone(), v.to_string_lossy().into_owned())) + .collect::>() + }) + .unwrap_or_default(); + + let spans = match trace.get(event_path!("spans")) { + Some(Value::Array(v)) => v + .iter() + .filter_map(|s| s.as_object().map(convert_span)) + .collect(), + _ => vec![], + }; + + let chunk = dd_proto::TraceChunk { + priority: trace + .get(event_path!("priority")) + .and_then(|v| v.as_integer().map(|v| v as i32)) + // This should not happen for Datadog originated traces, but in case this field is not populated + // we default to 1 (https://github.com/DataDog/datadog-agent/blob/eac2327/pkg/trace/sampler/sampler.go#L54-L55), + // which is what the Datadog trace-agent is doing for OTLP originated traces, as per + // https://github.com/DataDog/datadog-agent/blob/3ea2eb4/pkg/trace/api/otlp.go#L309. + .unwrap_or(1i32), + origin: trace + .get(event_path!("origin")) + .map(|v| v.to_string_lossy().into_owned()) + .unwrap_or_default(), + dropped_trace: trace + .get(event_path!("dropped")) + .and_then(|v| v.as_boolean()) + .unwrap_or(false), + spans, + tags: tags.clone(), + }; + + dd_proto::TracerPayload { + container_id: trace + .get(event_path!("container_id")) + .map(|v| v.to_string_lossy().into_owned()) + .unwrap_or_default(), + language_name: trace + .get(event_path!("language_name")) + .map(|v| v.to_string_lossy().into_owned()) + .unwrap_or_default(), + language_version: trace + .get(event_path!("language_version")) + .map(|v| v.to_string_lossy().into_owned()) + .unwrap_or_default(), + tracer_version: trace + .get(event_path!("tracer_version")) + .map(|v| v.to_string_lossy().into_owned()) + .unwrap_or_default(), + runtime_id: trace + .get(event_path!("runtime_id")) + .map(|v| v.to_string_lossy().into_owned()) + .unwrap_or_default(), + chunks: vec![chunk], + tags, + env: trace + .get(event_path!("env")) + .map(|v| v.to_string_lossy().into_owned()) + .unwrap_or_default(), + hostname: trace + .get(event_path!("hostname")) + .map(|v| v.to_string_lossy().into_owned()) + .unwrap_or_default(), + app_version: trace + .get(event_path!("app_version")) + .map(|v| v.to_string_lossy().into_owned()) + .unwrap_or_default(), + } +} - let chunk = dd_proto::TraceChunk { - priority: trace - .get(event_path!("priority")) - .and_then(|v| v.as_integer().map(|v| v as i32)) - // This should not happen for Datadog originated traces, but in case this field is not populated - // we default to 1 (https://github.com/DataDog/datadog-agent/blob/eac2327/pkg/trace/sampler/sampler.go#L54-L55), - // which is what the Datadog trace-agent is doing for OTLP originated traces, as per - // https://github.com/DataDog/datadog-agent/blob/3ea2eb4/pkg/trace/api/otlp.go#L309. - .unwrap_or(1i32), - origin: trace - .get(event_path!("origin")) - .map(|v| v.to_string_lossy().into_owned()) - .unwrap_or_default(), - dropped_trace: trace - .get(event_path!("dropped")) - .and_then(|v| v.as_boolean()) - .unwrap_or(false), - spans, - tags: tags.clone(), - }; +fn convert_span(span: &BTreeMap) -> dd_proto::Span { + let trace_id = match span.get("trace_id") { + Some(Value::Integer(val)) => *val, + _ => 0, + }; + let span_id = match span.get("span_id") { + Some(Value::Integer(val)) => *val, + _ => 0, + }; + let parent_id = match span.get("parent_id") { + Some(Value::Integer(val)) => *val, + _ => 0, + }; + let duration = match span.get("duration") { + Some(Value::Integer(val)) => *val, + _ => 0, + }; + let error = match span.get("error") { + Some(Value::Integer(val)) => *val, + _ => 0, + }; + let start = match span.get("start") { + Some(Value::Timestamp(val)) => val.timestamp_nanos_opt().expect("Timestamp out of range"), + _ => 0, + }; + + let meta = span + .get("meta") + .and_then(|m| m.as_object()) + .map(|m| { + m.iter() + .map(|(k, v)| (k.clone(), v.to_string_lossy().into_owned())) + .collect::>() + }) + .unwrap_or_default(); + + let meta_struct = span + .get("meta_struct") + .and_then(|m| m.as_object()) + .map(|m| { + m.iter() + .map(|(k, v)| (k.clone(), v.coerce_to_bytes().into_iter().collect())) + .collect::>>() + }) + .unwrap_or_default(); + + let metrics = span + .get("metrics") + .and_then(|m| m.as_object()) + .map(|m| { + m.iter() + .filter_map(|(k, v)| { + if let Value::Float(f) = v { + Some((k.clone(), f.into_inner())) + } else { + None + } + }) + .collect::>() + }) + .unwrap_or_default(); + + dd_proto::Span { + service: span + .get("service") + .map(|v| v.to_string_lossy().into_owned()) + .unwrap_or_default(), + name: span + .get("name") + .map(|v| v.to_string_lossy().into_owned()) + .unwrap_or_default(), + resource: span + .get("resource") + .map(|v| v.to_string_lossy().into_owned()) + .unwrap_or_default(), + r#type: span + .get("type") + .map(|v| v.to_string_lossy().into_owned()) + .unwrap_or_default(), + trace_id: trace_id as u64, + span_id: span_id as u64, + parent_id: parent_id as u64, + error: error as i32, + start, + duration, + meta, + metrics, + meta_struct, + } +} - dd_proto::TracerPayload { - container_id: trace - .get(event_path!("container_id")) - .map(|v| v.to_string_lossy().into_owned()) - .unwrap_or_default(), - language_name: trace - .get(event_path!("language_name")) - .map(|v| v.to_string_lossy().into_owned()) - .unwrap_or_default(), - language_version: trace - .get(event_path!("language_version")) - .map(|v| v.to_string_lossy().into_owned()) - .unwrap_or_default(), - tracer_version: trace - .get(event_path!("tracer_version")) - .map(|v| v.to_string_lossy().into_owned()) - .unwrap_or_default(), - runtime_id: trace - .get(event_path!("runtime_id")) - .map(|v| v.to_string_lossy().into_owned()) - .unwrap_or_default(), - chunks: vec![chunk], - tags, - env: trace - .get(event_path!("env")) - .map(|v| v.to_string_lossy().into_owned()) - .unwrap_or_default(), - hostname: trace - .get(event_path!("hostname")) - .map(|v| v.to_string_lossy().into_owned()) - .unwrap_or_default(), - app_version: trace - .get(event_path!("app_version")) - .map(|v| v.to_string_lossy().into_owned()) - .unwrap_or_default(), +#[cfg(test)] +mod test { + use proptest::prelude::*; + use vrl::event_path; + + use super::{encode_traces, PartitionKey}; + use crate::event::{LogEvent, TraceEvent}; + + proptest! { + #[test] + fn successfully_encode_payloads_smaller_than_max_size( + // 476 is the experimentally determined size that will fill a payload after encoding and overhead + lengths in proptest::collection::vec(16usize..476, 1usize..256), + ) { + let max_size = 1024; + + let key = PartitionKey { + api_key: Some("x".repeat(128).into()), + env: Some("production".into()), + hostname: Some("foo.bar.baz.local".into()), + agent_version: Some("1.2.3.4.5".into()), + target_tps: None, + error_tps: None, + }; + + // We only care about the size of the incoming traces, so just populate a single tag field + // that will be copied into the protobuf representation. + let traces = lengths + .into_iter() + .map(|n| { + let mut log = LogEvent::default(); + log.insert(event_path!("tags", "foo"), "x".repeat(n)); + TraceEvent::from(log) + }) + .collect(); + + for result in encode_traces(&key, traces, max_size) { + prop_assert!(result.is_ok()); + let (encoded, _processed) = result.unwrap(); + + prop_assert!( + encoded.len() <= max_size, + "encoded len {} longer than max size {}", + encoded.len(), + max_size + ); + } } } - fn convert_span(span: &BTreeMap) -> dd_proto::Span { - let trace_id = match span.get("trace_id") { - Some(Value::Integer(val)) => *val, - _ => 0, - }; - let span_id = match span.get("span_id") { - Some(Value::Integer(val)) => *val, - _ => 0, - }; - let parent_id = match span.get("parent_id") { - Some(Value::Integer(val)) => *val, - _ => 0, - }; - let duration = match span.get("duration") { - Some(Value::Integer(val)) => *val, - _ => 0, - }; - let error = match span.get("error") { - Some(Value::Integer(val)) => *val, - _ => 0, - }; - let start = match span.get("start") { - Some(Value::Timestamp(val)) => { - val.timestamp_nanos_opt().expect("Timestamp out of range") - } - _ => 0, + #[test] + fn handles_too_large_events() { + let max_size = 1024; + // 476 is experimentally determined to be too big to fit into a <1024 byte proto + let lengths = [128, 476, 128]; + + let key = PartitionKey { + api_key: Some("x".repeat(128).into()), + env: Some("production".into()), + hostname: Some("foo.bar.baz.local".into()), + agent_version: Some("1.2.3.4.5".into()), + target_tps: None, + error_tps: None, }; - let meta = span - .get("meta") - .and_then(|m| m.as_object()) - .map(|m| { - m.iter() - .map(|(k, v)| (k.clone(), v.to_string_lossy().into_owned())) - .collect::>() - }) - .unwrap_or_default(); - - let meta_struct = span - .get("meta_struct") - .and_then(|m| m.as_object()) - .map(|m| { - m.iter() - .map(|(k, v)| (k.clone(), v.coerce_to_bytes().into_iter().collect())) - .collect::>>() - }) - .unwrap_or_default(); - - let metrics = span - .get("metrics") - .and_then(|m| m.as_object()) - .map(|m| { - m.iter() - .filter_map(|(k, v)| { - if let Value::Float(f) = v { - Some((k.clone(), f.into_inner())) - } else { - None - } - }) - .collect::>() + // We only care about the size of the incoming traces, so just populate a single tag field + // that will be copied into the protobuf representation. + let traces = lengths + .into_iter() + .map(|n| { + let mut log = LogEvent::default(); + log.insert(event_path!("tags", "foo"), "x".repeat(n)); + TraceEvent::from(log) }) - .unwrap_or_default(); - - dd_proto::Span { - service: span - .get("service") - .map(|v| v.to_string_lossy().into_owned()) - .unwrap_or_default(), - name: span - .get("name") - .map(|v| v.to_string_lossy().into_owned()) - .unwrap_or_default(), - resource: span - .get("resource") - .map(|v| v.to_string_lossy().into_owned()) - .unwrap_or_default(), - r#type: span - .get("type") - .map(|v| v.to_string_lossy().into_owned()) - .unwrap_or_default(), - trace_id: trace_id as u64, - span_id: span_id as u64, - parent_id: parent_id as u64, - error: error as i32, - start, - duration, - meta, - metrics, - meta_struct, + .collect(); + + let mut results = encode_traces(&key, traces, max_size); + assert_eq!(3, results.len()); + + match &mut results[..] { + [Ok(one), Err(_two), Ok(three)] => { + for (encoded, processed) in [one, three] { + assert_eq!(1, processed.len()); + assert!( + encoded.len() <= max_size, + "encoded len {} longer than max size {}", + encoded.len(), + max_size + ); + } + } + _ => panic!( + "unexpected output {:?}", + results + .iter() + .map(|r| r.as_ref().map(|(_, p)| p.len())) + .collect::>() + ), } } }