diff --git a/src/sinks/aws_kinesis/config.rs b/src/sinks/aws_kinesis/config.rs index b79ecfa4f4142c..31c35e74c8fdcc 100644 --- a/src/sinks/aws_kinesis/config.rs +++ b/src/sinks/aws_kinesis/config.rs @@ -53,8 +53,6 @@ pub struct KinesisSinkBaseConfig { pub auth: AwsAuthentication, /// Whether or not to retry successful requests containing partial failures. - /// - /// Note: this can cause duplicates in Firehose. #[serde(default)] #[configurable(metadata(docs::advanced))] pub request_retry_partial: bool, diff --git a/src/sinks/aws_kinesis/firehose/config.rs b/src/sinks/aws_kinesis/firehose/config.rs index ca938424e8a0cc..58903d40c63520 100644 --- a/src/sinks/aws_kinesis/firehose/config.rs +++ b/src/sinks/aws_kinesis/firehose/config.rs @@ -188,7 +188,7 @@ impl RetryLogic for KinesisRetryLogic { } fn should_retry_response(&self, response: &Self::Response) -> RetryAction { - if self.retry_partial && response.failure_count > 0 { + if response.failure_count > 0 && self.retry_partial { let msg = format!("partial error count {}", response.failure_count); RetryAction::Retry(msg.into()) } else { diff --git a/src/sinks/aws_kinesis/firehose/record.rs b/src/sinks/aws_kinesis/firehose/record.rs index 628bf079948c58..52a656240282c7 100644 --- a/src/sinks/aws_kinesis/firehose/record.rs +++ b/src/sinks/aws_kinesis/firehose/record.rs @@ -56,7 +56,7 @@ impl SendRecord for KinesisFirehoseClient { ) -> Result> { let rec_count = records.len(); let total_size = records.iter().fold(0, |acc, record| { - acc + record.data().unwrap_or(&Blob::new(vec![])).as_ref().len() + acc + record.data().map(|v| v.as_ref().len()).unwrap_or_default() }); self.client .put_record_batch() diff --git a/src/sinks/aws_kinesis/streams/config.rs b/src/sinks/aws_kinesis/streams/config.rs index 5be4edfa3eadd8..515c5fa66ab0e0 100644 --- a/src/sinks/aws_kinesis/streams/config.rs +++ b/src/sinks/aws_kinesis/streams/config.rs @@ -201,7 +201,7 @@ impl RetryLogic for KinesisRetryLogic { } fn should_retry_response(&self, response: &Self::Response) -> RetryAction { - if self.retry_partial && response.failure_count > 0 { + if response.failure_count > 0 && self.retry_partial { let msg = format!("partial error count {}", response.failure_count); RetryAction::Retry(msg.into()) } else { diff --git a/src/sinks/aws_kinesis/streams/record.rs b/src/sinks/aws_kinesis/streams/record.rs index 0ebea604896644..47f22bfd1d7efc 100644 --- a/src/sinks/aws_kinesis/streams/record.rs +++ b/src/sinks/aws_kinesis/streams/record.rs @@ -72,7 +72,7 @@ impl SendRecord for KinesisStreamClient { ) -> Result> { let rec_count = records.len(); let total_size = records.iter().fold(0, |acc, record| { - acc + record.data().unwrap_or(&Blob::new(vec![])).as_ref().len() + acc + record.data().map(|v| v.as_ref().len()).unwrap_or_default() }); self.client .put_records()