Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kinesis sinks): implement full retry of partial failures in firehose/streams #17535

Merged
merged 11 commits into from
Jun 16, 2023
8 changes: 7 additions & 1 deletion src/sinks/aws_kinesis/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ pub struct KinesisSinkBaseConfig {
#[serde(default)]
pub auth: AwsAuthentication,

/// Whether or not to retry successful requests containing partial failures.
#[serde(default)]
#[configurable(metadata(docs::advanced))]
pub request_retry_partial: bool,

#[configurable(derived)]
#[serde(
default,
Expand All @@ -77,6 +82,7 @@ pub fn build_sink<C, R, RR, E, RT>(
partition_key_field: Option<String>,
batch_settings: BatcherSettings,
client: C,
retry_logic: RT,
) -> crate::Result<VectorSink>
where
C: SendRecord + Clone + Send + Sync + 'static,
Expand All @@ -92,7 +98,7 @@ where

let region = config.region.region();
let service = ServiceBuilder::new()
.settings::<RT, BatchKinesisRequest<RR>>(request_limits, RT::default())
.settings::<RT, BatchKinesisRequest<RR>>(request_limits, retry_logic)
.service(KinesisService::<C, R, E> {
client,
stream_name: config.stream_name.clone(),
Expand Down
17 changes: 16 additions & 1 deletion src/sinks/aws_kinesis/firehose/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use futures::FutureExt;
use snafu::Snafu;
use vector_config::configurable_component;

use crate::sinks::util::retries::RetryAction;
use crate::{
aws::{create_client, is_retriable_error, ClientBuilder},
config::{AcknowledgementsConfig, GenerateConfig, Input, ProxyConfig, SinkConfig, SinkContext},
Expand Down Expand Up @@ -141,6 +142,9 @@ impl SinkConfig for KinesisFirehoseSinkConfig {
None,
batch_settings,
KinesisFirehoseClient { client },
KinesisRetryLogic {
retry_partial: self.base.request_retry_partial,
},
)?;

Ok((sink, healthcheck))
Expand All @@ -166,7 +170,9 @@ impl GenerateConfig for KinesisFirehoseSinkConfig {
}

#[derive(Clone, Default)]
struct KinesisRetryLogic;
struct KinesisRetryLogic {
retry_partial: bool,
}

impl RetryLogic for KinesisRetryLogic {
type Error = SdkError<KinesisError>;
Expand All @@ -180,4 +186,13 @@ impl RetryLogic for KinesisRetryLogic {
}
is_retriable_error(error)
}

fn should_retry_response(&self, response: &Self::Response) -> RetryAction {
if response.failure_count > 0 && self.retry_partial {
let msg = format!("partial error count {}", response.failure_count);
RetryAction::Retry(msg.into())
} else {
RetryAction::Successful
}
}
}
1 change: 1 addition & 0 deletions src/sinks/aws_kinesis/firehose/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ async fn firehose_put_records() {
tls: None,
auth: Default::default(),
acknowledgements: Default::default(),
request_retry_partial: Default::default(),
};

let config = KinesisFirehoseSinkConfig { batch, base };
Expand Down
21 changes: 18 additions & 3 deletions src/sinks/aws_kinesis/firehose/record.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use aws_sdk_firehose::output::PutRecordBatchOutput;
use aws_sdk_firehose::types::{Blob, SdkError};
use bytes::Bytes;
use tracing::Instrument;

use super::{KinesisClient, KinesisError, KinesisRecord, Record, SendRecord};
use crate::sinks::prelude::*;

use super::{KinesisClient, KinesisError, KinesisRecord, KinesisResponse, Record, SendRecord};

#[derive(Clone)]
pub struct KinesisFirehoseRecord {
Expand Down Expand Up @@ -46,14 +49,26 @@ impl SendRecord for KinesisFirehoseClient {
type T = KinesisRecord;
type E = KinesisError;

async fn send(&self, records: Vec<Self::T>, stream_name: String) -> Option<SdkError<Self::E>> {
async fn send(
&self,
records: Vec<Self::T>,
stream_name: String,
) -> Result<KinesisResponse, SdkError<Self::E>> {
let rec_count = records.len();
let total_size = records.iter().fold(0, |acc, record| {
acc + record.data().map(|v| v.as_ref().len()).unwrap_or_default()
});
self.client
.put_record_batch()
.set_records(Some(records))
.delivery_stream_name(stream_name)
.send()
.instrument(info_span!("request").or_current())
.await
.err()
.map(|output: PutRecordBatchOutput| KinesisResponse {
count: rec_count,
failure_count: output.failed_put_count().unwrap_or(0) as usize,
events_byte_size: JsonSize::new(total_size),
})
}
}
2 changes: 2 additions & 0 deletions src/sinks/aws_kinesis/firehose/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ async fn check_batch_size() {
request: Default::default(),
tls: None,
auth: Default::default(),
request_retry_partial: false,
acknowledgements: Default::default(),
};

Expand Down Expand Up @@ -62,6 +63,7 @@ async fn check_batch_events() {
request: Default::default(),
tls: None,
auth: Default::default(),
request_retry_partial: false,
acknowledgements: Default::default(),
};

Expand Down
7 changes: 6 additions & 1 deletion src/sinks/aws_kinesis/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use async_trait::async_trait;
use aws_smithy_client::SdkError;
use bytes::Bytes;

use super::KinesisResponse;
/// An AWS Kinesis record type primarily to store the underlying aws crates' actual record `T`, and
/// to abstract the encoded length calculation.
pub trait Record {
Expand All @@ -24,5 +25,9 @@ pub trait SendRecord {
type E;

/// Sends the records.
async fn send(&self, records: Vec<Self::T>, stream_name: String) -> Option<SdkError<Self::E>>;
async fn send(
&self,
records: Vec<Self::T>,
stream_name: String,
) -> Result<KinesisResponse, SdkError<Self::E>>;
}
20 changes: 7 additions & 13 deletions src/sinks/aws_kinesis/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ where
}

pub struct KinesisResponse {
count: usize,
events_byte_size: JsonSize,
pub(crate) count: usize,
pub(crate) failure_count: usize,
pub(crate) events_byte_size: JsonSize,
}

impl DriverResponse for KinesisResponse {
Expand Down Expand Up @@ -72,7 +73,6 @@ where
let events_byte_size = requests
.get_metadata()
.events_estimated_json_encoded_byte_size();
let count = requests.get_metadata().event_count();

let records = requests
.events
Expand All @@ -84,16 +84,10 @@ where
let stream_name = self.stream_name.clone();

Box::pin(async move {
// Returning a Result (a trait that implements Try) is not a stable feature,
// so instead we have to explicitly check for error and return.
// https://github.com/rust-lang/rust/issues/84277
if let Some(e) = client.send(records, stream_name).await {
return Err(e);
}

Ok(KinesisResponse {
count,
events_byte_size,
client.send(records, stream_name).await.map(|mut r| {
// augment the response
r.events_byte_size = events_byte_size;
r
})
})
}
Expand Down
17 changes: 16 additions & 1 deletion src/sinks/aws_kinesis/streams/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use futures::FutureExt;
use snafu::Snafu;
use vector_config::{component::GenerateConfig, configurable_component};

use crate::sinks::util::retries::RetryAction;
use crate::{
aws::{create_client, is_retriable_error, ClientBuilder},
config::{AcknowledgementsConfig, Input, ProxyConfig, SinkConfig, SinkContext},
Expand Down Expand Up @@ -148,6 +149,9 @@ impl SinkConfig for KinesisStreamsSinkConfig {
self.partition_key_field.clone(),
batch_settings,
KinesisStreamClient { client },
KinesisRetryLogic {
retry_partial: self.base.request_retry_partial,
},
)?;

Ok((sink, healthcheck))
Expand All @@ -173,7 +177,9 @@ impl GenerateConfig for KinesisStreamsSinkConfig {
}
}
#[derive(Default, Clone)]
struct KinesisRetryLogic;
struct KinesisRetryLogic {
retry_partial: bool,
}

impl RetryLogic for KinesisRetryLogic {
type Error = SdkError<KinesisError>;
Expand All @@ -193,6 +199,15 @@ impl RetryLogic for KinesisRetryLogic {
}
is_retriable_error(error)
}

fn should_retry_response(&self, response: &Self::Response) -> RetryAction {
if response.failure_count > 0 && self.retry_partial {
let msg = format!("partial error count {}", response.failure_count);
RetryAction::Retry(msg.into())
} else {
RetryAction::Successful
}
}
}

#[cfg(test)]
Expand Down
1 change: 1 addition & 0 deletions src/sinks/aws_kinesis/streams/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ async fn kinesis_put_records_without_partition_key() {
tls: Default::default(),
auth: Default::default(),
acknowledgements: Default::default(),
request_retry_partial: Default::default(),
};

let config = KinesisStreamsSinkConfig {
Expand Down
21 changes: 18 additions & 3 deletions src/sinks/aws_kinesis/streams/record.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use aws_sdk_kinesis::output::PutRecordsOutput;
use aws_sdk_kinesis::types::{Blob, SdkError};
use bytes::Bytes;
use tracing::Instrument;

use super::{KinesisClient, KinesisError, KinesisRecord, Record, SendRecord};
use crate::sinks::prelude::*;

use super::{KinesisClient, KinesisError, KinesisRecord, KinesisResponse, Record, SendRecord};

#[derive(Clone)]
pub struct KinesisStreamRecord {
Expand Down Expand Up @@ -62,14 +65,26 @@ impl SendRecord for KinesisStreamClient {
type T = KinesisRecord;
type E = KinesisError;

async fn send(&self, records: Vec<Self::T>, stream_name: String) -> Option<SdkError<Self::E>> {
async fn send(
&self,
records: Vec<Self::T>,
stream_name: String,
) -> Result<KinesisResponse, SdkError<Self::E>> {
let rec_count = records.len();
let total_size = records.iter().fold(0, |acc, record| {
acc + record.data().map(|v| v.as_ref().len()).unwrap_or_default()
});
self.client
.put_records()
.set_records(Some(records))
.stream_name(stream_name)
.send()
.instrument(info_span!("request").or_current())
.await
.err()
.map(|output: PutRecordsOutput| KinesisResponse {
count: rec_count,
failure_count: output.failed_record_count().unwrap_or(0) as usize,
events_byte_size: JsonSize::new(total_size),
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ components: sinks: aws_kinesis_firehose: components._aws & {

configuration: base.components.sinks.aws_kinesis_firehose.configuration & {
_aws_include: false
request_retry_partial: warnings: ["This can cause duplicate logs to be published."]
}

input: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,11 @@ base: components: sinks: aws_kinesis_firehose: configuration: {
}
}
}
request_retry_partial: {
description: "Whether or not to retry successful requests containing partial failures."
required: false
type: bool: default: false
}
stream_name: {
description: """
The [stream name][stream_name] of the target Kinesis Firehose delivery stream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,11 @@ base: components: sinks: aws_kinesis_streams: configuration: {
}
}
}
request_retry_partial: {
description: "Whether or not to retry successful requests containing partial failures."
required: false
type: bool: default: false
}
stream_name: {
description: """
The [stream name][stream_name] of the target Kinesis Firehose delivery stream.
Expand Down