From e476e120503d8682c8aef511b7af9b8851f2d03c Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Tue, 8 Aug 2023 12:46:05 -0400 Subject: [PATCH] feat: Refactor 'event.get()' to use path types (#18160) * feat: Refactor 'get()' to use path types * vdev fmt --- lib/vector-core/src/event/discriminant.rs | 8 ++++- lib/vector-core/src/event/log_event.rs | 31 +++++++++++++---- lib/vector-core/src/event/trace.rs | 15 ++++++-- src/api/schema/events/trace.rs | 3 +- src/conditions/datadog_search.rs | 34 +++++++++++++------ .../datadog/traces/apm_stats/aggregation.rs | 9 ++--- src/sinks/datadog/traces/request_builder.rs | 27 ++++++++------- src/sinks/datadog/traces/sink.rs | 11 +++--- src/sinks/elasticsearch/config.rs | 4 ++- src/sinks/new_relic/model.rs | 5 +-- src/sources/datadog_agent/traces.rs | 14 ++++---- src/template.rs | 12 ++++--- src/transforms/sample.rs | 10 ++++-- 13 files changed, 124 insertions(+), 59 deletions(-) diff --git a/lib/vector-core/src/event/discriminant.rs b/lib/vector-core/src/event/discriminant.rs index 7c1eb40863e1c..fcbd5d0fa818f 100644 --- a/lib/vector-core/src/event/discriminant.rs +++ b/lib/vector-core/src/event/discriminant.rs @@ -27,7 +27,13 @@ impl Discriminant { pub fn from_log_event(event: &LogEvent, discriminant_fields: &[impl AsRef]) -> Self { let values: Vec> = discriminant_fields .iter() - .map(|discriminant_field| event.get(discriminant_field.as_ref()).cloned()) + .map(|discriminant_field| { + event + .parse_path_and_get_value(discriminant_field.as_ref()) + .ok() + .flatten() + .cloned() + }) .collect(); Self { values } } diff --git a/lib/vector-core/src/event/log_event.rs b/lib/vector-core/src/event/log_event.rs index 4eee59b608bc5..b2aeab55fab4e 100644 --- a/lib/vector-core/src/event/log_event.rs +++ b/lib/vector-core/src/event/log_event.rs @@ -20,7 +20,7 @@ use vector_common::{ request_metadata::GetEventCountTags, EventDataEq, }; -use vrl::path::OwnedTargetPath; +use vrl::path::{parse_target_path, OwnedTargetPath, PathParseError}; use super::{ estimated_json_encoded_size_of::EstimatedJsonEncodedSizeOf, @@ -295,6 +295,16 @@ impl LogEvent { self.metadata.add_finalizer(finalizer); } + /// Parse the specified `path` and if there are no parsing errors, attempt to get a reference to a value. + /// # Errors + /// Will return an error if path parsing failed. + pub fn parse_path_and_get_value( + &self, + path: impl AsRef, + ) -> Result, PathParseError> { + parse_target_path(path.as_ref()).map(|path| self.get(&path)) + } + #[allow(clippy::needless_pass_by_value)] // TargetPath is always a reference pub fn get<'a>(&self, key: impl TargetPath<'a>) -> Option<&Value> { match key.prefix() { @@ -439,11 +449,14 @@ impl LogEvent { let Some(incoming_val) = incoming.remove(field.as_ref()) else { continue }; - match self.get_mut(field.as_ref()) { - None => { - self.insert(field.as_ref(), incoming_val); + + if let Ok(path) = parse_target_path(field.as_ref()) { + match self.get_mut(&path) { + None => { + self.insert(&path, incoming_val); + } + Some(current_val) => current_val.merge(incoming_val), } - Some(current_val) => current_val.merge(incoming_val), } } self.metadata.merge(incoming.metadata); @@ -642,7 +655,9 @@ where type Output = Value; fn index(&self, key: T) -> &Value { - self.get(key.as_ref()) + self.parse_path_and_get_value(key.as_ref()) + .ok() + .flatten() .unwrap_or_else(|| panic!("Key is not found: {:?}", key.as_ref())) } } @@ -654,7 +669,9 @@ where { fn extend>(&mut self, iter: I) { for (k, v) in iter { - self.insert(k.as_ref(), v.into()); + if let Ok(path) = parse_target_path(k.as_ref()) { + self.insert(&path, v.into()); + } } } } diff --git a/lib/vector-core/src/event/trace.rs b/lib/vector-core/src/event/trace.rs index d5fc7856c9066..1762790bf8676 100644 --- a/lib/vector-core/src/event/trace.rs +++ b/lib/vector-core/src/event/trace.rs @@ -7,6 +7,7 @@ use vector_common::{ internal_event::TaggedEventsSent, json_size::JsonSize, request_metadata::GetEventCountTags, EventDataEq, }; +use vrl::path::PathParseError; use super::{ BatchNotifier, EstimatedJsonEncodedSizeOf, EventFinalizer, EventFinalizers, EventMetadata, @@ -71,13 +72,23 @@ impl TraceEvent { self.0.as_map().expect("inner value must be a map") } + /// Parse the specified `path` and if there are no parsing errors, attempt to get a reference to a value. + /// # Errors + /// Will return an error if path parsing failed. + pub fn parse_path_and_get_value( + &self, + path: impl AsRef, + ) -> Result, PathParseError> { + self.0.parse_path_and_get_value(path) + } + #[allow(clippy::needless_pass_by_value)] // TargetPath is always a reference pub fn get<'a>(&self, key: impl TargetPath<'a>) -> Option<&Value> { self.0.get(key) } - pub fn get_mut(&mut self, key: impl AsRef) -> Option<&mut Value> { - self.0.get_mut(key.as_ref()) + pub fn get_mut<'a>(&mut self, key: impl TargetPath<'a>) -> Option<&mut Value> { + self.0.get_mut(key) } pub fn contains(&self, key: impl AsRef) -> bool { diff --git a/src/api/schema/events/trace.rs b/src/api/schema/events/trace.rs index 9291e4db2888a..9bd52d3ce053e 100644 --- a/src/api/schema/events/trace.rs +++ b/src/api/schema/events/trace.rs @@ -1,5 +1,6 @@ use async_graphql::Object; use vector_common::encode_logfmt; +use vrl::event_path; use super::EventEncodingType; use crate::{event, topology::TapOutput}; @@ -48,7 +49,7 @@ impl Trace { /// Get JSON field data on the trace event, by field name async fn json(&self, field: String) -> Option { - self.event.get(field.as_str()).map(|field| { + self.event.get(event_path!(field.as_str())).map(|field| { serde_json::to_string(field) .expect("JSON serialization of log event field failed. Please report.") }) diff --git a/src/conditions/datadog_search.rs b/src/conditions/datadog_search.rs index 2e2e0e88f1344..90e9641567038 100644 --- a/src/conditions/datadog_search.rs +++ b/src/conditions/datadog_search.rs @@ -75,7 +75,12 @@ impl Filter for EventFilter { any_string_match("tags", move |value| value == field) } Field::Default(f) | Field::Facet(f) | Field::Reserved(f) => { - Run::boxed(move |log: &LogEvent| log.get(f.as_str()).is_some()) + Run::boxed(move |log: &LogEvent| { + log.parse_path_and_get_value(f.as_str()) + .ok() + .flatten() + .is_some() + }) } } } @@ -165,8 +170,11 @@ impl Filter for EventFilter { match field { // Facets are compared numerically if the value is numeric, or as strings otherwise. Field::Facet(f) => { - Run::boxed( - move |log: &LogEvent| match (log.get(f.as_str()), &comparison_value) { + Run::boxed(move |log: &LogEvent| { + match ( + log.parse_path_and_get_value(f.as_str()).ok().flatten(), + &comparison_value, + ) { // Integers. (Some(Value::Integer(lhs)), ComparisonValue::Integer(rhs)) => { match comparator { @@ -227,8 +235,8 @@ impl Filter for EventFilter { } } _ => false, - }, - ) + } + }) } // Tag values need extracting by "key:value" to be compared. Field::Tag(tag) => any_string_match("tags", move |value| match value.split_once(':') { @@ -266,9 +274,11 @@ where { let field = field.into(); - Run::boxed(move |log: &LogEvent| match log.get(field.as_str()) { - Some(Value::Bytes(v)) => func(String::from_utf8_lossy(v)), - _ => false, + Run::boxed(move |log: &LogEvent| { + match log.parse_path_and_get_value(field.as_str()).ok().flatten() { + Some(Value::Bytes(v)) => func(String::from_utf8_lossy(v)), + _ => false, + } }) } @@ -281,9 +291,11 @@ where { let field = field.into(); - Run::boxed(move |log: &LogEvent| match log.get(field.as_str()) { - Some(Value::Array(values)) => func(values), - _ => false, + Run::boxed(move |log: &LogEvent| { + match log.parse_path_and_get_value(field.as_str()).ok().flatten() { + Some(Value::Array(values)) => func(values), + _ => false, + } }) } diff --git a/src/sinks/datadog/traces/apm_stats/aggregation.rs b/src/sinks/datadog/traces/apm_stats/aggregation.rs index 9ae1ae6e50cc5..2c7d3a580e6ad 100644 --- a/src/sinks/datadog/traces/apm_stats/aggregation.rs +++ b/src/sinks/datadog/traces/apm_stats/aggregation.rs @@ -1,6 +1,7 @@ use std::{collections::BTreeMap, sync::Arc}; use chrono::Utc; +use vrl::event_path; use super::{ bucket::Bucket, ClientStatsBucket, ClientStatsPayload, PartitionKey, @@ -179,7 +180,7 @@ impl Aggregator { pub(crate) fn handle_trace(&mut self, partition_key: &PartitionKey, trace: &TraceEvent) { // Based on https://github.com/DataDog/datadog-agent/blob/cfa750c7412faa98e87a015f8ee670e5828bbe7f/pkg/trace/stats/concentrator.go#L148-L184 - let spans = match trace.get("spans") { + let spans = match trace.get(event_path!("spans")) { Some(Value::Array(v)) => v.iter().filter_map(|s| s.as_object()).collect(), _ => vec![], }; @@ -189,16 +190,16 @@ impl Aggregator { env: partition_key.env.clone().unwrap_or_default(), hostname: partition_key.hostname.clone().unwrap_or_default(), version: trace - .get("app_version") + .get(event_path!("app_version")) .map(|v| v.to_string_lossy().into_owned()) .unwrap_or_default(), container_id: trace - .get("container_id") + .get(event_path!("container_id")) .map(|v| v.to_string_lossy().into_owned()) .unwrap_or_default(), }; let synthetics = trace - .get("origin") + .get(event_path!("origin")) .map(|v| v.to_string_lossy().starts_with(TAG_SYNTHETICS)) .unwrap_or(false); diff --git a/src/sinks/datadog/traces/request_builder.rs b/src/sinks/datadog/traces/request_builder.rs index 68081627cc76a..64fa9f0d9c7ed 100644 --- a/src/sinks/datadog/traces/request_builder.rs +++ b/src/sinks/datadog/traces/request_builder.rs @@ -10,6 +10,7 @@ use prost::Message; use snafu::Snafu; use vector_common::request_metadata::RequestMetadata; use vector_core::event::{EventFinalizers, Finalizable}; +use vrl::event_path; use super::{ apm_stats::{compute_apm_stats, Aggregator}, @@ -283,7 +284,7 @@ impl DatadogTracesEncoder { fn vector_trace_into_dd_tracer_payload(trace: &TraceEvent) -> dd_proto::TracerPayload { let tags = trace - .get("tags") + .get(event_path!("tags")) .and_then(|m| m.as_object()) .map(|m| { m.iter() @@ -292,7 +293,7 @@ impl DatadogTracesEncoder { }) .unwrap_or_default(); - let spans = match trace.get("spans") { + let spans = match trace.get(event_path!("spans")) { Some(Value::Array(v)) => v .iter() .filter_map(|s| s.as_object().map(DatadogTracesEncoder::convert_span)) @@ -302,7 +303,7 @@ impl DatadogTracesEncoder { let chunk = dd_proto::TraceChunk { priority: trace - .get("priority") + .get(event_path!("priority")) .and_then(|v| v.as_integer().map(|v| v as i32)) // This should not happen for Datadog originated traces, but in case this field is not populated // we default to 1 (https://github.com/DataDog/datadog-agent/blob/eac2327/pkg/trace/sampler/sampler.go#L54-L55), @@ -310,11 +311,11 @@ impl DatadogTracesEncoder { // https://github.com/DataDog/datadog-agent/blob/3ea2eb4/pkg/trace/api/otlp.go#L309. .unwrap_or(1i32), origin: trace - .get("origin") + .get(event_path!("origin")) .map(|v| v.to_string_lossy().into_owned()) .unwrap_or_default(), dropped_trace: trace - .get("dropped") + .get(event_path!("dropped")) .and_then(|v| v.as_boolean()) .unwrap_or(false), spans, @@ -323,37 +324,37 @@ impl DatadogTracesEncoder { dd_proto::TracerPayload { container_id: trace - .get("container_id") + .get(event_path!("container_id")) .map(|v| v.to_string_lossy().into_owned()) .unwrap_or_default(), language_name: trace - .get("language_name") + .get(event_path!("language_name")) .map(|v| v.to_string_lossy().into_owned()) .unwrap_or_default(), language_version: trace - .get("language_version") + .get(event_path!("language_version")) .map(|v| v.to_string_lossy().into_owned()) .unwrap_or_default(), tracer_version: trace - .get("tracer_version") + .get(event_path!("tracer_version")) .map(|v| v.to_string_lossy().into_owned()) .unwrap_or_default(), runtime_id: trace - .get("runtime_id") + .get(event_path!("runtime_id")) .map(|v| v.to_string_lossy().into_owned()) .unwrap_or_default(), chunks: vec![chunk], tags, env: trace - .get("env") + .get(event_path!("env")) .map(|v| v.to_string_lossy().into_owned()) .unwrap_or_default(), hostname: trace - .get("hostname") + .get(event_path!("hostname")) .map(|v| v.to_string_lossy().into_owned()) .unwrap_or_default(), app_version: trace - .get("app_version") + .get(event_path!("app_version")) .map(|v| v.to_string_lossy().into_owned()) .unwrap_or_default(), } diff --git a/src/sinks/datadog/traces/sink.rs b/src/sinks/datadog/traces/sink.rs index fa1e2cc3f2280..4cf502a7eebae 100644 --- a/src/sinks/datadog/traces/sink.rs +++ b/src/sinks/datadog/traces/sink.rs @@ -7,6 +7,7 @@ use futures_util::{ }; use tokio::sync::oneshot::{channel, Sender}; use tower::Service; +use vrl::event_path; use vrl::path::PathPrefix; use vector_core::{ @@ -54,19 +55,21 @@ impl Partitioner for EventPartitioner { } Event::Trace(t) => PartitionKey { api_key: item.metadata().datadog_api_key(), - env: t.get("env").map(|s| s.to_string_lossy().into_owned()), + env: t + .get(event_path!("env")) + .map(|s| s.to_string_lossy().into_owned()), hostname: log_schema().host_key().and_then(|key| { t.get((PathPrefix::Event, key)) .map(|s| s.to_string_lossy().into_owned()) }), agent_version: t - .get("agent_version") + .get(event_path!("agent_version")) .map(|s| s.to_string_lossy().into_owned()), target_tps: t - .get("target_tps") + .get(event_path!("target_tps")) .and_then(|tps| tps.as_integer().map(Into::into)), error_tps: t - .get("error_tps") + .get(event_path!("error_tps")) .and_then(|tps| tps.as_integer().map(Into::into)), }, } diff --git a/src/sinks/elasticsearch/config.rs b/src/sinks/elasticsearch/config.rs index 9f50d79e44a2d..d84153fb2a899 100644 --- a/src/sinks/elasticsearch/config.rs +++ b/src/sinks/elasticsearch/config.rs @@ -443,7 +443,9 @@ impl DataStreamConfig { let (dtype, dataset, namespace) = if !self.auto_routing { (self.dtype(log)?, self.dataset(log)?, self.namespace(log)?) } else { - let data_stream = log.get("data_stream").and_then(|ds| ds.as_object()); + let data_stream = log + .get(event_path!("data_stream")) + .and_then(|ds| ds.as_object()); let dtype = data_stream .and_then(|ds| ds.get("type")) .map(|value| value.to_string_lossy().into_owned()) diff --git a/src/sinks/new_relic/model.rs b/src/sinks/new_relic/model.rs index c49337a13e3e4..27cf7292dcdbf 100644 --- a/src/sinks/new_relic/model.rs +++ b/src/sinks/new_relic/model.rs @@ -3,6 +3,7 @@ use std::{collections::HashMap, convert::TryFrom, fmt::Debug, time::SystemTime}; use chrono::{DateTime, Utc}; use ordered_float::NotNan; use serde::{Deserialize, Serialize}; +use vrl::event_path; use super::NewRelicSinkError; use crate::event::{Event, MetricValue, Value}; @@ -117,7 +118,7 @@ impl TryFrom> for EventsApiModel { event_model.insert(k, v.clone()); } - if let Some(message) = log.get("message") { + if let Some(message) = log.get(event_path!("message")) { let message = message.to_string_lossy().replace("\\\"", "\""); // If message contains a JSON string, parse it and insert all fields into self if let serde_json::Result::Ok(json_map) = @@ -189,7 +190,7 @@ impl TryFrom> for LogsApiModel { for (k, v) in log.convert_to_fields() { log_model.insert(k, v.clone()); } - if log.get("message").is_none() { + if log.get(event_path!("message")).is_none() { log_model.insert( "message".to_owned(), Value::from("log from vector".to_owned()), diff --git a/src/sources/datadog_agent/traces.rs b/src/sources/datadog_agent/traces.rs index ec5a56636169f..bd9099c13a2ab 100644 --- a/src/sources/datadog_agent/traces.rs +++ b/src/sources/datadog_agent/traces.rs @@ -146,16 +146,16 @@ fn handle_dd_trace_payload_v1( &source.log_schema_source_type_key, Bytes::from("datadog_agent"), ); - trace_event.insert("payload_version", "v2".to_string()); + trace_event.insert(event_path!("payload_version"), "v2".to_string()); trace_event.insert(&source.log_schema_host_key, hostname.clone()); - trace_event.insert("env", env.clone()); - trace_event.insert("agent_version", agent_version.clone()); - trace_event.insert("target_tps", target_tps); - trace_event.insert("error_tps", error_tps); - if let Some(Value::Object(span_tags)) = trace_event.get_mut("tags") { + trace_event.insert(event_path!("env"), env.clone()); + trace_event.insert(event_path!("agent_version"), agent_version.clone()); + trace_event.insert(event_path!("target_tps"), target_tps); + trace_event.insert(event_path!("error_tps"), error_tps); + if let Some(Value::Object(span_tags)) = trace_event.get_mut(event_path!("tags")) { span_tags.extend(tags.clone()); } else { - trace_event.insert("tags", Value::from(tags.clone())); + trace_event.insert(event_path!("tags"), Value::from(tags.clone())); } Event::Trace(trace_event) }) diff --git a/src/template.rs b/src/template.rs index b4230f351b12e..0312347b7b113 100644 --- a/src/template.rs +++ b/src/template.rs @@ -167,13 +167,17 @@ impl Template { Part::Reference(key) => { out.push_str( &match event { - EventRef::Log(log) => log.get(&**key).map(Value::to_string_lossy), + EventRef::Log(log) => log + .parse_path_and_get_value(key) + .ok() + .and_then(|v| v.map(Value::to_string_lossy)), EventRef::Metric(metric) => { render_metric_field(key, metric).map(Cow::Borrowed) } - EventRef::Trace(trace) => { - trace.get(key.as_str()).map(Value::to_string_lossy) - } + EventRef::Trace(trace) => trace + .parse_path_and_get_value(key) + .ok() + .and_then(|v| v.map(Value::to_string_lossy)), } .unwrap_or_else(|| { missing_keys.push(key.to_owned()); diff --git a/src/transforms/sample.rs b/src/transforms/sample.rs index 53ca4847f5723..a27b57f2cca9c 100644 --- a/src/transforms/sample.rs +++ b/src/transforms/sample.rs @@ -130,8 +130,14 @@ impl FunctionTransform for Sample { .key_field .as_ref() .and_then(|key_field| match &event { - Event::Log(event) => event.get(key_field.as_str()), - Event::Trace(event) => event.get(key_field.as_str()), + Event::Log(event) => event + .parse_path_and_get_value(key_field.as_str()) + .ok() + .flatten(), + Event::Trace(event) => event + .parse_path_and_get_value(key_field.as_str()) + .ok() + .flatten(), Event::Metric(_) => panic!("component can never receive metric events"), }) .map(|v| v.to_string_lossy());