From f88f992a044041b8737bd0af48f1a93326b677f2 Mon Sep 17 00:00:00 2001
From: Luke Steensen <luke.steensen@gmail.com>
Date: Thu, 19 Oct 2023 17:06:06 -0500
Subject: [PATCH 1/4] add simple failing test

Signed-off-by: Luke Steensen <luke.steensen@gmail.com>
---
 src/sinks/datadog/traces/request_builder.rs | 53 +++++++++++++++++++++
 1 file changed, 53 insertions(+)

diff --git a/src/sinks/datadog/traces/request_builder.rs b/src/sinks/datadog/traces/request_builder.rs
index 01ec1742a381e..b0716e34c4309 100644
--- a/src/sinks/datadog/traces/request_builder.rs
+++ b/src/sinks/datadog/traces/request_builder.rs
@@ -453,3 +453,56 @@ impl DatadogTracesEncoder {
         }
     }
 }
+
+#[cfg(test)]
+mod test {
+    use vrl::event_path;
+
+    use super::{DatadogTracesEncoder, PartitionKey};
+    use crate::event::{LogEvent, TraceEvent};
+
+    #[test]
+    fn successfully_encode_payloads_smaller_than_max_size() {
+        let max_size = 1024;
+
+        let encoder = DatadogTracesEncoder { max_size };
+        let key = PartitionKey {
+            api_key: Some("x".repeat(128).into()),
+            env: Some("production".into()),
+            hostname: Some("foo.bar.baz.local".into()),
+            agent_version: Some("1.2.3.4.5".into()),
+            target_tps: None,
+            error_tps: None,
+        };
+
+        // We only care about the size of the incoming traces, so just populate a single tag field
+        // that will be copied into the protobuf representation.
+        let traces = vec![
+            "x".repeat(256),
+            "x".repeat(256),
+            "x".repeat(256),
+            "x".repeat(256),
+            "x".repeat(256),
+            "x".repeat(256),
+        ]
+        .into_iter()
+        .map(|s| {
+            let mut log = LogEvent::default();
+            log.insert(event_path!("tags", "foo"), s);
+            TraceEvent::from(log)
+        })
+        .collect();
+
+        for result in encoder.encode_trace(&key, traces) {
+            assert!(result.is_ok());
+            let (encoded, _processed) = result.unwrap();
+
+            assert!(
+                encoded.len() <= max_size,
+                "encoded len {} longer than max size {}",
+                encoded.len(),
+                max_size
+            );
+        }
+    }
+}

From 9f77baefa6d45d9539fb6dc3361120dde1fe56eb Mon Sep 17 00:00:00 2001
From: Luke Steensen <luke.steensen@gmail.com>
Date: Thu, 19 Oct 2023 17:32:14 -0500
Subject: [PATCH 2/4] refactor to passing test

Signed-off-by: Luke Steensen <luke.steensen@gmail.com>
---
 src/sinks/datadog/traces/request_builder.rs | 480 +++++++++-----------
 1 file changed, 224 insertions(+), 256 deletions(-)

diff --git a/src/sinks/datadog/traces/request_builder.rs b/src/sinks/datadog/traces/request_builder.rs
index b0716e34c4309..4df3e7b25d1ea 100644
--- a/src/sinks/datadog/traces/request_builder.rs
+++ b/src/sinks/datadog/traces/request_builder.rs
@@ -64,7 +64,7 @@ pub struct DatadogTracesRequestBuilder {
     api_key: Arc<str>,
     endpoint_configuration: DatadogTracesEndpointConfiguration,
     compression: Compression,
-    trace_encoder: DatadogTracesEncoder,
+    max_size: usize,
     /// Contains the Aggregated stats across a time window.
     stats_aggregator: Arc<Mutex<Aggregator>>,
 }
@@ -81,7 +81,7 @@ impl DatadogTracesRequestBuilder {
             api_key,
             endpoint_configuration,
             compression,
-            trace_encoder: DatadogTracesEncoder { max_size },
+            max_size,
             stats_aggregator,
         })
     }
@@ -106,8 +106,6 @@ impl IncrementalRequestBuilder<(PartitionKey, Vec<Event>)> for DatadogTracesRequ
         input: (PartitionKey, Vec<Event>),
     ) -> Vec<Result<(Self::Metadata, Self::Payload), Self::Error>> {
         let (key, events) = input;
-        let mut results = Vec::new();
-        let n = events.len();
         let trace_events = events
             .into_iter()
             .filter_map(|e| e.try_into_trace())
@@ -117,11 +115,10 @@ impl IncrementalRequestBuilder<(PartitionKey, Vec<Event>)> for DatadogTracesRequ
         // separately from the sink framework, by the thread `flush_apm_stats_thread()`
         compute_apm_stats(&key, Arc::clone(&self.stats_aggregator), &trace_events);
 
-        self.trace_encoder
-            .encode_trace(&key, trace_events)
+        encode_traces(&key, trace_events, self.max_size)
             .into_iter()
-            .for_each(|r| match r {
-                Ok((payload, mut processed)) => {
+            .map(|result| {
+                result.and_then(|(payload, mut processed)| {
                     let uncompressed_size = payload.len();
                     let metadata = DDTracesMetadata {
                         api_key: key
@@ -146,22 +143,17 @@ impl IncrementalRequestBuilder<(PartitionKey, Vec<Event>)> for DatadogTracesRequ
                                 .expect("payload should never be zero length");
                             let request_metadata = builder.with_request_size(bytes_len);
 
-                            results.push(Ok(((metadata, request_metadata), bytes)))
+                            Ok(((metadata, request_metadata), bytes))
                         }
-                        Err(e) => results.push(Err(RequestBuilderError::FailedToBuild {
+                        Err(e) => Err(RequestBuilderError::FailedToBuild {
                             message: "Payload compression failed.",
                             reason: e.to_string(),
-                            dropped_events: n as u64,
-                        })),
+                            dropped_events: processed.len() as u64,
+                        }),
                     }
-                }
-                Err(err) => results.push(Err(RequestBuilderError::FailedToBuild {
-                    message: err.parts().0,
-                    reason: err.parts().1.into(),
-                    dropped_events: err.parts().2,
-                })),
-            });
-        results
+                })
+            })
+            .collect()
     }
 
     fn build_request(&mut self, metadata: Self::Metadata, payload: Self::Payload) -> Self::Request {
@@ -208,249 +200,226 @@ pub fn build_request(
     }
 }
 
-pub struct DatadogTracesEncoder {
+fn encode_traces(
+    key: &PartitionKey,
+    trace_events: Vec<TraceEvent>,
     max_size: usize,
-}
-
-#[derive(Debug, Snafu)]
-pub enum EncoderError {
-    #[snafu(display("Unable to split payload into small enough chunks"))]
-    UnableToSplit {
-        dropped_events: u64,
-        error_code: &'static str,
-    },
-}
-
-impl EncoderError {
-    pub const fn parts(&self) -> (&'static str, &'static str, u64) {
-        match self {
-            Self::UnableToSplit {
-                dropped_events: n,
-                error_code,
-            } => ("unable to split into small chunks", error_code, *n),
+) -> Vec<Result<(Vec<u8>, Vec<TraceEvent>), RequestBuilderError>> {
+    let mut results = Vec::new();
+    let mut processed = Vec::new();
+    let mut payload = build_empty_payload(key);
+    for trace in trace_events {
+        let proto = encode_trace(&trace);
+
+        if proto.encoded_len() >= max_size {
+            results.push(Err(RequestBuilderError::FailedToBuild {
+                message: "Dropped trace event",
+                reason: "Trace is larger than allowed payload size".into(),
+                dropped_events: 1,
+            }));
+
+            continue;
         }
-    }
-}
 
-impl DatadogTracesEncoder {
-    fn encode_trace(
-        &self,
-        key: &PartitionKey,
-        events: Vec<TraceEvent>,
-    ) -> Vec<Result<(Vec<u8>, Vec<TraceEvent>), EncoderError>> {
-        let mut encoded_payloads = Vec::new();
-        let payload = DatadogTracesEncoder::trace_into_payload(key, &events);
-        let encoded_payload = payload.encode_to_vec();
-        // This may happen exceptionally
-        if encoded_payload.len() > self.max_size {
-            debug!("A payload exceeded the maximum size, splitting into multiple.");
-            let n_chunks: usize = (encoded_payload.len() / self.max_size) + 1;
-            let chunk_size = (events.len() / n_chunks) + 1;
-            events.chunks(chunk_size).for_each(|events| {
-                let chunked_payload = DatadogTracesEncoder::trace_into_payload(key, events);
-                let encoded_chunk = chunked_payload.encode_to_vec();
-                if encoded_chunk.len() > self.max_size {
-                    encoded_payloads.push(Err(EncoderError::UnableToSplit {
-                        dropped_events: events.len() as u64,
-                        error_code: "message_too_big",
-                    }));
-                } else {
-                    encoded_payloads.push(Ok((encoded_chunk, events.to_vec())));
-                }
-            })
-        } else {
-            encoded_payloads.push(Ok((encoded_payload, events)));
+        if payload.encoded_len() + proto.encoded_len() >= max_size {
+            results.push(Ok((
+                payload.encode_to_vec(),
+                std::mem::take(&mut processed),
+            )));
+            payload = build_empty_payload(key);
         }
-        encoded_payloads
-    }
 
-    fn trace_into_payload(key: &PartitionKey, events: &[TraceEvent]) -> dd_proto::TracePayload {
-        dd_proto::TracePayload {
-            host_name: key.hostname.clone().unwrap_or_default(),
-            env: key.env.clone().unwrap_or_default(),
-            traces: vec![],       // Field reserved for the older trace payloads
-            transactions: vec![], // Field reserved for the older trace payloads
-            tracer_payloads: events
-                .iter()
-                .map(DatadogTracesEncoder::vector_trace_into_dd_tracer_payload)
-                .collect(),
-            // We only send tags at the Trace level
-            tags: BTreeMap::new(),
-            agent_version: key.agent_version.clone().unwrap_or_default(),
-            target_tps: key.target_tps.map(|tps| tps as f64).unwrap_or_default(),
-            error_tps: key.error_tps.map(|tps| tps as f64).unwrap_or_default(),
-        }
+        payload.tracer_payloads.push(proto);
+        processed.push(trace);
     }
+    results.push(Ok((
+        payload.encode_to_vec(),
+        std::mem::take(&mut processed),
+    )));
+    results
+}
 
-    fn vector_trace_into_dd_tracer_payload(trace: &TraceEvent) -> dd_proto::TracerPayload {
-        let tags = trace
-            .get(event_path!("tags"))
-            .and_then(|m| m.as_object())
-            .map(|m| {
-                m.iter()
-                    .map(|(k, v)| (k.clone(), v.to_string_lossy().into_owned()))
-                    .collect::<BTreeMap<String, String>>()
-            })
-            .unwrap_or_default();
-
-        let spans = match trace.get(event_path!("spans")) {
-            Some(Value::Array(v)) => v
-                .iter()
-                .filter_map(|s| s.as_object().map(DatadogTracesEncoder::convert_span))
-                .collect(),
-            _ => vec![],
-        };
-
-        let chunk = dd_proto::TraceChunk {
-            priority: trace
-                .get(event_path!("priority"))
-                .and_then(|v| v.as_integer().map(|v| v as i32))
-                // This should not happen for Datadog originated traces, but in case this field is not populated
-                // we default to 1 (https://github.com/DataDog/datadog-agent/blob/eac2327/pkg/trace/sampler/sampler.go#L54-L55),
-                // which is what the Datadog trace-agent is doing for OTLP originated traces, as per
-                // https://github.com/DataDog/datadog-agent/blob/3ea2eb4/pkg/trace/api/otlp.go#L309.
-                .unwrap_or(1i32),
-            origin: trace
-                .get(event_path!("origin"))
-                .map(|v| v.to_string_lossy().into_owned())
-                .unwrap_or_default(),
-            dropped_trace: trace
-                .get(event_path!("dropped"))
-                .and_then(|v| v.as_boolean())
-                .unwrap_or(false),
-            spans,
-            tags: tags.clone(),
-        };
-
-        dd_proto::TracerPayload {
-            container_id: trace
-                .get(event_path!("container_id"))
-                .map(|v| v.to_string_lossy().into_owned())
-                .unwrap_or_default(),
-            language_name: trace
-                .get(event_path!("language_name"))
-                .map(|v| v.to_string_lossy().into_owned())
-                .unwrap_or_default(),
-            language_version: trace
-                .get(event_path!("language_version"))
-                .map(|v| v.to_string_lossy().into_owned())
-                .unwrap_or_default(),
-            tracer_version: trace
-                .get(event_path!("tracer_version"))
-                .map(|v| v.to_string_lossy().into_owned())
-                .unwrap_or_default(),
-            runtime_id: trace
-                .get(event_path!("runtime_id"))
-                .map(|v| v.to_string_lossy().into_owned())
-                .unwrap_or_default(),
-            chunks: vec![chunk],
-            tags,
-            env: trace
-                .get(event_path!("env"))
-                .map(|v| v.to_string_lossy().into_owned())
-                .unwrap_or_default(),
-            hostname: trace
-                .get(event_path!("hostname"))
-                .map(|v| v.to_string_lossy().into_owned())
-                .unwrap_or_default(),
-            app_version: trace
-                .get(event_path!("app_version"))
-                .map(|v| v.to_string_lossy().into_owned())
-                .unwrap_or_default(),
-        }
+fn build_empty_payload(key: &PartitionKey) -> dd_proto::TracePayload {
+    dd_proto::TracePayload {
+        host_name: key.hostname.clone().unwrap_or_default(),
+        env: key.env.clone().unwrap_or_default(),
+        traces: vec![],       // Field reserved for the older trace payloads
+        transactions: vec![], // Field reserved for the older trace payloads
+        tracer_payloads: vec![],
+        // We only send tags at the Trace level
+        tags: BTreeMap::new(),
+        agent_version: key.agent_version.clone().unwrap_or_default(),
+        target_tps: key.target_tps.map(|tps| tps as f64).unwrap_or_default(),
+        error_tps: key.error_tps.map(|tps| tps as f64).unwrap_or_default(),
     }
+}
 
-    fn convert_span(span: &BTreeMap<String, Value>) -> dd_proto::Span {
-        let trace_id = match span.get("trace_id") {
-            Some(Value::Integer(val)) => *val,
-            _ => 0,
-        };
-        let span_id = match span.get("span_id") {
-            Some(Value::Integer(val)) => *val,
-            _ => 0,
-        };
-        let parent_id = match span.get("parent_id") {
-            Some(Value::Integer(val)) => *val,
-            _ => 0,
-        };
-        let duration = match span.get("duration") {
-            Some(Value::Integer(val)) => *val,
-            _ => 0,
-        };
-        let error = match span.get("error") {
-            Some(Value::Integer(val)) => *val,
-            _ => 0,
-        };
-        let start = match span.get("start") {
-            Some(Value::Timestamp(val)) => {
-                val.timestamp_nanos_opt().expect("Timestamp out of range")
-            }
-            _ => 0,
-        };
+fn encode_trace(trace: &TraceEvent) -> dd_proto::TracerPayload {
+    let tags = trace
+        .get(event_path!("tags"))
+        .and_then(|m| m.as_object())
+        .map(|m| {
+            m.iter()
+                .map(|(k, v)| (k.clone(), v.to_string_lossy().into_owned()))
+                .collect::<BTreeMap<String, String>>()
+        })
+        .unwrap_or_default();
+
+    let spans = match trace.get(event_path!("spans")) {
+        Some(Value::Array(v)) => v
+            .iter()
+            .filter_map(|s| s.as_object().map(convert_span))
+            .collect(),
+        _ => vec![],
+    };
+
+    let chunk = dd_proto::TraceChunk {
+        priority: trace
+            .get(event_path!("priority"))
+            .and_then(|v| v.as_integer().map(|v| v as i32))
+            // This should not happen for Datadog originated traces, but in case this field is not populated
+            // we default to 1 (https://github.com/DataDog/datadog-agent/blob/eac2327/pkg/trace/sampler/sampler.go#L54-L55),
+            // which is what the Datadog trace-agent is doing for OTLP originated traces, as per
+            // https://github.com/DataDog/datadog-agent/blob/3ea2eb4/pkg/trace/api/otlp.go#L309.
+            .unwrap_or(1i32),
+        origin: trace
+            .get(event_path!("origin"))
+            .map(|v| v.to_string_lossy().into_owned())
+            .unwrap_or_default(),
+        dropped_trace: trace
+            .get(event_path!("dropped"))
+            .and_then(|v| v.as_boolean())
+            .unwrap_or(false),
+        spans,
+        tags: tags.clone(),
+    };
+
+    dd_proto::TracerPayload {
+        container_id: trace
+            .get(event_path!("container_id"))
+            .map(|v| v.to_string_lossy().into_owned())
+            .unwrap_or_default(),
+        language_name: trace
+            .get(event_path!("language_name"))
+            .map(|v| v.to_string_lossy().into_owned())
+            .unwrap_or_default(),
+        language_version: trace
+            .get(event_path!("language_version"))
+            .map(|v| v.to_string_lossy().into_owned())
+            .unwrap_or_default(),
+        tracer_version: trace
+            .get(event_path!("tracer_version"))
+            .map(|v| v.to_string_lossy().into_owned())
+            .unwrap_or_default(),
+        runtime_id: trace
+            .get(event_path!("runtime_id"))
+            .map(|v| v.to_string_lossy().into_owned())
+            .unwrap_or_default(),
+        chunks: vec![chunk],
+        tags,
+        env: trace
+            .get(event_path!("env"))
+            .map(|v| v.to_string_lossy().into_owned())
+            .unwrap_or_default(),
+        hostname: trace
+            .get(event_path!("hostname"))
+            .map(|v| v.to_string_lossy().into_owned())
+            .unwrap_or_default(),
+        app_version: trace
+            .get(event_path!("app_version"))
+            .map(|v| v.to_string_lossy().into_owned())
+            .unwrap_or_default(),
+    }
+}
 
-        let meta = span
-            .get("meta")
-            .and_then(|m| m.as_object())
-            .map(|m| {
-                m.iter()
-                    .map(|(k, v)| (k.clone(), v.to_string_lossy().into_owned()))
-                    .collect::<BTreeMap<String, String>>()
-            })
-            .unwrap_or_default();
-
-        let meta_struct = span
-            .get("meta_struct")
-            .and_then(|m| m.as_object())
-            .map(|m| {
-                m.iter()
-                    .map(|(k, v)| (k.clone(), v.coerce_to_bytes().into_iter().collect()))
-                    .collect::<BTreeMap<String, Vec<u8>>>()
-            })
-            .unwrap_or_default();
-
-        let metrics = span
-            .get("metrics")
-            .and_then(|m| m.as_object())
-            .map(|m| {
-                m.iter()
-                    .filter_map(|(k, v)| {
-                        if let Value::Float(f) = v {
-                            Some((k.clone(), f.into_inner()))
-                        } else {
-                            None
-                        }
-                    })
-                    .collect::<BTreeMap<String, f64>>()
-            })
-            .unwrap_or_default();
-
-        dd_proto::Span {
-            service: span
-                .get("service")
-                .map(|v| v.to_string_lossy().into_owned())
-                .unwrap_or_default(),
-            name: span
-                .get("name")
-                .map(|v| v.to_string_lossy().into_owned())
-                .unwrap_or_default(),
-            resource: span
-                .get("resource")
-                .map(|v| v.to_string_lossy().into_owned())
-                .unwrap_or_default(),
-            r#type: span
-                .get("type")
-                .map(|v| v.to_string_lossy().into_owned())
-                .unwrap_or_default(),
-            trace_id: trace_id as u64,
-            span_id: span_id as u64,
-            parent_id: parent_id as u64,
-            error: error as i32,
-            start,
-            duration,
-            meta,
-            metrics,
-            meta_struct,
-        }
+fn convert_span(span: &BTreeMap<String, Value>) -> dd_proto::Span {
+    let trace_id = match span.get("trace_id") {
+        Some(Value::Integer(val)) => *val,
+        _ => 0,
+    };
+    let span_id = match span.get("span_id") {
+        Some(Value::Integer(val)) => *val,
+        _ => 0,
+    };
+    let parent_id = match span.get("parent_id") {
+        Some(Value::Integer(val)) => *val,
+        _ => 0,
+    };
+    let duration = match span.get("duration") {
+        Some(Value::Integer(val)) => *val,
+        _ => 0,
+    };
+    let error = match span.get("error") {
+        Some(Value::Integer(val)) => *val,
+        _ => 0,
+    };
+    let start = match span.get("start") {
+        Some(Value::Timestamp(val)) => val.timestamp_nanos_opt().expect("Timestamp out of range"),
+        _ => 0,
+    };
+
+    let meta = span
+        .get("meta")
+        .and_then(|m| m.as_object())
+        .map(|m| {
+            m.iter()
+                .map(|(k, v)| (k.clone(), v.to_string_lossy().into_owned()))
+                .collect::<BTreeMap<String, String>>()
+        })
+        .unwrap_or_default();
+
+    let meta_struct = span
+        .get("meta_struct")
+        .and_then(|m| m.as_object())
+        .map(|m| {
+            m.iter()
+                .map(|(k, v)| (k.clone(), v.coerce_to_bytes().into_iter().collect()))
+                .collect::<BTreeMap<String, Vec<u8>>>()
+        })
+        .unwrap_or_default();
+
+    let metrics = span
+        .get("metrics")
+        .and_then(|m| m.as_object())
+        .map(|m| {
+            m.iter()
+                .filter_map(|(k, v)| {
+                    if let Value::Float(f) = v {
+                        Some((k.clone(), f.into_inner()))
+                    } else {
+                        None
+                    }
+                })
+                .collect::<BTreeMap<String, f64>>()
+        })
+        .unwrap_or_default();
+
+    dd_proto::Span {
+        service: span
+            .get("service")
+            .map(|v| v.to_string_lossy().into_owned())
+            .unwrap_or_default(),
+        name: span
+            .get("name")
+            .map(|v| v.to_string_lossy().into_owned())
+            .unwrap_or_default(),
+        resource: span
+            .get("resource")
+            .map(|v| v.to_string_lossy().into_owned())
+            .unwrap_or_default(),
+        r#type: span
+            .get("type")
+            .map(|v| v.to_string_lossy().into_owned())
+            .unwrap_or_default(),
+        trace_id: trace_id as u64,
+        span_id: span_id as u64,
+        parent_id: parent_id as u64,
+        error: error as i32,
+        start,
+        duration,
+        meta,
+        metrics,
+        meta_struct,
     }
 }
 
@@ -458,14 +427,13 @@ impl DatadogTracesEncoder {
 mod test {
     use vrl::event_path;
 
-    use super::{DatadogTracesEncoder, PartitionKey};
+    use super::{encode_traces, PartitionKey};
     use crate::event::{LogEvent, TraceEvent};
 
     #[test]
     fn successfully_encode_payloads_smaller_than_max_size() {
         let max_size = 1024;
 
-        let encoder = DatadogTracesEncoder { max_size };
         let key = PartitionKey {
             api_key: Some("x".repeat(128).into()),
             env: Some("production".into()),
@@ -493,7 +461,7 @@ mod test {
         })
         .collect();
 
-        for result in encoder.encode_trace(&key, traces) {
+        for result in encode_traces(&key, traces, max_size) {
             assert!(result.is_ok());
             let (encoded, _processed) = result.unwrap();
 

From 3cd53bd871ab6242f7c41dd7ffe90dd2f5e05f97 Mon Sep 17 00:00:00 2001
From: Luke Steensen <luke.steensen@gmail.com>
Date: Fri, 20 Oct 2023 18:13:12 -0500
Subject: [PATCH 3/4] add proptest and rework to fix issues

Signed-off-by: Luke Steensen <luke.steensen@gmail.com>
---
 src/sinks/datadog/traces/request_builder.rs | 130 ++++++++++----------
 1 file changed, 68 insertions(+), 62 deletions(-)

diff --git a/src/sinks/datadog/traces/request_builder.rs b/src/sinks/datadog/traces/request_builder.rs
index 4df3e7b25d1ea..97e361ce7734c 100644
--- a/src/sinks/datadog/traces/request_builder.rs
+++ b/src/sinks/datadog/traces/request_builder.rs
@@ -208,29 +208,36 @@ fn encode_traces(
     let mut results = Vec::new();
     let mut processed = Vec::new();
     let mut payload = build_empty_payload(key);
-    for trace in trace_events {
-        let proto = encode_trace(&trace);
-
-        if proto.encoded_len() >= max_size {
-            results.push(Err(RequestBuilderError::FailedToBuild {
-                message: "Dropped trace event",
-                reason: "Trace is larger than allowed payload size".into(),
-                dropped_events: 1,
-            }));
-
-            continue;
-        }
 
-        if payload.encoded_len() + proto.encoded_len() >= max_size {
-            results.push(Ok((
-                payload.encode_to_vec(),
-                std::mem::take(&mut processed),
-            )));
-            payload = build_empty_payload(key);
+    for trace in trace_events {
+        let mut proto = encode_trace(&trace);
+
+        loop {
+            payload.tracer_payloads.push(proto);
+            if payload.encoded_len() >= max_size {
+                if payload.tracer_payloads.len() == 1 {
+                    // this individual trace is too big
+                    results.push(Err(RequestBuilderError::FailedToBuild {
+                        message: "Dropped trace event",
+                        reason: "Trace is larger than allowed payload size".into(),
+                        dropped_events: 1,
+                    }));
+
+                    break;
+                } else {
+                    // try with a fresh payload
+                    proto = payload.tracer_payloads.pop().expect("just pushed");
+                    results.push(Ok((
+                        payload.encode_to_vec(),
+                        std::mem::take(&mut processed),
+                    )));
+                    payload = build_empty_payload(key);
+                }
+            } else {
+                processed.push(trace);
+                break;
+            }
         }
-
-        payload.tracer_payloads.push(proto);
-        processed.push(trace);
     }
     results.push(Ok((
         payload.encode_to_vec(),
@@ -425,52 +432,51 @@ fn convert_span(span: &BTreeMap<String, Value>) -> dd_proto::Span {
 
 #[cfg(test)]
 mod test {
+    use proptest::prelude::*;
     use vrl::event_path;
 
     use super::{encode_traces, PartitionKey};
     use crate::event::{LogEvent, TraceEvent};
 
-    #[test]
-    fn successfully_encode_payloads_smaller_than_max_size() {
-        let max_size = 1024;
-
-        let key = PartitionKey {
-            api_key: Some("x".repeat(128).into()),
-            env: Some("production".into()),
-            hostname: Some("foo.bar.baz.local".into()),
-            agent_version: Some("1.2.3.4.5".into()),
-            target_tps: None,
-            error_tps: None,
-        };
-
-        // We only care about the size of the incoming traces, so just populate a single tag field
-        // that will be copied into the protobuf representation.
-        let traces = vec![
-            "x".repeat(256),
-            "x".repeat(256),
-            "x".repeat(256),
-            "x".repeat(256),
-            "x".repeat(256),
-            "x".repeat(256),
-        ]
-        .into_iter()
-        .map(|s| {
-            let mut log = LogEvent::default();
-            log.insert(event_path!("tags", "foo"), s);
-            TraceEvent::from(log)
-        })
-        .collect();
-
-        for result in encode_traces(&key, traces, max_size) {
-            assert!(result.is_ok());
-            let (encoded, _processed) = result.unwrap();
-
-            assert!(
-                encoded.len() <= max_size,
-                "encoded len {} longer than max size {}",
-                encoded.len(),
-                max_size
-            );
+    proptest! {
+        #[test]
+        fn successfully_encode_payloads_smaller_than_max_size(
+            // 476 is the experimentally determined size that will fill a payload after encoding and overhead
+            lengths in proptest::collection::vec(16usize..476, 1usize..256),
+        ) {
+            let max_size = 1024;
+
+            let key = PartitionKey {
+                api_key: Some("x".repeat(128).into()),
+                env: Some("production".into()),
+                hostname: Some("foo.bar.baz.local".into()),
+                agent_version: Some("1.2.3.4.5".into()),
+                target_tps: None,
+                error_tps: None,
+            };
+
+            // We only care about the size of the incoming traces, so just populate a single tag field
+            // that will be copied into the protobuf representation.
+            let traces = lengths
+                .into_iter()
+                .map(|n| {
+                    let mut log = LogEvent::default();
+                    log.insert(event_path!("tags", "foo"), "x".repeat(n));
+                    TraceEvent::from(log)
+                })
+                .collect();
+
+            for result in encode_traces(&key, traces, max_size) {
+                prop_assert!(result.is_ok());
+                let (encoded, _processed) = result.unwrap();
+
+                prop_assert!(
+                    encoded.len() <= max_size,
+                    "encoded len {} longer than max size {}",
+                    encoded.len(),
+                    max_size
+                );
+            }
         }
     }
 }

From ccab9c6ae13a66221cdf685e1bb7a969e93e01ab Mon Sep 17 00:00:00 2001
From: Luke Steensen <luke.steensen@gmail.com>
Date: Tue, 24 Oct 2023 18:07:23 -0500
Subject: [PATCH 4/4] add test for too large, fix minor bug

Signed-off-by: Luke Steensen <luke.steensen@gmail.com>
---
 src/sinks/datadog/traces/request_builder.rs | 56 ++++++++++++++++++++-
 1 file changed, 54 insertions(+), 2 deletions(-)

diff --git a/src/sinks/datadog/traces/request_builder.rs b/src/sinks/datadog/traces/request_builder.rs
index 97e361ce7734c..633630100c622 100644
--- a/src/sinks/datadog/traces/request_builder.rs
+++ b/src/sinks/datadog/traces/request_builder.rs
@@ -215,7 +215,9 @@ fn encode_traces(
         loop {
             payload.tracer_payloads.push(proto);
             if payload.encoded_len() >= max_size {
-                if payload.tracer_payloads.len() == 1 {
+                // take it back out
+                proto = payload.tracer_payloads.pop().expect("just pushed");
+                if payload.tracer_payloads.is_empty() {
                     // this individual trace is too big
                     results.push(Err(RequestBuilderError::FailedToBuild {
                         message: "Dropped trace event",
@@ -226,7 +228,6 @@ fn encode_traces(
                     break;
                 } else {
                     // try with a fresh payload
-                    proto = payload.tracer_payloads.pop().expect("just pushed");
                     results.push(Ok((
                         payload.encode_to_vec(),
                         std::mem::take(&mut processed),
@@ -479,4 +480,55 @@ mod test {
             }
         }
     }
+
+    #[test]
+    fn handles_too_large_events() {
+        let max_size = 1024;
+        // 476 is experimentally determined to be too big to fit into a <1024 byte proto
+        let lengths = [128, 476, 128];
+
+        let key = PartitionKey {
+            api_key: Some("x".repeat(128).into()),
+            env: Some("production".into()),
+            hostname: Some("foo.bar.baz.local".into()),
+            agent_version: Some("1.2.3.4.5".into()),
+            target_tps: None,
+            error_tps: None,
+        };
+
+        // We only care about the size of the incoming traces, so just populate a single tag field
+        // that will be copied into the protobuf representation.
+        let traces = lengths
+            .into_iter()
+            .map(|n| {
+                let mut log = LogEvent::default();
+                log.insert(event_path!("tags", "foo"), "x".repeat(n));
+                TraceEvent::from(log)
+            })
+            .collect();
+
+        let mut results = encode_traces(&key, traces, max_size);
+        assert_eq!(3, results.len());
+
+        match &mut results[..] {
+            [Ok(one), Err(_two), Ok(three)] => {
+                for (encoded, processed) in [one, three] {
+                    assert_eq!(1, processed.len());
+                    assert!(
+                        encoded.len() <= max_size,
+                        "encoded len {} longer than max size {}",
+                        encoded.len(),
+                        max_size
+                    );
+                }
+            }
+            _ => panic!(
+                "unexpected output {:?}",
+                results
+                    .iter()
+                    .map(|r| r.as_ref().map(|(_, p)| p.len()))
+                    .collect::<Vec<_>>()
+            ),
+        }
+    }
 }