Skip to content

Commit

Permalink
enable cloudwatch logs sink retention
Browse files Browse the repository at this point in the history
  • Loading branch information
AndrewChubatiuk committed Oct 20, 2023
1 parent 0568d7a commit f637e89
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 12 deletions.
42 changes: 42 additions & 0 deletions src/sinks/aws_cloudwatch_logs/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use aws_sdk_cloudwatchlogs::Client as CloudwatchLogsClient;
use aws_smithy_types::retry::RetryConfig;
use codecs::JsonSerializerConfig;
use futures::FutureExt;
use serde::{de, Deserialize, Deserializer};
use tower::ServiceBuilder;
use vector_config::configurable_component;
use vector_core::schema;
Expand Down Expand Up @@ -48,6 +49,43 @@ impl ClientBuilder for CloudwatchLogsClientBuilder {
}
}

#[configurable_component]
#[derive(Clone, Debug, Default)]
/// Retention policy configuration for AWS Cloudwatch Log Group
pub struct Retention {
#[configurable(derived)]
pub enabled: bool,

#[configurable(derived)]
#[serde(
default,
deserialize_with = "retention_days",
skip_serializing_if = "crate::serde::skip_serializing_if_default"
)]
pub days: i32,
}

fn retention_days<'de, D>(deserializer: D) -> Result<i32, D::Error>
where
D: Deserializer<'de>,
{
let days: i32 = Deserialize::deserialize(deserializer)?;
const ALLOWED_VALUES: &[i32] = &[
1, 3, 5, 7, 14, 30, 60, 90, 120, 150, 180, 365, 400, 545, 731, 1096, 1827, 2192, 2557,
2922, 3288, 3653,
];
if ALLOWED_VALUES.contains(&days) {
Ok(days)
} else {
let msg = format!("one of allowed values: {:?}", ALLOWED_VALUES).to_owned();
let expected: &str = &msg[..];
Err(de::Error::invalid_value(
de::Unexpected::Signed(days.into()),
&expected,
))
}
}

/// Configuration for the `aws_cloudwatch_logs` sink.
#[configurable_component(sink(
"aws_cloudwatch_logs",
Expand Down Expand Up @@ -96,6 +134,9 @@ pub struct CloudwatchLogsSinkConfig {
#[serde(default = "crate::serde::default_true")]
pub create_missing_stream: bool,

#[configurable(derived)]
pub retention: Retention,

#[configurable(derived)]
pub encoding: EncodingConfig,

Expand Down Expand Up @@ -227,6 +268,7 @@ fn default_config(encoding: EncodingConfig) -> CloudwatchLogsSinkConfig {
region: Default::default(),
create_missing_group: true,
create_missing_stream: true,
retention: Default::default(),
compression: Default::default(),
batch: Default::default(),
request: Default::default(),
Expand Down
7 changes: 7 additions & 0 deletions src/sinks/aws_cloudwatch_logs/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ async fn cloudwatch_insert_log_event() {
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
retention: Default::default(),
compression: Default::default(),
batch: Default::default(),
request: Default::default(),
Expand Down Expand Up @@ -92,6 +93,7 @@ async fn cloudwatch_insert_log_events_sorted() {
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
retention: Default::default(),
compression: Default::default(),
batch: Default::default(),
request: Default::default(),
Expand Down Expand Up @@ -167,6 +169,7 @@ async fn cloudwatch_insert_out_of_range_timestamp() {
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
retention: Default::default(),
compression: Default::default(),
batch: Default::default(),
request: Default::default(),
Expand Down Expand Up @@ -243,6 +246,7 @@ async fn cloudwatch_dynamic_group_and_stream_creation() {
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
retention: Default::default(),
compression: Default::default(),
batch: Default::default(),
request: Default::default(),
Expand Down Expand Up @@ -298,6 +302,7 @@ async fn cloudwatch_insert_log_event_batched() {
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
retention: Default::default(),
compression: Default::default(),
batch,
request: Default::default(),
Expand Down Expand Up @@ -348,6 +353,7 @@ async fn cloudwatch_insert_log_event_partitioned() {
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
retention: Default::default(),
compression: Default::default(),
batch: Default::default(),
request: Default::default(),
Expand Down Expand Up @@ -440,6 +446,7 @@ async fn cloudwatch_healthcheck() {
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
retention: Default::default(),
compression: Default::default(),
batch: Default::default(),
request: Default::default(),
Expand Down
104 changes: 99 additions & 5 deletions src/sinks/aws_cloudwatch_logs/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@ use std::{

use aws_sdk_cloudwatchlogs::error::{
CreateLogGroupError, CreateLogGroupErrorKind, CreateLogStreamError, CreateLogStreamErrorKind,
DescribeLogStreamsError, DescribeLogStreamsErrorKind, PutLogEventsError,
DescribeLogGroupsError, DescribeLogStreamsError, DescribeLogStreamsErrorKind,
PutLogEventsError, PutRetentionPolicyError,
};
use aws_sdk_cloudwatchlogs::operation::PutLogEvents;

use crate::sinks::aws_cloudwatch_logs::config::Retention;
use aws_sdk_cloudwatchlogs::model::InputLogEvent;
use aws_sdk_cloudwatchlogs::output::{DescribeLogStreamsOutput, PutLogEventsOutput};
use aws_sdk_cloudwatchlogs::output::{
DescribeLogGroupsOutput, DescribeLogStreamsOutput, PutLogEventsOutput,
};
use aws_sdk_cloudwatchlogs::types::SdkError;
use aws_sdk_cloudwatchlogs::Client as CloudwatchLogsClient;
use futures::{future::BoxFuture, FutureExt};
Expand All @@ -27,6 +31,7 @@ pub struct CloudwatchFuture {
state: State,
create_missing_group: bool,
create_missing_stream: bool,
retention_enabled: bool,
events: Vec<Vec<InputLogEvent>>,
token_tx: Option<oneshot::Sender<Option<String>>>,
}
Expand All @@ -41,15 +46,18 @@ struct Client {
stream_name: String,
group_name: String,
headers: IndexMap<String, String>,
retention_days: i32,
}

type ClientResult<T, E> = BoxFuture<'static, Result<T, SdkError<E>>>;

enum State {
CreateGroup(ClientResult<(), CreateLogGroupError>),
CreateStream(ClientResult<(), CreateLogStreamError>),
DescribeGroup(ClientResult<DescribeLogGroupsOutput, DescribeLogGroupsError>),
DescribeStream(ClientResult<DescribeLogStreamsOutput, DescribeLogStreamsError>),
Put(ClientResult<PutLogEventsOutput, PutLogEventsError>),
PutRetentionPolicy(ClientResult<(), PutRetentionPolicyError>),
}

impl CloudwatchFuture {
Expand All @@ -63,31 +71,37 @@ impl CloudwatchFuture {
group_name: String,
create_missing_group: bool,
create_missing_stream: bool,
retention: Retention,
mut events: Vec<Vec<InputLogEvent>>,
token: Option<String>,
token_tx: oneshot::Sender<Option<String>>,
) -> Self {
let retention_days = retention.days;
let client = Client {
client,
smithy_client,
stream_name,
group_name,
headers,
retention_days,
};

let state = if let Some(token) = token {
State::Put(client.put_logs(Some(token), events.pop().expect("No Events to send")))
} else {
State::DescribeStream(client.describe_stream())
State::DescribeGroup(client.describe_group())
};

let retention_enabled = retention.enabled;

Self {
client,
events,
state,
token_tx: Some(token_tx),
create_missing_group,
create_missing_stream,
retention_enabled,
}
}
}
Expand All @@ -98,6 +112,37 @@ impl Future for CloudwatchFuture {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
loop {
match &mut self.state {
State::DescribeGroup(fut) => {
let response = match ready!(fut.poll_unpin(cx)) {
Ok(response) => response,
Err(error) => {
return Poll::Ready(Err(CloudwatchError::DescribeLogGroups(error)))
}
};

let group_name = &self.client.group_name;

if let Some(group) = response
.log_groups
.ok_or(CloudwatchError::NoGroupsFound)?
.into_iter()
.filter(|log_group| log_group.log_group_name == Some(group_name.clone()))
.next()
{
debug!(message = "Group found.", group = ?group.log_group_name);
self.state = State::DescribeStream(self.client.describe_stream());
continue;
}

if self.create_missing_group {
info!("Log group provided does not exist; creating a new one.");
self.state = State::CreateGroup(self.client.create_log_group());
continue;
} else {
return Poll::Ready(Err(CloudwatchError::NoGroupsFound));
}
}

State::DescribeStream(fut) => {
let response = match ready!(fut.poll_unpin(cx)) {
Ok(response) => response,
Expand All @@ -115,14 +160,19 @@ impl Future for CloudwatchFuture {
}
}
}
return Poll::Ready(Err(CloudwatchError::Describe(err)));
return Poll::Ready(Err(CloudwatchError::DescribeLogStreams(err)));
}
};

let stream_name = &self.client.stream_name;

if let Some(stream) = response
.log_streams
.ok_or(CloudwatchError::NoStreamsFound)?
.into_iter()
.filter(|log_stream| {
log_stream.log_stream_name == Some(stream_name.clone())
})
.next()
{
debug!(message = "Stream found.", stream = ?stream.log_stream_name);
Expand Down Expand Up @@ -163,6 +213,11 @@ impl Future for CloudwatchFuture {

info!(message = "Group created.", name = %self.client.group_name);

if self.retention_enabled {
self.state = State::PutRetentionPolicy(self.client.put_retention_policy());
continue;
}

// self does not abide by `create_missing_stream` since a group
// never has any streams and thus we need to create one if a group
// is created no matter what.
Expand Down Expand Up @@ -212,6 +267,19 @@ impl Future for CloudwatchFuture {
return Poll::Ready(Ok(()));
}
}

State::PutRetentionPolicy(fut) => {
match ready!(fut.poll_unpin(cx)) {
Ok(__) => {}
Err(error) => {
return Poll::Ready(Err(CloudwatchError::PutRetentionPolicy(error)))
}
}

info!(message = "Retention policy updated for stream.", name = %self.client.stream_name);

self.state = State::DescribeStream(self.client.describe_stream());
}
}
}
}
Expand Down Expand Up @@ -268,14 +336,25 @@ impl Client {
Box::pin(async move {
client
.describe_log_streams()
.limit(1)
.log_group_name(group_name)
.log_stream_name_prefix(stream_name)
.send()
.await
})
}

pub fn describe_group(&self) -> ClientResult<DescribeLogGroupsOutput, DescribeLogGroupsError> {
let client = self.client.clone();
let group_name = self.group_name.clone();
Box::pin(async move {
client
.describe_log_groups()
.log_group_name_prefix(group_name)
.send()
.await
})
}

pub fn create_log_group(&self) -> ClientResult<(), CreateLogGroupError> {
let client = self.client.clone();
let group_name = self.group_name.clone();
Expand Down Expand Up @@ -303,4 +382,19 @@ impl Client {
Ok(())
})
}

pub fn put_retention_policy(&self) -> ClientResult<(), PutRetentionPolicyError> {
let client = self.client.clone();
let group_name = self.group_name.clone();
let retention_days = self.retention_days.clone();
Box::pin(async move {
client
.put_retention_policy()
.log_group_name(group_name)
.retention_in_days(retention_days)
.send()
.await?;
Ok(())
})
}
}
14 changes: 12 additions & 2 deletions src/sinks/aws_cloudwatch_logs/retry.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::marker::PhantomData;

use aws_sdk_cloudwatchlogs::error::{
CreateLogStreamErrorKind, DescribeLogStreamsErrorKind, PutLogEventsErrorKind,
CreateLogStreamErrorKind, DescribeLogGroupsErrorKind, DescribeLogStreamsErrorKind,
PutLogEventsErrorKind,
};
use aws_sdk_cloudwatchlogs::types::SdkError;

Expand Down Expand Up @@ -45,7 +46,16 @@ impl<T: Send + Sync + 'static> RetryLogic for CloudwatchRetryLogic<T> {
}
is_retriable_error(err)
}
CloudwatchError::Describe(err) => {
CloudwatchError::DescribeLogGroups(err) => {
if let SdkError::ServiceError(inner) = err {
let err = inner.err();
if let DescribeLogGroupsErrorKind::ServiceUnavailableException(_) = err.kind {
return true;
}
}
is_retriable_error(err)
}
CloudwatchError::DescribeLogStreams(err) => {
if let SdkError::ServiceError(inner) = err {
let err = inner.err();
if let DescribeLogStreamsErrorKind::ServiceUnavailableException(_) = err.kind {
Expand Down
Loading

0 comments on commit f637e89

Please sign in to comment.