Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(json codec): Fix deserializing non-object values with the Vector namespace #18379

Merged
merged 3 commits into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions lib/codecs/src/decoding/format/json.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::convert::TryInto;

use bytes::Bytes;
use chrono::Utc;
use derivative::Derivative;
Expand Down Expand Up @@ -122,9 +120,9 @@ impl Deserializer for JsonDeserializer {
let mut events = match json {
serde_json::Value::Array(values) => values
.into_iter()
.map(TryInto::try_into)
.map(|json| Event::from_json_value(json, log_namespace))
.collect::<Result<SmallVec<[Event; 1]>, _>>()?,
_ => smallvec![json.try_into()?],
_ => smallvec![Event::from_json_value(json, log_namespace)?],
};

let events = match log_namespace {
Expand Down Expand Up @@ -160,6 +158,7 @@ impl From<&JsonDeserializerConfig> for JsonDeserializer {
#[cfg(test)]
mod tests {
use vector_core::config::log_schema;
use vrl::core::Value;

use super::*;

Expand Down Expand Up @@ -190,6 +189,22 @@ mod tests {
}
}

#[test]
fn deserialize_non_object_vector_namespace() {
let input = Bytes::from(r#"null"#);
let deserializer = JsonDeserializer::default();

let namespace = LogNamespace::Vector;
let events = deserializer.parse(input.clone(), namespace).unwrap();
let mut events = events.into_iter();

let event = events.next().unwrap();
let log = event.as_log();
assert_eq!(log["."], Value::Null);

assert_eq!(events.next(), None);
}

#[test]
fn deserialize_json_array() {
let input = Bytes::from(r#"[{ "foo": 123 }, { "bar": 456 }]"#);
Expand Down
4 changes: 2 additions & 2 deletions lib/codecs/src/decoding/format/native_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ mod test {

let events = deserializer.parse(input, LogNamespace::Legacy).unwrap();

let event1 = Event::try_from(json1).unwrap();
let event2 = Event::try_from(json2).unwrap();
let event1 = Event::from_json_value(json1, LogNamespace::Legacy).unwrap();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated, but this is making me wonder: is data passed through correctly from Vector sink to Vector source when using log namespacing on either or both sides?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Vector sink/source uses protobuf to send the events. The protobuf encodes the raw Value for the event and metadata, so it should keep it in the correct format regardless of the log namespacing used. (It used to store only only a map for the event, assuming it was an object, but that was changed a while ago).

let event2 = Event::from_json_value(json2, LogNamespace::Legacy).unwrap();
let expected: SmallVec<[Event; 1]> = smallvec![event1, event2];
assert_eq!(events, expected);
}
Expand Down
52 changes: 27 additions & 25 deletions lib/vector-core/src/event/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
use std::{
collections::BTreeMap,
convert::{TryFrom, TryInto},
fmt::Debug,
sync::Arc,
};
use std::{collections::BTreeMap, convert::TryInto, fmt::Debug, sync::Arc};

use crate::config::LogNamespace;
use crate::{config::OutputId, ByteSizeOf};
pub use array::{into_event_stream, EventArray, EventContainer, LogArray, MetricArray, TraceArray};
pub use estimated_json_encoded_size_of::EstimatedJsonEncodedSizeOf;
Expand Down Expand Up @@ -324,6 +320,31 @@ impl Event {
self.metadata_mut().set_upstream_id(upstream_id);
self
}

/// Creates an Event from a JSON value.
///
/// # Errors
/// If a non-object JSON value is passed in with the `Legacy` namespace, this will return an error.
pub fn from_json_value(
value: serde_json::Value,
log_namespace: LogNamespace,
) -> crate::Result<Self> {
match log_namespace {
LogNamespace::Vector => Ok(LogEvent::from(Value::from(value)).into()),
LogNamespace::Legacy => match value {
serde_json::Value::Object(fields) => Ok(LogEvent::from(
fields
.into_iter()
.map(|(k, v)| (k, v.into()))
.collect::<BTreeMap<_, _>>(),
)
.into()),
_ => Err(crate::Error::from(
"Attempted to convert non-Object JSON into an Event.",
)),
},
}
}
}

impl EventDataEq for Event {
Expand All @@ -348,25 +369,6 @@ impl finalization::AddBatchNotifier for Event {
}
}

impl TryFrom<serde_json::Value> for Event {
type Error = crate::Error;

fn try_from(map: serde_json::Value) -> Result<Self, Self::Error> {
match map {
serde_json::Value::Object(fields) => Ok(LogEvent::from(
fields
.into_iter()
.map(|(k, v)| (k, v.into()))
.collect::<BTreeMap<_, _>>(),
)
.into()),
_ => Err(crate::Error::from(
"Attempted to convert non-Object JSON into an Event.",
)),
}
}
}

impl TryInto<serde_json::Value> for Event {
type Error = serde_json::Error;

Expand Down
47 changes: 34 additions & 13 deletions src/transforms/remap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1024,8 +1024,11 @@ mod tests {

#[test]
fn remap_timezone_fallback() {
let error =
Event::try_from(serde_json::json!({"timestamp": "2022-12-27 00:00:00"})).unwrap();
let error = Event::from_json_value(
serde_json::json!({"timestamp": "2022-12-27 00:00:00"}),
LogNamespace::Legacy,
)
.unwrap();
let conf = RemapConfig {
source: Some(formatdoc! {r#"
.timestamp = parse_timestamp!(.timestamp, format: "%Y-%m-%d %H:%M:%S")
Expand Down Expand Up @@ -1058,8 +1061,11 @@ mod tests {

#[test]
fn remap_timezone_override() {
let error =
Event::try_from(serde_json::json!({"timestamp": "2022-12-27 00:00:00"})).unwrap();
let error = Event::from_json_value(
serde_json::json!({"timestamp": "2022-12-27 00:00:00"}),
LogNamespace::Legacy,
)
.unwrap();
let conf = RemapConfig {
source: Some(formatdoc! {r#"
.timestamp = parse_timestamp!(.timestamp, format: "%Y-%m-%d %H:%M:%S")
Expand Down Expand Up @@ -1093,9 +1099,16 @@ mod tests {

#[test]
fn check_remap_branching() {
let happy = Event::try_from(serde_json::json!({"hello": "world"})).unwrap();
let abort = Event::try_from(serde_json::json!({"hello": "goodbye"})).unwrap();
let error = Event::try_from(serde_json::json!({"hello": 42})).unwrap();
let happy =
Event::from_json_value(serde_json::json!({"hello": "world"}), LogNamespace::Legacy)
.unwrap();
let abort = Event::from_json_value(
serde_json::json!({"hello": "goodbye"}),
LogNamespace::Legacy,
)
.unwrap();
let error =
Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();

let happy_metric = {
let mut metric = Metric::new(
Expand Down Expand Up @@ -1287,9 +1300,9 @@ mod tests {
#[test]
fn check_remap_branching_assert_with_message() {
let error_trigger_assert_custom_message =
Event::try_from(serde_json::json!({"hello": 42})).unwrap();
Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();
let error_trigger_default_assert_message =
Event::try_from(serde_json::json!({"hello": 0})).unwrap();
Event::from_json_value(serde_json::json!({"hello": 0}), LogNamespace::Legacy).unwrap();
let conf = RemapConfig {
source: Some(formatdoc! {r#"
assert_eq!(.hello, 0, "custom message here")
Expand Down Expand Up @@ -1349,7 +1362,8 @@ mod tests {

#[test]
fn check_remap_branching_abort_with_message() {
let error = Event::try_from(serde_json::json!({"hello": 42})).unwrap();
let error =
Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();
let conf = RemapConfig {
source: Some(formatdoc! {r#"
abort "custom message here"
Expand Down Expand Up @@ -1387,9 +1401,16 @@ mod tests {

#[test]
fn check_remap_branching_disabled() {
let happy = Event::try_from(serde_json::json!({"hello": "world"})).unwrap();
let abort = Event::try_from(serde_json::json!({"hello": "goodbye"})).unwrap();
let error = Event::try_from(serde_json::json!({"hello": 42})).unwrap();
let happy =
Event::from_json_value(serde_json::json!({"hello": "world"}), LogNamespace::Legacy)
.unwrap();
let abort = Event::from_json_value(
serde_json::json!({"hello": "goodbye"}),
LogNamespace::Legacy,
)
.unwrap();
let error =
Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();

let conf = RemapConfig {
source: Some(formatdoc! {r#"
Expand Down
17 changes: 13 additions & 4 deletions src/transforms/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,9 @@ mod test {
#[test]
fn route_pass_all_route_conditions() {
let output_names = vec!["first", "second", "third", UNMATCHED_ROUTE];
let event = Event::try_from(
let event = Event::from_json_value(
serde_json::json!({"message": "hello world", "second": "second", "third": "third"}),
LogNamespace::Legacy,
)
.unwrap();
let config = toml::from_str::<RouteConfig>(
Expand Down Expand Up @@ -234,7 +235,11 @@ mod test {
#[test]
fn route_pass_one_route_condition() {
let output_names = vec!["first", "second", "third", UNMATCHED_ROUTE];
let event = Event::try_from(serde_json::json!({"message": "hello world"})).unwrap();
let event = Event::from_json_value(
serde_json::json!({"message": "hello world"}),
LogNamespace::Legacy,
)
.unwrap();
let config = toml::from_str::<RouteConfig>(
r#"
route.first.type = "vrl"
Expand Down Expand Up @@ -275,7 +280,9 @@ mod test {
#[test]
fn route_pass_no_route_condition() {
let output_names = vec!["first", "second", "third", UNMATCHED_ROUTE];
let event = Event::try_from(serde_json::json!({"message": "NOPE"})).unwrap();
let event =
Event::from_json_value(serde_json::json!({"message": "NOPE"}), LogNamespace::Legacy)
.unwrap();
let config = toml::from_str::<RouteConfig>(
r#"
route.first.type = "vrl"
Expand Down Expand Up @@ -316,7 +323,9 @@ mod test {
#[test]
fn route_no_unmatched_output() {
let output_names = vec!["first", "second", "third", UNMATCHED_ROUTE];
let event = Event::try_from(serde_json::json!({"message": "NOPE"})).unwrap();
let event =
Event::from_json_value(serde_json::json!({"message": "NOPE"}), LogNamespace::Legacy)
.unwrap();
let config = toml::from_str::<RouteConfig>(
r#"
reroute_unmatched = false
Expand Down