-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(kinesis sinks): implement full retry of partial failures in firehose/streams #16771
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,7 @@ use futures::FutureExt; | |
use snafu::Snafu; | ||
use vector_config::configurable_component; | ||
|
||
use crate::sinks::util::retries::RetryAction; | ||
use crate::{ | ||
aws::{create_client, is_retriable_error, ClientBuilder}, | ||
config::{AcknowledgementsConfig, GenerateConfig, Input, ProxyConfig, SinkConfig, SinkContext}, | ||
|
@@ -141,6 +142,9 @@ impl SinkConfig for KinesisFirehoseSinkConfig { | |
None, | ||
batch_settings, | ||
KinesisFirehoseClient { client }, | ||
KinesisRetryLogic { | ||
retry_partial: self.base.request_retry_partial, | ||
}, | ||
) | ||
.await?; | ||
|
||
|
@@ -167,7 +171,9 @@ impl GenerateConfig for KinesisFirehoseSinkConfig { | |
} | ||
|
||
#[derive(Clone, Default)] | ||
struct KinesisRetryLogic; | ||
struct KinesisRetryLogic { | ||
retry_partial: bool, | ||
} | ||
|
||
impl RetryLogic for KinesisRetryLogic { | ||
type Error = SdkError<KinesisError>; | ||
|
@@ -181,4 +187,13 @@ impl RetryLogic for KinesisRetryLogic { | |
} | ||
is_retriable_error(error) | ||
} | ||
|
||
fn should_retry_response(&self, response: &Self::Response) -> RetryAction { | ||
if self.retry_partial && response.failure_count > 0 { | ||
let msg = format!("partial error count {}", response.failure_count); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be nice to include the error type and reason if we can pull that out of the response reasonably. |
||
return RetryAction::Retry(msg.into()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixme: doesn't need a return |
||
} else { | ||
RetryAction::Successful | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,5 @@ | ||
use crate::sinks::aws_kinesis::KinesisResponse; | ||
use aws_sdk_firehose::output::PutRecordBatchOutput; | ||
use aws_sdk_firehose::types::{Blob, SdkError}; | ||
use bytes::Bytes; | ||
use tracing::Instrument; | ||
|
@@ -46,14 +48,24 @@ impl SendRecord for KinesisFirehoseClient { | |
type T = KinesisRecord; | ||
type E = KinesisError; | ||
|
||
async fn send(&self, records: Vec<Self::T>, stream_name: String) -> Option<SdkError<Self::E>> { | ||
async fn send( | ||
&self, | ||
records: Vec<Self::T>, | ||
stream_name: String, | ||
) -> Result<KinesisResponse, SdkError<Self::E>> { | ||
let rec_count = records.len().clone(); | ||
|
||
self.client | ||
.put_record_batch() | ||
.set_records(Some(records)) | ||
.delivery_stream_name(stream_name) | ||
.send() | ||
.instrument(info_span!("request").or_current()) | ||
.await | ||
.err() | ||
.map(|output: PutRecordBatchOutput| KinesisResponse { | ||
count: rec_count, | ||
failure_count: output.failed_put_count().unwrap_or(0) as usize, | ||
events_byte_size: 0, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the events size isn't available here. Wasn't sure the best way to modify this - will think about it. I may just return the failure count for now, and build the |
||
}) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,5 @@ | ||
use crate::sinks::aws_kinesis::KinesisResponse; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This could be moved into the |
||
use aws_sdk_kinesis::output::PutRecordsOutput; | ||
use aws_sdk_kinesis::types::{Blob, SdkError}; | ||
use bytes::Bytes; | ||
use tracing::Instrument; | ||
|
@@ -62,14 +64,23 @@ impl SendRecord for KinesisStreamClient { | |
type T = KinesisRecord; | ||
type E = KinesisError; | ||
|
||
async fn send(&self, records: Vec<Self::T>, stream_name: String) -> Option<SdkError<Self::E>> { | ||
async fn send( | ||
&self, | ||
records: Vec<Self::T>, | ||
stream_name: String, | ||
) -> Result<KinesisResponse, SdkError<Self::E>> { | ||
let rec_count = records.len().clone(); | ||
self.client | ||
.put_records() | ||
.set_records(Some(records)) | ||
.stream_name(stream_name) | ||
.send() | ||
.instrument(info_span!("request").or_current()) | ||
.await | ||
.err() | ||
.map(|output: PutRecordsOutput| KinesisResponse { | ||
count: rec_count, | ||
failure_count: output.failed_record_count().unwrap_or(0) as usize, | ||
events_byte_size: 0, | ||
}) | ||
Comment on lines
+80
to
+84
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It definitely feels better to me to do this in the |
||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably want per-sink config. This is in the "base" only
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 especially if we're only supporting it for streams.