diff --git a/src/sinks/aws_kinesis/config.rs b/src/sinks/aws_kinesis/config.rs index 4e2d136a054ffe..31c35e74c8fdcc 100644 --- a/src/sinks/aws_kinesis/config.rs +++ b/src/sinks/aws_kinesis/config.rs @@ -52,6 +52,11 @@ pub struct KinesisSinkBaseConfig { #[serde(default)] pub auth: AwsAuthentication, + /// Whether or not to retry successful requests containing partial failures. + #[serde(default)] + #[configurable(metadata(docs::advanced))] + pub request_retry_partial: bool, + #[configurable(derived)] #[serde( default, @@ -77,6 +82,7 @@ pub fn build_sink( partition_key_field: Option, batch_settings: BatcherSettings, client: C, + retry_logic: RT, ) -> crate::Result where C: SendRecord + Clone + Send + Sync + 'static, @@ -92,7 +98,7 @@ where let region = config.region.region(); let service = ServiceBuilder::new() - .settings::>(request_limits, RT::default()) + .settings::>(request_limits, retry_logic) .service(KinesisService:: { client, stream_name: config.stream_name.clone(), diff --git a/src/sinks/aws_kinesis/firehose/config.rs b/src/sinks/aws_kinesis/firehose/config.rs index c8080e0711da3d..aca15796d87c9a 100644 --- a/src/sinks/aws_kinesis/firehose/config.rs +++ b/src/sinks/aws_kinesis/firehose/config.rs @@ -8,6 +8,7 @@ use futures::FutureExt; use snafu::Snafu; use vector_config::configurable_component; +use crate::sinks::util::retries::RetryAction; use crate::{ aws::{create_client, is_retriable_error, ClientBuilder}, config::{AcknowledgementsConfig, GenerateConfig, Input, ProxyConfig, SinkConfig, SinkContext}, @@ -141,6 +142,9 @@ impl SinkConfig for KinesisFirehoseSinkConfig { None, batch_settings, KinesisFirehoseClient { client }, + KinesisRetryLogic { + retry_partial: self.base.request_retry_partial, + }, )?; Ok((sink, healthcheck)) @@ -166,7 +170,9 @@ impl GenerateConfig for KinesisFirehoseSinkConfig { } #[derive(Clone, Default)] -struct KinesisRetryLogic; +struct KinesisRetryLogic { + retry_partial: bool, +} impl RetryLogic for KinesisRetryLogic { type Error = SdkError; @@ -180,4 +186,13 @@ impl RetryLogic for KinesisRetryLogic { } is_retriable_error(error) } + + fn should_retry_response(&self, response: &Self::Response) -> RetryAction { + if response.failure_count > 0 && self.retry_partial { + let msg = format!("partial error count {}", response.failure_count); + RetryAction::Retry(msg.into()) + } else { + RetryAction::Successful + } + } } diff --git a/src/sinks/aws_kinesis/firehose/integration_tests.rs b/src/sinks/aws_kinesis/firehose/integration_tests.rs index 8a46c57f83e14e..9a4b903811ba7d 100644 --- a/src/sinks/aws_kinesis/firehose/integration_tests.rs +++ b/src/sinks/aws_kinesis/firehose/integration_tests.rs @@ -57,6 +57,7 @@ async fn firehose_put_records() { tls: None, auth: Default::default(), acknowledgements: Default::default(), + request_retry_partial: Default::default(), }; let config = KinesisFirehoseSinkConfig { batch, base }; diff --git a/src/sinks/aws_kinesis/firehose/record.rs b/src/sinks/aws_kinesis/firehose/record.rs index 49d1ee821f5c34..52a656240282c7 100644 --- a/src/sinks/aws_kinesis/firehose/record.rs +++ b/src/sinks/aws_kinesis/firehose/record.rs @@ -1,8 +1,11 @@ +use aws_sdk_firehose::output::PutRecordBatchOutput; use aws_sdk_firehose::types::{Blob, SdkError}; use bytes::Bytes; use tracing::Instrument; -use super::{KinesisClient, KinesisError, KinesisRecord, Record, SendRecord}; +use crate::sinks::prelude::*; + +use super::{KinesisClient, KinesisError, KinesisRecord, KinesisResponse, Record, SendRecord}; #[derive(Clone)] pub struct KinesisFirehoseRecord { @@ -46,7 +49,15 @@ impl SendRecord for KinesisFirehoseClient { type T = KinesisRecord; type E = KinesisError; - async fn send(&self, records: Vec, stream_name: String) -> Option> { + async fn send( + &self, + records: Vec, + stream_name: String, + ) -> Result> { + let rec_count = records.len(); + let total_size = records.iter().fold(0, |acc, record| { + acc + record.data().map(|v| v.as_ref().len()).unwrap_or_default() + }); self.client .put_record_batch() .set_records(Some(records)) @@ -54,6 +65,10 @@ impl SendRecord for KinesisFirehoseClient { .send() .instrument(info_span!("request").or_current()) .await - .err() + .map(|output: PutRecordBatchOutput| KinesisResponse { + count: rec_count, + failure_count: output.failed_put_count().unwrap_or(0) as usize, + events_byte_size: JsonSize::new(total_size), + }) } } diff --git a/src/sinks/aws_kinesis/firehose/tests.rs b/src/sinks/aws_kinesis/firehose/tests.rs index a15fb47a4e7944..54c55d9efee1d1 100644 --- a/src/sinks/aws_kinesis/firehose/tests.rs +++ b/src/sinks/aws_kinesis/firehose/tests.rs @@ -33,6 +33,7 @@ async fn check_batch_size() { request: Default::default(), tls: None, auth: Default::default(), + request_retry_partial: false, acknowledgements: Default::default(), }; @@ -62,6 +63,7 @@ async fn check_batch_events() { request: Default::default(), tls: None, auth: Default::default(), + request_retry_partial: false, acknowledgements: Default::default(), }; diff --git a/src/sinks/aws_kinesis/record.rs b/src/sinks/aws_kinesis/record.rs index 03ad11c7104168..a244f028cb78de 100644 --- a/src/sinks/aws_kinesis/record.rs +++ b/src/sinks/aws_kinesis/record.rs @@ -2,6 +2,7 @@ use async_trait::async_trait; use aws_smithy_client::SdkError; use bytes::Bytes; +use super::KinesisResponse; /// An AWS Kinesis record type primarily to store the underlying aws crates' actual record `T`, and /// to abstract the encoded length calculation. pub trait Record { @@ -24,5 +25,9 @@ pub trait SendRecord { type E; /// Sends the records. - async fn send(&self, records: Vec, stream_name: String) -> Option>; + async fn send( + &self, + records: Vec, + stream_name: String, + ) -> Result>; } diff --git a/src/sinks/aws_kinesis/service.rs b/src/sinks/aws_kinesis/service.rs index 3539fee4e2eab8..4ebc53f0d746a9 100644 --- a/src/sinks/aws_kinesis/service.rs +++ b/src/sinks/aws_kinesis/service.rs @@ -37,8 +37,9 @@ where } pub struct KinesisResponse { - count: usize, - events_byte_size: JsonSize, + pub(crate) count: usize, + pub(crate) failure_count: usize, + pub(crate) events_byte_size: JsonSize, } impl DriverResponse for KinesisResponse { @@ -72,7 +73,6 @@ where let events_byte_size = requests .get_metadata() .events_estimated_json_encoded_byte_size(); - let count = requests.get_metadata().event_count(); let records = requests .events @@ -84,16 +84,10 @@ where let stream_name = self.stream_name.clone(); Box::pin(async move { - // Returning a Result (a trait that implements Try) is not a stable feature, - // so instead we have to explicitly check for error and return. - // https://github.com/rust-lang/rust/issues/84277 - if let Some(e) = client.send(records, stream_name).await { - return Err(e); - } - - Ok(KinesisResponse { - count, - events_byte_size, + client.send(records, stream_name).await.map(|mut r| { + // augment the response + r.events_byte_size = events_byte_size; + r }) }) } diff --git a/src/sinks/aws_kinesis/streams/config.rs b/src/sinks/aws_kinesis/streams/config.rs index 673ab7b4d212a8..515c5fa66ab0e0 100644 --- a/src/sinks/aws_kinesis/streams/config.rs +++ b/src/sinks/aws_kinesis/streams/config.rs @@ -6,6 +6,7 @@ use futures::FutureExt; use snafu::Snafu; use vector_config::{component::GenerateConfig, configurable_component}; +use crate::sinks::util::retries::RetryAction; use crate::{ aws::{create_client, is_retriable_error, ClientBuilder}, config::{AcknowledgementsConfig, Input, ProxyConfig, SinkConfig, SinkContext}, @@ -148,6 +149,9 @@ impl SinkConfig for KinesisStreamsSinkConfig { self.partition_key_field.clone(), batch_settings, KinesisStreamClient { client }, + KinesisRetryLogic { + retry_partial: self.base.request_retry_partial, + }, )?; Ok((sink, healthcheck)) @@ -173,7 +177,9 @@ impl GenerateConfig for KinesisStreamsSinkConfig { } } #[derive(Default, Clone)] -struct KinesisRetryLogic; +struct KinesisRetryLogic { + retry_partial: bool, +} impl RetryLogic for KinesisRetryLogic { type Error = SdkError; @@ -193,6 +199,15 @@ impl RetryLogic for KinesisRetryLogic { } is_retriable_error(error) } + + fn should_retry_response(&self, response: &Self::Response) -> RetryAction { + if response.failure_count > 0 && self.retry_partial { + let msg = format!("partial error count {}", response.failure_count); + RetryAction::Retry(msg.into()) + } else { + RetryAction::Successful + } + } } #[cfg(test)] diff --git a/src/sinks/aws_kinesis/streams/integration_tests.rs b/src/sinks/aws_kinesis/streams/integration_tests.rs index a9a66804e3729e..8793aa520c0246 100644 --- a/src/sinks/aws_kinesis/streams/integration_tests.rs +++ b/src/sinks/aws_kinesis/streams/integration_tests.rs @@ -98,6 +98,7 @@ async fn kinesis_put_records_without_partition_key() { tls: Default::default(), auth: Default::default(), acknowledgements: Default::default(), + request_retry_partial: Default::default(), }; let config = KinesisStreamsSinkConfig { diff --git a/src/sinks/aws_kinesis/streams/record.rs b/src/sinks/aws_kinesis/streams/record.rs index 67eba50d9aff26..339d6997af63a0 100644 --- a/src/sinks/aws_kinesis/streams/record.rs +++ b/src/sinks/aws_kinesis/streams/record.rs @@ -1,8 +1,11 @@ +use aws_sdk_kinesis::output::PutRecordsOutput; use aws_sdk_kinesis::types::{Blob, SdkError}; use bytes::Bytes; use tracing::Instrument; -use super::{KinesisClient, KinesisError, KinesisRecord, Record, SendRecord}; +use crate::sinks::prelude::*; + +use super::{KinesisClient, KinesisError, KinesisRecord, KinesisResponse, Record, SendRecord}; #[derive(Clone)] pub struct KinesisStreamRecord { @@ -62,7 +65,15 @@ impl SendRecord for KinesisStreamClient { type T = KinesisRecord; type E = KinesisError; - async fn send(&self, records: Vec, stream_name: String) -> Option> { + async fn send( + &self, + records: Vec, + stream_name: String, + ) -> Result> { + let rec_count = records.len(); + let total_size = records.iter().fold(0, |acc, record| { + acc + record.data().map(|v| v.as_ref().len()).unwrap_or_default() + }); self.client .put_records() .set_records(Some(records)) @@ -70,6 +81,10 @@ impl SendRecord for KinesisStreamClient { .send() .instrument(info_span!("request").or_current()) .await - .err() + .map(|output: PutRecordsOutput| KinesisResponse { + count: rec_count, + failure_count: output.failed_record_count().unwrap_or(0) as usize, + events_byte_size: JsonSize::new(total_size), + }) } } diff --git a/website/cue/reference/components/sinks/aws_kinesis_firehose.cue b/website/cue/reference/components/sinks/aws_kinesis_firehose.cue index b06766ee167ccd..35847e6db84dfb 100644 --- a/website/cue/reference/components/sinks/aws_kinesis_firehose.cue +++ b/website/cue/reference/components/sinks/aws_kinesis_firehose.cue @@ -75,6 +75,7 @@ components: sinks: aws_kinesis_firehose: components._aws & { configuration: base.components.sinks.aws_kinesis_firehose.configuration & { _aws_include: false + request_retry_partial: warnings: ["This can cause duplicate logs to be published."] } input: { diff --git a/website/cue/reference/components/sinks/base/aws_kinesis_firehose.cue b/website/cue/reference/components/sinks/base/aws_kinesis_firehose.cue index 2a4c55bf3e8ee9..19a70860692e54 100644 --- a/website/cue/reference/components/sinks/base/aws_kinesis_firehose.cue +++ b/website/cue/reference/components/sinks/base/aws_kinesis_firehose.cue @@ -480,6 +480,11 @@ base: components: sinks: aws_kinesis_firehose: configuration: { } } } + request_retry_partial: { + description: "Whether or not to retry successful requests containing partial failures." + required: false + type: bool: default: false + } stream_name: { description: """ The [stream name][stream_name] of the target Kinesis Firehose delivery stream. diff --git a/website/cue/reference/components/sinks/base/aws_kinesis_streams.cue b/website/cue/reference/components/sinks/base/aws_kinesis_streams.cue index 180d54f1b9a96d..40164b9d0e2927 100644 --- a/website/cue/reference/components/sinks/base/aws_kinesis_streams.cue +++ b/website/cue/reference/components/sinks/base/aws_kinesis_streams.cue @@ -489,6 +489,11 @@ base: components: sinks: aws_kinesis_streams: configuration: { } } } + request_retry_partial: { + description: "Whether or not to retry successful requests containing partial failures." + required: false + type: bool: default: false + } stream_name: { description: """ The [stream name][stream_name] of the target Kinesis Firehose delivery stream.