From 9e52b9cde2b790f09fca26cd43c62f5f5d3d3c8d Mon Sep 17 00:00:00 2001 From: Jason Goodwin Date: Sat, 11 Mar 2023 12:42:30 -0800 Subject: [PATCH 01/11] feat(kinesis sinks): implement full retry of partial failures in firehose/streams --- src/sinks/aws_kinesis/config.rs | 10 +++++++++- src/sinks/aws_kinesis/firehose/config.rs | 17 ++++++++++++++++- src/sinks/aws_kinesis/firehose/record.rs | 17 +++++++++++++++-- src/sinks/aws_kinesis/firehose/tests.rs | 2 ++ src/sinks/aws_kinesis/record.rs | 7 ++++++- src/sinks/aws_kinesis/service.rs | 20 +++++++------------- src/sinks/aws_kinesis/streams/config.rs | 17 ++++++++++++++++- src/sinks/aws_kinesis/streams/record.rs | 23 +++++++++++++++++++++-- 8 files changed, 92 insertions(+), 21 deletions(-) diff --git a/src/sinks/aws_kinesis/config.rs b/src/sinks/aws_kinesis/config.rs index 4e2d136a054ff..b79ecfa4f4142 100644 --- a/src/sinks/aws_kinesis/config.rs +++ b/src/sinks/aws_kinesis/config.rs @@ -52,6 +52,13 @@ pub struct KinesisSinkBaseConfig { #[serde(default)] pub auth: AwsAuthentication, + /// Whether or not to retry successful requests containing partial failures. + /// + /// Note: this can cause duplicates in Firehose. + #[serde(default)] + #[configurable(metadata(docs::advanced))] + pub request_retry_partial: bool, + #[configurable(derived)] #[serde( default, @@ -77,6 +84,7 @@ pub fn build_sink( partition_key_field: Option, batch_settings: BatcherSettings, client: C, + retry_logic: RT, ) -> crate::Result where C: SendRecord + Clone + Send + Sync + 'static, @@ -92,7 +100,7 @@ where let region = config.region.region(); let service = ServiceBuilder::new() - .settings::>(request_limits, RT::default()) + .settings::>(request_limits, retry_logic) .service(KinesisService:: { client, stream_name: config.stream_name.clone(), diff --git a/src/sinks/aws_kinesis/firehose/config.rs b/src/sinks/aws_kinesis/firehose/config.rs index c8080e0711da3..9621b148e35a6 100644 --- a/src/sinks/aws_kinesis/firehose/config.rs +++ b/src/sinks/aws_kinesis/firehose/config.rs @@ -8,6 +8,7 @@ use futures::FutureExt; use snafu::Snafu; use vector_config::configurable_component; +use crate::sinks::util::retries::RetryAction; use crate::{ aws::{create_client, is_retriable_error, ClientBuilder}, config::{AcknowledgementsConfig, GenerateConfig, Input, ProxyConfig, SinkConfig, SinkContext}, @@ -141,6 +142,9 @@ impl SinkConfig for KinesisFirehoseSinkConfig { None, batch_settings, KinesisFirehoseClient { client }, + KinesisRetryLogic { + retry_partial: self.base.request_retry_partial, + }, )?; Ok((sink, healthcheck)) @@ -166,7 +170,9 @@ impl GenerateConfig for KinesisFirehoseSinkConfig { } #[derive(Clone, Default)] -struct KinesisRetryLogic; +struct KinesisRetryLogic { + retry_partial: bool, +} impl RetryLogic for KinesisRetryLogic { type Error = SdkError; @@ -180,4 +186,13 @@ impl RetryLogic for KinesisRetryLogic { } is_retriable_error(error) } + + fn should_retry_response(&self, response: &Self::Response) -> RetryAction { + if self.retry_partial && response.failure_count > 0 { + let msg = format!("partial error count {}", response.failure_count); + return RetryAction::Retry(msg.into()); + } else { + RetryAction::DontRetry("ok".into()) + } + } } diff --git a/src/sinks/aws_kinesis/firehose/record.rs b/src/sinks/aws_kinesis/firehose/record.rs index 49d1ee821f5c3..80f8250f0a7ea 100644 --- a/src/sinks/aws_kinesis/firehose/record.rs +++ b/src/sinks/aws_kinesis/firehose/record.rs @@ -1,6 +1,9 @@ +use crate::sinks::aws_kinesis::KinesisResponse; +use aws_sdk_firehose::output::PutRecordBatchOutput; use aws_sdk_firehose::types::{Blob, SdkError}; use bytes::Bytes; use tracing::Instrument; +use crate::{sinks::prelude::*}; use super::{KinesisClient, KinesisError, KinesisRecord, Record, SendRecord}; @@ -46,7 +49,13 @@ impl SendRecord for KinesisFirehoseClient { type T = KinesisRecord; type E = KinesisError; - async fn send(&self, records: Vec, stream_name: String) -> Option> { + async fn send( + &self, + records: Vec, + stream_name: String, + ) -> Result> { + let rec_count = records.len().clone(); + self.client .put_record_batch() .set_records(Some(records)) @@ -54,6 +63,10 @@ impl SendRecord for KinesisFirehoseClient { .send() .instrument(info_span!("request").or_current()) .await - .err() + .map(|output: PutRecordBatchOutput| KinesisResponse { + count: rec_count, + failure_count: output.failed_put_count().unwrap_or(0) as usize, + events_byte_size: JsonSize::zero(), + }) } } diff --git a/src/sinks/aws_kinesis/firehose/tests.rs b/src/sinks/aws_kinesis/firehose/tests.rs index a15fb47a4e794..54c55d9efee1d 100644 --- a/src/sinks/aws_kinesis/firehose/tests.rs +++ b/src/sinks/aws_kinesis/firehose/tests.rs @@ -33,6 +33,7 @@ async fn check_batch_size() { request: Default::default(), tls: None, auth: Default::default(), + request_retry_partial: false, acknowledgements: Default::default(), }; @@ -62,6 +63,7 @@ async fn check_batch_events() { request: Default::default(), tls: None, auth: Default::default(), + request_retry_partial: false, acknowledgements: Default::default(), }; diff --git a/src/sinks/aws_kinesis/record.rs b/src/sinks/aws_kinesis/record.rs index 03ad11c710416..7b4edb8df7790 100644 --- a/src/sinks/aws_kinesis/record.rs +++ b/src/sinks/aws_kinesis/record.rs @@ -1,3 +1,4 @@ +use crate::sinks::aws_kinesis::KinesisResponse; use async_trait::async_trait; use aws_smithy_client::SdkError; use bytes::Bytes; @@ -24,5 +25,9 @@ pub trait SendRecord { type E; /// Sends the records. - async fn send(&self, records: Vec, stream_name: String) -> Option>; + async fn send( + &self, + records: Vec, + stream_name: String, + ) -> Result>; } diff --git a/src/sinks/aws_kinesis/service.rs b/src/sinks/aws_kinesis/service.rs index 3539fee4e2eab..4ebc53f0d746a 100644 --- a/src/sinks/aws_kinesis/service.rs +++ b/src/sinks/aws_kinesis/service.rs @@ -37,8 +37,9 @@ where } pub struct KinesisResponse { - count: usize, - events_byte_size: JsonSize, + pub(crate) count: usize, + pub(crate) failure_count: usize, + pub(crate) events_byte_size: JsonSize, } impl DriverResponse for KinesisResponse { @@ -72,7 +73,6 @@ where let events_byte_size = requests .get_metadata() .events_estimated_json_encoded_byte_size(); - let count = requests.get_metadata().event_count(); let records = requests .events @@ -84,16 +84,10 @@ where let stream_name = self.stream_name.clone(); Box::pin(async move { - // Returning a Result (a trait that implements Try) is not a stable feature, - // so instead we have to explicitly check for error and return. - // https://github.com/rust-lang/rust/issues/84277 - if let Some(e) = client.send(records, stream_name).await { - return Err(e); - } - - Ok(KinesisResponse { - count, - events_byte_size, + client.send(records, stream_name).await.map(|mut r| { + // augment the response + r.events_byte_size = events_byte_size; + r }) }) } diff --git a/src/sinks/aws_kinesis/streams/config.rs b/src/sinks/aws_kinesis/streams/config.rs index 673ab7b4d212a..8aadefe7cb3b7 100644 --- a/src/sinks/aws_kinesis/streams/config.rs +++ b/src/sinks/aws_kinesis/streams/config.rs @@ -6,6 +6,7 @@ use futures::FutureExt; use snafu::Snafu; use vector_config::{component::GenerateConfig, configurable_component}; +use crate::sinks::util::retries::RetryAction; use crate::{ aws::{create_client, is_retriable_error, ClientBuilder}, config::{AcknowledgementsConfig, Input, ProxyConfig, SinkConfig, SinkContext}, @@ -148,6 +149,9 @@ impl SinkConfig for KinesisStreamsSinkConfig { self.partition_key_field.clone(), batch_settings, KinesisStreamClient { client }, + KinesisRetryLogic { + retry_partial: self.base.request_retry_partial, + }, )?; Ok((sink, healthcheck)) @@ -173,7 +177,9 @@ impl GenerateConfig for KinesisStreamsSinkConfig { } } #[derive(Default, Clone)] -struct KinesisRetryLogic; +struct KinesisRetryLogic { + retry_partial: bool, +} impl RetryLogic for KinesisRetryLogic { type Error = SdkError; @@ -193,6 +199,15 @@ impl RetryLogic for KinesisRetryLogic { } is_retriable_error(error) } + + fn should_retry_response(&self, response: &Self::Response) -> RetryAction { + if self.retry_partial && response.failure_count > 0 { + let msg = format!("partial error count {}", response.failure_count); + return RetryAction::Retry(msg.into()); + } else { + RetryAction::DontRetry("ok".into()) + } + } } #[cfg(test)] diff --git a/src/sinks/aws_kinesis/streams/record.rs b/src/sinks/aws_kinesis/streams/record.rs index 67eba50d9aff2..de6eeb13999d3 100644 --- a/src/sinks/aws_kinesis/streams/record.rs +++ b/src/sinks/aws_kinesis/streams/record.rs @@ -1,6 +1,9 @@ +use crate::sinks::aws_kinesis::KinesisResponse; +use aws_sdk_kinesis::output::PutRecordsOutput; use aws_sdk_kinesis::types::{Blob, SdkError}; use bytes::Bytes; use tracing::Instrument; +use crate::{sinks::prelude::*}; use super::{KinesisClient, KinesisError, KinesisRecord, Record, SendRecord}; @@ -62,7 +65,19 @@ impl SendRecord for KinesisStreamClient { type T = KinesisRecord; type E = KinesisError; - async fn send(&self, records: Vec, stream_name: String) -> Option> { + async fn send( + &self, + records: Vec, + stream_name: String, + ) -> Result> { + let rec_count = records.len().clone(); + // let total_byte_size: usize = records.iter().map(|record| record.len()).sum(); + // let total_byte_size: usize = records.iter().map(|record| record.data().len()).sum(); + // let total_byte_size: usize = records.iter().map(|record| { + // record.data().unwrap_or_else(|| &Bytes::new()).len() + // }).sum(); + + self.client .put_records() .set_records(Some(records)) @@ -70,6 +85,10 @@ impl SendRecord for KinesisStreamClient { .send() .instrument(info_span!("request").or_current()) .await - .err() + .map(|output: PutRecordsOutput| KinesisResponse { + count: rec_count, + failure_count: output.failed_record_count().unwrap_or(0) as usize, + events_byte_size: JsonSize::zero(), + }) } } From acd512fac1465f96df72db24302454cd29415671 Mon Sep 17 00:00:00 2001 From: Jason Goodwin Date: Sat, 11 Mar 2023 16:39:08 -0800 Subject: [PATCH 02/11] fix(kinesis sinks): use old behavior if ok --- src/sinks/aws_kinesis/firehose/config.rs | 2 +- src/sinks/aws_kinesis/streams/config.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sinks/aws_kinesis/firehose/config.rs b/src/sinks/aws_kinesis/firehose/config.rs index 9621b148e35a6..77a889629fd42 100644 --- a/src/sinks/aws_kinesis/firehose/config.rs +++ b/src/sinks/aws_kinesis/firehose/config.rs @@ -192,7 +192,7 @@ impl RetryLogic for KinesisRetryLogic { let msg = format!("partial error count {}", response.failure_count); return RetryAction::Retry(msg.into()); } else { - RetryAction::DontRetry("ok".into()) + RetryAction::Successful } } } diff --git a/src/sinks/aws_kinesis/streams/config.rs b/src/sinks/aws_kinesis/streams/config.rs index 8aadefe7cb3b7..a5975a5c740a5 100644 --- a/src/sinks/aws_kinesis/streams/config.rs +++ b/src/sinks/aws_kinesis/streams/config.rs @@ -205,7 +205,7 @@ impl RetryLogic for KinesisRetryLogic { let msg = format!("partial error count {}", response.failure_count); return RetryAction::Retry(msg.into()); } else { - RetryAction::DontRetry("ok".into()) + RetryAction::Successful } } } From 325e5efde6ea53808f20e083258eb05e4a911874 Mon Sep 17 00:00:00 2001 From: dengmingtong Date: Tue, 30 May 2023 16:10:11 +0800 Subject: [PATCH 03/11] implement full retry of partial failures in firehose/streams --- src/sinks/aws_kinesis/firehose/config.rs | 2 +- src/sinks/aws_kinesis/firehose/integration_tests.rs | 1 + src/sinks/aws_kinesis/firehose/record.rs | 2 +- src/sinks/aws_kinesis/streams/config.rs | 2 +- src/sinks/aws_kinesis/streams/integration_tests.rs | 1 + src/sinks/aws_kinesis/streams/record.rs | 12 ++---------- 6 files changed, 7 insertions(+), 13 deletions(-) diff --git a/src/sinks/aws_kinesis/firehose/config.rs b/src/sinks/aws_kinesis/firehose/config.rs index 77a889629fd42..ca938424e8a0c 100644 --- a/src/sinks/aws_kinesis/firehose/config.rs +++ b/src/sinks/aws_kinesis/firehose/config.rs @@ -190,7 +190,7 @@ impl RetryLogic for KinesisRetryLogic { fn should_retry_response(&self, response: &Self::Response) -> RetryAction { if self.retry_partial && response.failure_count > 0 { let msg = format!("partial error count {}", response.failure_count); - return RetryAction::Retry(msg.into()); + RetryAction::Retry(msg.into()) } else { RetryAction::Successful } diff --git a/src/sinks/aws_kinesis/firehose/integration_tests.rs b/src/sinks/aws_kinesis/firehose/integration_tests.rs index 8a46c57f83e14..9a4b903811ba7 100644 --- a/src/sinks/aws_kinesis/firehose/integration_tests.rs +++ b/src/sinks/aws_kinesis/firehose/integration_tests.rs @@ -57,6 +57,7 @@ async fn firehose_put_records() { tls: None, auth: Default::default(), acknowledgements: Default::default(), + request_retry_partial: Default::default(), }; let config = KinesisFirehoseSinkConfig { batch, base }; diff --git a/src/sinks/aws_kinesis/firehose/record.rs b/src/sinks/aws_kinesis/firehose/record.rs index 80f8250f0a7ea..522d9435878c6 100644 --- a/src/sinks/aws_kinesis/firehose/record.rs +++ b/src/sinks/aws_kinesis/firehose/record.rs @@ -54,7 +54,7 @@ impl SendRecord for KinesisFirehoseClient { records: Vec, stream_name: String, ) -> Result> { - let rec_count = records.len().clone(); + let rec_count = records.len(); self.client .put_record_batch() diff --git a/src/sinks/aws_kinesis/streams/config.rs b/src/sinks/aws_kinesis/streams/config.rs index a5975a5c740a5..5be4edfa3eadd 100644 --- a/src/sinks/aws_kinesis/streams/config.rs +++ b/src/sinks/aws_kinesis/streams/config.rs @@ -203,7 +203,7 @@ impl RetryLogic for KinesisRetryLogic { fn should_retry_response(&self, response: &Self::Response) -> RetryAction { if self.retry_partial && response.failure_count > 0 { let msg = format!("partial error count {}", response.failure_count); - return RetryAction::Retry(msg.into()); + RetryAction::Retry(msg.into()) } else { RetryAction::Successful } diff --git a/src/sinks/aws_kinesis/streams/integration_tests.rs b/src/sinks/aws_kinesis/streams/integration_tests.rs index a9a66804e3729..8793aa520c024 100644 --- a/src/sinks/aws_kinesis/streams/integration_tests.rs +++ b/src/sinks/aws_kinesis/streams/integration_tests.rs @@ -98,6 +98,7 @@ async fn kinesis_put_records_without_partition_key() { tls: Default::default(), auth: Default::default(), acknowledgements: Default::default(), + request_retry_partial: Default::default(), }; let config = KinesisStreamsSinkConfig { diff --git a/src/sinks/aws_kinesis/streams/record.rs b/src/sinks/aws_kinesis/streams/record.rs index de6eeb13999d3..05848c081dfa1 100644 --- a/src/sinks/aws_kinesis/streams/record.rs +++ b/src/sinks/aws_kinesis/streams/record.rs @@ -1,11 +1,10 @@ -use crate::sinks::aws_kinesis::KinesisResponse; use aws_sdk_kinesis::output::PutRecordsOutput; use aws_sdk_kinesis::types::{Blob, SdkError}; use bytes::Bytes; use tracing::Instrument; use crate::{sinks::prelude::*}; -use super::{KinesisClient, KinesisError, KinesisRecord, Record, SendRecord}; +use super::{KinesisClient, KinesisError, KinesisResponse, KinesisRecord, Record, SendRecord}; #[derive(Clone)] pub struct KinesisStreamRecord { @@ -70,14 +69,7 @@ impl SendRecord for KinesisStreamClient { records: Vec, stream_name: String, ) -> Result> { - let rec_count = records.len().clone(); - // let total_byte_size: usize = records.iter().map(|record| record.len()).sum(); - // let total_byte_size: usize = records.iter().map(|record| record.data().len()).sum(); - // let total_byte_size: usize = records.iter().map(|record| { - // record.data().unwrap_or_else(|| &Bytes::new()).len() - // }).sum(); - - + let rec_count = records.len(); self.client .put_records() .set_records(Some(records)) From cd2cc92a0453f2472cca9c106e2ff16105416651 Mon Sep 17 00:00:00 2001 From: dengmingtong Date: Wed, 7 Jun 2023 22:22:13 +0800 Subject: [PATCH 04/11] set events_byte_size as accurate data size --- src/sinks/aws_kinesis/firehose/record.rs | 6 ++++-- src/sinks/aws_kinesis/streams/record.rs | 5 ++++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/src/sinks/aws_kinesis/firehose/record.rs b/src/sinks/aws_kinesis/firehose/record.rs index 522d9435878c6..2595f53c2497c 100644 --- a/src/sinks/aws_kinesis/firehose/record.rs +++ b/src/sinks/aws_kinesis/firehose/record.rs @@ -55,7 +55,9 @@ impl SendRecord for KinesisFirehoseClient { stream_name: String, ) -> Result> { let rec_count = records.len(); - + let total_size = records.iter().fold(0, |acc, record| { + acc + record.data().unwrap_or(&Blob::new(vec![])).as_ref().len() + }); self.client .put_record_batch() .set_records(Some(records)) @@ -66,7 +68,7 @@ impl SendRecord for KinesisFirehoseClient { .map(|output: PutRecordBatchOutput| KinesisResponse { count: rec_count, failure_count: output.failed_put_count().unwrap_or(0) as usize, - events_byte_size: JsonSize::zero(), + events_byte_size: JsonSize::new(total_size), }) } } diff --git a/src/sinks/aws_kinesis/streams/record.rs b/src/sinks/aws_kinesis/streams/record.rs index 05848c081dfa1..e056bcfb098a3 100644 --- a/src/sinks/aws_kinesis/streams/record.rs +++ b/src/sinks/aws_kinesis/streams/record.rs @@ -70,6 +70,9 @@ impl SendRecord for KinesisStreamClient { stream_name: String, ) -> Result> { let rec_count = records.len(); + let total_size = records.iter().fold(0, |acc, record| { + acc + record.data().unwrap_or(&Blob::new(vec![])).as_ref().len() + }); self.client .put_records() .set_records(Some(records)) @@ -80,7 +83,7 @@ impl SendRecord for KinesisStreamClient { .map(|output: PutRecordsOutput| KinesisResponse { count: rec_count, failure_count: output.failed_record_count().unwrap_or(0) as usize, - events_byte_size: JsonSize::zero(), + events_byte_size: JsonSize::new(total_size), }) } } From 68e87bdf4f1fc089764a340ad9db30267d1ecde6 Mon Sep 17 00:00:00 2001 From: dengmingtong Date: Fri, 9 Jun 2023 10:34:46 +0800 Subject: [PATCH 05/11] fix some organization issue --- src/sinks/aws_kinesis/firehose/record.rs | 6 +++--- src/sinks/aws_kinesis/record.rs | 2 +- src/sinks/aws_kinesis/streams/record.rs | 3 ++- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/sinks/aws_kinesis/firehose/record.rs b/src/sinks/aws_kinesis/firehose/record.rs index 2595f53c2497c..628bf079948c5 100644 --- a/src/sinks/aws_kinesis/firehose/record.rs +++ b/src/sinks/aws_kinesis/firehose/record.rs @@ -1,11 +1,11 @@ -use crate::sinks::aws_kinesis::KinesisResponse; use aws_sdk_firehose::output::PutRecordBatchOutput; use aws_sdk_firehose::types::{Blob, SdkError}; use bytes::Bytes; use tracing::Instrument; -use crate::{sinks::prelude::*}; -use super::{KinesisClient, KinesisError, KinesisRecord, Record, SendRecord}; +use crate::sinks::prelude::*; + +use super::{KinesisClient, KinesisError, KinesisRecord, KinesisResponse, Record, SendRecord}; #[derive(Clone)] pub struct KinesisFirehoseRecord { diff --git a/src/sinks/aws_kinesis/record.rs b/src/sinks/aws_kinesis/record.rs index 7b4edb8df7790..a244f028cb78d 100644 --- a/src/sinks/aws_kinesis/record.rs +++ b/src/sinks/aws_kinesis/record.rs @@ -1,8 +1,8 @@ -use crate::sinks::aws_kinesis::KinesisResponse; use async_trait::async_trait; use aws_smithy_client::SdkError; use bytes::Bytes; +use super::KinesisResponse; /// An AWS Kinesis record type primarily to store the underlying aws crates' actual record `T`, and /// to abstract the encoded length calculation. pub trait Record { diff --git a/src/sinks/aws_kinesis/streams/record.rs b/src/sinks/aws_kinesis/streams/record.rs index e056bcfb098a3..0ebea60489664 100644 --- a/src/sinks/aws_kinesis/streams/record.rs +++ b/src/sinks/aws_kinesis/streams/record.rs @@ -2,7 +2,8 @@ use aws_sdk_kinesis::output::PutRecordsOutput; use aws_sdk_kinesis::types::{Blob, SdkError}; use bytes::Bytes; use tracing::Instrument; -use crate::{sinks::prelude::*}; + +use crate::sinks::prelude::*; use super::{KinesisClient, KinesisError, KinesisResponse, KinesisRecord, Record, SendRecord}; From a80c778305ce1858d769c0d04319d97b80180c87 Mon Sep 17 00:00:00 2001 From: dengmingtong Date: Sat, 10 Jun 2023 11:22:01 +0800 Subject: [PATCH 06/11] refactor for new comments --- src/sinks/aws_kinesis/config.rs | 2 -- src/sinks/aws_kinesis/firehose/config.rs | 2 +- src/sinks/aws_kinesis/firehose/record.rs | 2 +- src/sinks/aws_kinesis/streams/config.rs | 2 +- src/sinks/aws_kinesis/streams/record.rs | 2 +- 5 files changed, 4 insertions(+), 6 deletions(-) diff --git a/src/sinks/aws_kinesis/config.rs b/src/sinks/aws_kinesis/config.rs index b79ecfa4f4142..31c35e74c8fdc 100644 --- a/src/sinks/aws_kinesis/config.rs +++ b/src/sinks/aws_kinesis/config.rs @@ -53,8 +53,6 @@ pub struct KinesisSinkBaseConfig { pub auth: AwsAuthentication, /// Whether or not to retry successful requests containing partial failures. - /// - /// Note: this can cause duplicates in Firehose. #[serde(default)] #[configurable(metadata(docs::advanced))] pub request_retry_partial: bool, diff --git a/src/sinks/aws_kinesis/firehose/config.rs b/src/sinks/aws_kinesis/firehose/config.rs index ca938424e8a0c..58903d40c6352 100644 --- a/src/sinks/aws_kinesis/firehose/config.rs +++ b/src/sinks/aws_kinesis/firehose/config.rs @@ -188,7 +188,7 @@ impl RetryLogic for KinesisRetryLogic { } fn should_retry_response(&self, response: &Self::Response) -> RetryAction { - if self.retry_partial && response.failure_count > 0 { + if response.failure_count > 0 && self.retry_partial { let msg = format!("partial error count {}", response.failure_count); RetryAction::Retry(msg.into()) } else { diff --git a/src/sinks/aws_kinesis/firehose/record.rs b/src/sinks/aws_kinesis/firehose/record.rs index 628bf079948c5..52a656240282c 100644 --- a/src/sinks/aws_kinesis/firehose/record.rs +++ b/src/sinks/aws_kinesis/firehose/record.rs @@ -56,7 +56,7 @@ impl SendRecord for KinesisFirehoseClient { ) -> Result> { let rec_count = records.len(); let total_size = records.iter().fold(0, |acc, record| { - acc + record.data().unwrap_or(&Blob::new(vec![])).as_ref().len() + acc + record.data().map(|v| v.as_ref().len()).unwrap_or_default() }); self.client .put_record_batch() diff --git a/src/sinks/aws_kinesis/streams/config.rs b/src/sinks/aws_kinesis/streams/config.rs index 5be4edfa3eadd..515c5fa66ab0e 100644 --- a/src/sinks/aws_kinesis/streams/config.rs +++ b/src/sinks/aws_kinesis/streams/config.rs @@ -201,7 +201,7 @@ impl RetryLogic for KinesisRetryLogic { } fn should_retry_response(&self, response: &Self::Response) -> RetryAction { - if self.retry_partial && response.failure_count > 0 { + if response.failure_count > 0 && self.retry_partial { let msg = format!("partial error count {}", response.failure_count); RetryAction::Retry(msg.into()) } else { diff --git a/src/sinks/aws_kinesis/streams/record.rs b/src/sinks/aws_kinesis/streams/record.rs index 0ebea60489664..47f22bfd1d7ef 100644 --- a/src/sinks/aws_kinesis/streams/record.rs +++ b/src/sinks/aws_kinesis/streams/record.rs @@ -72,7 +72,7 @@ impl SendRecord for KinesisStreamClient { ) -> Result> { let rec_count = records.len(); let total_size = records.iter().fold(0, |acc, record| { - acc + record.data().unwrap_or(&Blob::new(vec![])).as_ref().len() + acc + record.data().map(|v| v.as_ref().len()).unwrap_or_default() }); self.client .put_records() From 3ade963fa60d2723bddc2065695412bc3aba7615 Mon Sep 17 00:00:00 2001 From: dengmingtong Date: Sun, 11 Jun 2023 16:47:47 +0800 Subject: [PATCH 07/11] add warning in the cue docs --- .../cue/reference/components/sources/aws_kinesis_firehose.cue | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/website/cue/reference/components/sources/aws_kinesis_firehose.cue b/website/cue/reference/components/sources/aws_kinesis_firehose.cue index 22adc0ecb28e4..58974baed1f9d 100644 --- a/website/cue/reference/components/sources/aws_kinesis_firehose.cue +++ b/website/cue/reference/components/sources/aws_kinesis_firehose.cue @@ -58,7 +58,9 @@ components: sources: aws_kinesis_firehose: { platform_name: null } - configuration: base.components.sources.aws_kinesis_firehose.configuration + configuration: base.components.sources.aws_kinesis_firehose.configuration & { + request_retry_partial: warnings: ["This can cause duplicates in Firehose."] + } output: logs: { line: { From 2c2ed76d4018decdcc966047d0b5b5361c43cc49 Mon Sep 17 00:00:00 2001 From: dengmingtong Date: Mon, 12 Jun 2023 23:42:09 +0800 Subject: [PATCH 08/11] add warning in the cue docs --- .../cue/reference/components/sinks/aws_kinesis_firehose.cue | 1 + .../cue/reference/components/sinks/aws_kinesis_streams.cue | 1 + .../reference/components/sinks/base/aws_kinesis_firehose.cue | 5 +++++ .../reference/components/sinks/base/aws_kinesis_streams.cue | 5 +++++ .../reference/components/sources/aws_kinesis_firehose.cue | 4 +--- 5 files changed, 13 insertions(+), 3 deletions(-) diff --git a/website/cue/reference/components/sinks/aws_kinesis_firehose.cue b/website/cue/reference/components/sinks/aws_kinesis_firehose.cue index b06766ee167cc..35847e6db84df 100644 --- a/website/cue/reference/components/sinks/aws_kinesis_firehose.cue +++ b/website/cue/reference/components/sinks/aws_kinesis_firehose.cue @@ -75,6 +75,7 @@ components: sinks: aws_kinesis_firehose: components._aws & { configuration: base.components.sinks.aws_kinesis_firehose.configuration & { _aws_include: false + request_retry_partial: warnings: ["This can cause duplicate logs to be published."] } input: { diff --git a/website/cue/reference/components/sinks/aws_kinesis_streams.cue b/website/cue/reference/components/sinks/aws_kinesis_streams.cue index 75bfa684f8863..4800530655d93 100644 --- a/website/cue/reference/components/sinks/aws_kinesis_streams.cue +++ b/website/cue/reference/components/sinks/aws_kinesis_streams.cue @@ -75,6 +75,7 @@ components: sinks: aws_kinesis_streams: components._aws & { configuration: base.components.sinks.aws_kinesis_streams.configuration & { _aws_include: false + request_retry_partial: warnings: ["This can cause duplicate logs to be published."] } input: { diff --git a/website/cue/reference/components/sinks/base/aws_kinesis_firehose.cue b/website/cue/reference/components/sinks/base/aws_kinesis_firehose.cue index 2a4c55bf3e8ee..c1c8b0a67a793 100644 --- a/website/cue/reference/components/sinks/base/aws_kinesis_firehose.cue +++ b/website/cue/reference/components/sinks/base/aws_kinesis_firehose.cue @@ -480,6 +480,11 @@ base: components: sinks: aws_kinesis_firehose: configuration: { } } } + request_retry_partial: { + description: "Whether or not to retry successful requests containing partial failures." + required: false + type: bool: default: false + } stream_name: { description: """ The [stream name][stream_name] of the target Kinesis Firehose delivery stream. diff --git a/website/cue/reference/components/sinks/base/aws_kinesis_streams.cue b/website/cue/reference/components/sinks/base/aws_kinesis_streams.cue index 180d54f1b9a96..dad522559a609 100644 --- a/website/cue/reference/components/sinks/base/aws_kinesis_streams.cue +++ b/website/cue/reference/components/sinks/base/aws_kinesis_streams.cue @@ -489,6 +489,11 @@ base: components: sinks: aws_kinesis_streams: configuration: { } } } + request_retry_partial: { + description: "Whether or not to retry successful requests containing partial failures." + required: false + type: bool: default: false + } stream_name: { description: """ The [stream name][stream_name] of the target Kinesis Firehose delivery stream. diff --git a/website/cue/reference/components/sources/aws_kinesis_firehose.cue b/website/cue/reference/components/sources/aws_kinesis_firehose.cue index 58974baed1f9d..22adc0ecb28e4 100644 --- a/website/cue/reference/components/sources/aws_kinesis_firehose.cue +++ b/website/cue/reference/components/sources/aws_kinesis_firehose.cue @@ -58,9 +58,7 @@ components: sources: aws_kinesis_firehose: { platform_name: null } - configuration: base.components.sources.aws_kinesis_firehose.configuration & { - request_retry_partial: warnings: ["This can cause duplicates in Firehose."] - } + configuration: base.components.sources.aws_kinesis_firehose.configuration output: logs: { line: { From 43784734d62432120464f4c7f969625a264bb403 Mon Sep 17 00:00:00 2001 From: Spencer Gilbert Date: Mon, 12 Jun 2023 12:44:06 -0400 Subject: [PATCH 09/11] +cargo fmt Signed-off-by: Spencer Gilbert --- src/sinks/aws_kinesis/firehose/config.rs | 2 +- src/sinks/aws_kinesis/streams/record.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sinks/aws_kinesis/firehose/config.rs b/src/sinks/aws_kinesis/firehose/config.rs index 58903d40c6352..aca15796d87c9 100644 --- a/src/sinks/aws_kinesis/firehose/config.rs +++ b/src/sinks/aws_kinesis/firehose/config.rs @@ -188,7 +188,7 @@ impl RetryLogic for KinesisRetryLogic { } fn should_retry_response(&self, response: &Self::Response) -> RetryAction { - if response.failure_count > 0 && self.retry_partial { + if response.failure_count > 0 && self.retry_partial { let msg = format!("partial error count {}", response.failure_count); RetryAction::Retry(msg.into()) } else { diff --git a/src/sinks/aws_kinesis/streams/record.rs b/src/sinks/aws_kinesis/streams/record.rs index 47f22bfd1d7ef..339d6997af63a 100644 --- a/src/sinks/aws_kinesis/streams/record.rs +++ b/src/sinks/aws_kinesis/streams/record.rs @@ -5,7 +5,7 @@ use tracing::Instrument; use crate::sinks::prelude::*; -use super::{KinesisClient, KinesisError, KinesisResponse, KinesisRecord, Record, SendRecord}; +use super::{KinesisClient, KinesisError, KinesisRecord, KinesisResponse, Record, SendRecord}; #[derive(Clone)] pub struct KinesisStreamRecord { From 3b58d14867ac88d20d3f844241e2662529da7566 Mon Sep 17 00:00:00 2001 From: Spencer Gilbert Date: Mon, 12 Jun 2023 13:27:00 -0400 Subject: [PATCH 10/11] +cue fmt --- .../reference/components/sinks/base/aws_kinesis_firehose.cue | 2 +- .../cue/reference/components/sinks/base/aws_kinesis_streams.cue | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/website/cue/reference/components/sinks/base/aws_kinesis_firehose.cue b/website/cue/reference/components/sinks/base/aws_kinesis_firehose.cue index c1c8b0a67a793..19a70860692e5 100644 --- a/website/cue/reference/components/sinks/base/aws_kinesis_firehose.cue +++ b/website/cue/reference/components/sinks/base/aws_kinesis_firehose.cue @@ -484,7 +484,7 @@ base: components: sinks: aws_kinesis_firehose: configuration: { description: "Whether or not to retry successful requests containing partial failures." required: false type: bool: default: false - } + } stream_name: { description: """ The [stream name][stream_name] of the target Kinesis Firehose delivery stream. diff --git a/website/cue/reference/components/sinks/base/aws_kinesis_streams.cue b/website/cue/reference/components/sinks/base/aws_kinesis_streams.cue index dad522559a609..40164b9d0e292 100644 --- a/website/cue/reference/components/sinks/base/aws_kinesis_streams.cue +++ b/website/cue/reference/components/sinks/base/aws_kinesis_streams.cue @@ -493,7 +493,7 @@ base: components: sinks: aws_kinesis_streams: configuration: { description: "Whether or not to retry successful requests containing partial failures." required: false type: bool: default: false - } + } stream_name: { description: """ The [stream name][stream_name] of the target Kinesis Firehose delivery stream. From 275441c1153173352fbffb266f82fb1de018bcdf Mon Sep 17 00:00:00 2001 From: dengmingtong Date: Thu, 15 Jun 2023 09:20:22 +0800 Subject: [PATCH 11/11] remove warning for kinesis stream --- website/cue/reference/components/sinks/aws_kinesis_streams.cue | 1 - 1 file changed, 1 deletion(-) diff --git a/website/cue/reference/components/sinks/aws_kinesis_streams.cue b/website/cue/reference/components/sinks/aws_kinesis_streams.cue index 4800530655d93..75bfa684f8863 100644 --- a/website/cue/reference/components/sinks/aws_kinesis_streams.cue +++ b/website/cue/reference/components/sinks/aws_kinesis_streams.cue @@ -75,7 +75,6 @@ components: sinks: aws_kinesis_streams: components._aws & { configuration: base.components.sinks.aws_kinesis_streams.configuration & { _aws_include: false - request_retry_partial: warnings: ["This can cause duplicate logs to be published."] } input: {