Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(aws_cloudwatch_logs sink): allow setting type of log class to cr… #22031

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
This deprecates the `create_missing_group` field within the AWS Cloudwatch logs. Use the
`group_class` field to explicitly specify the correct log class group to use if you need to auto
create a missing group.

authors: PriceHiller
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The Cloudwatch Logs Sink now supports specifying the type of log class to create.

authors: PriceHiller
39 changes: 39 additions & 0 deletions src/sinks/aws_cloudwatch_logs/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,30 @@ where
}
}

/// Defines the log class to create if missing
///
/// See https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/CloudWatch_Logs_Log_Classes.html
#[configurable_component]
#[derive(Clone, Debug, Default)]
pub enum LogGroupClassDef {
/// Logs that require real-time monitoring or frequently accessed logs
#[default]
Standard,
/// Log class that can be used to cost-effectively consolidate logs
InfrequentAccess,
}

impl From<LogGroupClassDef> for aws_sdk_cloudwatchlogs::types::LogGroupClass {
fn from(value: LogGroupClassDef) -> Self {
match value {
LogGroupClassDef::Standard => aws_sdk_cloudwatchlogs::types::LogGroupClass::Standard,
LogGroupClassDef::InfrequentAccess => {
aws_sdk_cloudwatchlogs::types::LogGroupClass::InfrequentAccess
}
}
}
}

/// Configuration for the `aws_cloudwatch_logs` sink.
#[configurable_component(sink(
"aws_cloudwatch_logs",
Expand Down Expand Up @@ -115,9 +139,22 @@ pub struct CloudwatchLogsSinkConfig {
/// the first stream.
///
/// [log_group]: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/Working-with-log-groups-and-streams.html
#[configurable(deprecated, metadata(docs::hidden))]
#[serde(default = "crate::serde::default_true")]
pub create_missing_group: bool,

/// Dynamically create a [log group][log_group] if it does not already exist with the specified
/// [group class][group_class].
///
/// This ignores `create_missing_stream` directly after creating the group and creates
/// the first stream.
///
/// [log_group]: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/Working-with-log-groups-and-streams.html
/// [group_class]: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/CloudWatch_Logs_Log_Classes.html
#[configurable(derived)]
#[serde(default)]
pub group_class: Option<LogGroupClassDef>,

/// Dynamically create a [log stream][log_stream] if it does not already exist.
///
/// [log_stream]: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/Working-with-log-groups-and-streams.html
Expand Down Expand Up @@ -232,12 +269,14 @@ impl GenerateConfig for CloudwatchLogsSinkConfig {
}

fn default_config(encoding: EncodingConfig) -> CloudwatchLogsSinkConfig {
#[allow(deprecated)]
CloudwatchLogsSinkConfig {
encoding,
group_name: Default::default(),
stream_name: Default::default(),
region: Default::default(),
create_missing_group: true,
group_class: Default::default(),
create_missing_stream: true,
retention: Default::default(),
compression: Default::default(),
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/aws_cloudwatch_logs/healthcheck.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub async fn healthcheck(
if config.group_name.is_dynamic() {
info!("Skipping healthcheck log group check: `group_name` is dynamic.");
Ok(())
} else if config.create_missing_group {
} else if config.group_class.is_some() {
info!("Skipping healthcheck log group check: `group_name` will be created if missing.");
Ok(())
} else {
Expand Down
7 changes: 7 additions & 0 deletions src/sinks/aws_cloudwatch_logs/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ async fn cloudwatch_insert_log_event() {
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
group_class: Default::default(),
retention: Default::default(),
compression: Default::default(),
batch: Default::default(),
Expand Down Expand Up @@ -94,6 +95,7 @@ async fn cloudwatch_insert_log_events_sorted() {
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
group_class: Default::default(),
retention: Default::default(),
compression: Default::default(),
batch: Default::default(),
Expand Down Expand Up @@ -170,6 +172,7 @@ async fn cloudwatch_insert_out_of_range_timestamp() {
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
group_class: Default::default(),
retention: Default::default(),
compression: Default::default(),
batch: Default::default(),
Expand Down Expand Up @@ -247,6 +250,7 @@ async fn cloudwatch_dynamic_group_and_stream_creation() {
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
group_class: Default::default(),
retention: Default::default(),
compression: Default::default(),
batch: Default::default(),
Expand Down Expand Up @@ -303,6 +307,7 @@ async fn cloudwatch_insert_log_event_batched() {
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
group_class: Default::default(),
retention: Default::default(),
compression: Default::default(),
batch,
Expand Down Expand Up @@ -354,6 +359,7 @@ async fn cloudwatch_insert_log_event_partitioned() {
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
group_class: Default::default(),
retention: Default::default(),
compression: Default::default(),
batch: Default::default(),
Expand Down Expand Up @@ -447,6 +453,7 @@ async fn cloudwatch_healthcheck() {
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
group_class: Default::default(),
retention: Default::default(),
compression: Default::default(),
batch: Default::default(),
Expand Down
20 changes: 13 additions & 7 deletions src/sinks/aws_cloudwatch_logs/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use aws_sdk_cloudwatchlogs::{
put_log_events::{PutLogEventsError, PutLogEventsOutput},
put_retention_policy::PutRetentionPolicyError,
},
types::InputLogEvent,
types::{InputLogEvent, LogGroupClass},
Client as CloudwatchLogsClient,
};
use aws_smithy_runtime_api::client::{orchestrator::HttpResponse, result::SdkError};
Expand All @@ -38,6 +38,7 @@ struct Client {
client: CloudwatchLogsClient,
stream_name: String,
group_name: String,
group_class: Option<LogGroupClass>,
headers: IndexMap<HeaderName, HeaderValue>,
retention_days: u32,
}
Expand All @@ -60,18 +61,20 @@ impl CloudwatchFuture {
headers: IndexMap<HeaderName, HeaderValue>,
stream_name: String,
group_name: String,
create_missing_group: bool,
group_class: Option<LogGroupClass>,
create_missing_stream: bool,
retention: Retention,
mut events: Vec<Vec<InputLogEvent>>,
token: Option<String>,
token_tx: oneshot::Sender<Option<String>>,
) -> Self {
let retention_days = retention.days;
let create_missing_group = group_class.is_some();
let client = Client {
client,
stream_name,
group_name,
group_class,
headers,
retention_days,
};
Expand Down Expand Up @@ -288,12 +291,15 @@ impl Client {
pub fn create_log_group(&self) -> ClientResult<(), CreateLogGroupError> {
let client = self.client.clone();
let group_name = self.group_name.clone();
let group_class = self.group_class.clone();

Box::pin(async move {
client
.create_log_group()
.log_group_name(group_name)
.send()
.await?;
let mut client_log_group_builder = client.create_log_group().log_group_name(group_name);
client_log_group_builder = match group_class {
Some(class) => client_log_group_builder.log_group_class(class),
None => client_log_group_builder,
};
client_log_group_builder.send().await?;
Ok(())
})
}
Expand Down
20 changes: 15 additions & 5 deletions src/sinks/aws_cloudwatch_logs/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use aws_sdk_cloudwatchlogs::{
describe_log_streams::DescribeLogStreamsError, put_log_events::PutLogEventsError,
put_retention_policy::PutRetentionPolicyError,
},
types::InputLogEvent,
types::{InputLogEvent, LogGroupClass},
Client as CloudwatchLogsClient,
};
use aws_smithy_runtime_api::client::{orchestrator::HttpResponse, result::SdkError};
Expand Down Expand Up @@ -235,7 +235,17 @@ impl CloudwatchLogsSvc {
let group_name = key.group.clone();
let stream_name = key.stream.clone();

let create_missing_group = config.create_missing_group;
let group_class = match config.group_class {
Some(class) => Some(LogGroupClass::from(class)),
None => {
// Backwards compat for `create_missing_group` field
if config.create_missing_group {
Some(LogGroupClass::Standard)
} else {
None
}
}
};
let create_missing_stream = config.create_missing_stream;

let retention = config.retention.clone();
Expand All @@ -245,7 +255,7 @@ impl CloudwatchLogsSvc {
client,
stream_name,
group_name,
create_missing_group,
group_class,
create_missing_stream,
retention,
token: None,
Expand Down Expand Up @@ -319,7 +329,7 @@ impl Service<Vec<InputLogEvent>> for CloudwatchLogsSvc {
self.headers.clone(),
self.stream_name.clone(),
self.group_name.clone(),
self.create_missing_group,
self.group_class.clone(),
self.create_missing_stream,
self.retention.clone(),
event_batches,
Expand All @@ -337,7 +347,7 @@ pub struct CloudwatchLogsSvc {
headers: IndexMap<HeaderName, HeaderValue>,
stream_name: String,
group_name: String,
create_missing_group: bool,
group_class: Option<LogGroupClass>,
create_missing_stream: bool,
retention: Retention,
token: Option<String>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,18 +198,6 @@ base: components: sinks: aws_cloudwatch_logs: configuration: {
}
}
}
create_missing_group: {
description: """
Dynamically create a [log group][log_group] if it does not already exist.

This ignores `create_missing_stream` directly after creating the group and creates
the first stream.

[log_group]: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/Working-with-log-groups-and-streams.html
"""
required: false
type: bool: default: true
}
create_missing_stream: {
description: """
Dynamically create a [log stream][log_stream] if it does not already exist.
Expand Down Expand Up @@ -566,6 +554,23 @@ base: components: sinks: aws_cloudwatch_logs: configuration: {
required: false
type: string: examples: ["http://127.0.0.0:5000/path/to/service"]
}
group_class: {
description: """
Dynamically create a [log group][log_group] if it does not already exist with the specified
[group class][group_class].

This ignores `create_missing_stream` directly after creating the group and creates
the first stream.

[log_group]: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/Working-with-log-groups-and-streams.html
[group_class]: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/CloudWatch_Logs_Log_Classes.html
"""
required: false
type: string: enum: {
InfrequentAccess: "Log class that can be used to cost-effectively consolidate logs"
Standard: "Logs that require real-time monitoring or frequently accessed logs"
}
}
group_name: {
description: """
The [group name][group_name] of the target CloudWatch Logs stream.
Expand Down
Loading