Skip to content

Commit

Permalink
refactor for new comments
Browse files Browse the repository at this point in the history
  • Loading branch information
dengmingtong committed Jun 11, 2023
1 parent ecb27e5 commit 10be829
Show file tree
Hide file tree
Showing 5 changed files with 4 additions and 6 deletions.
2 changes: 0 additions & 2 deletions src/sinks/aws_kinesis/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ pub struct KinesisSinkBaseConfig {
pub auth: AwsAuthentication,

/// Whether or not to retry successful requests containing partial failures.
///
/// Note: this can cause duplicates in Firehose.
#[serde(default)]
#[configurable(metadata(docs::advanced))]
pub request_retry_partial: bool,
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/aws_kinesis/firehose/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ impl RetryLogic for KinesisRetryLogic {
}

fn should_retry_response(&self, response: &Self::Response) -> RetryAction {
if self.retry_partial && response.failure_count > 0 {
if response.failure_count > 0 && self.retry_partial {
let msg = format!("partial error count {}", response.failure_count);
RetryAction::Retry(msg.into())
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/aws_kinesis/firehose/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl SendRecord for KinesisFirehoseClient {
) -> Result<KinesisResponse, SdkError<Self::E>> {
let rec_count = records.len();
let total_size = records.iter().fold(0, |acc, record| {
acc + record.data().unwrap_or(&Blob::new(vec![])).as_ref().len()
acc + record.data().map(|v| v.as_ref().len()).unwrap_or_default()
});
self.client
.put_record_batch()
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/aws_kinesis/streams/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ impl RetryLogic for KinesisRetryLogic {
}

fn should_retry_response(&self, response: &Self::Response) -> RetryAction {
if self.retry_partial && response.failure_count > 0 {
if response.failure_count > 0 && self.retry_partial {
let msg = format!("partial error count {}", response.failure_count);
RetryAction::Retry(msg.into())
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/aws_kinesis/streams/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl SendRecord for KinesisStreamClient {
) -> Result<KinesisResponse, SdkError<Self::E>> {
let rec_count = records.len();
let total_size = records.iter().fold(0, |acc, record| {
acc + record.data().unwrap_or(&Blob::new(vec![])).as_ref().len()
acc + record.data().map(|v| v.as_ref().len()).unwrap_or_default()
});
self.client
.put_records()
Expand Down

0 comments on commit 10be829

Please sign in to comment.