diff --git a/lib/codecs/src/decoding/format/protobuf.rs b/lib/codecs/src/decoding/format/protobuf.rs index ff337a18f6fe8..71bd98c1567fa 100644 --- a/lib/codecs/src/decoding/format/protobuf.rs +++ b/lib/codecs/src/decoding/format/protobuf.rs @@ -33,9 +33,8 @@ pub struct ProtobufDeserializerConfig { impl ProtobufDeserializerConfig { /// Build the `ProtobufDeserializer` from this configuration. - pub fn build(&self) -> ProtobufDeserializer { - // TODO return a Result instead. - ProtobufDeserializer::try_from(self).unwrap() + pub fn build(&self) -> vector_common::Result { + ProtobufDeserializer::try_from(self) } /// Return the type of event build by this deserializer. diff --git a/lib/codecs/src/decoding/mod.rs b/lib/codecs/src/decoding/mod.rs index 0539a6a67d2cd..008de9d9d245e 100644 --- a/lib/codecs/src/decoding/mod.rs +++ b/lib/codecs/src/decoding/mod.rs @@ -277,16 +277,18 @@ impl From for DeserializerConfig { impl DeserializerConfig { /// Build the `Deserializer` from this configuration. - pub fn build(&self) -> Deserializer { + pub fn build(&self) -> vector_common::Result { match self { - DeserializerConfig::Bytes => Deserializer::Bytes(BytesDeserializerConfig.build()), - DeserializerConfig::Json(config) => Deserializer::Json(config.build()), - DeserializerConfig::Protobuf(config) => Deserializer::Protobuf(config.build()), + DeserializerConfig::Bytes => Ok(Deserializer::Bytes(BytesDeserializerConfig.build())), + DeserializerConfig::Json(config) => Ok(Deserializer::Json(config.build())), + DeserializerConfig::Protobuf(config) => Ok(Deserializer::Protobuf(config.build()?)), #[cfg(feature = "syslog")] - DeserializerConfig::Syslog(config) => Deserializer::Syslog(config.build()), - DeserializerConfig::Native => Deserializer::Native(NativeDeserializerConfig.build()), - DeserializerConfig::NativeJson(config) => Deserializer::NativeJson(config.build()), - DeserializerConfig::Gelf(config) => Deserializer::Gelf(config.build()), + DeserializerConfig::Syslog(config) => Ok(Deserializer::Syslog(config.build())), + DeserializerConfig::Native => { + Ok(Deserializer::Native(NativeDeserializerConfig.build())) + } + DeserializerConfig::NativeJson(config) => Ok(Deserializer::NativeJson(config.build())), + DeserializerConfig::Gelf(config) => Ok(Deserializer::Gelf(config.build())), } } diff --git a/src/codecs/decoding/config.rs b/src/codecs/decoding/config.rs index 50a5b6ed4e7f8..b4a01245bcfe0 100644 --- a/src/codecs/decoding/config.rs +++ b/src/codecs/decoding/config.rs @@ -41,13 +41,13 @@ impl DecodingConfig { } /// Builds a `Decoder` from the provided configuration. - pub fn build(&self) -> Decoder { + pub fn build(&self) -> vector_common::Result { // Build the framer. let framer = self.framing.build(); // Build the deserializer. - let deserializer = self.decoding.build(); + let deserializer = self.decoding.build()?; - Decoder::new(framer, deserializer).with_log_namespace(self.log_namespace) + Ok(Decoder::new(framer, deserializer).with_log_namespace(self.log_namespace)) } } diff --git a/src/components/validation/resources/http.rs b/src/components/validation/resources/http.rs index ec1a8e3817d8a..7fd6105b579e6 100644 --- a/src/components/validation/resources/http.rs +++ b/src/components/validation/resources/http.rs @@ -62,12 +62,15 @@ impl HttpResourceConfig { codec: ResourceCodec, output_tx: mpsc::Sender>, task_coordinator: &TaskCoordinator, - ) { + ) -> vector_common::Result<()> { match direction { // We'll pull data from the sink. - ResourceDirection::Pull => { - spawn_output_http_client(self, codec, output_tx, task_coordinator) - } + ResourceDirection::Pull => Ok(spawn_output_http_client( + self, + codec, + output_tx, + task_coordinator, + )), // The sink will push data to us. ResourceDirection::Push => { spawn_output_http_server(self, codec, output_tx, task_coordinator) @@ -227,12 +230,12 @@ fn spawn_output_http_server( codec: ResourceCodec, output_tx: mpsc::Sender>, task_coordinator: &TaskCoordinator, -) { +) -> vector_common::Result<()> { // This HTTP server will wait for events to be sent by a sink, and collect them and send them on // via an output sender. We accept/collect events until we're told to shutdown. // First, we'll build and spawn our HTTP server. - let decoder = codec.into_decoder(); + let decoder = codec.into_decoder()?; let (_, http_server_shutdown_tx) = spawn_http_server(task_coordinator, &config, move |request| { @@ -276,6 +279,7 @@ fn spawn_output_http_server( debug!("HTTP server external output resource completed."); }); + Ok(()) } /// Spawns an HTTP client that pulls events by making requests to an HTTP server driven by a sink. diff --git a/src/components/validation/resources/mod.rs b/src/components/validation/resources/mod.rs index 0ed2060b60be0..3bb2b48697948 100644 --- a/src/components/validation/resources/mod.rs +++ b/src/components/validation/resources/mod.rs @@ -96,24 +96,24 @@ impl ResourceCodec { /// /// The decoder is generated as an inverse to the input codec: if an encoding configuration was /// given, we generate a decoder that satisfies that encoding configuration, and vice versa. - pub fn into_decoder(&self) -> Decoder { + pub fn into_decoder(&self) -> vector_common::Result { let (framer, deserializer) = match self { Self::Decoding(config) => return config.build(), Self::Encoding(config) => ( encoder_framing_to_decoding_framer(config.config().default_stream_framing()), - serializer_config_to_deserializer(config.config()), + serializer_config_to_deserializer(config.config())?, ), Self::EncodingWithFraming(config) => { let (maybe_framing, serializer) = config.config(); let framing = maybe_framing.clone().unwrap_or(FramingConfig::Bytes); ( encoder_framing_to_decoding_framer(framing), - serializer_config_to_deserializer(serializer), + serializer_config_to_deserializer(serializer)?, ) } }; - Decoder::new(framer, deserializer) + Ok(Decoder::new(framer, deserializer)) } } @@ -178,7 +178,9 @@ fn decoder_framing_to_encoding_framer(framing: &decoding::FramingConfig) -> enco framing_config.build() } -fn serializer_config_to_deserializer(config: &SerializerConfig) -> decoding::Deserializer { +fn serializer_config_to_deserializer( + config: &SerializerConfig, +) -> vector_common::Result { let deserializer_config = match config { SerializerConfig::Avro { .. } => todo!(), SerializerConfig::Csv { .. } => todo!(), @@ -311,7 +313,7 @@ impl ExternalResource { self, output_tx: mpsc::Sender>, task_coordinator: &TaskCoordinator, - ) { + ) -> vector_common::Result<()> { match self.definition { ResourceDefinition::Http(http_config) => { http_config.spawn_as_output(self.direction, self.codec, output_tx, task_coordinator) diff --git a/src/components/validation/runner/mod.rs b/src/components/validation/runner/mod.rs index 4af5bf22b016a..f38bdc75a3f41 100644 --- a/src/components/validation/runner/mod.rs +++ b/src/components/validation/runner/mod.rs @@ -191,7 +191,7 @@ impl Runner { } } - pub async fn run_validation(self) -> Result, String> { + pub async fn run_validation(self) -> Result, vector_common::Error> { // Initialize our test environment. initialize_test_environment(); @@ -251,7 +251,7 @@ impl Runner { &self.configuration, &input_task_coordinator, &output_task_coordinator, - ); + )?; let input_tx = runner_input.into_sender(controlled_edges.input); let output_rx = runner_output.into_receiver(controlled_edges.output); debug!("External resource (if any) and controlled edges built and spawned."); @@ -413,7 +413,7 @@ fn build_external_resource( configuration: &ValidationConfiguration, input_task_coordinator: &TaskCoordinator, output_task_coordinator: &TaskCoordinator, -) -> (RunnerInput, RunnerOutput, Option>) { +) -> Result<(RunnerInput, RunnerOutput, Option>), vector_common::Error> { let component_type = configuration.component_type(); let maybe_external_resource = configuration.external_resource(); let maybe_encoder = maybe_external_resource @@ -430,15 +430,15 @@ fn build_external_resource( maybe_external_resource.expect("a source must always have an external resource"); resource.spawn_as_input(rx, input_task_coordinator); - ( + Ok(( RunnerInput::External(tx), RunnerOutput::Controlled, maybe_encoder, - ) + )) } ComponentType::Transform => { // Transforms have no external resources. - (RunnerInput::Controlled, RunnerOutput::Controlled, None) + Ok((RunnerInput::Controlled, RunnerOutput::Controlled, None)) } ComponentType::Sink => { // As an external resource for a sink, we create a channel that the validation runner @@ -448,13 +448,13 @@ fn build_external_resource( let (tx, rx) = mpsc::channel(1024); let resource = maybe_external_resource.expect("a sink must always have an external resource"); - resource.spawn_as_output(tx, output_task_coordinator); + resource.spawn_as_output(tx, output_task_coordinator)?; - ( + Ok(( RunnerInput::Controlled, RunnerOutput::External(rx), maybe_encoder, - ) + )) } } } diff --git a/src/sources/amqp.rs b/src/sources/amqp.rs index 578584808f451..f6a9be1a46f09 100644 --- a/src/sources/amqp.rs +++ b/src/sources/amqp.rs @@ -127,7 +127,7 @@ fn default_offset_key() -> OptionalValuePath { impl_generate_config_from_default!(AmqpSourceConfig); impl AmqpSourceConfig { - fn decoder(&self, log_namespace: LogNamespace) -> Decoder { + fn decoder(&self, log_namespace: LogNamespace) -> vector_common::Result { DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace).build() } } @@ -317,7 +317,8 @@ async fn receive_event( msg: Delivery, ) -> Result<(), ()> { let payload = Cursor::new(Bytes::copy_from_slice(&msg.data)); - let mut stream = FramedRead::new(payload, config.decoder(log_namespace)); + let decoder = config.decoder(log_namespace).map_err(|_e| ())?; + let mut stream = FramedRead::new(payload, decoder); // Extract timestamp from AMQP message let timestamp = msg diff --git a/src/sources/aws_kinesis_firehose/mod.rs b/src/sources/aws_kinesis_firehose/mod.rs index f5f43deb19f9b..add2787d48037 100644 --- a/src/sources/aws_kinesis_firehose/mod.rs +++ b/src/sources/aws_kinesis_firehose/mod.rs @@ -143,7 +143,8 @@ impl SourceConfig for AwsKinesisFirehoseConfig { async fn build(&self, cx: SourceContext) -> crate::Result { let log_namespace = cx.log_namespace(self.log_namespace); let decoder = - DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace).build(); + DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace) + .build()?; let acknowledgements = cx.do_acknowledgements(self.acknowledgements); diff --git a/src/sources/aws_s3/mod.rs b/src/sources/aws_s3/mod.rs index 0890a3869467a..7d8618437848f 100644 --- a/src/sources/aws_s3/mod.rs +++ b/src/sources/aws_s3/mod.rs @@ -241,7 +241,8 @@ impl AwsS3Config { .await?; let decoder = - DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace).build(); + DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace) + .build()?; match self.sqs { Some(ref sqs) => { diff --git a/src/sources/aws_sqs/config.rs b/src/sources/aws_sqs/config.rs index a4f8bbb7629c0..604cb4a6bb3b4 100644 --- a/src/sources/aws_sqs/config.rs +++ b/src/sources/aws_sqs/config.rs @@ -111,7 +111,8 @@ impl SourceConfig for AwsSqsConfig { let client = self.build_client(&cx).await?; let decoder = - DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace).build(); + DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace) + .build()?; let acknowledgements = cx.do_acknowledgements(self.acknowledgements); Ok(Box::pin( diff --git a/src/sources/aws_sqs/source.rs b/src/sources/aws_sqs/source.rs index 1e7659a30cec0..0f69b0740f8f1 100644 --- a/src/sources/aws_sqs/source.rs +++ b/src/sources/aws_sqs/source.rs @@ -245,7 +245,8 @@ mod tests { config.decoding, LogNamespace::Vector, ) - .build(), + .build() + .unwrap(), "aws_sqs", b"test", Some(now), @@ -297,7 +298,8 @@ mod tests { config.decoding, LogNamespace::Legacy, ) - .build(), + .build() + .unwrap(), "aws_sqs", b"test", Some(now), diff --git a/src/sources/datadog_agent/mod.rs b/src/sources/datadog_agent/mod.rs index 04614f1ac25c9..becaf033a9bb1 100644 --- a/src/sources/datadog_agent/mod.rs +++ b/src/sources/datadog_agent/mod.rs @@ -156,7 +156,8 @@ impl SourceConfig for DatadogAgentConfig { .clone(); let decoder = - DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace).build(); + DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace) + .build()?; let tls = MaybeTlsSettings::from_config(&self.tls, true)?; let source = DatadogAgentSource::new( diff --git a/src/sources/demo_logs.rs b/src/sources/demo_logs.rs index 35044dd80a0bd..1b391c553eb29 100644 --- a/src/sources/demo_logs.rs +++ b/src/sources/demo_logs.rs @@ -292,7 +292,8 @@ impl SourceConfig for DemoLogsConfig { self.format.validate()?; let decoder = - DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace).build(); + DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace) + .build()?; Ok(Box::pin(demo_logs_source( self.interval, self.count, @@ -361,7 +362,8 @@ mod tests { default_decoding(), LogNamespace::Legacy, ) - .build(); + .build() + .unwrap(); demo_logs_source( config.interval, config.count, diff --git a/src/sources/exec/mod.rs b/src/sources/exec/mod.rs index cbf2ac1d4b331..7e254ddd05a8b 100644 --- a/src/sources/exec/mod.rs +++ b/src/sources/exec/mod.rs @@ -234,7 +234,7 @@ impl SourceConfig for ExecConfig { .clone() .unwrap_or_else(|| self.decoding.default_stream_framing()); let decoder = - DecodingConfig::new(framing, self.decoding.clone(), LogNamespace::Legacy).build(); + DecodingConfig::new(framing, self.decoding.clone(), LogNamespace::Legacy).build()?; match &self.mode { Mode::Scheduled => { diff --git a/src/sources/file_descriptors/mod.rs b/src/sources/file_descriptors/mod.rs index 3f7286b3eb38f..2d2ec2979c47e 100644 --- a/src/sources/file_descriptors/mod.rs +++ b/src/sources/file_descriptors/mod.rs @@ -62,7 +62,7 @@ pub trait FileDescriptorConfig: NamedComponent { let framing = self .framing() .unwrap_or_else(|| decoding.default_stream_framing()); - let decoder = DecodingConfig::new(framing, decoding, log_namespace).build(); + let decoder = DecodingConfig::new(framing, decoding, log_namespace).build()?; let (sender, receiver) = mpsc::channel(1024); diff --git a/src/sources/gcp_pubsub.rs b/src/sources/gcp_pubsub.rs index b92062553f4cd..29128200d9e6b 100644 --- a/src/sources/gcp_pubsub.rs +++ b/src/sources/gcp_pubsub.rs @@ -314,7 +314,7 @@ impl SourceConfig for PubsubConfig { self.decoding.clone(), log_namespace, ) - .build(), + .build()?, acknowledgements: cx.do_acknowledgements(self.acknowledgements), shutdown: cx.shutdown, out: cx.out, diff --git a/src/sources/heroku_logs.rs b/src/sources/heroku_logs.rs index fe55fad2037d1..4ffb1cdc5169d 100644 --- a/src/sources/heroku_logs.rs +++ b/src/sources/heroku_logs.rs @@ -163,7 +163,8 @@ impl SourceConfig for LogplexConfig { let log_namespace = cx.log_namespace(self.log_namespace); let decoder = - DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace).build(); + DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace) + .build()?; let source = LogplexSource { query_parameters: self.query_parameters.clone(), diff --git a/src/sources/http_client/client.rs b/src/sources/http_client/client.rs index 4fa3773faeaba..a6c80782d5f44 100644 --- a/src/sources/http_client/client.rs +++ b/src/sources/http_client/client.rs @@ -194,7 +194,7 @@ impl SourceConfig for HttpClientConfig { let log_namespace = cx.log_namespace(self.log_namespace); // build the decoder - let decoder = self.get_decoding_config(Some(log_namespace)).build(); + let decoder = self.get_decoding_config(Some(log_namespace)).build()?; let content_type = self.decoding.content_type(&self.framing).to_string(); diff --git a/src/sources/http_server.rs b/src/sources/http_server.rs index be3b349df94c0..89da17c13e333 100644 --- a/src/sources/http_server.rs +++ b/src/sources/http_server.rs @@ -314,7 +314,7 @@ fn remove_duplicates(mut list: Vec, list_name: &str) -> Vec { #[typetag::serde(name = "http_server")] impl SourceConfig for SimpleHttpConfig { async fn build(&self, cx: SourceContext) -> crate::Result { - let decoder = self.get_decoding_config()?.build(); + let decoder = self.get_decoding_config()?.build()?; let log_namespace = cx.log_namespace(self.log_namespace); let source = SimpleHttpSource { diff --git a/src/sources/kafka.rs b/src/sources/kafka.rs index 302060debefaf..1ccdb41bd3248 100644 --- a/src/sources/kafka.rs +++ b/src/sources/kafka.rs @@ -296,7 +296,8 @@ impl SourceConfig for KafkaSourceConfig { let consumer = create_consumer(self)?; let decoder = - DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace).build(); + DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace) + .build()?; let acknowledgements = cx.do_acknowledgements(self.acknowledgements); Ok(Box::pin(kafka_source( @@ -1158,7 +1159,8 @@ mod integration_test { config.decoding.clone(), log_namespace, ) - .build(); + .build() + .unwrap(); tokio::spawn(kafka_source( config, diff --git a/src/sources/nats.rs b/src/sources/nats.rs index 507e95cbd822d..2d893a9bee642 100644 --- a/src/sources/nats.rs +++ b/src/sources/nats.rs @@ -122,7 +122,8 @@ impl SourceConfig for NatsSourceConfig { let log_namespace = cx.log_namespace(self.log_namespace); let (connection, subscription) = create_subscription(self).await?; let decoder = - DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace).build(); + DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace) + .build()?; Ok(Box::pin(nats_source( self.clone(), @@ -373,7 +374,8 @@ mod integration_tests { conf.decoding.clone(), LogNamespace::Legacy, ) - .build(); + .build() + .unwrap(); tokio::spawn(nats_source( conf.clone(), nc, diff --git a/src/sources/redis/mod.rs b/src/sources/redis/mod.rs index f35b311f9e074..1569449ddf29b 100644 --- a/src/sources/redis/mod.rs +++ b/src/sources/redis/mod.rs @@ -169,7 +169,8 @@ impl SourceConfig for RedisSourceConfig { let client = redis::Client::open(self.url.as_str()).context(ClientSnafu {})?; let connection_info = ConnectionInfo::from(client.get_connection_info()); let decoder = - DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace).build(); + DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace) + .build()?; let bytes_received = register!(BytesReceived::from(Protocol::from( connection_info.protocol diff --git a/src/sources/socket/mod.rs b/src/sources/socket/mod.rs index 408e500331c7f..a27712cc2073d 100644 --- a/src/sources/socket/mod.rs +++ b/src/sources/socket/mod.rs @@ -124,7 +124,7 @@ impl SourceConfig for SocketConfig { decoding, log_namespace, ) - .build(); + .build()?; let tcp = tcp::RawTcpSource::new(config.clone(), decoder, log_namespace); let tls_config = config.tls().as_ref().map(|tls| tls.tls_config.clone()); @@ -156,7 +156,7 @@ impl SourceConfig for SocketConfig { config.decoding().clone(), log_namespace, ) - .build(); + .build()?; Ok(udp::udp( config, decoder, @@ -176,7 +176,7 @@ impl SourceConfig for SocketConfig { config.decoding.clone(), log_namespace, ) - .build(); + .build()?; unix::unix_datagram(config, decoder, cx.shutdown, cx.out, log_namespace) } @@ -193,7 +193,7 @@ impl SourceConfig for SocketConfig { decoding, log_namespace, ) - .build(); + .build()?; unix::unix_stream(config, decoder, cx.shutdown, cx.out, log_namespace) }