Skip to content

Commit

Permalink
enable cloudwatch logs sink retention
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Chubatiuk committed Oct 17, 2023
1 parent 0568d7a commit 21d9a58
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 8 deletions.
44 changes: 44 additions & 0 deletions src/sinks/aws_cloudwatch_logs/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ use tower::ServiceBuilder;
use vector_config::configurable_component;
use vector_core::schema;
use vrl::value::Kind;
use serde::{
de, Deserialize, Deserializer,
};

use crate::{
aws::{
Expand Down Expand Up @@ -48,6 +51,43 @@ impl ClientBuilder for CloudwatchLogsClientBuilder {
}
}

#[configurable_component]
#[derive(Clone, Debug, Default)]
/// Retention policy configuration for AWS Cloudwatch Log Group
pub struct Retention {
#[configurable(derived)]
pub enabled: bool,

#[configurable(derived)]
#[serde(
default,
deserialize_with = "retention_days",
skip_serializing_if = "crate::serde::skip_serializing_if_default"
)]
pub days: i32,
}

fn retention_days<'de, D>(deserializer: D) -> Result<i32, D::Error>
where D: Deserializer<'de>,
{

let days: i32 = Deserialize::deserialize(deserializer)?;
const ALLOWED_VALUES: &[i32] = &[
1, 3, 5, 7, 14, 30, 60,
90, 120, 150, 180, 365,
400, 545, 731, 1096, 1827,
2192, 2557, 2922, 3288, 3653,
];
if ALLOWED_VALUES.contains(&days) {
Ok(days)
} else {
let msg = format!("one of allowed values: {:?}", ALLOWED_VALUES).to_owned();
let expected: &str = &msg[..];
Err(de::Error::invalid_value(de::Unexpected::Signed(days.into()), &expected))
}
}


/// Configuration for the `aws_cloudwatch_logs` sink.
#[configurable_component(sink(
"aws_cloudwatch_logs",
Expand Down Expand Up @@ -96,6 +136,9 @@ pub struct CloudwatchLogsSinkConfig {
#[serde(default = "crate::serde::default_true")]
pub create_missing_stream: bool,

#[configurable(derived)]
pub retention: Retention,

#[configurable(derived)]
pub encoding: EncodingConfig,

Expand Down Expand Up @@ -227,6 +270,7 @@ fn default_config(encoding: EncodingConfig) -> CloudwatchLogsSinkConfig {
region: Default::default(),
create_missing_group: true,
create_missing_stream: true,
retention: Default::default(),
compression: Default::default(),
batch: Default::default(),
request: Default::default(),
Expand Down
7 changes: 7 additions & 0 deletions src/sinks/aws_cloudwatch_logs/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ async fn cloudwatch_insert_log_event() {
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
retention: Default::default(),
compression: Default::default(),
batch: Default::default(),
request: Default::default(),
Expand Down Expand Up @@ -92,6 +93,7 @@ async fn cloudwatch_insert_log_events_sorted() {
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
retention: Default::default(),
compression: Default::default(),
batch: Default::default(),
request: Default::default(),
Expand Down Expand Up @@ -167,6 +169,7 @@ async fn cloudwatch_insert_out_of_range_timestamp() {
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
retention: Default::default(),
compression: Default::default(),
batch: Default::default(),
request: Default::default(),
Expand Down Expand Up @@ -243,6 +246,7 @@ async fn cloudwatch_dynamic_group_and_stream_creation() {
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
retention: Default::default(),
compression: Default::default(),
batch: Default::default(),
request: Default::default(),
Expand Down Expand Up @@ -298,6 +302,7 @@ async fn cloudwatch_insert_log_event_batched() {
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
retention: Default::default(),
compression: Default::default(),
batch,
request: Default::default(),
Expand Down Expand Up @@ -348,6 +353,7 @@ async fn cloudwatch_insert_log_event_partitioned() {
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
retention: Default::default(),
compression: Default::default(),
batch: Default::default(),
request: Default::default(),
Expand Down Expand Up @@ -440,6 +446,7 @@ async fn cloudwatch_healthcheck() {
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
retention: Default::default(),
compression: Default::default(),
batch: Default::default(),
request: Default::default(),
Expand Down
91 changes: 85 additions & 6 deletions src/sinks/aws_cloudwatch_logs/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ use std::{

use aws_sdk_cloudwatchlogs::error::{
CreateLogGroupError, CreateLogGroupErrorKind, CreateLogStreamError, CreateLogStreamErrorKind,
DescribeLogStreamsError, DescribeLogStreamsErrorKind, PutLogEventsError,
DeleteRetentionPolicyError, DescribeLogStreamsError, DescribeLogStreamsErrorKind,
PutLogEventsError, PutRetentionPolicyError,
};
use aws_sdk_cloudwatchlogs::operation::PutLogEvents;

use crate::sinks::aws_cloudwatch_logs::config::Retention;
use aws_sdk_cloudwatchlogs::model::InputLogEvent;
use aws_sdk_cloudwatchlogs::output::{DescribeLogStreamsOutput, PutLogEventsOutput};
use aws_sdk_cloudwatchlogs::types::SdkError;
Expand All @@ -27,6 +29,7 @@ pub struct CloudwatchFuture {
state: State,
create_missing_group: bool,
create_missing_stream: bool,
retention_enabled: bool,
events: Vec<Vec<InputLogEvent>>,
token_tx: Option<oneshot::Sender<Option<String>>>,
}
Expand All @@ -41,6 +44,7 @@ struct Client {
stream_name: String,
group_name: String,
headers: IndexMap<String, String>,
retention_days: i32,
}

type ClientResult<T, E> = BoxFuture<'static, Result<T, SdkError<E>>>;
Expand All @@ -50,6 +54,8 @@ enum State {
CreateStream(ClientResult<(), CreateLogStreamError>),
DescribeStream(ClientResult<DescribeLogStreamsOutput, DescribeLogStreamsError>),
Put(ClientResult<PutLogEventsOutput, PutLogEventsError>),
PutRetentionPolicy(ClientResult<(), PutRetentionPolicyError>),
DeleteRetentionPolicy(ClientResult<(), DeleteRetentionPolicyError>),
}

impl CloudwatchFuture {
Expand All @@ -63,16 +69,19 @@ impl CloudwatchFuture {
group_name: String,
create_missing_group: bool,
create_missing_stream: bool,
retention: Retention,
mut events: Vec<Vec<InputLogEvent>>,
token: Option<String>,
token_tx: oneshot::Sender<Option<String>>,
) -> Self {
let retention_days = retention.days;
let client = Client {
client,
smithy_client,
stream_name,
group_name,
headers,
retention_days,
};

let state = if let Some(token) = token {
Expand All @@ -81,13 +90,16 @@ impl CloudwatchFuture {
State::DescribeStream(client.describe_stream())
};

let retention_enabled = retention.enabled;

Self {
client,
events,
state,
token_tx: Some(token_tx),
create_missing_group,
create_missing_stream,
retention_enabled,
}
}
}
Expand All @@ -111,7 +123,7 @@ impl Future for CloudwatchFuture {

self.state =
State::CreateGroup(self.client.create_log_group());
continue;
continue
}
}
}
Expand Down Expand Up @@ -163,10 +175,18 @@ impl Future for CloudwatchFuture {

info!(message = "Group created.", name = %self.client.group_name);

// self does not abide by `create_missing_stream` since a group
// never has any streams and thus we need to create one if a group
// is created no matter what.
self.state = State::CreateStream(self.client.create_log_stream());
if self.retention_enabled {
info!("Retention policy enabled; updating retention days.");
self.state = State::PutRetentionPolicy(
self.client.put_retention_policy(),
);
continue;
} else {
self.state = State::DeleteRetentionPolicy(
self.client.delete_retention_policy(),
);
continue;
}
}

State::CreateStream(fut) => {
Expand Down Expand Up @@ -212,6 +232,36 @@ impl Future for CloudwatchFuture {
return Poll::Ready(Ok(()));
}
}

State::PutRetentionPolicy(fut) => {
match ready!(fut.poll_unpin(cx)) {
Ok(__) => {}
Err(error) => {
return Poll::Ready(Err(CloudwatchError::PutRetentionPolicy(error)))
}
}

info!(message = "Retention policy updated for stream.", name = %self.client.stream_name);
// self does not abide by `create_missing_stream` since a group
// never has any streams and thus we need to create one if a group
// is created no matter what.
self.state = State::CreateStream(self.client.create_log_stream());
}

State::DeleteRetentionPolicy(fut) => {
match ready!(fut.poll_unpin(cx)) {
Ok(__) => {}
Err(error) => {
return Poll::Ready(Err(CloudwatchError::DeleteRetentionPolicy(error)))
}
}

info!(message = "Retention policy deleted for stream.", name = %self.client.stream_name);
// self does not abide by `create_missing_stream` since a group
// never has any streams and thus we need to create one if a group
// is created no matter what.
self.state = State::CreateStream(self.client.create_log_stream());
}
}
}
}
Expand Down Expand Up @@ -303,4 +353,33 @@ impl Client {
Ok(())
})
}

pub fn put_retention_policy(&self) -> ClientResult<(), PutRetentionPolicyError> {
let client = self.client.clone();
let group_name = self.group_name.clone();
let retention_days = self.retention_days.clone();
Box::pin(async move {
client
.put_retention_policy()
.log_group_name(group_name)
.retention_in_days(retention_days)
.send()
.await?;
Ok(())
})
}

pub fn delete_retention_policy(&self) -> ClientResult<(), DeleteRetentionPolicyError> {
let client = self.client.clone();
let group_name = self.group_name.clone();

Box::pin(async move {
client
.delete_retention_policy()
.log_group_name(group_name)
.send()
.await?;
Ok(())
})
}
}
18 changes: 16 additions & 2 deletions src/sinks/aws_cloudwatch_logs/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use std::{
};

use aws_sdk_cloudwatchlogs::error::{
CreateLogGroupError, CreateLogStreamError, DescribeLogStreamsError, PutLogEventsError,
CreateLogGroupError, CreateLogStreamError, DeleteRetentionPolicyError, DescribeLogStreamsError,
PutLogEventsError, PutRetentionPolicyError,
};
use aws_sdk_cloudwatchlogs::model::InputLogEvent;
use aws_sdk_cloudwatchlogs::types::SdkError;
Expand All @@ -31,7 +32,7 @@ use vector_core::stream::DriverResponse;
use crate::sinks::{
aws_cloudwatch_logs::{
config::CloudwatchLogsSinkConfig, request, retry::CloudwatchRetryLogic,
sink::BatchCloudwatchRequest, CloudwatchKey,
sink::BatchCloudwatchRequest, CloudwatchKey, config::Retention,
},
util::{retries::FixedRetryPolicy, EncodedLength, TowerRequestConfig, TowerRequestSettings},
};
Expand Down Expand Up @@ -61,6 +62,8 @@ pub enum CloudwatchError {
Describe(SdkError<DescribeLogStreamsError>),
CreateStream(SdkError<CreateLogStreamError>),
CreateGroup(SdkError<CreateLogGroupError>),
PutRetentionPolicy(SdkError<PutRetentionPolicyError>),
DeleteRetentionPolicy(SdkError<DeleteRetentionPolicyError>),
NoStreamsFound,
}

Expand All @@ -76,6 +79,12 @@ impl fmt::Display for CloudwatchError {
write!(f, "CloudwatchError::CreateGroup: {}", error)
}
CloudwatchError::NoStreamsFound => write!(f, "CloudwatchError: No Streams Found"),
CloudwatchError::PutRetentionPolicy(error) => {
write!(f, "CloudwatchError::PutRetentionPolicy: {}", error)
}
CloudwatchError::DeleteRetentionPolicy(error) => {
write!(f, "CloudwatchError::DeleteRetentionPolicy: {}", error)
}
}
}
}
Expand Down Expand Up @@ -216,6 +225,8 @@ impl CloudwatchLogsSvc {
let create_missing_group = config.create_missing_group;
let create_missing_stream = config.create_missing_stream;

let retention = config.retention.clone();

CloudwatchLogsSvc {
headers: config.request.headers,
client,
Expand All @@ -224,6 +235,7 @@ impl CloudwatchLogsSvc {
group_name,
create_missing_group,
create_missing_stream,
retention,
token: None,
token_rx: None,
}
Expand Down Expand Up @@ -305,6 +317,7 @@ impl Service<Vec<InputLogEvent>> for CloudwatchLogsSvc {
self.group_name.clone(),
self.create_missing_group,
self.create_missing_stream,
self.retention.clone(),
event_batches,
self.token.take(),
tx,
Expand All @@ -323,6 +336,7 @@ pub struct CloudwatchLogsSvc {
group_name: String,
create_missing_group: bool,
create_missing_stream: bool,
retention: Retention,
token: Option<String>,
token_rx: Option<oneshot::Receiver<Option<String>>>,
}
Expand Down

0 comments on commit 21d9a58

Please sign in to comment.