diff --git a/lib/codecs/src/decoding/format/gelf.rs b/lib/codecs/src/decoding/format/gelf.rs index 14da4d2266438..e5b7dbe96c315 100644 --- a/lib/codecs/src/decoding/format/gelf.rs +++ b/lib/codecs/src/decoding/format/gelf.rs @@ -26,13 +26,14 @@ use crate::{gelf_fields::*, VALID_FIELD_REGEX}; /// of vector will still work with the new relaxed decoding. /// Config used to build a `GelfDeserializer`. -#[derive(Debug, Clone, Default, Deserialize, Serialize)] +#[configurable_component] +#[derive(Debug, Clone, Default)] pub struct GelfDeserializerConfig { + /// GELF-specific decoding options. #[serde( default, skip_serializing_if = "vector_core::serde::skip_serializing_if_default" )] - /// GELF-specific decoding options. pub gelf: GelfDeserializerOptions, } diff --git a/lib/codecs/src/decoding/format/json.rs b/lib/codecs/src/decoding/format/json.rs index 49980122be493..67e7bc624bbdf 100644 --- a/lib/codecs/src/decoding/format/json.rs +++ b/lib/codecs/src/decoding/format/json.rs @@ -17,14 +17,13 @@ use super::{default_lossy, Deserializer}; /// Config used to build a `JsonDeserializer`. #[configurable_component] -#[derive(Debug, Clone, PartialEq, Eq, Derivative)] -#[derivative(Default)] +#[derive(Debug, Clone, Default)] pub struct JsonDeserializerConfig { + /// JSON-specific decoding options. #[serde( default, skip_serializing_if = "vector_core::serde::skip_serializing_if_default" )] - /// JSON-specific decoding options. pub json: JsonDeserializerOptions, } diff --git a/lib/codecs/src/decoding/format/native_json.rs b/lib/codecs/src/decoding/format/native_json.rs index 7a8a1914015de..43e09ec86f5f6 100644 --- a/lib/codecs/src/decoding/format/native_json.rs +++ b/lib/codecs/src/decoding/format/native_json.rs @@ -1,6 +1,5 @@ use bytes::Bytes; use derivative::Derivative; -use serde::{Deserialize, Serialize}; use smallvec::{smallvec, SmallVec}; use vector_config::configurable_component; use vector_core::{config::DataType, event::Event, schema}; @@ -11,9 +10,14 @@ use super::{default_lossy, Deserializer}; use vector_core::config::LogNamespace; /// Config used to build a `NativeJsonDeserializer`. -#[derive(Debug, Clone, Default, Deserialize, Serialize)] +#[configurable_component] +#[derive(Debug, Clone, Default)] pub struct NativeJsonDeserializerConfig { /// Vector's native JSON-specific decoding options. + #[serde( + default, + skip_serializing_if = "vector_core::serde::skip_serializing_if_default" + )] pub native_json: NativeJsonDeserializerOptions, } diff --git a/lib/codecs/src/decoding/format/syslog.rs b/lib/codecs/src/decoding/format/syslog.rs index f9e173ba7d497..336d7c1aa232f 100644 --- a/lib/codecs/src/decoding/format/syslog.rs +++ b/lib/codecs/src/decoding/format/syslog.rs @@ -3,7 +3,6 @@ use chrono::{DateTime, Datelike, Utc}; use derivative::Derivative; use lookup::lookup_v2::parse_value_path; use lookup::{event_path, owned_value_path, OwnedTargetPath, OwnedValuePath, PathPrefix}; -use serde::{Deserialize, Serialize}; use smallvec::{smallvec, SmallVec}; use std::borrow::Cow; use std::collections::BTreeMap; @@ -20,10 +19,17 @@ use vrl::value::{kind::Collection, Kind}; use super::{default_lossy, Deserializer}; /// Config used to build a `SyslogDeserializer`. -#[derive(Debug, Clone, Default, Deserialize, Serialize)] +#[configurable_component] +#[derive(Debug, Clone, Default)] pub struct SyslogDeserializerConfig { + #[serde(skip)] source: Option<&'static str>, + /// Syslog-specific decoding options. + #[serde( + default, + skip_serializing_if = "vector_core::serde::skip_serializing_if_default" + )] pub syslog: SyslogDeserializerOptions, } diff --git a/lib/codecs/src/decoding/framing/character_delimited.rs b/lib/codecs/src/decoding/framing/character_delimited.rs index ef7e49c63f759..2c4d0ef918125 100644 --- a/lib/codecs/src/decoding/framing/character_delimited.rs +++ b/lib/codecs/src/decoding/framing/character_delimited.rs @@ -1,6 +1,5 @@ use bytes::{Buf, Bytes, BytesMut}; use memchr::memchr; -use serde::{Deserialize, Serialize}; use tokio_util::codec::Decoder; use tracing::{trace, warn}; use vector_config::configurable_component; @@ -8,7 +7,8 @@ use vector_config::configurable_component; use super::BoxedFramingError; /// Config used to build a `CharacterDelimitedDecoder`. -#[derive(Debug, Clone, Deserialize, Serialize)] +#[configurable_component] +#[derive(Debug, Clone)] pub struct CharacterDelimitedDecoderConfig { /// Options for the character delimited decoder. pub character_delimited: CharacterDelimitedDecoderOptions, diff --git a/lib/codecs/src/decoding/framing/newline_delimited.rs b/lib/codecs/src/decoding/framing/newline_delimited.rs index cc96d08d7ab40..6a1f2c81caca1 100644 --- a/lib/codecs/src/decoding/framing/newline_delimited.rs +++ b/lib/codecs/src/decoding/framing/newline_delimited.rs @@ -1,19 +1,19 @@ use bytes::{Bytes, BytesMut}; use derivative::Derivative; -use serde::{Deserialize, Serialize}; use tokio_util::codec::Decoder; use vector_config::configurable_component; use super::{BoxedFramingError, CharacterDelimitedDecoder}; /// Config used to build a `NewlineDelimitedDecoder`. -#[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq, Eq)] +#[configurable_component] +#[derive(Debug, Clone, Default, PartialEq, Eq)] pub struct NewlineDelimitedDecoderConfig { + /// Options for the newline delimited decoder. #[serde( default, skip_serializing_if = "vector_core::serde::skip_serializing_if_default" )] - /// Options for the newline delimited decoder. pub newline_delimited: NewlineDelimitedDecoderOptions, } diff --git a/lib/codecs/src/decoding/framing/octet_counting.rs b/lib/codecs/src/decoding/framing/octet_counting.rs index 0e9f2d15a4b4d..281a8b64dba03 100644 --- a/lib/codecs/src/decoding/framing/octet_counting.rs +++ b/lib/codecs/src/decoding/framing/octet_counting.rs @@ -2,7 +2,6 @@ use std::io; use bytes::{Buf, Bytes, BytesMut}; use derivative::Derivative; -use serde::{Deserialize, Serialize}; use tokio_util::codec::{LinesCodec, LinesCodecError}; use tracing::trace; use vector_config::configurable_component; @@ -10,7 +9,8 @@ use vector_config::configurable_component; use super::BoxedFramingError; /// Config used to build a `OctetCountingDecoder`. -#[derive(Debug, Clone, Default, Deserialize, Serialize)] +#[configurable_component] +#[derive(Debug, Clone, Default)] pub struct OctetCountingDecoderConfig { #[serde( default, diff --git a/lib/codecs/src/decoding/mod.rs b/lib/codecs/src/decoding/mod.rs index acf26f33a944f..96b3d04dee82c 100644 --- a/lib/codecs/src/decoding/mod.rs +++ b/lib/codecs/src/decoding/mod.rs @@ -73,9 +73,6 @@ impl StreamDecodingError for Error { /// Framing handles how events are separated when encoded in a raw byte form, where each event is /// a frame that must be prefixed, or delimited, in a way that marks where an event begins and /// ends within the byte stream. -// Unfortunately, copying options of the nested enum variants is necessary -// since `serde` doesn't allow `flatten`ing these: -// https://github.com/serde-rs/serde/issues/1402. #[configurable_component] #[derive(Clone, Debug)] #[serde(tag = "method", rename_all = "snake_case")] @@ -85,35 +82,18 @@ pub enum FramingConfig { Bytes, /// Byte frames which are delimited by a chosen character. - CharacterDelimited { - /// Options for the character delimited decoder. - character_delimited: CharacterDelimitedDecoderOptions, - }, + CharacterDelimited(CharacterDelimitedDecoderConfig), /// Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length. LengthDelimited, /// Byte frames which are delimited by a newline character. - NewlineDelimited { - #[serde( - default, - skip_serializing_if = "vector_core::serde::skip_serializing_if_default" - )] - /// Options for the newline delimited decoder. - newline_delimited: NewlineDelimitedDecoderOptions, - }, + NewlineDelimited(NewlineDelimitedDecoderConfig), /// Byte frames according to the [octet counting][octet_counting] format. /// /// [octet_counting]: https://tools.ietf.org/html/rfc6587#section-3.4.1 - OctetCounting { - #[serde( - default, - skip_serializing_if = "vector_core::serde::skip_serializing_if_default" - )] - /// Options for the octet counting decoder. - octet_counting: OctetCountingDecoderOptions, - }, + OctetCounting(OctetCountingDecoderConfig), } impl From for FramingConfig { @@ -124,9 +104,7 @@ impl From for FramingConfig { impl From for FramingConfig { fn from(config: CharacterDelimitedDecoderConfig) -> Self { - Self::CharacterDelimited { - character_delimited: config.character_delimited, - } + Self::CharacterDelimited(config) } } @@ -138,17 +116,13 @@ impl From for FramingConfig { impl From for FramingConfig { fn from(config: NewlineDelimitedDecoderConfig) -> Self { - Self::NewlineDelimited { - newline_delimited: config.newline_delimited, - } + Self::NewlineDelimited(config) } } impl From for FramingConfig { fn from(config: OctetCountingDecoderConfig) -> Self { - Self::OctetCounting { - octet_counting: config.octet_counting, - } + Self::OctetCounting(config) } } @@ -157,29 +131,12 @@ impl FramingConfig { pub fn build(&self) -> Framer { match self { FramingConfig::Bytes => Framer::Bytes(BytesDecoderConfig.build()), - FramingConfig::CharacterDelimited { - character_delimited, - } => Framer::CharacterDelimited( - CharacterDelimitedDecoderConfig { - character_delimited: character_delimited.clone(), - } - .build(), - ), + FramingConfig::CharacterDelimited(config) => Framer::CharacterDelimited(config.build()), FramingConfig::LengthDelimited => { Framer::LengthDelimited(LengthDelimitedDecoderConfig.build()) } - FramingConfig::NewlineDelimited { newline_delimited } => Framer::NewlineDelimited( - NewlineDelimitedDecoderConfig { - newline_delimited: newline_delimited.clone(), - } - .build(), - ), - FramingConfig::OctetCounting { octet_counting } => Framer::OctetCounting( - OctetCountingDecoderConfig { - octet_counting: octet_counting.clone(), - } - .build(), - ), + FramingConfig::NewlineDelimited(config) => Framer::NewlineDelimited(config.build()), + FramingConfig::OctetCounting(config) => Framer::OctetCounting(config.build()), } } } @@ -229,9 +186,6 @@ impl tokio_util::codec::Decoder for Framer { } /// Deserializer configuration. -// Unfortunately, copying options of the nested enum variants is necessary -// since `serde` doesn't allow `flatten`ing these: -// https://github.com/serde-rs/serde/issues/1402. #[configurable_component] #[derive(Clone, Debug)] #[serde(tag = "codec", rename_all = "snake_case")] @@ -244,14 +198,7 @@ pub enum DeserializerConfig { /// Decodes the raw bytes as [JSON][json]. /// /// [json]: https://www.json.org/ - Json { - /// JSON-specific decoding options. - #[serde( - default, - skip_serializing_if = "vector_core::serde::skip_serializing_if_default" - )] - json: JsonDeserializerOptions, - }, + Json(JsonDeserializerConfig), #[cfg(feature = "syslog")] /// Decodes the raw bytes as a Syslog message. @@ -261,14 +208,7 @@ pub enum DeserializerConfig { /// /// [rfc3164]: https://www.ietf.org/rfc/rfc3164.txt /// [rfc5424]: https://www.ietf.org/rfc/rfc5424.txt - Syslog { - /// Syslog-specific decoding options. - #[serde( - default, - skip_serializing_if = "vector_core::serde::skip_serializing_if_default" - )] - syslog: SyslogDeserializerOptions, - }, + Syslog(SyslogDeserializerConfig), /// Decodes the raw bytes as Vector’s [native Protocol Buffers format][vector_native_protobuf]. /// @@ -284,26 +224,12 @@ pub enum DeserializerConfig { /// /// [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue /// [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs - NativeJson { - /// Vector's native JSON-specific decoding options. - #[serde( - default, - skip_serializing_if = "vector_core::serde::skip_serializing_if_default" - )] - native_json: NativeJsonDeserializerOptions, - }, + NativeJson(NativeJsonDeserializerConfig), /// Decodes the raw bytes as a [GELF][gelf] message. /// /// [gelf]: https://docs.graylog.org/docs/gelf - Gelf { - /// GELF-specific decoding options. - #[serde( - default, - skip_serializing_if = "vector_core::serde::skip_serializing_if_default" - )] - gelf: GelfDeserializerOptions, - }, + Gelf(GelfDeserializerConfig), } impl From for DeserializerConfig { @@ -314,22 +240,32 @@ impl From for DeserializerConfig { impl From for DeserializerConfig { fn from(config: JsonDeserializerConfig) -> Self { - Self::Json { json: config.json } + Self::Json(config) } } #[cfg(feature = "syslog")] impl From for DeserializerConfig { fn from(config: SyslogDeserializerConfig) -> Self { - Self::Syslog { - syslog: config.syslog, - } + Self::Syslog(config) } } impl From for DeserializerConfig { fn from(config: GelfDeserializerConfig) -> Self { - Self::Gelf { gelf: config.gelf } + Self::Gelf(config) + } +} + +impl From for DeserializerConfig { + fn from(_: NativeDeserializerConfig) -> Self { + Self::Native + } +} + +impl From for DeserializerConfig { + fn from(config: NativeJsonDeserializerConfig) -> Self { + Self::NativeJson(config) } } @@ -338,20 +274,12 @@ impl DeserializerConfig { pub fn build(&self) -> Deserializer { match self { DeserializerConfig::Bytes => Deserializer::Bytes(BytesDeserializerConfig.build()), - DeserializerConfig::Json { json } => { - Deserializer::Json(JsonDeserializerConfig::new(json.clone()).build()) - } + DeserializerConfig::Json(config) => Deserializer::Json(config.build()), #[cfg(feature = "syslog")] - DeserializerConfig::Syslog { syslog } => { - Deserializer::Syslog(SyslogDeserializerConfig::new(syslog.clone()).build()) - } + DeserializerConfig::Syslog(config) => Deserializer::Syslog(config.build()), DeserializerConfig::Native => Deserializer::Native(NativeDeserializerConfig.build()), - DeserializerConfig::NativeJson { native_json } => Deserializer::NativeJson( - NativeJsonDeserializerConfig::new(native_json.clone()).build(), - ), - DeserializerConfig::Gelf { gelf } => { - Deserializer::Gelf(GelfDeserializerConfig::new(gelf.clone()).build()) - } + DeserializerConfig::NativeJson(config) => Deserializer::NativeJson(config.build()), + DeserializerConfig::Gelf(config) => Deserializer::Gelf(config.build()), } } @@ -360,15 +288,13 @@ impl DeserializerConfig { match self { DeserializerConfig::Native => FramingConfig::LengthDelimited, DeserializerConfig::Bytes - | DeserializerConfig::Json { .. } - | DeserializerConfig::Gelf { .. } - | DeserializerConfig::NativeJson { .. } => FramingConfig::NewlineDelimited { - newline_delimited: Default::default(), - }, + | DeserializerConfig::Json(_) + | DeserializerConfig::Gelf(_) + | DeserializerConfig::NativeJson(_) => { + FramingConfig::NewlineDelimited(Default::default()) + } #[cfg(feature = "syslog")] - DeserializerConfig::Syslog { .. } => FramingConfig::NewlineDelimited { - newline_delimited: Default::default(), - }, + DeserializerConfig::Syslog(_) => FramingConfig::NewlineDelimited(Default::default()), } } @@ -376,20 +302,12 @@ impl DeserializerConfig { pub fn output_type(&self) -> DataType { match self { DeserializerConfig::Bytes => BytesDeserializerConfig.output_type(), - DeserializerConfig::Json { json } => { - JsonDeserializerConfig::new(json.clone()).output_type() - } + DeserializerConfig::Json(config) => config.output_type(), #[cfg(feature = "syslog")] - DeserializerConfig::Syslog { syslog } => { - SyslogDeserializerConfig::new(syslog.clone()).output_type() - } + DeserializerConfig::Syslog(config) => config.output_type(), DeserializerConfig::Native => NativeDeserializerConfig.output_type(), - DeserializerConfig::NativeJson { native_json } => { - NativeJsonDeserializerConfig::new(native_json.clone()).output_type() - } - DeserializerConfig::Gelf { gelf } => { - GelfDeserializerConfig::new(gelf.clone()).output_type() - } + DeserializerConfig::NativeJson(config) => config.output_type(), + DeserializerConfig::Gelf(config) => config.output_type(), } } @@ -397,21 +315,12 @@ impl DeserializerConfig { pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition { match self { DeserializerConfig::Bytes => BytesDeserializerConfig.schema_definition(log_namespace), - DeserializerConfig::Json { json } => { - JsonDeserializerConfig::new(json.clone()).schema_definition(log_namespace) - } + DeserializerConfig::Json(config) => config.schema_definition(log_namespace), #[cfg(feature = "syslog")] - DeserializerConfig::Syslog { syslog } => { - SyslogDeserializerConfig::new(syslog.clone()).schema_definition(log_namespace) - } + DeserializerConfig::Syslog(config) => config.schema_definition(log_namespace), DeserializerConfig::Native => NativeDeserializerConfig.schema_definition(log_namespace), - DeserializerConfig::NativeJson { native_json } => { - NativeJsonDeserializerConfig::new(native_json.clone()) - .schema_definition(log_namespace) - } - DeserializerConfig::Gelf { gelf } => { - GelfDeserializerConfig::new(gelf.clone()).schema_definition(log_namespace) - } + DeserializerConfig::NativeJson(config) => config.schema_definition(log_namespace), + DeserializerConfig::Gelf(config) => config.schema_definition(log_namespace), } } @@ -419,31 +328,31 @@ impl DeserializerConfig { pub const fn content_type(&self, framer: &FramingConfig) -> &'static str { match (&self, framer) { ( - DeserializerConfig::Json { .. } | DeserializerConfig::NativeJson { .. }, - FramingConfig::NewlineDelimited { .. }, + DeserializerConfig::Json(_) | DeserializerConfig::NativeJson(_), + FramingConfig::NewlineDelimited(_), ) => "application/x-ndjson", ( - DeserializerConfig::Gelf { .. } - | DeserializerConfig::Json { .. } - | DeserializerConfig::NativeJson { .. }, - FramingConfig::CharacterDelimited { + DeserializerConfig::Gelf(_) + | DeserializerConfig::Json(_) + | DeserializerConfig::NativeJson(_), + FramingConfig::CharacterDelimited(CharacterDelimitedDecoderConfig { character_delimited: CharacterDelimitedDecoderOptions { delimiter: b',', max_length: Some(usize::MAX), }, - }, + }), ) => "application/json", (DeserializerConfig::Native, _) => "application/octet-stream", ( - DeserializerConfig::Json { .. } - | DeserializerConfig::NativeJson { .. } + DeserializerConfig::Json(_) + | DeserializerConfig::NativeJson(_) | DeserializerConfig::Bytes - | DeserializerConfig::Gelf { .. }, + | DeserializerConfig::Gelf(_), _, ) => "text/plain", #[cfg(feature = "syslog")] - (DeserializerConfig::Syslog { .. }, _) => "text/plain", + (DeserializerConfig::Syslog(_), _) => "text/plain", } } } diff --git a/lib/codecs/src/encoding/framing/character_delimited.rs b/lib/codecs/src/encoding/framing/character_delimited.rs index f93c279d61cf8..2e536976618b8 100644 --- a/lib/codecs/src/encoding/framing/character_delimited.rs +++ b/lib/codecs/src/encoding/framing/character_delimited.rs @@ -1,12 +1,12 @@ use bytes::{BufMut, BytesMut}; -use serde::{Deserialize, Serialize}; use tokio_util::codec::Encoder; use vector_config::configurable_component; use super::BoxedFramingError; /// Config used to build a `CharacterDelimitedEncoder`. -#[derive(Debug, Clone, Deserialize, Serialize)] +#[configurable_component] +#[derive(Debug, Clone, Eq, PartialEq)] pub struct CharacterDelimitedEncoderConfig { /// Options for the character delimited encoder. pub character_delimited: CharacterDelimitedEncoderOptions, diff --git a/lib/codecs/src/encoding/mod.rs b/lib/codecs/src/encoding/mod.rs index d798e8d7bf9d9..f9516411720d1 100644 --- a/lib/codecs/src/encoding/mod.rs +++ b/lib/codecs/src/encoding/mod.rs @@ -61,10 +61,7 @@ pub enum FramingConfig { Bytes, /// Event data is delimited by a single ASCII (7-bit) character. - CharacterDelimited { - /// Options for the character delimited encoder. - character_delimited: CharacterDelimitedEncoderOptions, - }, + CharacterDelimited(CharacterDelimitedEncoderConfig), /// Event data is prefixed with its length in bytes. /// @@ -83,9 +80,7 @@ impl From for FramingConfig { impl From for FramingConfig { fn from(config: CharacterDelimitedEncoderConfig) -> Self { - Self::CharacterDelimited { - character_delimited: config.character_delimited, - } + Self::CharacterDelimited(config) } } @@ -106,14 +101,7 @@ impl FramingConfig { pub fn build(&self) -> Framer { match self { FramingConfig::Bytes => Framer::Bytes(BytesEncoderConfig.build()), - FramingConfig::CharacterDelimited { - character_delimited, - } => Framer::CharacterDelimited( - CharacterDelimitedEncoderConfig { - character_delimited: character_delimited.clone(), - } - .build(), - ), + FramingConfig::CharacterDelimited(config) => Framer::CharacterDelimited(config.build()), FramingConfig::LengthDelimited => { Framer::LengthDelimited(LengthDelimitedEncoderConfig.build()) } @@ -201,10 +189,7 @@ pub enum SerializerConfig { /// /// This codec must be configured with fields to encode. /// - Csv( - /// Options for the CSV encoder. - CsvSerializerConfig, - ), + Csv(CsvSerializerConfig), /// Encodes an event as a [GELF][gelf] message. /// @@ -214,10 +199,7 @@ pub enum SerializerConfig { /// Encodes an event as [JSON][json]. /// /// [json]: https://www.json.org/ - Json( - /// Encoding options specific to the text serializer. - JsonSerializerConfig, - ), + Json(JsonSerializerConfig), /// Encodes an event as a [logfmt][logfmt] message. /// @@ -257,10 +239,7 @@ pub enum SerializerConfig { /// Be careful if you are modifying your log events (for example, by using a `remap` /// transform) and removing the message field while doing additional parsing on it, as this /// could lead to the encoding emitting empty strings for the given event. - Text( - /// Encoding options specific to the text serializer. - TextSerializerConfig, - ), + Text(TextSerializerConfig), } impl From for SerializerConfig { diff --git a/src/components/validation/resources/mod.rs b/src/components/validation/resources/mod.rs index a7e99bc998a17..2b9fc3c542ccb 100644 --- a/src/components/validation/resources/mod.rs +++ b/src/components/validation/resources/mod.rs @@ -2,7 +2,7 @@ mod event; mod http; use codecs::{ - decoding::{self, DeserializerConfig, NewlineDelimitedDecoderOptions}, + decoding::{self, DeserializerConfig}, encoding::{ self, Framer, FramingConfig, JsonSerializerConfig, SerializerConfig, TextSerializerConfig, }, @@ -160,20 +160,18 @@ fn deserializer_config_to_serializer(config: &DeserializerConfig) -> encoding::S fn decoder_framing_to_encoding_framer(framing: &decoding::FramingConfig) -> encoding::Framer { let framing_config = match framing { decoding::FramingConfig::Bytes => encoding::FramingConfig::Bytes, - decoding::FramingConfig::CharacterDelimited { - character_delimited, - } => encoding::FramingConfig::CharacterDelimited { - character_delimited: encoding::CharacterDelimitedEncoderOptions { - delimiter: character_delimited.delimiter, - }, - }, - decoding::FramingConfig::LengthDelimited => encoding::FramingConfig::LengthDelimited, - decoding::FramingConfig::NewlineDelimited { .. } => { - encoding::FramingConfig::NewlineDelimited + decoding::FramingConfig::CharacterDelimited(config) => { + encoding::FramingConfig::CharacterDelimited(encoding::CharacterDelimitedEncoderConfig { + character_delimited: encoding::CharacterDelimitedEncoderOptions { + delimiter: config.character_delimited.delimiter, + }, + }) } + decoding::FramingConfig::LengthDelimited => encoding::FramingConfig::LengthDelimited, + decoding::FramingConfig::NewlineDelimited(_) => encoding::FramingConfig::NewlineDelimited, // TODO: There's no equivalent octet counting framer for encoding... although // there's no particular reason that would make it hard to write. - decoding::FramingConfig::OctetCounting { .. } => todo!(), + decoding::FramingConfig::OctetCounting(_) => todo!(), }; framing_config.build() @@ -183,17 +181,11 @@ fn serializer_config_to_deserializer(config: &SerializerConfig) -> decoding::Des let deserializer_config = match config { SerializerConfig::Avro { .. } => todo!(), SerializerConfig::Csv { .. } => todo!(), - SerializerConfig::Gelf => DeserializerConfig::Gelf { - gelf: Default::default(), - }, - SerializerConfig::Json(_) => DeserializerConfig::Json { - json: Default::default(), - }, + SerializerConfig::Gelf => DeserializerConfig::Gelf(Default::default()), + SerializerConfig::Json(_) => DeserializerConfig::Json(Default::default()), SerializerConfig::Logfmt => todo!(), SerializerConfig::Native => DeserializerConfig::Native, - SerializerConfig::NativeJson => DeserializerConfig::NativeJson { - native_json: Default::default(), - }, + SerializerConfig::NativeJson => DeserializerConfig::NativeJson(Default::default()), SerializerConfig::RawMessage | SerializerConfig::Text(_) => DeserializerConfig::Bytes, }; @@ -203,18 +195,18 @@ fn serializer_config_to_deserializer(config: &SerializerConfig) -> decoding::Des fn encoder_framing_to_decoding_framer(framing: encoding::FramingConfig) -> decoding::Framer { let framing_config = match framing { encoding::FramingConfig::Bytes => decoding::FramingConfig::Bytes, - encoding::FramingConfig::CharacterDelimited { - character_delimited, - } => decoding::FramingConfig::CharacterDelimited { - character_delimited: decoding::CharacterDelimitedDecoderOptions { - delimiter: character_delimited.delimiter, - max_length: None, - }, - }, + encoding::FramingConfig::CharacterDelimited(config) => { + decoding::FramingConfig::CharacterDelimited(decoding::CharacterDelimitedDecoderConfig { + character_delimited: decoding::CharacterDelimitedDecoderOptions { + delimiter: config.character_delimited.delimiter, + max_length: None, + }, + }) + } encoding::FramingConfig::LengthDelimited => decoding::FramingConfig::LengthDelimited, - encoding::FramingConfig::NewlineDelimited => decoding::FramingConfig::NewlineDelimited { - newline_delimited: NewlineDelimitedDecoderOptions::default(), - }, + encoding::FramingConfig::NewlineDelimited => { + decoding::FramingConfig::NewlineDelimited(Default::default()) + } }; framing_config.build() diff --git a/src/sources/aws_s3/mod.rs b/src/sources/aws_s3/mod.rs index 17ec4cb45cd82..02a6cbcf6a9b5 100644 --- a/src/sources/aws_s3/mod.rs +++ b/src/sources/aws_s3/mod.rs @@ -3,7 +3,7 @@ use std::{convert::TryInto, io::ErrorKind}; use async_compression::tokio::bufread; use aws_sdk_s3::types::ByteStream; use codecs::decoding::{DeserializerConfig, FramingConfig, NewlineDelimitedDecoderOptions}; -use codecs::BytesDeserializerConfig; +use codecs::{BytesDeserializerConfig, NewlineDelimitedDecoderConfig}; use futures::{stream, stream::StreamExt, TryStreamExt}; use lookup::owned_value_path; use snafu::Snafu; @@ -133,9 +133,9 @@ pub struct AwsS3Config { const fn default_framing() -> FramingConfig { // This is used for backwards compatibility. It used to be the only (hardcoded) option. - FramingConfig::NewlineDelimited { + FramingConfig::NewlineDelimited(NewlineDelimitedDecoderConfig { newline_delimited: NewlineDelimitedDecoderOptions { max_length: None }, - } + }) } impl_generate_config_from_default!(AwsS3Config); diff --git a/src/sources/datadog_agent/tests.rs b/src/sources/datadog_agent/tests.rs index ed3c5d3a4d9d4..4fc3ef4321ee1 100644 --- a/src/sources/datadog_agent/tests.rs +++ b/src/sources/datadog_agent/tests.rs @@ -1595,9 +1595,7 @@ fn test_config_outputs() { ( "json / single output", TestCase { - decoding: DeserializerConfig::Json { - json: Default::default(), - }, + decoding: DeserializerConfig::Json(Default::default()), multiple_outputs: false, want: HashMap::from([( None, @@ -1622,9 +1620,7 @@ fn test_config_outputs() { ( "json / multiple output", TestCase { - decoding: DeserializerConfig::Json { - json: Default::default(), - }, + decoding: DeserializerConfig::Json(Default::default()), multiple_outputs: true, want: HashMap::from([ ( @@ -1666,9 +1662,7 @@ fn test_config_outputs() { ( "syslog / single output", TestCase { - decoding: DeserializerConfig::Syslog { - syslog: Default::default(), - }, + decoding: DeserializerConfig::Syslog(Default::default()), multiple_outputs: false, want: HashMap::from([( None, @@ -1743,9 +1737,7 @@ fn test_config_outputs() { ( "syslog / multiple output", TestCase { - decoding: DeserializerConfig::Syslog { - syslog: Default::default(), - }, + decoding: DeserializerConfig::Syslog(Default::default()), multiple_outputs: true, want: HashMap::from([ ( diff --git a/src/sources/http_client/client.rs b/src/sources/http_client/client.rs index 6ba6ea77220f1..55036ccde446b 100644 --- a/src/sources/http_client/client.rs +++ b/src/sources/http_client/client.rs @@ -235,9 +235,7 @@ impl ValidatableComponent for HttpClientConfig { let config = Self { endpoint: uri.to_string(), interval: Duration::from_secs(1), - decoding: DeserializerConfig::Json { - json: Default::default(), - }, + decoding: DeserializerConfig::Json(Default::default()), ..Default::default() }; diff --git a/src/sources/http_client/integration_tests.rs b/src/sources/http_client/integration_tests.rs index 5927265abd2bf..f7b04403e45d3 100644 --- a/src/sources/http_client/integration_tests.rs +++ b/src/sources/http_client/integration_tests.rs @@ -96,9 +96,7 @@ async fn collected_logs_json() { endpoint: format!("{}/logs/json.json", dufs_address()), interval: INTERVAL, query: HashMap::new(), - decoding: DeserializerConfig::Json { - json: Default::default(), - }, + decoding: DeserializerConfig::Json(Default::default()), framing: default_framing_message_based(), headers: HashMap::new(), method: HttpMethod::Get, @@ -122,9 +120,7 @@ async fn collected_metrics_native_json() { endpoint: format!("{}/metrics/native.json", dufs_address()), interval: INTERVAL, query: HashMap::new(), - decoding: DeserializerConfig::NativeJson { - native_json: Default::default(), - }, + decoding: DeserializerConfig::NativeJson(Default::default()), framing: default_framing_message_based(), headers: HashMap::new(), method: HttpMethod::Get, @@ -153,9 +149,7 @@ async fn collected_trace_native_json() { endpoint: format!("{}/traces/native.json", dufs_address()), interval: INTERVAL, query: HashMap::new(), - decoding: DeserializerConfig::NativeJson { - native_json: Default::default(), - }, + decoding: DeserializerConfig::NativeJson(Default::default()), framing: default_framing_message_based(), headers: HashMap::new(), method: HttpMethod::Get, @@ -179,9 +173,7 @@ async fn unauthorized_no_auth() { endpoint: format!("{}/logs/json.json", dufs_auth_address()), interval: INTERVAL, query: HashMap::new(), - decoding: DeserializerConfig::Json { - json: Default::default(), - }, + decoding: DeserializerConfig::Json(Default::default()), framing: default_framing_message_based(), headers: HashMap::new(), method: HttpMethod::Get, @@ -199,9 +191,7 @@ async fn unauthorized_wrong_auth() { endpoint: format!("{}/logs/json.json", dufs_auth_address()), interval: INTERVAL, query: HashMap::new(), - decoding: DeserializerConfig::Json { - json: Default::default(), - }, + decoding: DeserializerConfig::Json(Default::default()), framing: default_framing_message_based(), headers: HashMap::new(), method: HttpMethod::Get, @@ -222,9 +212,7 @@ async fn authorized() { endpoint: format!("{}/logs/json.json", dufs_auth_address()), interval: INTERVAL, query: HashMap::new(), - decoding: DeserializerConfig::Json { - json: Default::default(), - }, + decoding: DeserializerConfig::Json(Default::default()), framing: default_framing_message_based(), headers: HashMap::new(), method: HttpMethod::Get, @@ -245,9 +233,7 @@ async fn tls_invalid_ca() { endpoint: format!("{}/logs/json.json", dufs_https_address()), interval: INTERVAL, query: HashMap::new(), - decoding: DeserializerConfig::Json { - json: Default::default(), - }, + decoding: DeserializerConfig::Json(Default::default()), framing: default_framing_message_based(), headers: HashMap::new(), method: HttpMethod::Get, @@ -268,9 +254,7 @@ async fn tls_valid() { endpoint: format!("{}/logs/json.json", dufs_https_address()), interval: INTERVAL, query: HashMap::new(), - decoding: DeserializerConfig::Json { - json: Default::default(), - }, + decoding: DeserializerConfig::Json(Default::default()), framing: default_framing_message_based(), headers: HashMap::new(), method: HttpMethod::Get, @@ -292,9 +276,7 @@ async fn shutdown() { endpoint: format!("{}/logs/json.json", dufs_address()), interval: INTERVAL, query: HashMap::new(), - decoding: DeserializerConfig::Json { - json: Default::default(), - }, + decoding: DeserializerConfig::Json(Default::default()), framing: default_framing_message_based(), headers: HashMap::new(), method: HttpMethod::Get, diff --git a/src/sources/http_client/tests.rs b/src/sources/http_client/tests.rs index bf1e6b007511a..ecd799a73696b 100644 --- a/src/sources/http_client/tests.rs +++ b/src/sources/http_client/tests.rs @@ -1,13 +1,11 @@ +use codecs::CharacterDelimitedDecoderConfig; use std::collections::HashMap; use tokio::time::Duration; use warp::{http::HeaderMap, Filter}; use crate::sources::util::http::HttpMethod; use crate::{serde::default_decoding, serde::default_framing_message_based}; -use codecs::decoding::{ - CharacterDelimitedDecoderOptions, DeserializerConfig, FramingConfig, - NewlineDelimitedDecoderOptions, -}; +use codecs::decoding::{CharacterDelimitedDecoderOptions, DeserializerConfig, FramingConfig}; use vector_core::event::Event; use super::HttpClientConfig; @@ -78,12 +76,8 @@ async fn json_decoding_newline_delimited() { endpoint: format!("http://{}/endpoint", in_addr), interval: INTERVAL, query: HashMap::new(), - decoding: DeserializerConfig::Json { - json: Default::default(), - }, - framing: FramingConfig::NewlineDelimited { - newline_delimited: NewlineDelimitedDecoderOptions::default(), - }, + decoding: DeserializerConfig::Json(Default::default()), + framing: FramingConfig::NewlineDelimited(Default::default()), headers: HashMap::new(), method: HttpMethod::Get, tls: None, @@ -110,15 +104,13 @@ async fn json_decoding_character_delimited() { endpoint: format!("http://{}/endpoint", in_addr), interval: INTERVAL, query: HashMap::new(), - decoding: DeserializerConfig::Json { - json: Default::default(), - }, - framing: FramingConfig::CharacterDelimited { + decoding: DeserializerConfig::Json(Default::default()), + framing: FramingConfig::CharacterDelimited(CharacterDelimitedDecoderConfig { character_delimited: CharacterDelimitedDecoderOptions { delimiter: b',', max_length: Some(usize::MAX), }, - }, + }), headers: HashMap::new(), method: HttpMethod::Get, tls: None, @@ -150,9 +142,7 @@ async fn request_query_applied() { vec!["val1".to_string(), "val2".to_string()], ), ]), - decoding: DeserializerConfig::Json { - json: Default::default(), - }, + decoding: DeserializerConfig::Json(Default::default()), framing: default_framing_message_based(), headers: HashMap::new(), method: HttpMethod::Get, diff --git a/src/sources/http_server.rs b/src/sources/http_server.rs index 50f9b3fd0bd32..2bd2afecee34f 100644 --- a/src/sources/http_server.rs +++ b/src/sources/http_server.rs @@ -256,9 +256,7 @@ impl_generate_config_from_default!(SimpleHttpConfig); impl ValidatableComponent for SimpleHttpConfig { fn validation_configuration() -> ValidationConfiguration { let config = Self { - decoding: Some(DeserializerConfig::Json { - json: Default::default(), - }), + decoding: Some(DeserializerConfig::Json(Default::default())), ..Default::default() }; diff --git a/src/sources/socket/unix.rs b/src/sources/socket/unix.rs index 2144a8c8e0278..4d09e128a2c94 100644 --- a/src/sources/socket/unix.rs +++ b/src/sources/socket/unix.rs @@ -127,11 +127,9 @@ pub(super) fn unix_datagram( let max_length = config .framing .and_then(|framing| match framing { - FramingConfig::CharacterDelimited { - character_delimited, - } => character_delimited.max_length, - FramingConfig::NewlineDelimited { newline_delimited } => newline_delimited.max_length, - FramingConfig::OctetCounting { octet_counting } => octet_counting.max_length, + FramingConfig::CharacterDelimited(config) => config.character_delimited.max_length, + FramingConfig::NewlineDelimited(config) => config.newline_delimited.max_length, + FramingConfig::OctetCounting(config) => config.octet_counting.max_length, _ => None, }) .unwrap_or_else(crate::serde::default_max_length);