Skip to content

Commit

Permalink
feat: Migrate LogSchema::metadata key to new lookup code (vectordotde…
Browse files Browse the repository at this point in the history
…v#18058)

* Migrate LogSchema::metadata_key to new lookup code

* add generic maybe_insert function

* updates after rebasing on latest master

* fix tests
  • Loading branch information
pront authored Jul 24, 2023
1 parent db9e47f commit 8663602
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 33 deletions.
16 changes: 8 additions & 8 deletions lib/vector-core/src/config/log_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ pub struct LogSchema {
/// Generally, this field will be set by Vector to hold event-specific metadata, such as
/// annotations by the `remap` transform when an error or abort is encountered.
#[serde(default = "LogSchema::default_metadata_key")]
metadata_key: String,
metadata_key: OptionalValuePath,
}

impl Default for LogSchema {
Expand Down Expand Up @@ -105,8 +105,8 @@ impl LogSchema {
OptionalValuePath::new(SOURCE_TYPE)
}

fn default_metadata_key() -> String {
String::from(METADATA)
fn default_metadata_key() -> OptionalValuePath {
OptionalValuePath::new(METADATA)
}

pub fn message_key(&self) -> Option<&OwnedValuePath> {
Expand Down Expand Up @@ -134,8 +134,8 @@ impl LogSchema {
self.source_type_key.path.as_ref()
}

pub fn metadata_key(&self) -> &str {
&self.metadata_key
pub fn metadata_key(&self) -> Option<&OwnedValuePath> {
self.metadata_key.path.as_ref()
}

pub fn set_message_key(&mut self, path: Option<OwnedValuePath>) {
Expand All @@ -154,8 +154,8 @@ impl LogSchema {
self.source_type_key = OptionalValuePath { path };
}

pub fn set_metadata_key(&mut self, v: String) {
self.metadata_key = v;
pub fn set_metadata_key(&mut self, path: Option<OwnedValuePath>) {
self.metadata_key = OptionalValuePath { path };
}

/// Merge two `LogSchema` instances together.
Expand Down Expand Up @@ -202,7 +202,7 @@ impl LogSchema {
{
errors.push("conflicting values for 'log_schema.metadata_key' found".to_owned());
} else {
self.set_metadata_key(other.metadata_key().to_string());
self.set_metadata_key(other.metadata_key().cloned());
}
}

Expand Down
17 changes: 17 additions & 0 deletions lib/vector-core/src/event/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use vector_common::{
internal_event::TaggedEventsSent, json_size::JsonSize, request_metadata::GetEventCountTags,
EventDataEq,
};
use vrl::path::{PathPrefix, ValuePath};

use super::{
BatchNotifier, EstimatedJsonEncodedSizeOf, EventFinalizer, EventFinalizers, EventMetadata,
Expand Down Expand Up @@ -84,13 +85,29 @@ impl TraceEvent {
self.0.contains(key.as_ref())
}

// TODO This should eventually use TargetPath for the `key` parameter.
// https://github.com/vectordotdev/vector/issues/18059
pub fn insert(
&mut self,
key: impl AsRef<str>,
value: impl Into<Value> + Debug,
) -> Option<Value> {
self.0.insert(key.as_ref(), value.into())
}

// TODO Audit code and use this if possible.
// https://github.com/vectordotdev/vector/issues/18059
pub fn maybe_insert<'a, F: FnOnce() -> Value>(
&mut self,
prefix: PathPrefix,
path: Option<impl ValuePath<'a>>,
value_callback: F,
) -> Option<Value> {
if let Some(path) = path {
return self.0.insert((prefix, path), value_callback());
}
None
}
}

impl From<LogEvent> for TraceEvent {
Expand Down
56 changes: 31 additions & 25 deletions src/transforms/remap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ use std::{
};

use codecs::MetricTagValues;
use lookup::lookup_v2::{parse_value_path, ValuePath};
use lookup::{metadata_path, owned_value_path, path, PathPrefix};
use lookup::{metadata_path, owned_value_path, PathPrefix};
use snafu::{ResultExt, Snafu};
use vector_common::TimeZone;
use vector_config::configurable_component;
Expand All @@ -20,6 +19,8 @@ use vrl::compiler::runtime::{Runtime, Terminate};
use vrl::compiler::state::ExternalEnv;
use vrl::compiler::{CompileConfig, ExpressionError, Function, Program, TypeState, VrlRuntime};
use vrl::diagnostic::{DiagnosticMessage, Formatter, Note};
use vrl::path;
use vrl::path::ValuePath;
use vrl::value::{Kind, Value};

use crate::config::OutputId;
Expand Down Expand Up @@ -288,7 +289,7 @@ impl TransformConfig for RemapConfig {
let dropped_definition = Definition::combine_log_namespaces(
input_definition.log_namespaces(),
input_definition.clone().with_event_field(
&parse_value_path(log_schema().metadata_key()).expect("valid metadata key"),
log_schema().metadata_key().expect("valid metadata key"),
Kind::object(BTreeMap::from([
("reason".into(), Kind::bytes()),
("message".into(), Kind::bytes()),
Expand Down Expand Up @@ -451,13 +452,12 @@ where
match event {
Event::Log(ref mut log) => match log.namespace() {
LogNamespace::Legacy => {
log.insert(
(
PathPrefix::Event,
log_schema().metadata_key().concat(path!("dropped")),
),
self.dropped_data(reason, error),
);
if let Some(metadata_key) = log_schema().metadata_key() {
log.insert(
(PathPrefix::Event, metadata_key.concat(path!("dropped"))),
self.dropped_data(reason, error),
);
}
}
LogNamespace::Vector => {
log.insert(
Expand All @@ -467,23 +467,29 @@ where
}
},
Event::Metric(ref mut metric) => {
let m = log_schema().metadata_key();
metric.replace_tag(format!("{}.dropped.reason", m), reason.into());
metric.replace_tag(
format!("{}.dropped.component_id", m),
self.component_key
.as_ref()
.map(ToString::to_string)
.unwrap_or_else(String::new),
);
metric.replace_tag(format!("{}.dropped.component_type", m), "remap".into());
metric.replace_tag(format!("{}.dropped.component_kind", m), "transform".into());
if let Some(metadata_key) = log_schema().metadata_key() {
metric.replace_tag(format!("{}.dropped.reason", metadata_key), reason.into());
metric.replace_tag(
format!("{}.dropped.component_id", metadata_key),
self.component_key
.as_ref()
.map(ToString::to_string)
.unwrap_or_else(String::new),
);
metric.replace_tag(
format!("{}.dropped.component_type", metadata_key),
"remap".into(),
);
metric.replace_tag(
format!("{}.dropped.component_kind", metadata_key),
"transform".into(),
);
}
}
Event::Trace(ref mut trace) => {
trace.insert(
log_schema().metadata_key(),
self.dropped_data(reason, error),
);
trace.maybe_insert(PathPrefix::Event, log_schema().metadata_key(), || {
self.dropped_data(reason, error).into()
});
}
}
}
Expand Down

0 comments on commit 8663602

Please sign in to comment.