diff --git a/src/sinks/aws_cloudwatch_logs/config.rs b/src/sinks/aws_cloudwatch_logs/config.rs index e3fb55de82726..8b8b94662376c 100644 --- a/src/sinks/aws_cloudwatch_logs/config.rs +++ b/src/sinks/aws_cloudwatch_logs/config.rs @@ -1,6 +1,7 @@ use aws_sdk_cloudwatchlogs::Client as CloudwatchLogsClient; use aws_smithy_types::retry::RetryConfig; use futures::FutureExt; +use serde::{de, Deserialize, Deserializer}; use tower::ServiceBuilder; use vector_lib::codecs::JsonSerializerConfig; use vector_lib::configurable::configurable_component; @@ -48,6 +49,44 @@ impl ClientBuilder for CloudwatchLogsClientBuilder { } } +#[configurable_component] +#[derive(Clone, Debug, Default)] +/// Retention policy configuration for AWS CloudWatch Log Group +pub struct Retention { + /// Whether or not to set a retention policy when creating a new Log Group. + #[serde(default)] + pub enabled: bool, + + /// If retention is enabled, the number of days to retain logs for. + #[serde( + default, + deserialize_with = "retention_days", + skip_serializing_if = "crate::serde::skip_serializing_if_default" + )] + pub days: u32, +} + +fn retention_days<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + let days: u32 = Deserialize::deserialize(deserializer)?; + const ALLOWED_VALUES: &[u32] = &[ + 1, 3, 5, 7, 14, 30, 60, 90, 120, 150, 180, 365, 400, 545, 731, 1096, 1827, 2192, 2557, + 2922, 3288, 3653, + ]; + if ALLOWED_VALUES.contains(&days) { + Ok(days) + } else { + let msg = format!("one of allowed values: {:?}", ALLOWED_VALUES).to_owned(); + let expected: &str = &msg[..]; + Err(de::Error::invalid_value( + de::Unexpected::Signed(days.into()), + &expected, + )) + } +} + /// Configuration for the `aws_cloudwatch_logs` sink. #[configurable_component(sink( "aws_cloudwatch_logs", @@ -96,6 +135,10 @@ pub struct CloudwatchLogsSinkConfig { #[serde(default = "crate::serde::default_true")] pub create_missing_stream: bool, + #[configurable(derived)] + #[serde(default)] + pub retention: Retention, + #[configurable(derived)] pub encoding: EncodingConfig, @@ -227,6 +270,7 @@ fn default_config(encoding: EncodingConfig) -> CloudwatchLogsSinkConfig { region: Default::default(), create_missing_group: true, create_missing_stream: true, + retention: Default::default(), compression: Default::default(), batch: Default::default(), request: Default::default(), diff --git a/src/sinks/aws_cloudwatch_logs/integration_tests.rs b/src/sinks/aws_cloudwatch_logs/integration_tests.rs index 3091cab25daea..695fddf6639e1 100644 --- a/src/sinks/aws_cloudwatch_logs/integration_tests.rs +++ b/src/sinks/aws_cloudwatch_logs/integration_tests.rs @@ -43,6 +43,7 @@ async fn cloudwatch_insert_log_event() { encoding: TextSerializerConfig::default().into(), create_missing_group: true, create_missing_stream: true, + retention: Default::default(), compression: Default::default(), batch: Default::default(), request: Default::default(), @@ -93,6 +94,7 @@ async fn cloudwatch_insert_log_events_sorted() { encoding: TextSerializerConfig::default().into(), create_missing_group: true, create_missing_stream: true, + retention: Default::default(), compression: Default::default(), batch: Default::default(), request: Default::default(), @@ -168,6 +170,7 @@ async fn cloudwatch_insert_out_of_range_timestamp() { encoding: TextSerializerConfig::default().into(), create_missing_group: true, create_missing_stream: true, + retention: Default::default(), compression: Default::default(), batch: Default::default(), request: Default::default(), @@ -244,6 +247,7 @@ async fn cloudwatch_dynamic_group_and_stream_creation() { encoding: TextSerializerConfig::default().into(), create_missing_group: true, create_missing_stream: true, + retention: Default::default(), compression: Default::default(), batch: Default::default(), request: Default::default(), @@ -299,6 +303,7 @@ async fn cloudwatch_insert_log_event_batched() { encoding: TextSerializerConfig::default().into(), create_missing_group: true, create_missing_stream: true, + retention: Default::default(), compression: Default::default(), batch, request: Default::default(), @@ -349,6 +354,7 @@ async fn cloudwatch_insert_log_event_partitioned() { encoding: TextSerializerConfig::default().into(), create_missing_group: true, create_missing_stream: true, + retention: Default::default(), compression: Default::default(), batch: Default::default(), request: Default::default(), @@ -441,6 +447,7 @@ async fn cloudwatch_healthcheck() { encoding: TextSerializerConfig::default().into(), create_missing_group: true, create_missing_stream: true, + retention: Default::default(), compression: Default::default(), batch: Default::default(), request: Default::default(), diff --git a/src/sinks/aws_cloudwatch_logs/request.rs b/src/sinks/aws_cloudwatch_logs/request.rs index bd0942e89789a..635d87d2784ab 100644 --- a/src/sinks/aws_cloudwatch_logs/request.rs +++ b/src/sinks/aws_cloudwatch_logs/request.rs @@ -7,9 +7,11 @@ use std::{ use aws_sdk_cloudwatchlogs::error::{ CreateLogGroupError, CreateLogGroupErrorKind, CreateLogStreamError, CreateLogStreamErrorKind, DescribeLogStreamsError, DescribeLogStreamsErrorKind, PutLogEventsError, + PutRetentionPolicyError, }; use aws_sdk_cloudwatchlogs::operation::PutLogEvents; +use crate::sinks::aws_cloudwatch_logs::config::Retention; use aws_sdk_cloudwatchlogs::model::InputLogEvent; use aws_sdk_cloudwatchlogs::output::{DescribeLogStreamsOutput, PutLogEventsOutput}; use aws_sdk_cloudwatchlogs::types::SdkError; @@ -27,6 +29,7 @@ pub struct CloudwatchFuture { state: State, create_missing_group: bool, create_missing_stream: bool, + retention_enabled: bool, events: Vec>, token_tx: Option>>, } @@ -41,6 +44,7 @@ struct Client { stream_name: String, group_name: String, headers: IndexMap, + retention_days: u32, } type ClientResult = BoxFuture<'static, Result>>; @@ -50,6 +54,7 @@ enum State { CreateStream(ClientResult<(), CreateLogStreamError>), DescribeStream(ClientResult), Put(ClientResult), + PutRetentionPolicy(ClientResult<(), PutRetentionPolicyError>), } impl CloudwatchFuture { @@ -63,16 +68,19 @@ impl CloudwatchFuture { group_name: String, create_missing_group: bool, create_missing_stream: bool, + retention: Retention, mut events: Vec>, token: Option, token_tx: oneshot::Sender>, ) -> Self { + let retention_days = retention.days; let client = Client { client, smithy_client, stream_name, group_name, headers, + retention_days, }; let state = if let Some(token) = token { @@ -81,6 +89,8 @@ impl CloudwatchFuture { State::DescribeStream(client.describe_stream()) }; + let retention_enabled = retention.enabled; + Self { client, events, @@ -88,6 +98,7 @@ impl CloudwatchFuture { token_tx: Some(token_tx), create_missing_group, create_missing_stream, + retention_enabled, } } } @@ -115,15 +126,17 @@ impl Future for CloudwatchFuture { } } } - return Poll::Ready(Err(CloudwatchError::Describe(err))); + return Poll::Ready(Err(CloudwatchError::DescribeLogStreams(err))); } }; + let stream_name = &self.client.stream_name; + if let Some(stream) = response .log_streams .ok_or(CloudwatchError::NoStreamsFound)? .into_iter() - .next() + .find(|log_stream| log_stream.log_stream_name == Some(stream_name.clone())) { debug!(message = "Stream found.", stream = ?stream.log_stream_name); @@ -163,6 +176,11 @@ impl Future for CloudwatchFuture { info!(message = "Group created.", name = %self.client.group_name); + if self.retention_enabled { + self.state = State::PutRetentionPolicy(self.client.put_retention_policy()); + continue; + } + // self does not abide by `create_missing_stream` since a group // never has any streams and thus we need to create one if a group // is created no matter what. @@ -212,6 +230,19 @@ impl Future for CloudwatchFuture { return Poll::Ready(Ok(())); } } + + State::PutRetentionPolicy(fut) => { + match ready!(fut.poll_unpin(cx)) { + Ok(_) => {} + Err(error) => { + return Poll::Ready(Err(CloudwatchError::PutRetentionPolicy(error))) + } + } + + info!(message = "Retention policy updated for stream.", name = %self.client.stream_name); + + self.state = State::CreateStream(self.client.create_log_stream()); + } } } } @@ -268,7 +299,6 @@ impl Client { Box::pin(async move { client .describe_log_streams() - .limit(1) .log_group_name(group_name) .log_stream_name_prefix(stream_name) .send() @@ -303,4 +333,19 @@ impl Client { Ok(()) }) } + + pub fn put_retention_policy(&self) -> ClientResult<(), PutRetentionPolicyError> { + let client = self.client.clone(); + let group_name = self.group_name.clone(); + let retention_days = self.retention_days; + Box::pin(async move { + client + .put_retention_policy() + .log_group_name(group_name) + .retention_in_days(retention_days.try_into().unwrap()) + .send() + .await?; + Ok(()) + }) + } } diff --git a/src/sinks/aws_cloudwatch_logs/retry.rs b/src/sinks/aws_cloudwatch_logs/retry.rs index c089f532f5dd0..11c9a81baca1c 100644 --- a/src/sinks/aws_cloudwatch_logs/retry.rs +++ b/src/sinks/aws_cloudwatch_logs/retry.rs @@ -45,7 +45,7 @@ impl RetryLogic for CloudwatchRetryLogic { } is_retriable_error(err) } - CloudwatchError::Describe(err) => { + CloudwatchError::DescribeLogStreams(err) => { if let SdkError::ServiceError(inner) = err { let err = inner.err(); if let DescribeLogStreamsErrorKind::ServiceUnavailableException(_) = err.kind { diff --git a/src/sinks/aws_cloudwatch_logs/service.rs b/src/sinks/aws_cloudwatch_logs/service.rs index d3d6fcb4dc44b..b3703cf9166a8 100644 --- a/src/sinks/aws_cloudwatch_logs/service.rs +++ b/src/sinks/aws_cloudwatch_logs/service.rs @@ -6,6 +6,7 @@ use std::{ use aws_sdk_cloudwatchlogs::error::{ CreateLogGroupError, CreateLogStreamError, DescribeLogStreamsError, PutLogEventsError, + PutRetentionPolicyError, }; use aws_sdk_cloudwatchlogs::model::InputLogEvent; use aws_sdk_cloudwatchlogs::types::SdkError; @@ -30,7 +31,7 @@ use vector_lib::{ use crate::sinks::{ aws_cloudwatch_logs::{ - config::CloudwatchLogsSinkConfig, request, retry::CloudwatchRetryLogic, + config::CloudwatchLogsSinkConfig, config::Retention, request, retry::CloudwatchRetryLogic, sink::BatchCloudwatchRequest, CloudwatchKey, }, util::{retries::FixedRetryPolicy, EncodedLength, TowerRequestConfig, TowerRequestSettings}, @@ -58,9 +59,10 @@ pub type SmithyClient = std::sync::Arc< #[derive(Debug)] pub enum CloudwatchError { Put(SdkError), - Describe(SdkError), + DescribeLogStreams(SdkError), CreateStream(SdkError), CreateGroup(SdkError), + PutRetentionPolicy(SdkError), NoStreamsFound, } @@ -68,7 +70,9 @@ impl fmt::Display for CloudwatchError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { CloudwatchError::Put(error) => write!(f, "CloudwatchError::Put: {}", error), - CloudwatchError::Describe(error) => write!(f, "CloudwatchError::Describe: {}", error), + CloudwatchError::DescribeLogStreams(error) => { + write!(f, "CloudwatchError::DescribeLogStreams: {}", error) + } CloudwatchError::CreateStream(error) => { write!(f, "CloudwatchError::CreateStream: {}", error) } @@ -76,6 +80,9 @@ impl fmt::Display for CloudwatchError { write!(f, "CloudwatchError::CreateGroup: {}", error) } CloudwatchError::NoStreamsFound => write!(f, "CloudwatchError: No Streams Found"), + CloudwatchError::PutRetentionPolicy(error) => { + write!(f, "CloudwatchError::PutRetentionPolicy: {}", error) + } } } } @@ -90,7 +97,7 @@ impl From> for CloudwatchError { impl From> for CloudwatchError { fn from(error: SdkError) -> Self { - CloudwatchError::Describe(error) + CloudwatchError::DescribeLogStreams(error) } } @@ -216,6 +223,8 @@ impl CloudwatchLogsSvc { let create_missing_group = config.create_missing_group; let create_missing_stream = config.create_missing_stream; + let retention = config.retention.clone(); + CloudwatchLogsSvc { headers: config.request.headers, client, @@ -224,6 +233,7 @@ impl CloudwatchLogsSvc { group_name, create_missing_group, create_missing_stream, + retention, token: None, token_rx: None, } @@ -305,6 +315,7 @@ impl Service> for CloudwatchLogsSvc { self.group_name.clone(), self.create_missing_group, self.create_missing_stream, + self.retention.clone(), event_batches, self.token.take(), tx, @@ -323,6 +334,7 @@ pub struct CloudwatchLogsSvc { group_name: String, create_missing_group: bool, create_missing_stream: bool, + retention: Retention, token: Option, token_rx: Option>>, } diff --git a/website/cue/reference/components/sinks/base/aws_cloudwatch_logs.cue b/website/cue/reference/components/sinks/base/aws_cloudwatch_logs.cue index 162b8dbe4853f..d048ad0d0c98b 100644 --- a/website/cue/reference/components/sinks/base/aws_cloudwatch_logs.cue +++ b/website/cue/reference/components/sinks/base/aws_cloudwatch_logs.cue @@ -648,6 +648,22 @@ base: components: sinks: aws_cloudwatch_logs: configuration: { } } } + retention: { + description: "Retention policy configuration for AWS CloudWatch Log Group" + required: false + type: object: options: { + days: { + description: "If retention is enabled, the number of days to retain logs for." + required: false + type: uint: default: 0 + } + enabled: { + description: "Whether or not to set a retention policy when creating a new Log Group." + required: false + type: bool: default: false + } + } + } stream_name: { description: """ The [stream name][stream_name] of the target CloudWatch Logs stream.