Skip to content

Commit

Permalink
feat: Migrate LogSchema::message_key to new lookup code (vectordotdev…
Browse files Browse the repository at this point in the history
…#18024)

## Motivation
This part of vectordotdev#13033.

## Summary
* `LogSchema::message_key` is now an `OptionalValuePath`.
* To avoid hacky `String` to `&'static str` conversions, I changed the
`Requirement::meaning` key type to `String`.
  • Loading branch information
pront authored Jul 21, 2023
1 parent 689a79e commit 0f14c0d
Show file tree
Hide file tree
Showing 55 changed files with 639 additions and 410 deletions.
2 changes: 1 addition & 1 deletion benches/codecs/character_delimited_bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ fn decoding(c: &mut Criterion) {
.map(|ml| CharacterDelimitedDecoder::new_with_max_length(b'a', ml))
.unwrap_or(CharacterDelimitedDecoder::new(b'a')),
);
let deserializer = Deserializer::Bytes(BytesDeserializer::new());
let deserializer = Deserializer::Bytes(BytesDeserializer);
let decoder = vector::codecs::Decoder::new(framer, deserializer);

(Box::new(decoder), param.input.clone())
Expand Down
2 changes: 1 addition & 1 deletion benches/codecs/newline_bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ fn decoding(c: &mut Criterion) {
.map(|ml| NewlineDelimitedDecoder::new_with_max_length(ml))
.unwrap_or(NewlineDelimitedDecoder::new()),
);
let deserializer = Deserializer::Bytes(BytesDeserializer::new());
let deserializer = Deserializer::Bytes(BytesDeserializer);
let decoder = vector::codecs::Decoder::new(framer, deserializer);

(Box::new(decoder), param.input.clone())
Expand Down
35 changes: 11 additions & 24 deletions lib/codecs/src/decoding/format/bytes.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use bytes::Bytes;
use lookup::lookup_v2::parse_value_path;
use lookup::OwnedTargetPath;
use serde::{Deserialize, Serialize};
use smallvec::{smallvec, SmallVec};
Expand All @@ -9,6 +8,7 @@ use vector_core::{
event::{Event, LogEvent},
schema,
};
use vrl::path::PathPrefix;
use vrl::value::Kind;

use super::Deserializer;
Expand All @@ -25,7 +25,7 @@ impl BytesDeserializerConfig {

/// Build the `BytesDeserializer` from this configuration.
pub fn build(&self) -> BytesDeserializer {
BytesDeserializer::new()
BytesDeserializer
}

/// Return the type of event build by this deserializer.
Expand All @@ -37,7 +37,7 @@ impl BytesDeserializerConfig {
pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
match log_namespace {
LogNamespace::Legacy => schema::Definition::empty_legacy_namespace().with_event_field(
&parse_value_path(log_schema().message_key()).expect("valid message key"),
log_schema().message_key().expect("valid message key"),
Kind::bytes(),
Some("message"),
),
Expand All @@ -54,32 +54,16 @@ impl BytesDeserializerConfig {
/// This deserializer can be considered as the no-op action for input where no
/// further decoding has been specified.
#[derive(Debug, Clone)]
pub struct BytesDeserializer {
// Only used with the "Legacy" namespace. The "Vector" namespace decodes the data at the root of the event.
log_schema_message_key: &'static str,
}

impl Default for BytesDeserializer {
fn default() -> Self {
Self::new()
}
}
pub struct BytesDeserializer;

impl BytesDeserializer {
/// Creates a new `BytesDeserializer`.
pub fn new() -> Self {
Self {
log_schema_message_key: log_schema().message_key(),
}
}

/// Deserializes the given bytes, which will always produce a single `LogEvent`.
pub fn parse_single(&self, bytes: Bytes, log_namespace: LogNamespace) -> LogEvent {
match log_namespace {
LogNamespace::Vector => log_namespace.new_log_from_data(bytes),
LogNamespace::Legacy => {
let mut log = LogEvent::default();
log.insert(self.log_schema_message_key, bytes);
log.maybe_insert(PathPrefix::Event, log_schema().message_key(), bytes);
log
}
}
Expand Down Expand Up @@ -107,15 +91,18 @@ mod tests {
#[test]
fn deserialize_bytes_legacy_namespace() {
let input = Bytes::from("foo");
let deserializer = BytesDeserializer::new();
let deserializer = BytesDeserializer;

let events = deserializer.parse(input, LogNamespace::Legacy).unwrap();
let mut events = events.into_iter();

{
let event = events.next().unwrap();
let log = event.as_log();
assert_eq!(log[log_schema().message_key()], "foo".into());
assert_eq!(
log[log_schema().message_key().unwrap().to_string()],
"foo".into()
);
}

assert_eq!(events.next(), None);
Expand All @@ -124,7 +111,7 @@ mod tests {
#[test]
fn deserialize_bytes_vector_namespace() {
let input = Bytes::from("foo");
let deserializer = BytesDeserializer::new();
let deserializer = BytesDeserializer;

let events = deserializer.parse(input, LogNamespace::Vector).unwrap();
assert_eq!(events.len(), 1);
Expand Down
4 changes: 2 additions & 2 deletions lib/codecs/src/decoding/format/gelf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ mod tests {
Some(&Value::Bytes(Bytes::from_static(b"example.org")))
);
assert_eq!(
log.get(log_schema().message_key()),
log.get((PathPrefix::Event, log_schema().message_key().unwrap())),
Some(&Value::Bytes(Bytes::from_static(
b"A short message that helps you identify what is going on"
)))
Expand Down Expand Up @@ -348,7 +348,7 @@ mod tests {
let events = deserialize_gelf_input(&input).unwrap();
assert_eq!(events.len(), 1);
let log = events[0].as_log();
assert!(log.contains(log_schema().message_key()));
assert!(log.contains((PathPrefix::Event, log_schema().message_key().unwrap())));
}

// filter out id
Expand Down
14 changes: 8 additions & 6 deletions lib/codecs/src/decoding/format/syslog.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use bytes::Bytes;
use chrono::{DateTime, Datelike, Utc};
use derivative::Derivative;
use lookup::lookup_v2::parse_value_path;
use lookup::{event_path, owned_value_path, OwnedTargetPath, OwnedValuePath, PathPrefix};
use smallvec::{smallvec, SmallVec};
use std::borrow::Cow;
Expand Down Expand Up @@ -71,7 +70,7 @@ impl SyslogDeserializerConfig {
// The `message` field is always defined. If parsing fails, the entire body becomes the
// message.
.with_event_field(
&parse_value_path(log_schema().message_key()).expect("valid message key"),
log_schema().message_key().expect("valid message key"),
Kind::bytes(),
Some("message"),
);
Expand Down Expand Up @@ -429,7 +428,7 @@ fn insert_fields_from_syslog(
) {
match log_namespace {
LogNamespace::Legacy => {
log.insert(event_path!(log_schema().message_key()), parsed.msg);
log.maybe_insert(PathPrefix::Event, log_schema().message_key(), parsed.msg);
}
LogNamespace::Vector => {
log.insert(event_path!("message"), parsed.msg);
Expand Down Expand Up @@ -500,7 +499,10 @@ mod tests {

let events = deserializer.parse(input, LogNamespace::Legacy).unwrap();
assert_eq!(events.len(), 1);
assert_eq!(events[0].as_log()[log_schema().message_key()], "MSG".into());
assert_eq!(
events[0].as_log()[log_schema().message_key().unwrap().to_string()],
"MSG".into()
);
assert!(
events[0].as_log()[log_schema().timestamp_key().unwrap().to_string()].is_timestamp()
);
Expand All @@ -522,8 +524,8 @@ mod tests {

fn init() {
let mut schema = LogSchema::default();
schema.set_message_key("legacy_message".to_string());
schema.set_message_key("legacy_timestamp".to_string());
schema.set_message_key(Some(owned_value_path!("legacy_message")));
schema.set_message_key(Some(owned_value_path!("legacy_timestamp")));
init_log_schema(schema, false);
}
}
12 changes: 12 additions & 0 deletions lib/codecs/src/encoding/format/common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
use vector_core::config::log_schema;
use vector_core::schema;
use vrl::value::Kind;

/// Inspect the global log schema and create a schema requirement.
pub fn get_serializer_schema_requirement() -> schema::Requirement {
if let Some(message_key) = log_schema().message_key() {
schema::Requirement::empty().required_meaning(message_key.to_string(), Kind::any())
} else {
schema::Requirement::empty()
}
}
17 changes: 10 additions & 7 deletions lib/codecs/src/encoding/format/gelf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use vector_core::{
event::Value,
schema,
};
use vrl::path::PathPrefix;

/// On GELF encoding behavior:
/// Graylog has a relaxed parsing. They are much more lenient than the spec would
Expand Down Expand Up @@ -138,13 +139,15 @@ fn coerce_required_fields(mut log: LogEvent) -> vector_common::Result<LogEvent>
err_missing_field(HOST)?;
}

let message_key = log_schema().message_key();
if !log.contains(SHORT_MESSAGE) {
// rename the log_schema().message_key() to SHORT_MESSAGE
if log.contains(message_key) {
log.rename_key(message_key, SHORT_MESSAGE);
} else {
err_missing_field(SHORT_MESSAGE)?;
if let Some(message_key) = log_schema().message_key() {
// rename the log_schema().message_key() to SHORT_MESSAGE
let target_path = (PathPrefix::Event, message_key);
if log.contains(target_path) {
log.rename_key(target_path, SHORT_MESSAGE);
} else {
err_missing_field(SHORT_MESSAGE)?;
}
}
}
Ok(log)
Expand Down Expand Up @@ -329,7 +332,7 @@ mod tests {
let event_fields = btreemap! {
VERSION => "1.1",
HOST => "example.org",
log_schema().message_key() => "Some message",
log_schema().message_key().unwrap().to_string() => "Some message",
};

let jsn = do_serialize(true, event_fields).unwrap();
Expand Down
1 change: 1 addition & 0 deletions lib/codecs/src/encoding/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#![deny(missing_docs)]

mod avro;
mod common;
mod csv;
mod gelf;
mod json;
Expand Down
20 changes: 4 additions & 16 deletions lib/codecs/src/encoding/format/raw_message.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
use crate::encoding::format::common::get_serializer_schema_requirement;
use bytes::{BufMut, BytesMut};
use serde::{Deserialize, Serialize};
use tokio_util::codec::Encoder;
use vector_core::{
config::{log_schema, DataType},
event::Event,
schema,
};
use vrl::value::Kind;
use vector_core::{config::DataType, event::Event, schema};

/// Config used to build a `RawMessageSerializer`.
#[derive(Debug, Clone, Default, Deserialize, Serialize)]
Expand All @@ -30,7 +26,7 @@ impl RawMessageSerializerConfig {

/// The schema required by the serializer.
pub fn schema_requirement(&self) -> schema::Requirement {
schema::Requirement::empty().required_meaning(log_schema().message_key(), Kind::any())
get_serializer_schema_requirement()
}
}

Expand All @@ -49,18 +45,10 @@ impl Encoder<Event> for RawMessageSerializer {
type Error = vector_common::Error;

fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
let message_key = log_schema().message_key();

let log = event.as_log();

if let Some(bytes) = log
.get_by_meaning(message_key)
.or_else(|| log.get(message_key))
.map(|value| value.coerce_to_bytes())
{
if let Some(bytes) = log.get_message().map(|value| value.coerce_to_bytes()) {
buffer.put(bytes);
}

Ok(())
}
}
Expand Down
18 changes: 4 additions & 14 deletions lib/codecs/src/encoding/format/text.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
use crate::encoding::format::common::get_serializer_schema_requirement;
use bytes::{BufMut, BytesMut};
use tokio_util::codec::Encoder;
use vector_core::{
config::{log_schema, DataType},
event::Event,
schema,
};
use vrl::value::Kind;
use vector_core::{config::DataType, event::Event, schema};

use crate::MetricTagValues;

Expand Down Expand Up @@ -42,7 +38,7 @@ impl TextSerializerConfig {

/// The schema required by the serializer.
pub fn schema_requirement(&self) -> schema::Requirement {
schema::Requirement::empty().required_meaning(log_schema().message_key(), Kind::any())
get_serializer_schema_requirement()
}
}

Expand All @@ -67,15 +63,9 @@ impl Encoder<Event> for TextSerializer {
type Error = vector_common::Error;

fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
let message_key = log_schema().message_key();

match event {
Event::Log(log) => {
if let Some(bytes) = log
.get_by_meaning(message_key)
.or_else(|| log.get(message_key))
.map(|value| value.coerce_to_bytes())
{
if let Some(bytes) = log.get_message().map(|value| value.coerce_to_bytes()) {
buffer.put(bytes);
}
}
Expand Down
3 changes: 2 additions & 1 deletion lib/opentelemetry-proto/src/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use vector_core::{
config::{log_schema, LegacyKey, LogNamespace},
event::{Event, LogEvent},
};
use vrl::path::PathPrefix;
use vrl::value::Value;

use super::proto::{
Expand Down Expand Up @@ -94,7 +95,7 @@ impl ResourceLog {
LogNamespace::Legacy => {
let mut log = LogEvent::default();
if let Some(v) = self.log_record.body.and_then(|av| av.value) {
log.insert(log_schema().message_key(), v);
log.maybe_insert(PathPrefix::Event, log_schema().message_key(), v);
}
log
}
Expand Down
Loading

0 comments on commit 0f14c0d

Please sign in to comment.