Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: replace tuples with &OwnedTargetPath wherever possible #18097

Merged
merged 3 commits into from
Jul 27, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions benches/template.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,7 @@ fn bench_elasticsearch_index(c: &mut Criterion) {
let index = Template::try_from("index-%Y.%m.%d").unwrap();
let mut event = Event::Log(LogEvent::from("hello world"));
event.as_mut_log().insert(
(
lookup::PathPrefix::Event,
log_schema().timestamp_key().unwrap(),
),
log_schema().timestamp_key_target_path().unwrap(),
Utc::now(),
);

Expand All @@ -31,10 +28,7 @@ fn bench_elasticsearch_index(c: &mut Criterion) {
let index = Template::try_from("index").unwrap();
let mut event = Event::Log(LogEvent::from("hello world"));
event.as_mut_log().insert(
(
lookup::PathPrefix::Event,
log_schema().timestamp_key().unwrap(),
),
log_schema().timestamp_key_target_path().unwrap(),
Utc::now(),
);

Expand Down
3 changes: 1 addition & 2 deletions lib/codecs/src/decoding/format/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use vector_core::{
event::{Event, LogEvent},
schema,
};
use vrl::path::PathPrefix;
use vrl::value::Kind;

use super::Deserializer;
Expand Down Expand Up @@ -63,7 +62,7 @@ impl BytesDeserializer {
LogNamespace::Vector => log_namespace.new_log_from_data(bytes),
LogNamespace::Legacy => {
let mut log = LogEvent::default();
log.maybe_insert(PathPrefix::Event, log_schema().message_key(), bytes);
log.maybe_insert(log_schema().message_key_target_path(), bytes);
log
}
}
Expand Down
15 changes: 6 additions & 9 deletions lib/codecs/src/decoding/format/gelf.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use bytes::Bytes;
use chrono::{DateTime, NaiveDateTime, Utc};
use derivative::Derivative;
use lookup::{event_path, owned_value_path, PathPrefix};
use lookup::{event_path, owned_value_path};
use serde::{Deserialize, Serialize};
use smallvec::{smallvec, SmallVec};
use std::collections::HashMap;
Expand Down Expand Up @@ -130,20 +130,17 @@ impl GelfDeserializer {
log.insert(FULL_MESSAGE, full_message.to_string());
}

if let Some(timestamp_key) = log_schema().timestamp_key() {
if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
if let Some(timestamp) = parsed.timestamp {
let naive = NaiveDateTime::from_timestamp_opt(
f64::trunc(timestamp) as i64,
f64::fract(timestamp) as u32,
)
.expect("invalid timestamp");
log.insert(
(PathPrefix::Event, timestamp_key),
DateTime::<Utc>::from_utc(naive, Utc),
);
log.insert(timestamp_key, DateTime::<Utc>::from_utc(naive, Utc));
// per GELF spec- add timestamp if not provided
} else {
log.insert((PathPrefix::Event, timestamp_key), Utc::now());
log.insert(timestamp_key, Utc::now());
}
}

Expand Down Expand Up @@ -293,7 +290,7 @@ mod tests {
Some(&Value::Bytes(Bytes::from_static(b"example.org")))
);
assert_eq!(
log.get((PathPrefix::Event, log_schema().message_key().unwrap())),
log.get(log_schema().message_key_target_path().unwrap()),
Some(&Value::Bytes(Bytes::from_static(
b"A short message that helps you identify what is going on"
)))
Expand Down Expand Up @@ -348,7 +345,7 @@ mod tests {
let events = deserialize_gelf_input(&input).unwrap();
assert_eq!(events.len(), 1);
let log = events[0].as_log();
assert!(log.contains((PathPrefix::Event, log_schema().message_key().unwrap())));
assert!(log.contains(log_schema().message_key_target_path().unwrap()));
}

// filter out id
Expand Down
9 changes: 4 additions & 5 deletions lib/codecs/src/decoding/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::convert::TryInto;
use bytes::Bytes;
use chrono::Utc;
use derivative::Derivative;
use lookup::PathPrefix;
use smallvec::{smallvec, SmallVec};
use vector_config::configurable_component;
use vector_core::{
Expand Down Expand Up @@ -133,11 +132,11 @@ impl Deserializer for JsonDeserializer {
LogNamespace::Legacy => {
let timestamp = Utc::now();

if let Some(timestamp_key) = log_schema().timestamp_key() {
if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
for event in &mut events {
let log = event.as_mut_log();
if !log.contains((PathPrefix::Event, timestamp_key)) {
log.insert((PathPrefix::Event, timestamp_key), timestamp);
if !log.contains(timestamp_key) {
log.insert(timestamp_key, timestamp);
}
}
}
Expand Down Expand Up @@ -218,7 +217,7 @@ mod tests {
let log = event.as_log();
assert_eq!(log["bar"], 456.into());
assert_eq!(
log.get((PathPrefix::Event, log_schema().timestamp_key().unwrap()))
log.get(log_schema().timestamp_key_target_path().unwrap())
.is_some(),
namespace == LogNamespace::Legacy
);
Expand Down
8 changes: 3 additions & 5 deletions lib/codecs/src/decoding/format/syslog.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use bytes::Bytes;
use chrono::{DateTime, Datelike, Utc};
use derivative::Derivative;
use lookup::{event_path, owned_value_path, OwnedTargetPath, OwnedValuePath, PathPrefix};
use lookup::{event_path, owned_value_path, OwnedTargetPath, OwnedValuePath};
use smallvec::{smallvec, SmallVec};
use std::borrow::Cow;
use std::collections::BTreeMap;
Expand Down Expand Up @@ -428,7 +428,7 @@ fn insert_fields_from_syslog(
) {
match log_namespace {
LogNamespace::Legacy => {
log.maybe_insert(PathPrefix::Event, log_schema().message_key(), parsed.msg);
log.maybe_insert(log_schema().message_key_target_path(), parsed.msg);
}
LogNamespace::Vector => {
log.insert(event_path!("message"), parsed.msg);
Expand All @@ -439,9 +439,7 @@ fn insert_fields_from_syslog(
let timestamp = DateTime::<Utc>::from(timestamp);
match log_namespace {
LogNamespace::Legacy => {
if let Some(timestamp_key) = log_schema().timestamp_key() {
log.insert((PathPrefix::Event, timestamp_key), timestamp);
}
log.maybe_insert(log_schema().timestamp_key_target_path(), timestamp);
}
LogNamespace::Vector => {
log.insert(event_path!("timestamp"), timestamp);
Expand Down
3 changes: 1 addition & 2 deletions lib/opentelemetry-proto/src/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use vector_core::{
config::{log_schema, LegacyKey, LogNamespace},
event::{Event, LogEvent},
};
use vrl::path::PathPrefix;
use vrl::value::Value;

use super::proto::{
Expand Down Expand Up @@ -95,7 +94,7 @@ impl ResourceLog {
LogNamespace::Legacy => {
let mut log = LogEvent::default();
if let Some(v) = self.log_record.body.and_then(|av| av.value) {
log.maybe_insert(PathPrefix::Event, log_schema().message_key(), v);
log.maybe_insert(log_schema().message_key_target_path(), v);
}
log
}
Expand Down
15 changes: 5 additions & 10 deletions lib/vector-core/src/event/log_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use vector_common::{
request_metadata::GetEventCountTags,
EventDataEq,
};
use vrl::path::{OwnedTargetPath, OwnedValuePath};
use vrl::path::OwnedTargetPath;

use super::{
estimated_json_encoded_size_of::EstimatedJsonEncodedSizeOf,
Expand Down Expand Up @@ -160,7 +160,7 @@ impl LogEvent {
/// valid for `LogNamespace::Legacy`
pub fn from_str_legacy(msg: impl Into<String>) -> Self {
let mut log = LogEvent::default();
log.maybe_insert(PathPrefix::Event, log_schema().message_key(), msg.into());
log.maybe_insert(log_schema().message_key_target_path(), msg.into());

if let Some(timestamp_key) = log_schema().timestamp_key() {
log.insert((PathPrefix::Event, timestamp_key), Utc::now());
Expand Down Expand Up @@ -356,14 +356,9 @@ impl LogEvent {
}
}

pub fn maybe_insert(
&mut self,
prefix: PathPrefix,
path: Option<&OwnedValuePath>,
value: impl Into<Value>,
) {
pub fn maybe_insert<'a>(&mut self, path: Option<impl TargetPath<'a>>, value: impl Into<Value>) {
if let Some(path) = path {
self.insert((prefix, path), value);
self.insert(path, value);
}
}

Expand Down Expand Up @@ -572,7 +567,7 @@ mod test_utils {
impl From<Bytes> for LogEvent {
fn from(message: Bytes) -> Self {
let mut log = LogEvent::default();
log.maybe_insert(PathPrefix::Event, log_schema().message_key(), message);
log.maybe_insert(log_schema().message_key_target_path(), message);
if let Some(timestamp_key) = log_schema().timestamp_key() {
log.insert((PathPrefix::Event, timestamp_key), Utc::now());
}
Expand Down
2 changes: 1 addition & 1 deletion lib/vector-core/src/event/vrl_target.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub struct TargetIter<T> {

fn create_log_event(value: Value, metadata: EventMetadata) -> LogEvent {
let mut log = LogEvent::new_with_metadata(metadata);
log.maybe_insert(PathPrefix::Event, log_schema().message_key(), value);
log.maybe_insert(log_schema().message_key_target_path(), value);
log
}

Expand Down
5 changes: 1 addition & 4 deletions src/sinks/humio/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,10 +269,7 @@ mod integration_tests {
);

let ts = Utc.timestamp_nanos(Utc::now().timestamp_millis() * 1_000_000 + 132_456);
event.insert(
(PathPrefix::Event, log_schema().timestamp_key().unwrap()),
ts,
);
event.insert(log_schema().timestamp_key_target_path().unwrap(), ts);

run_and_assert_sink_compliance(sink, stream::once(ready(event)), &HTTP_SINK_TAGS).await;

Expand Down
8 changes: 4 additions & 4 deletions src/sinks/loki/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ async fn many_streams() {
let index = (i % 5) * 2;
let message = lines[index]
.as_log()
.get((PathPrefix::Event, log_schema().message_key().unwrap()))
.get(log_schema().message_key_target_path().unwrap())
.unwrap()
.to_string_lossy();
assert_eq!(output, &message);
Expand All @@ -338,7 +338,7 @@ async fn many_streams() {
let index = ((i % 5) * 2) + 1;
let message = lines[index]
.as_log()
.get((PathPrefix::Event, log_schema().message_key().unwrap()))
.get(log_schema().message_key_target_path().unwrap())
.unwrap()
.to_string_lossy();
assert_eq!(output, &message);
Expand Down Expand Up @@ -385,7 +385,7 @@ async fn interpolate_stream_key() {
for (i, output) in outputs.iter().enumerate() {
let message = lines[i]
.as_log()
.get((PathPrefix::Event, log_schema().message_key().unwrap()))
.get(log_schema().message_key_target_path().unwrap())
.unwrap()
.to_string_lossy();
assert_eq!(output, &message);
Expand Down Expand Up @@ -638,7 +638,7 @@ async fn test_out_of_order_events(
assert_eq!(
&expected[i]
.as_log()
.get((PathPrefix::Event, log_schema().message_key().unwrap()))
.get(log_schema().message_key_target_path().unwrap())
.unwrap()
.to_string_lossy(),
output,
Expand Down
2 changes: 1 addition & 1 deletion src/sources/aws_sqs/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ pub(crate) async fn test() {
for event in events {
let message = event
.as_log()
.get((PathPrefix::Event, log_schema().message_key().unwrap()))
.get(log_schema().message_key_target_path().unwrap())
.unwrap()
.to_string_lossy();
if !expected_messages.remove(message.as_ref()) {
Expand Down
10 changes: 4 additions & 6 deletions src/sources/aws_sqs/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,14 +219,12 @@ async fn delete_messages(client: SqsClient, receipts: Vec<String>, queue_url: St

#[cfg(test)]
mod tests {
use crate::codecs::DecodingConfig;
use chrono::SecondsFormat;
use lookup::path;
use vrl::path::PathPrefix;

use super::*;
use crate::codecs::DecodingConfig;
use crate::config::{log_schema, SourceConfig};
use crate::sources::aws_sqs::AwsSqsConfig;
use chrono::SecondsFormat;
use lookup::path;

#[tokio::test]
async fn test_decode_vector_namespace() {
Expand Down Expand Up @@ -313,7 +311,7 @@ mod tests {
events[0]
.clone()
.as_log()
.get((PathPrefix::Event, log_schema().message_key().unwrap()))
.get(log_schema().message_key_target_path().unwrap())
.unwrap()
.to_string_lossy(),
message
Expand Down
2 changes: 1 addition & 1 deletion src/sources/docker_logs/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -970,7 +970,7 @@ mod integration_tests {

event
.into_log()
.remove((PathPrefix::Event, log_schema().message_key().unwrap()))
.remove(log_schema().message_key_target_path().unwrap())
.unwrap()
.to_string_lossy()
.into_owned()
Expand Down
6 changes: 2 additions & 4 deletions src/sources/fluent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use chrono::Utc;
use codecs::{BytesDeserializerConfig, StreamDecodingError};
use flate2::read::MultiGzDecoder;
use lookup::lookup_v2::parse_value_path;
use lookup::{metadata_path, owned_value_path, path, OwnedValuePath, PathPrefix};
use lookup::{metadata_path, owned_value_path, path, OwnedValuePath};
use rmp_serde::{decode, Deserializer, Serializer};
use serde::{Deserialize, Serialize};
use smallvec::{smallvec, SmallVec};
Expand Down Expand Up @@ -599,9 +599,7 @@ impl From<FluentEvent<'_>> for LogEvent {
log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
}
LogNamespace::Legacy => {
if let Some(timestamp_key) = log_schema().timestamp_key() {
log.insert((PathPrefix::Event, timestamp_key), timestamp);
}
log.maybe_insert(log_schema().timestamp_key_target_path(), timestamp);
}
}

Expand Down
8 changes: 3 additions & 5 deletions src/sources/journald.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use bytes::Bytes;
use chrono::{TimeZone, Utc};
use codecs::{decoding::BoxedFramingError, CharacterDelimitedDecoder};
use futures::{poll, stream::BoxStream, task::Poll, StreamExt};
use lookup::{metadata_path, owned_value_path, path, PathPrefix};
use lookup::{metadata_path, owned_value_path, path};
use nix::{
sys::signal::{kill, Signal},
unistd::Pid,
Expand Down Expand Up @@ -741,9 +741,7 @@ fn enrich_log_event(log: &mut LogEvent, log_namespace: LogNamespace) {
}
LogNamespace::Legacy => {
if let Some(ts) = timestamp {
if let Some(timestamp_key) = log_schema().timestamp_key() {
log.insert((PathPrefix::Event, timestamp_key), ts);
}
log.maybe_insert(log_schema().timestamp_key_target_path(), ts);
}
}
}
Expand Down Expand Up @@ -784,7 +782,7 @@ fn create_log_event_from_record(
let mut log = LogEvent::from_iter(record).with_batch_notifier_option(batch);

if let Some(message) = log.remove(MESSAGE) {
log.maybe_insert(PathPrefix::Event, log_schema().message_key(), message);
log.maybe_insert(log_schema().message_key_target_path(), message);
}

log
Expand Down
Loading