Skip to content

Commit

Permalink
fix(observability): add all events that are being encoded (#18289)
Browse files Browse the repository at this point in the history
Add all events that are being encoded

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>
  • Loading branch information
StephenWakely authored and jszwedko committed Aug 18, 2023
1 parent 2dcaf30 commit c9ccee0
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 79 deletions.
6 changes: 3 additions & 3 deletions lib/vector-common/src/request_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl GroupedCountByteSize {
/// Returns a `HashMap` of tags => event counts for when we are tracking by tags.
/// Returns `None` if we are not tracking by tags.
#[must_use]
#[cfg(test)]
#[cfg(any(test, feature = "test"))]
pub fn sizes(&self) -> Option<&HashMap<TaggedEventsSent, CountByteSize>> {
match self {
Self::Tagged { sizes } => Some(sizes),
Expand All @@ -71,8 +71,8 @@ impl GroupedCountByteSize {

/// Returns a single count for when we are not tracking by tags.
#[must_use]
#[cfg(test)]
fn size(&self) -> Option<CountByteSize> {
#[cfg(any(test, feature = "test"))]
pub fn size(&self) -> Option<CountByteSize> {
match self {
Self::Tagged { .. } => None,
Self::Untagged { size } => Some(*size),
Expand Down
175 changes: 99 additions & 76 deletions src/sinks/util/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ impl Encoder<Vec<Event>> for (Transformer, crate::codecs::Encoder<Framer>) {
if let Some(last) = events.pop() {
for mut event in events {
self.0.transform(&mut event);

// Ensure the json size is calculated after any fields have been removed
// by the transformer.
byte_size.add_event(&event, event.estimated_json_encoded_size_of());

let mut bytes = BytesMut::new();
encoder
.encode(event, &mut bytes)
Expand Down Expand Up @@ -150,6 +155,7 @@ mod tests {
CharacterDelimitedEncoder, JsonSerializerConfig, NewlineDelimitedEncoder,
TextSerializerConfig,
};
use vector_common::{internal_event::CountByteSize, json_size::JsonSize};
use vector_core::event::LogEvent;
use vrl::value::Value;

Expand All @@ -166,10 +172,14 @@ mod tests {
);

let mut writer = Vec::new();
let (written, _json_size) = encoding.encode_input(vec![], &mut writer).unwrap();
let (written, json_size) = encoding.encode_input(vec![], &mut writer).unwrap();
assert_eq!(written, 2);

assert_eq!(String::from_utf8(writer).unwrap(), "[]");
assert_eq!(
CountByteSize(0, JsonSize::zero()),
json_size.size().unwrap()
);
}

#[test]
Expand All @@ -183,18 +193,21 @@ mod tests {
);

let mut writer = Vec::new();
let (written, _json_size) = encoding
.encode_input(
vec![Event::Log(LogEvent::from(BTreeMap::from([(
String::from("key"),
Value::from("value"),
)])))],
&mut writer,
)
.unwrap();
let input = vec![Event::Log(LogEvent::from(BTreeMap::from([(
String::from("key"),
Value::from("value"),
)])))];

let input_json_size = input
.iter()
.map(|event| event.estimated_json_encoded_size_of())
.sum::<JsonSize>();

let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
assert_eq!(written, 17);

assert_eq!(String::from_utf8(writer).unwrap(), r#"[{"key":"value"}]"#);
assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
}

#[test]
Expand All @@ -207,32 +220,36 @@ mod tests {
),
);

let input = vec![
Event::Log(LogEvent::from(BTreeMap::from([(
String::from("key"),
Value::from("value1"),
)]))),
Event::Log(LogEvent::from(BTreeMap::from([(
String::from("key"),
Value::from("value2"),
)]))),
Event::Log(LogEvent::from(BTreeMap::from([(
String::from("key"),
Value::from("value3"),
)]))),
];

let input_json_size = input
.iter()
.map(|event| event.estimated_json_encoded_size_of())
.sum::<JsonSize>();

let mut writer = Vec::new();
let (written, _json_size) = encoding
.encode_input(
vec![
Event::Log(LogEvent::from(BTreeMap::from([(
String::from("key"),
Value::from("value1"),
)]))),
Event::Log(LogEvent::from(BTreeMap::from([(
String::from("key"),
Value::from("value2"),
)]))),
Event::Log(LogEvent::from(BTreeMap::from([(
String::from("key"),
Value::from("value3"),
)]))),
],
&mut writer,
)
.unwrap();
let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
assert_eq!(written, 52);

assert_eq!(
String::from_utf8(writer).unwrap(),
r#"[{"key":"value1"},{"key":"value2"},{"key":"value3"}]"#
);

assert_eq!(CountByteSize(3, input_json_size), json_size.size().unwrap());
}

#[test]
Expand All @@ -246,10 +263,14 @@ mod tests {
);

let mut writer = Vec::new();
let (written, _json_size) = encoding.encode_input(vec![], &mut writer).unwrap();
let (written, json_size) = encoding.encode_input(vec![], &mut writer).unwrap();
assert_eq!(written, 0);

assert_eq!(String::from_utf8(writer).unwrap(), "");
assert_eq!(
CountByteSize(0, JsonSize::zero()),
json_size.size().unwrap()
);
}

#[test]
Expand All @@ -263,18 +284,20 @@ mod tests {
);

let mut writer = Vec::new();
let (written, _json_size) = encoding
.encode_input(
vec![Event::Log(LogEvent::from(BTreeMap::from([(
String::from("key"),
Value::from("value"),
)])))],
&mut writer,
)
.unwrap();
let input = vec![Event::Log(LogEvent::from(BTreeMap::from([(
String::from("key"),
Value::from("value"),
)])))];
let input_json_size = input
.iter()
.map(|event| event.estimated_json_encoded_size_of())
.sum::<JsonSize>();

let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
assert_eq!(written, 15);

assert_eq!(String::from_utf8(writer).unwrap(), r#"{"key":"value"}"#);
assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
}

#[test]
Expand All @@ -288,31 +311,33 @@ mod tests {
);

let mut writer = Vec::new();
let (written, _json_size) = encoding
.encode_input(
vec![
Event::Log(LogEvent::from(BTreeMap::from([(
String::from("key"),
Value::from("value1"),
)]))),
Event::Log(LogEvent::from(BTreeMap::from([(
String::from("key"),
Value::from("value2"),
)]))),
Event::Log(LogEvent::from(BTreeMap::from([(
String::from("key"),
Value::from("value3"),
)]))),
],
&mut writer,
)
.unwrap();
let input = vec![
Event::Log(LogEvent::from(BTreeMap::from([(
String::from("key"),
Value::from("value1"),
)]))),
Event::Log(LogEvent::from(BTreeMap::from([(
String::from("key"),
Value::from("value2"),
)]))),
Event::Log(LogEvent::from(BTreeMap::from([(
String::from("key"),
Value::from("value3"),
)]))),
];
let input_json_size = input
.iter()
.map(|event| event.estimated_json_encoded_size_of())
.sum::<JsonSize>();

let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
assert_eq!(written, 50);

assert_eq!(
String::from_utf8(writer).unwrap(),
"{\"key\":\"value1\"}\n{\"key\":\"value2\"}\n{\"key\":\"value3\"}"
);
assert_eq!(CountByteSize(3, input_json_size), json_size.size().unwrap());
}

#[test]
Expand All @@ -323,18 +348,17 @@ mod tests {
);

let mut writer = Vec::new();
let (written, _json_size) = encoding
.encode_input(
Event::Log(LogEvent::from(BTreeMap::from([(
String::from("key"),
Value::from("value"),
)]))),
&mut writer,
)
.unwrap();
let input = Event::Log(LogEvent::from(BTreeMap::from([(
String::from("key"),
Value::from("value"),
)])));
let input_json_size = input.estimated_json_encoded_size_of();

let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
assert_eq!(written, 15);

assert_eq!(String::from_utf8(writer).unwrap(), r#"{"key":"value"}"#);
assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
}

#[test]
Expand All @@ -345,17 +369,16 @@ mod tests {
);

let mut writer = Vec::new();
let (written, _json_size) = encoding
.encode_input(
Event::Log(LogEvent::from(BTreeMap::from([(
String::from("message"),
Value::from("value"),
)]))),
&mut writer,
)
.unwrap();
let input = Event::Log(LogEvent::from(BTreeMap::from([(
String::from("message"),
Value::from("value"),
)])));
let input_json_size = input.estimated_json_encoded_size_of();

let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
assert_eq!(written, 5);

assert_eq!(String::from_utf8(writer).unwrap(), r#"value"#);
assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
}
}

0 comments on commit c9ccee0

Please sign in to comment.