Skip to content

Commit

Permalink
fix(json codec): Fix deserializing non-object values with the `Vector…
Browse files Browse the repository at this point in the history
…` namespace (#18379)

* fix remap array return values when using the Vector namespace

* fix json codec when using vector namespace
  • Loading branch information
fuchsnj authored Aug 24, 2023
1 parent 76ffdab commit f15144b
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 48 deletions.
23 changes: 19 additions & 4 deletions lib/codecs/src/decoding/format/json.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::convert::TryInto;

use bytes::Bytes;
use chrono::Utc;
use derivative::Derivative;
Expand Down Expand Up @@ -122,9 +120,9 @@ impl Deserializer for JsonDeserializer {
let mut events = match json {
serde_json::Value::Array(values) => values
.into_iter()
.map(TryInto::try_into)
.map(|json| Event::from_json_value(json, log_namespace))
.collect::<Result<SmallVec<[Event; 1]>, _>>()?,
_ => smallvec![json.try_into()?],
_ => smallvec![Event::from_json_value(json, log_namespace)?],
};

let events = match log_namespace {
Expand Down Expand Up @@ -160,6 +158,7 @@ impl From<&JsonDeserializerConfig> for JsonDeserializer {
#[cfg(test)]
mod tests {
use vector_core::config::log_schema;
use vrl::core::Value;

use super::*;

Expand Down Expand Up @@ -190,6 +189,22 @@ mod tests {
}
}

#[test]
fn deserialize_non_object_vector_namespace() {
let input = Bytes::from(r#"null"#);
let deserializer = JsonDeserializer::default();

let namespace = LogNamespace::Vector;
let events = deserializer.parse(input.clone(), namespace).unwrap();
let mut events = events.into_iter();

let event = events.next().unwrap();
let log = event.as_log();
assert_eq!(log["."], Value::Null);

assert_eq!(events.next(), None);
}

#[test]
fn deserialize_json_array() {
let input = Bytes::from(r#"[{ "foo": 123 }, { "bar": 456 }]"#);
Expand Down
4 changes: 2 additions & 2 deletions lib/codecs/src/decoding/format/native_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ mod test {

let events = deserializer.parse(input, LogNamespace::Legacy).unwrap();

let event1 = Event::try_from(json1).unwrap();
let event2 = Event::try_from(json2).unwrap();
let event1 = Event::from_json_value(json1, LogNamespace::Legacy).unwrap();
let event2 = Event::from_json_value(json2, LogNamespace::Legacy).unwrap();
let expected: SmallVec<[Event; 1]> = smallvec![event1, event2];
assert_eq!(events, expected);
}
Expand Down
52 changes: 27 additions & 25 deletions lib/vector-core/src/event/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
use std::{
collections::BTreeMap,
convert::{TryFrom, TryInto},
fmt::Debug,
sync::Arc,
};
use std::{collections::BTreeMap, convert::TryInto, fmt::Debug, sync::Arc};

use crate::config::LogNamespace;
use crate::{config::OutputId, ByteSizeOf};
pub use array::{into_event_stream, EventArray, EventContainer, LogArray, MetricArray, TraceArray};
pub use estimated_json_encoded_size_of::EstimatedJsonEncodedSizeOf;
Expand Down Expand Up @@ -324,6 +320,31 @@ impl Event {
self.metadata_mut().set_upstream_id(upstream_id);
self
}

/// Creates an Event from a JSON value.
///
/// # Errors
/// If a non-object JSON value is passed in with the `Legacy` namespace, this will return an error.
pub fn from_json_value(
value: serde_json::Value,
log_namespace: LogNamespace,
) -> crate::Result<Self> {
match log_namespace {
LogNamespace::Vector => Ok(LogEvent::from(Value::from(value)).into()),
LogNamespace::Legacy => match value {
serde_json::Value::Object(fields) => Ok(LogEvent::from(
fields
.into_iter()
.map(|(k, v)| (k, v.into()))
.collect::<BTreeMap<_, _>>(),
)
.into()),
_ => Err(crate::Error::from(
"Attempted to convert non-Object JSON into an Event.",
)),
},
}
}
}

impl EventDataEq for Event {
Expand All @@ -348,25 +369,6 @@ impl finalization::AddBatchNotifier for Event {
}
}

impl TryFrom<serde_json::Value> for Event {
type Error = crate::Error;

fn try_from(map: serde_json::Value) -> Result<Self, Self::Error> {
match map {
serde_json::Value::Object(fields) => Ok(LogEvent::from(
fields
.into_iter()
.map(|(k, v)| (k, v.into()))
.collect::<BTreeMap<_, _>>(),
)
.into()),
_ => Err(crate::Error::from(
"Attempted to convert non-Object JSON into an Event.",
)),
}
}
}

impl TryInto<serde_json::Value> for Event {
type Error = serde_json::Error;

Expand Down
47 changes: 34 additions & 13 deletions src/transforms/remap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1024,8 +1024,11 @@ mod tests {

#[test]
fn remap_timezone_fallback() {
let error =
Event::try_from(serde_json::json!({"timestamp": "2022-12-27 00:00:00"})).unwrap();
let error = Event::from_json_value(
serde_json::json!({"timestamp": "2022-12-27 00:00:00"}),
LogNamespace::Legacy,
)
.unwrap();
let conf = RemapConfig {
source: Some(formatdoc! {r#"
.timestamp = parse_timestamp!(.timestamp, format: "%Y-%m-%d %H:%M:%S")
Expand Down Expand Up @@ -1058,8 +1061,11 @@ mod tests {

#[test]
fn remap_timezone_override() {
let error =
Event::try_from(serde_json::json!({"timestamp": "2022-12-27 00:00:00"})).unwrap();
let error = Event::from_json_value(
serde_json::json!({"timestamp": "2022-12-27 00:00:00"}),
LogNamespace::Legacy,
)
.unwrap();
let conf = RemapConfig {
source: Some(formatdoc! {r#"
.timestamp = parse_timestamp!(.timestamp, format: "%Y-%m-%d %H:%M:%S")
Expand Down Expand Up @@ -1093,9 +1099,16 @@ mod tests {

#[test]
fn check_remap_branching() {
let happy = Event::try_from(serde_json::json!({"hello": "world"})).unwrap();
let abort = Event::try_from(serde_json::json!({"hello": "goodbye"})).unwrap();
let error = Event::try_from(serde_json::json!({"hello": 42})).unwrap();
let happy =
Event::from_json_value(serde_json::json!({"hello": "world"}), LogNamespace::Legacy)
.unwrap();
let abort = Event::from_json_value(
serde_json::json!({"hello": "goodbye"}),
LogNamespace::Legacy,
)
.unwrap();
let error =
Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();

let happy_metric = {
let mut metric = Metric::new(
Expand Down Expand Up @@ -1287,9 +1300,9 @@ mod tests {
#[test]
fn check_remap_branching_assert_with_message() {
let error_trigger_assert_custom_message =
Event::try_from(serde_json::json!({"hello": 42})).unwrap();
Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();
let error_trigger_default_assert_message =
Event::try_from(serde_json::json!({"hello": 0})).unwrap();
Event::from_json_value(serde_json::json!({"hello": 0}), LogNamespace::Legacy).unwrap();
let conf = RemapConfig {
source: Some(formatdoc! {r#"
assert_eq!(.hello, 0, "custom message here")
Expand Down Expand Up @@ -1349,7 +1362,8 @@ mod tests {

#[test]
fn check_remap_branching_abort_with_message() {
let error = Event::try_from(serde_json::json!({"hello": 42})).unwrap();
let error =
Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();
let conf = RemapConfig {
source: Some(formatdoc! {r#"
abort "custom message here"
Expand Down Expand Up @@ -1387,9 +1401,16 @@ mod tests {

#[test]
fn check_remap_branching_disabled() {
let happy = Event::try_from(serde_json::json!({"hello": "world"})).unwrap();
let abort = Event::try_from(serde_json::json!({"hello": "goodbye"})).unwrap();
let error = Event::try_from(serde_json::json!({"hello": 42})).unwrap();
let happy =
Event::from_json_value(serde_json::json!({"hello": "world"}), LogNamespace::Legacy)
.unwrap();
let abort = Event::from_json_value(
serde_json::json!({"hello": "goodbye"}),
LogNamespace::Legacy,
)
.unwrap();
let error =
Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();

let conf = RemapConfig {
source: Some(formatdoc! {r#"
Expand Down
17 changes: 13 additions & 4 deletions src/transforms/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,9 @@ mod test {
#[test]
fn route_pass_all_route_conditions() {
let output_names = vec!["first", "second", "third", UNMATCHED_ROUTE];
let event = Event::try_from(
let event = Event::from_json_value(
serde_json::json!({"message": "hello world", "second": "second", "third": "third"}),
LogNamespace::Legacy,
)
.unwrap();
let config = toml::from_str::<RouteConfig>(
Expand Down Expand Up @@ -234,7 +235,11 @@ mod test {
#[test]
fn route_pass_one_route_condition() {
let output_names = vec!["first", "second", "third", UNMATCHED_ROUTE];
let event = Event::try_from(serde_json::json!({"message": "hello world"})).unwrap();
let event = Event::from_json_value(
serde_json::json!({"message": "hello world"}),
LogNamespace::Legacy,
)
.unwrap();
let config = toml::from_str::<RouteConfig>(
r#"
route.first.type = "vrl"
Expand Down Expand Up @@ -275,7 +280,9 @@ mod test {
#[test]
fn route_pass_no_route_condition() {
let output_names = vec!["first", "second", "third", UNMATCHED_ROUTE];
let event = Event::try_from(serde_json::json!({"message": "NOPE"})).unwrap();
let event =
Event::from_json_value(serde_json::json!({"message": "NOPE"}), LogNamespace::Legacy)
.unwrap();
let config = toml::from_str::<RouteConfig>(
r#"
route.first.type = "vrl"
Expand Down Expand Up @@ -316,7 +323,9 @@ mod test {
#[test]
fn route_no_unmatched_output() {
let output_names = vec!["first", "second", "third", UNMATCHED_ROUTE];
let event = Event::try_from(serde_json::json!({"message": "NOPE"})).unwrap();
let event =
Event::from_json_value(serde_json::json!({"message": "NOPE"}), LogNamespace::Legacy)
.unwrap();
let config = toml::from_str::<RouteConfig>(
r#"
reroute_unmatched = false
Expand Down

0 comments on commit f15144b

Please sign in to comment.