From 26342d5ec703b78cf9026abe57f1afeb04b7a76a Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Thu, 17 Aug 2023 12:19:03 +0100 Subject: [PATCH] Add all events that are being encoded Signed-off-by: Stephen Wakely --- lib/vector-common/src/request_metadata.rs | 6 +- src/sinks/util/encoding.rs | 175 ++++++++++++---------- 2 files changed, 102 insertions(+), 79 deletions(-) diff --git a/lib/vector-common/src/request_metadata.rs b/lib/vector-common/src/request_metadata.rs index 6069a0dda837b..12a164d23e437 100644 --- a/lib/vector-common/src/request_metadata.rs +++ b/lib/vector-common/src/request_metadata.rs @@ -61,7 +61,7 @@ impl GroupedCountByteSize { /// Returns a `HashMap` of tags => event counts for when we are tracking by tags. /// Returns `None` if we are not tracking by tags. #[must_use] - #[cfg(test)] + #[cfg(any(test, feature = "test"))] pub fn sizes(&self) -> Option<&HashMap> { match self { Self::Tagged { sizes } => Some(sizes), @@ -71,8 +71,8 @@ impl GroupedCountByteSize { /// Returns a single count for when we are not tracking by tags. #[must_use] - #[cfg(test)] - fn size(&self) -> Option { + #[cfg(any(test, feature = "test"))] + pub fn size(&self) -> Option { match self { Self::Tagged { .. } => None, Self::Untagged { size } => Some(*size), diff --git a/src/sinks/util/encoding.rs b/src/sinks/util/encoding.rs index b60e7ef177410..d74b0ad3c2034 100644 --- a/src/sinks/util/encoding.rs +++ b/src/sinks/util/encoding.rs @@ -39,6 +39,11 @@ impl Encoder> for (Transformer, crate::codecs::Encoder) { if let Some(last) = events.pop() { for mut event in events { self.0.transform(&mut event); + + // Ensure the json size is calculated after any fields have been removed + // by the transformer. + byte_size.add_event(&event, event.estimated_json_encoded_size_of()); + let mut bytes = BytesMut::new(); encoder .encode(event, &mut bytes) @@ -150,6 +155,7 @@ mod tests { CharacterDelimitedEncoder, JsonSerializerConfig, NewlineDelimitedEncoder, TextSerializerConfig, }; + use vector_common::{internal_event::CountByteSize, json_size::JsonSize}; use vector_core::event::LogEvent; use vrl::value::Value; @@ -166,10 +172,14 @@ mod tests { ); let mut writer = Vec::new(); - let (written, _json_size) = encoding.encode_input(vec![], &mut writer).unwrap(); + let (written, json_size) = encoding.encode_input(vec![], &mut writer).unwrap(); assert_eq!(written, 2); assert_eq!(String::from_utf8(writer).unwrap(), "[]"); + assert_eq!( + CountByteSize(0, JsonSize::zero()), + json_size.size().unwrap() + ); } #[test] @@ -183,18 +193,21 @@ mod tests { ); let mut writer = Vec::new(); - let (written, _json_size) = encoding - .encode_input( - vec![Event::Log(LogEvent::from(BTreeMap::from([( - String::from("key"), - Value::from("value"), - )])))], - &mut writer, - ) - .unwrap(); + let input = vec![Event::Log(LogEvent::from(BTreeMap::from([( + String::from("key"), + Value::from("value"), + )])))]; + + let input_json_size = input + .iter() + .map(|event| event.estimated_json_encoded_size_of()) + .sum::(); + + let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap(); assert_eq!(written, 17); assert_eq!(String::from_utf8(writer).unwrap(), r#"[{"key":"value"}]"#); + assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap()); } #[test] @@ -207,32 +220,36 @@ mod tests { ), ); + let input = vec![ + Event::Log(LogEvent::from(BTreeMap::from([( + String::from("key"), + Value::from("value1"), + )]))), + Event::Log(LogEvent::from(BTreeMap::from([( + String::from("key"), + Value::from("value2"), + )]))), + Event::Log(LogEvent::from(BTreeMap::from([( + String::from("key"), + Value::from("value3"), + )]))), + ]; + + let input_json_size = input + .iter() + .map(|event| event.estimated_json_encoded_size_of()) + .sum::(); + let mut writer = Vec::new(); - let (written, _json_size) = encoding - .encode_input( - vec![ - Event::Log(LogEvent::from(BTreeMap::from([( - String::from("key"), - Value::from("value1"), - )]))), - Event::Log(LogEvent::from(BTreeMap::from([( - String::from("key"), - Value::from("value2"), - )]))), - Event::Log(LogEvent::from(BTreeMap::from([( - String::from("key"), - Value::from("value3"), - )]))), - ], - &mut writer, - ) - .unwrap(); + let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap(); assert_eq!(written, 52); assert_eq!( String::from_utf8(writer).unwrap(), r#"[{"key":"value1"},{"key":"value2"},{"key":"value3"}]"# ); + + assert_eq!(CountByteSize(3, input_json_size), json_size.size().unwrap()); } #[test] @@ -246,10 +263,14 @@ mod tests { ); let mut writer = Vec::new(); - let (written, _json_size) = encoding.encode_input(vec![], &mut writer).unwrap(); + let (written, json_size) = encoding.encode_input(vec![], &mut writer).unwrap(); assert_eq!(written, 0); assert_eq!(String::from_utf8(writer).unwrap(), ""); + assert_eq!( + CountByteSize(0, JsonSize::zero()), + json_size.size().unwrap() + ); } #[test] @@ -263,18 +284,20 @@ mod tests { ); let mut writer = Vec::new(); - let (written, _json_size) = encoding - .encode_input( - vec![Event::Log(LogEvent::from(BTreeMap::from([( - String::from("key"), - Value::from("value"), - )])))], - &mut writer, - ) - .unwrap(); + let input = vec![Event::Log(LogEvent::from(BTreeMap::from([( + String::from("key"), + Value::from("value"), + )])))]; + let input_json_size = input + .iter() + .map(|event| event.estimated_json_encoded_size_of()) + .sum::(); + + let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap(); assert_eq!(written, 15); assert_eq!(String::from_utf8(writer).unwrap(), r#"{"key":"value"}"#); + assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap()); } #[test] @@ -288,31 +311,33 @@ mod tests { ); let mut writer = Vec::new(); - let (written, _json_size) = encoding - .encode_input( - vec![ - Event::Log(LogEvent::from(BTreeMap::from([( - String::from("key"), - Value::from("value1"), - )]))), - Event::Log(LogEvent::from(BTreeMap::from([( - String::from("key"), - Value::from("value2"), - )]))), - Event::Log(LogEvent::from(BTreeMap::from([( - String::from("key"), - Value::from("value3"), - )]))), - ], - &mut writer, - ) - .unwrap(); + let input = vec![ + Event::Log(LogEvent::from(BTreeMap::from([( + String::from("key"), + Value::from("value1"), + )]))), + Event::Log(LogEvent::from(BTreeMap::from([( + String::from("key"), + Value::from("value2"), + )]))), + Event::Log(LogEvent::from(BTreeMap::from([( + String::from("key"), + Value::from("value3"), + )]))), + ]; + let input_json_size = input + .iter() + .map(|event| event.estimated_json_encoded_size_of()) + .sum::(); + + let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap(); assert_eq!(written, 50); assert_eq!( String::from_utf8(writer).unwrap(), "{\"key\":\"value1\"}\n{\"key\":\"value2\"}\n{\"key\":\"value3\"}" ); + assert_eq!(CountByteSize(3, input_json_size), json_size.size().unwrap()); } #[test] @@ -323,18 +348,17 @@ mod tests { ); let mut writer = Vec::new(); - let (written, _json_size) = encoding - .encode_input( - Event::Log(LogEvent::from(BTreeMap::from([( - String::from("key"), - Value::from("value"), - )]))), - &mut writer, - ) - .unwrap(); + let input = Event::Log(LogEvent::from(BTreeMap::from([( + String::from("key"), + Value::from("value"), + )]))); + let input_json_size = input.estimated_json_encoded_size_of(); + + let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap(); assert_eq!(written, 15); assert_eq!(String::from_utf8(writer).unwrap(), r#"{"key":"value"}"#); + assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap()); } #[test] @@ -345,17 +369,16 @@ mod tests { ); let mut writer = Vec::new(); - let (written, _json_size) = encoding - .encode_input( - Event::Log(LogEvent::from(BTreeMap::from([( - String::from("message"), - Value::from("value"), - )]))), - &mut writer, - ) - .unwrap(); + let input = Event::Log(LogEvent::from(BTreeMap::from([( + String::from("message"), + Value::from("value"), + )]))); + let input_json_size = input.estimated_json_encoded_size_of(); + + let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap(); assert_eq!(written, 5); assert_eq!(String::from_utf8(writer).unwrap(), r#"value"#); + assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap()); } }