From c0ef7220fe747856b43b42fc3a2d5843cac45507 Mon Sep 17 00:00:00 2001 From: Nathan Fox Date: Thu, 29 Jun 2023 16:59:54 -0400 Subject: [PATCH] fix remap behavior for root types --- lib/vector-core/src/event/vrl_target.rs | 183 ++++++++++++++- lib/vector-core/src/schema/definition.rs | 19 ++ src/conditions/vrl.rs | 7 +- src/transforms/remap.rs | 276 ++++++----------------- 4 files changed, 271 insertions(+), 214 deletions(-) diff --git a/lib/vector-core/src/event/vrl_target.rs b/lib/vector-core/src/event/vrl_target.rs index f6277cdfe20a6..3ca714be778f0 100644 --- a/lib/vector-core/src/event/vrl_target.rs +++ b/lib/vector-core/src/event/vrl_target.rs @@ -6,11 +6,13 @@ use lookup::{OwnedTargetPath, OwnedValuePath, PathPrefix}; use snafu::Snafu; use vrl::compiler::value::VrlValueConvert; use vrl::compiler::{ProgramInfo, SecretTarget, Target}; -use vrl::value::Value; +use vrl::prelude::Collection; +use vrl::value::{Kind, Value}; use super::{Event, EventMetadata, LogEvent, Metric, MetricKind, TraceEvent}; -use crate::config::log_schema; +use crate::config::{log_schema, LogNamespace}; use crate::event::metric::TagValue; +use crate::schema::Definition; const VALID_METRIC_PATHS_SET: &str = ".name, .namespace, .timestamp, .kind, .tags"; @@ -114,11 +116,24 @@ impl VrlTarget { } } + /// Modifies a schema in the same way that the `into_events` function modifies the event + pub fn modify_schema_definition_for_into_events(input: Definition) -> Definition { + let log_namespaces = input.log_namespaces().clone(); + + // both namespaces merge arrays, but only `Legacy` moves field definitions into a "message" field. + let merged_arrays = merge_array_definitions(input); + Definition::combine_log_namespaces( + &log_namespaces, + move_field_definitions_into_message(merged_arrays.clone()), + merged_arrays, + ) + } + /// Turn the target back into events. /// /// This returns an iterator of events as one event can be turned into multiple by assigning an /// array to `.` in VRL. - pub fn into_events(self) -> TargetEvents { + pub fn into_events(self, log_namespace: LogNamespace) -> TargetEvents { match self { VrlTarget::LogEvent(value, metadata) => match value { value @ Value::Object(_) => { @@ -131,11 +146,16 @@ impl VrlTarget { _marker: PhantomData, }), - v => { - let mut log = LogEvent::new_with_metadata(metadata); - log.insert(log_schema().message_key(), v); - TargetEvents::One(log.into()) - } + v => match log_namespace { + LogNamespace::Vector => { + TargetEvents::One(LogEvent::from_parts(v, metadata).into()) + } + LogNamespace::Legacy => { + let mut log = LogEvent::new_with_metadata(metadata); + log.insert(log_schema().message_key(), v); + TargetEvents::One(log.into()) + } + }, }, VrlTarget::Trace(value, metadata) => match value { value @ Value::Object(_) => { @@ -174,6 +194,53 @@ impl VrlTarget { } } +/// If the VRL returns a value that is not an array (see [`merge_array_definitions`]), +/// or an object, that data is moved into the `message` field. +fn move_field_definitions_into_message(mut definition: Definition) -> Definition { + let mut message = definition.event_kind().clone(); + message.remove_object(); + message.remove_array(); + + if !message.is_never() { + // We need to add the given message type to a field called `message` + // in the event. + let message = Kind::object(Collection::from(BTreeMap::from([( + log_schema().message_key().into(), + message, + )]))); + + definition.event_kind_mut().remove_bytes(); + definition.event_kind_mut().remove_integer(); + definition.event_kind_mut().remove_float(); + definition.event_kind_mut().remove_boolean(); + definition.event_kind_mut().remove_timestamp(); + definition.event_kind_mut().remove_regex(); + definition.event_kind_mut().remove_null(); + + *definition.event_kind_mut() = definition.event_kind().union(message); + } + + definition +} + +/// If the transform returns an array, the elements of this array will be separated +/// out into it's individual elements and passed downstream. +/// +/// The potential types that the transform can output are any of the arrays +/// elements or any non-array elements that are within the definition. All these +/// definitions need to be merged together. +fn merge_array_definitions(mut definition: Definition) -> Definition { + if let Some(array) = definition.event_kind().as_array() { + let array_kinds = array.reduced_kind(); + + let kind = definition.event_kind_mut(); + kind.remove_array(); + *kind = kind.union(array_kinds); + } + + definition +} + fn set_metric_tag_values(name: String, value: &Value, metric: &mut Metric, multi_value_tags: bool) { if multi_value_tags { let tag_values = value @@ -589,11 +656,107 @@ mod test { use lookup::owned_value_path; use similar_asserts::assert_eq; use vrl::btreemap; + use vrl::value::kind::Index; use super::super::MetricValue; use super::*; use crate::metric_tags; + #[test] + fn test_field_definitions_in_message() { + let definition = + Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Legacy]); + assert_eq!( + Definition::new_with_default_metadata( + Kind::object(BTreeMap::from([("message".into(), Kind::bytes())])), + [LogNamespace::Legacy] + ), + move_field_definitions_into_message(definition) + ); + + // Test when a message field already exists. + let definition = Definition::new_with_default_metadata( + Kind::object(BTreeMap::from([("message".into(), Kind::integer())])).or_bytes(), + [LogNamespace::Legacy], + ); + assert_eq!( + Definition::new_with_default_metadata( + Kind::object(BTreeMap::from([( + "message".into(), + Kind::bytes().or_integer() + )])), + [LogNamespace::Legacy] + ), + move_field_definitions_into_message(definition) + ); + } + + #[test] + fn test_merged_array_definitions_simple() { + // Test merging the array definitions where the schema definition + // is simple, containing only one possible type in the array. + let object: BTreeMap = [ + ("carrot".into(), Kind::bytes()), + ("potato".into(), Kind::integer()), + ] + .into(); + + let kind = Kind::array(Collection::from_unknown(Kind::object(object))); + + let definition = Definition::new_with_default_metadata(kind, [LogNamespace::Legacy]); + + let kind = Kind::object(BTreeMap::from([ + ("carrot".into(), Kind::bytes()), + ("potato".into(), Kind::integer()), + ])); + + let wanted = Definition::new_with_default_metadata(kind, [LogNamespace::Legacy]); + let merged = merge_array_definitions(definition); + + assert_eq!(wanted, merged); + } + + #[test] + fn test_merged_array_definitions_complex() { + // Test merging the array definitions where the schema definition + // is fairly complex containing multiple different possible types. + let object: BTreeMap = [ + ("carrot".into(), Kind::bytes()), + ("potato".into(), Kind::integer()), + ] + .into(); + + let array: BTreeMap = [ + (Index::from(0), Kind::integer()), + (Index::from(1), Kind::boolean()), + ( + Index::from(2), + Kind::object(BTreeMap::from([("peas".into(), Kind::bytes())])), + ), + ] + .into(); + + let mut kind = Kind::bytes(); + kind.add_object(object); + kind.add_array(array); + + let definition = Definition::new_with_default_metadata(kind, [LogNamespace::Legacy]); + + let mut kind = Kind::bytes(); + kind.add_integer(); + kind.add_boolean(); + kind.add_object(BTreeMap::from([ + ("carrot".into(), Kind::bytes().or_undefined()), + ("potato".into(), Kind::integer().or_undefined()), + ("peas".into(), Kind::bytes().or_undefined()), + ])); + + let wanted = Definition::new_with_default_metadata(kind, [LogNamespace::Legacy]); + let merged = merge_array_definitions(definition); + + assert_eq!(wanted, merged); + } + #[test] fn log_get() { let cases = vec![ @@ -755,7 +918,7 @@ mod test { Ok(Some(value)) ); assert_eq!( - match target.into_events() { + match target.into_events(LogNamespace::Legacy) { TargetEvents::One(event) => vec![event], TargetEvents::Logs(events) => events.collect::>(), TargetEvents::Traces(events) => events.collect::>(), @@ -901,7 +1064,7 @@ mod test { Target::target_insert(&mut target, &OwnedTargetPath::event_root(), value).unwrap(); assert_eq!( - match target.into_events() { + match target.into_events(LogNamespace::Legacy) { TargetEvents::One(event) => vec![event], TargetEvents::Logs(events) => events.collect::>(), TargetEvents::Traces(events) => events.collect::>(), diff --git a/lib/vector-core/src/schema/definition.rs b/lib/vector-core/src/schema/definition.rs index 62a5bd3b2ff64..a3c5afc034cb4 100644 --- a/lib/vector-core/src/schema/definition.rs +++ b/lib/vector-core/src/schema/definition.rs @@ -457,6 +457,25 @@ impl Definition { self } + /// If the schema definition depends on the `LogNamespace`, this combines the individual + /// definitions for each `LogNamespace`. + pub fn combine_log_namespaces( + log_namespaces: &BTreeSet, + legacy: Self, + vector: Self, + ) -> Self { + let mut combined = + Definition::new_with_default_metadata(Kind::never(), log_namespaces.clone()); + + if log_namespaces.contains(&LogNamespace::Legacy) { + combined = combined.merge(legacy); + } + if log_namespaces.contains(&LogNamespace::Vector) { + combined = combined.merge(vector); + } + combined + } + /// Returns an `OwnedTargetPath` into an event, based on the provided `meaning`, if the meaning exists. pub fn meaning_path(&self, meaning: &str) -> Option<&OwnedTargetPath> { match self.meaning.get(meaning) { diff --git a/src/conditions/vrl.rs b/src/conditions/vrl.rs index 0c132ccc485aa..3fb115e7ba7fc 100644 --- a/src/conditions/vrl.rs +++ b/src/conditions/vrl.rs @@ -6,6 +6,7 @@ use vrl::compiler::{CompilationResult, CompileConfig, Program, TypeState, VrlRun use vrl::diagnostic::Formatter; use vrl::value::Value; +use crate::config::LogNamespace; use crate::event::TargetEvents; use crate::{ conditions::{Condition, Conditional, ConditionalConfig}, @@ -84,12 +85,16 @@ pub struct Vrl { impl Vrl { fn run(&self, event: Event) -> (Event, RuntimeResult) { + let log_namespace = event + .maybe_as_log() + .map(|log| log.namespace()) + .unwrap_or(LogNamespace::Legacy); let mut target = VrlTarget::new(event, self.program.info(), false); // TODO: use timezone from remap config let timezone = TimeZone::default(); let result = Runtime::default().resolve(&mut target, &self.program, &timezone); - let original_event = match target.into_events() { + let original_event = match target.into_events(log_namespace) { TargetEvents::One(event) => event, _ => panic!("Event was modified in a condition. This is an internal compiler error."), }; diff --git a/src/transforms/remap.rs b/src/transforms/remap.rs index a6b01dbc8844d..a21c135919b28 100644 --- a/src/transforms/remap.rs +++ b/src/transforms/remap.rs @@ -21,8 +21,6 @@ use vrl::compiler::runtime::{Runtime, Terminate}; use vrl::compiler::state::ExternalEnv; use vrl::compiler::{CompileConfig, ExpressionError, Function, Program, TypeState, VrlRuntime}; use vrl::diagnostic::{DiagnosticMessage, Formatter, Note}; -use vrl::value::kind::merge::{CollisionStrategy, Strategy}; -use vrl::value::kind::Collection; use vrl::value::{Kind, Value}; use crate::config::OutputId; @@ -288,63 +286,35 @@ impl TransformConfig for RemapConfig { // When a message is dropped and re-routed, we keep the original event, but also annotate // it with additional metadata. - let mut dropped_definition = Definition::new_with_default_metadata( - Kind::never(), - input_definition.log_namespaces().clone(), + let dropped_definition = Definition::combine_log_namespaces( + input_definition.log_namespaces(), + input_definition.clone().with_event_field( + &parse_value_path(log_schema().metadata_key()).expect("valid metadata key"), + Kind::object(BTreeMap::from([ + ("reason".into(), Kind::bytes()), + ("message".into(), Kind::bytes()), + ("component_id".into(), Kind::bytes()), + ("component_type".into(), Kind::bytes()), + ("component_kind".into(), Kind::bytes()), + ])), + Some("metadata"), + ), + input_definition + .clone() + .with_metadata_field(&owned_value_path!("reason"), Kind::bytes(), None) + .with_metadata_field(&owned_value_path!("message"), Kind::bytes(), None) + .with_metadata_field(&owned_value_path!("component_id"), Kind::bytes(), None) + .with_metadata_field(&owned_value_path!("component_type"), Kind::bytes(), None) + .with_metadata_field(&owned_value_path!("component_kind"), Kind::bytes(), None), ); - if input_definition - .log_namespaces() - .contains(&LogNamespace::Legacy) - { - dropped_definition = - dropped_definition.merge(input_definition.clone().with_event_field( - &parse_value_path(log_schema().metadata_key()).expect("valid metadata key"), - Kind::object(BTreeMap::from([ - ("reason".into(), Kind::bytes()), - ("message".into(), Kind::bytes()), - ("component_id".into(), Kind::bytes()), - ("component_type".into(), Kind::bytes()), - ("component_kind".into(), Kind::bytes()), - ])), - Some("metadata"), - )); - } - - if input_definition - .log_namespaces() - .contains(&LogNamespace::Vector) - { - dropped_definition = dropped_definition.merge( - input_definition - .clone() - .with_metadata_field(&owned_value_path!("reason"), Kind::bytes(), None) - .with_metadata_field(&owned_value_path!("message"), Kind::bytes(), None) - .with_metadata_field( - &owned_value_path!("component_id"), - Kind::bytes(), - None, - ) - .with_metadata_field( - &owned_value_path!("component_type"), - Kind::bytes(), - None, - ) - .with_metadata_field( - &owned_value_path!("component_kind"), - Kind::bytes(), - None, - ), - ); - } - default_definitions.insert( output_id.clone(), - move_field_definitions_into_message(merge_array_definitions(default_definition)), + VrlTarget::modify_schema_definition_for_into_events(default_definition), ); dropped_definitions.insert( output_id.clone(), - move_field_definitions_into_message(merge_array_definitions(dropped_definition)), + VrlTarget::modify_schema_definition_for_into_events(dropped_definition), ); } @@ -575,6 +545,11 @@ where None }; + let log_namespace = event + .maybe_as_log() + .map(|log| log.namespace()) + .unwrap_or(LogNamespace::Legacy); + let mut target = VrlTarget::new( event, self.program.info(), @@ -586,7 +561,7 @@ where let result = self.run_vrl(&mut target); match result { - Ok(_) => match target.into_events() { + Ok(_) => match target.into_events(log_namespace) { TargetEvents::One(event) => { push_default(event, output, &self.default_schema_definition) } @@ -655,58 +630,6 @@ fn push_dropped( output.push_named(DROPPED, event) } -/// If the VRL returns a value that is not an array (see [`merge_array_definitions`]), -/// or an object, that data is moved into the `message` field. -fn move_field_definitions_into_message(mut definition: schema::Definition) -> schema::Definition { - let mut message = definition.event_kind().clone(); - message.remove_object(); - message.remove_array(); - - if !message.is_never() { - // We need to add the given message type to a field called `message` - // in the event. - let message = Kind::object(Collection::from(BTreeMap::from([( - log_schema().message_key().into(), - message, - )]))); - - definition.event_kind_mut().remove_bytes(); - definition.event_kind_mut().remove_integer(); - definition.event_kind_mut().remove_float(); - definition.event_kind_mut().remove_boolean(); - definition.event_kind_mut().remove_timestamp(); - definition.event_kind_mut().remove_regex(); - definition.event_kind_mut().remove_null(); - - *definition.event_kind_mut() = definition.event_kind().union(message); - } - - definition -} - -/// If the transform returns an array, the elements of this array will be separated -/// out into it's individual elements and passed downstream. -/// -/// The potential types that the transform can output are any of the arrays -/// elements or any non-array elements that are within the definition. All these -/// definitions need to be merged together. -fn merge_array_definitions(mut definition: schema::Definition) -> schema::Definition { - if let Some(array) = definition.event_kind().as_array() { - let array_kinds = array.reduced_kind(); - - let kind = definition.event_kind_mut(); - kind.remove_array(); - kind.merge( - array_kinds, - Strategy { - collisions: CollisionStrategy::Union, - }, - ); - } - - definition -} - #[derive(Debug, Snafu)] pub enum BuildError { #[snafu(display("must provide exactly one of `source` or `file` configuration"))] @@ -725,7 +648,7 @@ mod tests { use indoc::{formatdoc, indoc}; use vector_core::{config::GlobalOptions, event::EventMetadata, metric_tags}; use vrl::btreemap; - use vrl::value::kind::{Collection, Index}; + use vrl::value::kind::Collection; use super::*; use crate::{ @@ -742,6 +665,7 @@ mod tests { transforms::OutputBuffer, }; use chrono::DateTime; + use enrichment::TableRegistry; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; @@ -861,6 +785,49 @@ mod tests { assert!(tform.runner().runtime.is_empty()); } + #[test] + fn remap_return_raw_string_vector_namespace() { + let initial_definition = Definition::default_for_namespace(&[LogNamespace::Vector].into()); + + let event = { + let mut metadata = EventMetadata::default() + .with_schema_definition(&Arc::new(initial_definition.clone())); + // the Vector metadata field is required for an event to correctly detect the namespace at runtime + metadata + .value_mut() + .insert(&owned_value_path!("vector"), BTreeMap::new()); + + let mut event = LogEvent::new_with_metadata(metadata); + event.insert("copy_from", "buz"); + Event::from(event) + }; + + let conf = RemapConfig { + source: Some(r#" . = "root string";"#.to_string()), + file: None, + drop_on_error: true, + drop_on_abort: false, + ..Default::default() + }; + let mut tform = remap(conf.clone()).unwrap(); + let result = transform_one(&mut tform, event).unwrap(); + assert_eq!(get_field_string(&result, "."), "root string"); + + let mut outputs = conf.outputs( + TableRegistry::default(), + &[(OutputId::dummy(), initial_definition)], + LogNamespace::Vector, + ); + + assert_eq!(outputs.len(), 1); + let output = outputs.pop().unwrap(); + assert_eq!(output.port, None); + let actual_schema_def = output.schema_definitions(true)[&OutputId::dummy()].clone(); + let expected_schema = + Definition::new(Kind::bytes(), Kind::any_object(), [LogNamespace::Vector]); + assert_eq!(actual_schema_def, expected_schema); + } + #[test] fn check_remap_adds() { let event = { @@ -1651,103 +1618,6 @@ mod tests { .await } - #[test] - fn test_field_definitions_in_message() { - let definition = - schema::Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Legacy]); - assert_eq!( - schema::Definition::new_with_default_metadata( - Kind::object(BTreeMap::from([("message".into(), Kind::bytes())])), - [LogNamespace::Legacy] - ), - move_field_definitions_into_message(definition) - ); - - // Test when a message field already exists. - let definition = schema::Definition::new_with_default_metadata( - Kind::object(BTreeMap::from([("message".into(), Kind::integer())])).or_bytes(), - [LogNamespace::Legacy], - ); - assert_eq!( - schema::Definition::new_with_default_metadata( - Kind::object(BTreeMap::from([( - "message".into(), - Kind::bytes().or_integer() - )])), - [LogNamespace::Legacy] - ), - move_field_definitions_into_message(definition) - ) - } - - #[test] - fn test_merged_array_definitions_simple() { - // Test merging the array definitions where the schema definition - // is simple, containing only one possible type in the array. - let object: BTreeMap = [ - ("carrot".into(), Kind::bytes()), - ("potato".into(), Kind::integer()), - ] - .into(); - - let kind = Kind::array(Collection::from_unknown(Kind::object(object))); - - let definition = - schema::Definition::new_with_default_metadata(kind, [LogNamespace::Legacy]); - - let kind = Kind::object(BTreeMap::from([ - ("carrot".into(), Kind::bytes()), - ("potato".into(), Kind::integer()), - ])); - - let wanted = schema::Definition::new_with_default_metadata(kind, [LogNamespace::Legacy]); - let merged = merge_array_definitions(definition); - - assert_eq!(wanted, merged); - } - - #[test] - fn test_merged_array_definitions_complex() { - // Test merging the array definitions where the schema definition - // is fairly complex containing multiple different possible types. - let object: BTreeMap = [ - ("carrot".into(), Kind::bytes()), - ("potato".into(), Kind::integer()), - ] - .into(); - - let array: BTreeMap = [ - (Index::from(0), Kind::integer()), - (Index::from(1), Kind::boolean()), - ( - Index::from(2), - Kind::object(BTreeMap::from([("peas".into(), Kind::bytes())])), - ), - ] - .into(); - - let mut kind = Kind::bytes(); - kind.add_object(object); - kind.add_array(array); - - let definition = - schema::Definition::new_with_default_metadata(kind, [LogNamespace::Legacy]); - - let mut kind = Kind::bytes(); - kind.add_integer(); - kind.add_boolean(); - kind.add_object(BTreeMap::from([ - ("carrot".into(), Kind::bytes().or_undefined()), - ("potato".into(), Kind::integer().or_undefined()), - ("peas".into(), Kind::bytes().or_undefined()), - ])); - - let wanted = schema::Definition::new_with_default_metadata(kind, [LogNamespace::Legacy]); - let merged = merge_array_definitions(definition); - - assert_eq!(wanted, merged); - } - #[test] fn test_combined_transforms_simple() { // Make sure that when getting the definitions from one transform and