Skip to content

Commit

Permalink
feat(aws_cloudwatch_logs sink): add configurable log retention (vecto…
Browse files Browse the repository at this point in the history
…rdotdev#18865)

* enable cloudwatch logs sink retention

* MR comments

Co-authored-by: Stephen Wakely <stephen@lisp.space>

* make fmt

* fixed warnings

* generated docs

* pr comments

* pr comments

* generated docs

---------

Co-authored-by: Stephen Wakely <stephen@lisp.space>
  • Loading branch information
2 people authored and AndrooTheChen committed Sep 23, 2024
1 parent d18f744 commit b3bcaca
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 8 deletions.
44 changes: 44 additions & 0 deletions src/sinks/aws_cloudwatch_logs/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use aws_sdk_cloudwatchlogs::Client as CloudwatchLogsClient;
use aws_smithy_types::retry::RetryConfig;
use futures::FutureExt;
use serde::{de, Deserialize, Deserializer};
use tower::ServiceBuilder;
use vector_lib::codecs::JsonSerializerConfig;
use vector_lib::configurable::configurable_component;
Expand Down Expand Up @@ -48,6 +49,44 @@ impl ClientBuilder for CloudwatchLogsClientBuilder {
}
}

#[configurable_component]
#[derive(Clone, Debug, Default)]
/// Retention policy configuration for AWS CloudWatch Log Group
pub struct Retention {
/// Whether or not to set a retention policy when creating a new Log Group.
#[serde(default)]
pub enabled: bool,

/// If retention is enabled, the number of days to retain logs for.
#[serde(
default,
deserialize_with = "retention_days",
skip_serializing_if = "crate::serde::skip_serializing_if_default"
)]
pub days: u32,
}

fn retention_days<'de, D>(deserializer: D) -> Result<u32, D::Error>
where
D: Deserializer<'de>,
{
let days: u32 = Deserialize::deserialize(deserializer)?;
const ALLOWED_VALUES: &[u32] = &[
1, 3, 5, 7, 14, 30, 60, 90, 120, 150, 180, 365, 400, 545, 731, 1096, 1827, 2192, 2557,
2922, 3288, 3653,
];
if ALLOWED_VALUES.contains(&days) {
Ok(days)
} else {
let msg = format!("one of allowed values: {:?}", ALLOWED_VALUES).to_owned();
let expected: &str = &msg[..];
Err(de::Error::invalid_value(
de::Unexpected::Signed(days.into()),
&expected,
))
}
}

/// Configuration for the `aws_cloudwatch_logs` sink.
#[configurable_component(sink(
"aws_cloudwatch_logs",
Expand Down Expand Up @@ -96,6 +135,10 @@ pub struct CloudwatchLogsSinkConfig {
#[serde(default = "crate::serde::default_true")]
pub create_missing_stream: bool,

#[configurable(derived)]
#[serde(default)]
pub retention: Retention,

#[configurable(derived)]
pub encoding: EncodingConfig,

Expand Down Expand Up @@ -227,6 +270,7 @@ fn default_config(encoding: EncodingConfig) -> CloudwatchLogsSinkConfig {
region: Default::default(),
create_missing_group: true,
create_missing_stream: true,
retention: Default::default(),
compression: Default::default(),
batch: Default::default(),
request: Default::default(),
Expand Down
7 changes: 7 additions & 0 deletions src/sinks/aws_cloudwatch_logs/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ async fn cloudwatch_insert_log_event() {
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
retention: Default::default(),
compression: Default::default(),
batch: Default::default(),
request: Default::default(),
Expand Down Expand Up @@ -93,6 +94,7 @@ async fn cloudwatch_insert_log_events_sorted() {
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
retention: Default::default(),
compression: Default::default(),
batch: Default::default(),
request: Default::default(),
Expand Down Expand Up @@ -168,6 +170,7 @@ async fn cloudwatch_insert_out_of_range_timestamp() {
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
retention: Default::default(),
compression: Default::default(),
batch: Default::default(),
request: Default::default(),
Expand Down Expand Up @@ -244,6 +247,7 @@ async fn cloudwatch_dynamic_group_and_stream_creation() {
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
retention: Default::default(),
compression: Default::default(),
batch: Default::default(),
request: Default::default(),
Expand Down Expand Up @@ -299,6 +303,7 @@ async fn cloudwatch_insert_log_event_batched() {
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
retention: Default::default(),
compression: Default::default(),
batch,
request: Default::default(),
Expand Down Expand Up @@ -349,6 +354,7 @@ async fn cloudwatch_insert_log_event_partitioned() {
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
retention: Default::default(),
compression: Default::default(),
batch: Default::default(),
request: Default::default(),
Expand Down Expand Up @@ -441,6 +447,7 @@ async fn cloudwatch_healthcheck() {
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
retention: Default::default(),
compression: Default::default(),
batch: Default::default(),
request: Default::default(),
Expand Down
51 changes: 48 additions & 3 deletions src/sinks/aws_cloudwatch_logs/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ use std::{
use aws_sdk_cloudwatchlogs::error::{
CreateLogGroupError, CreateLogGroupErrorKind, CreateLogStreamError, CreateLogStreamErrorKind,
DescribeLogStreamsError, DescribeLogStreamsErrorKind, PutLogEventsError,
PutRetentionPolicyError,
};
use aws_sdk_cloudwatchlogs::operation::PutLogEvents;

use crate::sinks::aws_cloudwatch_logs::config::Retention;
use aws_sdk_cloudwatchlogs::model::InputLogEvent;
use aws_sdk_cloudwatchlogs::output::{DescribeLogStreamsOutput, PutLogEventsOutput};
use aws_sdk_cloudwatchlogs::types::SdkError;
Expand All @@ -27,6 +29,7 @@ pub struct CloudwatchFuture {
state: State,
create_missing_group: bool,
create_missing_stream: bool,
retention_enabled: bool,
events: Vec<Vec<InputLogEvent>>,
token_tx: Option<oneshot::Sender<Option<String>>>,
}
Expand All @@ -41,6 +44,7 @@ struct Client {
stream_name: String,
group_name: String,
headers: IndexMap<String, String>,
retention_days: u32,
}

type ClientResult<T, E> = BoxFuture<'static, Result<T, SdkError<E>>>;
Expand All @@ -50,6 +54,7 @@ enum State {
CreateStream(ClientResult<(), CreateLogStreamError>),
DescribeStream(ClientResult<DescribeLogStreamsOutput, DescribeLogStreamsError>),
Put(ClientResult<PutLogEventsOutput, PutLogEventsError>),
PutRetentionPolicy(ClientResult<(), PutRetentionPolicyError>),
}

impl CloudwatchFuture {
Expand All @@ -63,16 +68,19 @@ impl CloudwatchFuture {
group_name: String,
create_missing_group: bool,
create_missing_stream: bool,
retention: Retention,
mut events: Vec<Vec<InputLogEvent>>,
token: Option<String>,
token_tx: oneshot::Sender<Option<String>>,
) -> Self {
let retention_days = retention.days;
let client = Client {
client,
smithy_client,
stream_name,
group_name,
headers,
retention_days,
};

let state = if let Some(token) = token {
Expand All @@ -81,13 +89,16 @@ impl CloudwatchFuture {
State::DescribeStream(client.describe_stream())
};

let retention_enabled = retention.enabled;

Self {
client,
events,
state,
token_tx: Some(token_tx),
create_missing_group,
create_missing_stream,
retention_enabled,
}
}
}
Expand Down Expand Up @@ -115,15 +126,17 @@ impl Future for CloudwatchFuture {
}
}
}
return Poll::Ready(Err(CloudwatchError::Describe(err)));
return Poll::Ready(Err(CloudwatchError::DescribeLogStreams(err)));
}
};

let stream_name = &self.client.stream_name;

if let Some(stream) = response
.log_streams
.ok_or(CloudwatchError::NoStreamsFound)?
.into_iter()
.next()
.find(|log_stream| log_stream.log_stream_name == Some(stream_name.clone()))
{
debug!(message = "Stream found.", stream = ?stream.log_stream_name);

Expand Down Expand Up @@ -163,6 +176,11 @@ impl Future for CloudwatchFuture {

info!(message = "Group created.", name = %self.client.group_name);

if self.retention_enabled {
self.state = State::PutRetentionPolicy(self.client.put_retention_policy());
continue;
}

// self does not abide by `create_missing_stream` since a group
// never has any streams and thus we need to create one if a group
// is created no matter what.
Expand Down Expand Up @@ -212,6 +230,19 @@ impl Future for CloudwatchFuture {
return Poll::Ready(Ok(()));
}
}

State::PutRetentionPolicy(fut) => {
match ready!(fut.poll_unpin(cx)) {
Ok(_) => {}
Err(error) => {
return Poll::Ready(Err(CloudwatchError::PutRetentionPolicy(error)))
}
}

info!(message = "Retention policy updated for stream.", name = %self.client.stream_name);

self.state = State::CreateStream(self.client.create_log_stream());
}
}
}
}
Expand Down Expand Up @@ -268,7 +299,6 @@ impl Client {
Box::pin(async move {
client
.describe_log_streams()
.limit(1)
.log_group_name(group_name)
.log_stream_name_prefix(stream_name)
.send()
Expand Down Expand Up @@ -303,4 +333,19 @@ impl Client {
Ok(())
})
}

pub fn put_retention_policy(&self) -> ClientResult<(), PutRetentionPolicyError> {
let client = self.client.clone();
let group_name = self.group_name.clone();
let retention_days = self.retention_days;
Box::pin(async move {
client
.put_retention_policy()
.log_group_name(group_name)
.retention_in_days(retention_days.try_into().unwrap())
.send()
.await?;
Ok(())
})
}
}
2 changes: 1 addition & 1 deletion src/sinks/aws_cloudwatch_logs/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl<T: Send + Sync + 'static> RetryLogic for CloudwatchRetryLogic<T> {
}
is_retriable_error(err)
}
CloudwatchError::Describe(err) => {
CloudwatchError::DescribeLogStreams(err) => {
if let SdkError::ServiceError(inner) = err {
let err = inner.err();
if let DescribeLogStreamsErrorKind::ServiceUnavailableException(_) = err.kind {
Expand Down
20 changes: 16 additions & 4 deletions src/sinks/aws_cloudwatch_logs/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{

use aws_sdk_cloudwatchlogs::error::{
CreateLogGroupError, CreateLogStreamError, DescribeLogStreamsError, PutLogEventsError,
PutRetentionPolicyError,
};
use aws_sdk_cloudwatchlogs::model::InputLogEvent;
use aws_sdk_cloudwatchlogs::types::SdkError;
Expand All @@ -30,7 +31,7 @@ use vector_lib::{

use crate::sinks::{
aws_cloudwatch_logs::{
config::CloudwatchLogsSinkConfig, request, retry::CloudwatchRetryLogic,
config::CloudwatchLogsSinkConfig, config::Retention, request, retry::CloudwatchRetryLogic,
sink::BatchCloudwatchRequest, CloudwatchKey,
},
util::{retries::FixedRetryPolicy, EncodedLength, TowerRequestConfig, TowerRequestSettings},
Expand Down Expand Up @@ -58,24 +59,30 @@ pub type SmithyClient = std::sync::Arc<
#[derive(Debug)]
pub enum CloudwatchError {
Put(SdkError<PutLogEventsError>),
Describe(SdkError<DescribeLogStreamsError>),
DescribeLogStreams(SdkError<DescribeLogStreamsError>),
CreateStream(SdkError<CreateLogStreamError>),
CreateGroup(SdkError<CreateLogGroupError>),
PutRetentionPolicy(SdkError<PutRetentionPolicyError>),
NoStreamsFound,
}

impl fmt::Display for CloudwatchError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
CloudwatchError::Put(error) => write!(f, "CloudwatchError::Put: {}", error),
CloudwatchError::Describe(error) => write!(f, "CloudwatchError::Describe: {}", error),
CloudwatchError::DescribeLogStreams(error) => {
write!(f, "CloudwatchError::DescribeLogStreams: {}", error)
}
CloudwatchError::CreateStream(error) => {
write!(f, "CloudwatchError::CreateStream: {}", error)
}
CloudwatchError::CreateGroup(error) => {
write!(f, "CloudwatchError::CreateGroup: {}", error)
}
CloudwatchError::NoStreamsFound => write!(f, "CloudwatchError: No Streams Found"),
CloudwatchError::PutRetentionPolicy(error) => {
write!(f, "CloudwatchError::PutRetentionPolicy: {}", error)
}
}
}
}
Expand All @@ -90,7 +97,7 @@ impl From<SdkError<PutLogEventsError>> for CloudwatchError {

impl From<SdkError<DescribeLogStreamsError>> for CloudwatchError {
fn from(error: SdkError<DescribeLogStreamsError>) -> Self {
CloudwatchError::Describe(error)
CloudwatchError::DescribeLogStreams(error)
}
}

Expand Down Expand Up @@ -216,6 +223,8 @@ impl CloudwatchLogsSvc {
let create_missing_group = config.create_missing_group;
let create_missing_stream = config.create_missing_stream;

let retention = config.retention.clone();

CloudwatchLogsSvc {
headers: config.request.headers,
client,
Expand All @@ -224,6 +233,7 @@ impl CloudwatchLogsSvc {
group_name,
create_missing_group,
create_missing_stream,
retention,
token: None,
token_rx: None,
}
Expand Down Expand Up @@ -305,6 +315,7 @@ impl Service<Vec<InputLogEvent>> for CloudwatchLogsSvc {
self.group_name.clone(),
self.create_missing_group,
self.create_missing_stream,
self.retention.clone(),
event_batches,
self.token.take(),
tx,
Expand All @@ -323,6 +334,7 @@ pub struct CloudwatchLogsSvc {
group_name: String,
create_missing_group: bool,
create_missing_stream: bool,
retention: Retention,
token: Option<String>,
token_rx: Option<oneshot::Receiver<Option<String>>>,
}
Expand Down
Loading

0 comments on commit b3bcaca

Please sign in to comment.