Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: remap behavior for root types when using the Vector namespace #17807

Merged
merged 2 commits into from
Jun 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 173 additions & 10 deletions lib/vector-core/src/event/vrl_target.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ use lookup::{OwnedTargetPath, OwnedValuePath, PathPrefix};
use snafu::Snafu;
use vrl::compiler::value::VrlValueConvert;
use vrl::compiler::{ProgramInfo, SecretTarget, Target};
use vrl::value::Value;
use vrl::prelude::Collection;
use vrl::value::{Kind, Value};

use super::{Event, EventMetadata, LogEvent, Metric, MetricKind, TraceEvent};
use crate::config::log_schema;
use crate::config::{log_schema, LogNamespace};
use crate::event::metric::TagValue;
use crate::schema::Definition;

const VALID_METRIC_PATHS_SET: &str = ".name, .namespace, .timestamp, .kind, .tags";

Expand Down Expand Up @@ -114,11 +116,24 @@ impl VrlTarget {
}
}

/// Modifies a schema in the same way that the `into_events` function modifies the event
pub fn modify_schema_definition_for_into_events(input: Definition) -> Definition {
let log_namespaces = input.log_namespaces().clone();

// both namespaces merge arrays, but only `Legacy` moves field definitions into a "message" field.
let merged_arrays = merge_array_definitions(input);
Definition::combine_log_namespaces(
&log_namespaces,
move_field_definitions_into_message(merged_arrays.clone()),
merged_arrays,
)
}

/// Turn the target back into events.
///
/// This returns an iterator of events as one event can be turned into multiple by assigning an
/// array to `.` in VRL.
pub fn into_events(self) -> TargetEvents {
pub fn into_events(self, log_namespace: LogNamespace) -> TargetEvents {
match self {
VrlTarget::LogEvent(value, metadata) => match value {
value @ Value::Object(_) => {
Expand All @@ -131,11 +146,16 @@ impl VrlTarget {
_marker: PhantomData,
}),

v => {
let mut log = LogEvent::new_with_metadata(metadata);
log.insert(log_schema().message_key(), v);
TargetEvents::One(log.into())
}
v => match log_namespace {
LogNamespace::Vector => {
TargetEvents::One(LogEvent::from_parts(v, metadata).into())
}
LogNamespace::Legacy => {
let mut log = LogEvent::new_with_metadata(metadata);
log.insert(log_schema().message_key(), v);
TargetEvents::One(log.into())
}
},
},
VrlTarget::Trace(value, metadata) => match value {
value @ Value::Object(_) => {
Expand Down Expand Up @@ -174,6 +194,53 @@ impl VrlTarget {
}
}

/// If the VRL returns a value that is not an array (see [`merge_array_definitions`]),
/// or an object, that data is moved into the `message` field.
fn move_field_definitions_into_message(mut definition: Definition) -> Definition {
let mut message = definition.event_kind().clone();
message.remove_object();
message.remove_array();

if !message.is_never() {
// We need to add the given message type to a field called `message`
// in the event.
let message = Kind::object(Collection::from(BTreeMap::from([(
log_schema().message_key().into(),
message,
)])));

definition.event_kind_mut().remove_bytes();
definition.event_kind_mut().remove_integer();
definition.event_kind_mut().remove_float();
definition.event_kind_mut().remove_boolean();
definition.event_kind_mut().remove_timestamp();
definition.event_kind_mut().remove_regex();
definition.event_kind_mut().remove_null();

*definition.event_kind_mut() = definition.event_kind().union(message);
}

definition
}

/// If the transform returns an array, the elements of this array will be separated
/// out into it's individual elements and passed downstream.
///
/// The potential types that the transform can output are any of the arrays
/// elements or any non-array elements that are within the definition. All these
/// definitions need to be merged together.
fn merge_array_definitions(mut definition: Definition) -> Definition {
if let Some(array) = definition.event_kind().as_array() {
let array_kinds = array.reduced_kind();

let kind = definition.event_kind_mut();
kind.remove_array();
*kind = kind.union(array_kinds);
}

definition
}

fn set_metric_tag_values(name: String, value: &Value, metric: &mut Metric, multi_value_tags: bool) {
if multi_value_tags {
let tag_values = value
Expand Down Expand Up @@ -589,11 +656,107 @@ mod test {
use lookup::owned_value_path;
use similar_asserts::assert_eq;
use vrl::btreemap;
use vrl::value::kind::Index;

use super::super::MetricValue;
use super::*;
use crate::metric_tags;

#[test]
fn test_field_definitions_in_message() {
let definition =
Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Legacy]);
assert_eq!(
Definition::new_with_default_metadata(
Kind::object(BTreeMap::from([("message".into(), Kind::bytes())])),
[LogNamespace::Legacy]
),
move_field_definitions_into_message(definition)
);

// Test when a message field already exists.
let definition = Definition::new_with_default_metadata(
Kind::object(BTreeMap::from([("message".into(), Kind::integer())])).or_bytes(),
[LogNamespace::Legacy],
);
assert_eq!(
Definition::new_with_default_metadata(
Kind::object(BTreeMap::from([(
"message".into(),
Kind::bytes().or_integer()
)])),
[LogNamespace::Legacy]
),
move_field_definitions_into_message(definition)
);
}

#[test]
fn test_merged_array_definitions_simple() {
// Test merging the array definitions where the schema definition
// is simple, containing only one possible type in the array.
let object: BTreeMap<vrl::value::kind::Field, Kind> = [
("carrot".into(), Kind::bytes()),
("potato".into(), Kind::integer()),
]
.into();

let kind = Kind::array(Collection::from_unknown(Kind::object(object)));

let definition = Definition::new_with_default_metadata(kind, [LogNamespace::Legacy]);

let kind = Kind::object(BTreeMap::from([
("carrot".into(), Kind::bytes()),
("potato".into(), Kind::integer()),
]));

let wanted = Definition::new_with_default_metadata(kind, [LogNamespace::Legacy]);
let merged = merge_array_definitions(definition);

assert_eq!(wanted, merged);
}

#[test]
fn test_merged_array_definitions_complex() {
// Test merging the array definitions where the schema definition
// is fairly complex containing multiple different possible types.
let object: BTreeMap<vrl::value::kind::Field, Kind> = [
("carrot".into(), Kind::bytes()),
("potato".into(), Kind::integer()),
]
.into();

let array: BTreeMap<Index, Kind> = [
(Index::from(0), Kind::integer()),
(Index::from(1), Kind::boolean()),
(
Index::from(2),
Kind::object(BTreeMap::from([("peas".into(), Kind::bytes())])),
),
]
.into();

let mut kind = Kind::bytes();
kind.add_object(object);
kind.add_array(array);

let definition = Definition::new_with_default_metadata(kind, [LogNamespace::Legacy]);

let mut kind = Kind::bytes();
kind.add_integer();
kind.add_boolean();
kind.add_object(BTreeMap::from([
("carrot".into(), Kind::bytes().or_undefined()),
("potato".into(), Kind::integer().or_undefined()),
("peas".into(), Kind::bytes().or_undefined()),
]));

let wanted = Definition::new_with_default_metadata(kind, [LogNamespace::Legacy]);
let merged = merge_array_definitions(definition);

assert_eq!(wanted, merged);
}

#[test]
fn log_get() {
let cases = vec![
Expand Down Expand Up @@ -755,7 +918,7 @@ mod test {
Ok(Some(value))
);
assert_eq!(
match target.into_events() {
match target.into_events(LogNamespace::Legacy) {
TargetEvents::One(event) => vec![event],
TargetEvents::Logs(events) => events.collect::<Vec<_>>(),
TargetEvents::Traces(events) => events.collect::<Vec<_>>(),
Expand Down Expand Up @@ -901,7 +1064,7 @@ mod test {
Target::target_insert(&mut target, &OwnedTargetPath::event_root(), value).unwrap();

assert_eq!(
match target.into_events() {
match target.into_events(LogNamespace::Legacy) {
TargetEvents::One(event) => vec![event],
TargetEvents::Logs(events) => events.collect::<Vec<_>>(),
TargetEvents::Traces(events) => events.collect::<Vec<_>>(),
Expand Down
19 changes: 19 additions & 0 deletions lib/vector-core/src/schema/definition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,25 @@ impl Definition {
self
}

/// If the schema definition depends on the `LogNamespace`, this combines the individual
/// definitions for each `LogNamespace`.
pub fn combine_log_namespaces(
log_namespaces: &BTreeSet<LogNamespace>,
legacy: Self,
vector: Self,
) -> Self {
let mut combined =
Definition::new_with_default_metadata(Kind::never(), log_namespaces.clone());

if log_namespaces.contains(&LogNamespace::Legacy) {
combined = combined.merge(legacy);
}
if log_namespaces.contains(&LogNamespace::Vector) {
combined = combined.merge(vector);
}
combined
}

/// Returns an `OwnedTargetPath` into an event, based on the provided `meaning`, if the meaning exists.
pub fn meaning_path(&self, meaning: &str) -> Option<&OwnedTargetPath> {
match self.meaning.get(meaning) {
Expand Down
7 changes: 6 additions & 1 deletion src/conditions/vrl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use vrl::compiler::{CompilationResult, CompileConfig, Program, TypeState, VrlRun
use vrl::diagnostic::Formatter;
use vrl::value::Value;

use crate::config::LogNamespace;
use crate::event::TargetEvents;
use crate::{
conditions::{Condition, Conditional, ConditionalConfig},
Expand Down Expand Up @@ -84,12 +85,16 @@ pub struct Vrl {

impl Vrl {
fn run(&self, event: Event) -> (Event, RuntimeResult) {
let log_namespace = event
.maybe_as_log()
.map(|log| log.namespace())
.unwrap_or(LogNamespace::Legacy);
let mut target = VrlTarget::new(event, self.program.info(), false);
// TODO: use timezone from remap config
let timezone = TimeZone::default();

let result = Runtime::default().resolve(&mut target, &self.program, &timezone);
let original_event = match target.into_events() {
let original_event = match target.into_events(log_namespace) {
TargetEvents::One(event) => event,
_ => panic!("Event was modified in a condition. This is an internal compiler error."),
};
Expand Down
Loading