Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement(aws config): Make IMDS client configurable for AWS authentication #15518

Merged
merged 8 commits into from
Dec 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,8 @@ aws-core = [
"dep:aws-smithy-client",
"dep:aws-smithy-http",
"dep:aws-smithy-http-tower",
"dep:aws-smithy-types"
"dep:aws-smithy-types",
"dep:serde_with",
]

# Anything that requires Protocol Buffers.
Expand Down
139 changes: 132 additions & 7 deletions src/aws/auth.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,52 @@
use std::time::Duration;

use aws_config::{
default_provider::credentials::DefaultCredentialsChain, sts::AssumeRoleProviderBuilder,
default_provider::credentials::DefaultCredentialsChain, imds, sts::AssumeRoleProviderBuilder,
};
use aws_types::{credentials::SharedCredentialsProvider, region::Region, Credentials};
use serde_with::serde_as;
use vector_common::sensitive_string::SensitiveString;
use vector_config::configurable_component;

// matches default load timeout from the SDK as of 0.10.1, but lets us confidently document the
// default rather than relying on the SDK default to not change
const DEFAULT_LOAD_TIMEOUT: Duration = Duration::from_secs(5);

/// IMDS Client Configuration for authenticating with AWS.
#[serde_as]
#[configurable_component]
#[derive(Copy, Clone, Debug, Derivative)]
#[derivative(Default)]
#[serde(deny_unknown_fields)]
pub struct ImdsAuthentication {
/// Number of IMDS retries for fetching tokens and metadata.
#[serde(default = "default_max_attempts")]
#[derivative(Default(value = "default_max_attempts()"))]
max_attempts: u32,

/// Connect timeout for IMDS.
#[serde(default = "default_timeout")]
#[serde(rename = "connect_timeout_seconds")]
#[serde_as(as = "serde_with::DurationSeconds<u64>")]
#[derivative(Default(value = "default_timeout()"))]
connect_timeout: Duration,
spencergilbert marked this conversation as resolved.
Show resolved Hide resolved
spencergilbert marked this conversation as resolved.
Show resolved Hide resolved

/// Read timeout for IMDS.
#[serde(default = "default_timeout")]
#[serde(rename = "read_timeout_seconds")]
#[serde_as(as = "serde_with::DurationSeconds<u64>")]
#[derivative(Default(value = "default_timeout()"))]
read_timeout: Duration,
spencergilbert marked this conversation as resolved.
Show resolved Hide resolved
}

const fn default_max_attempts() -> u32 {
4
}

const fn default_timeout() -> Duration {
Duration::from_secs(1)
}

/// Configuration of the authentication strategy for interacting with AWS services.
#[configurable_component]
#[derive(Clone, Debug, Derivative)]
Expand Down Expand Up @@ -45,6 +81,10 @@ pub enum AwsAuthentication {
/// Timeout for assuming the role, in seconds.
load_timeout_secs: Option<u64>,

/// Configuration for authenticating with AWS through IMDS.
#[serde(default)]
imds: ImdsAuthentication,

/// The AWS region to send STS requests to.
///
/// If not set, this will default to the configured region
Expand All @@ -57,6 +97,10 @@ pub enum AwsAuthentication {
Default {
/// Timeout for successfully loading any credentials, in seconds.
load_timeout_secs: Option<u64>,

/// Configuration for authenticating with AWS through IMDS.
#[serde(default)]
imds: ImdsAuthentication,
},
}

Expand All @@ -80,17 +124,24 @@ impl AwsAuthentication {
AwsAuthentication::Role {
assume_role,
load_timeout_secs,
imds,
region,
} => {
let auth_region = region.clone().map(Region::new).unwrap_or(service_region);
let provider = AssumeRoleProviderBuilder::new(assume_role)
.region(auth_region.clone())
.build(default_credentials_provider(auth_region, *load_timeout_secs).await);
.build(
default_credentials_provider(auth_region, *load_timeout_secs, *imds)
.await?,
);

Ok(SharedCredentialsProvider::new(provider))
}
AwsAuthentication::Default { load_timeout_secs } => Ok(SharedCredentialsProvider::new(
default_credentials_provider(service_region, *load_timeout_secs).await,
AwsAuthentication::Default {
load_timeout_secs,
imds,
} => Ok(SharedCredentialsProvider::new(
default_credentials_provider(service_region, *load_timeout_secs, *imds).await?,
)),
}
}
Expand All @@ -107,23 +158,35 @@ impl AwsAuthentication {
async fn default_credentials_provider(
region: Region,
load_timeout_secs: Option<u64>,
) -> SharedCredentialsProvider {
imds: ImdsAuthentication,
) -> crate::Result<SharedCredentialsProvider> {
let client = imds::Client::builder()
.max_attempts(imds.max_attempts)
.connect_timeout(imds.connect_timeout)
.read_timeout(imds.read_timeout)
.build()
.await?;

let chain = DefaultCredentialsChain::builder()
.region(region)
.imds_client(client)
.load_timeout(
load_timeout_secs
.map(Duration::from_secs)
.unwrap_or(DEFAULT_LOAD_TIMEOUT),
);

SharedCredentialsProvider::new(chain.build().await)
Ok(SharedCredentialsProvider::new(chain.build().await))
}

#[cfg(test)]
mod tests {
use super::*;
use serde::{Deserialize, Serialize};

const CONNECT_TIMEOUT: Duration = Duration::from_secs(30);
const READ_TIMEOUT: Duration = Duration::from_secs(10);

#[derive(Serialize, Deserialize, Clone, Debug)]
struct ComponentConfig {
assume_role: Option<String>,
Expand Down Expand Up @@ -154,7 +217,32 @@ mod tests {
assert!(matches!(
config.auth,
AwsAuthentication::Default {
load_timeout_secs: Some(10)
load_timeout_secs: Some(10),
imds: ImdsAuthentication { .. },
}
));
}

#[test]
fn parsing_default_with_imds_client() {
let config = toml::from_str::<ComponentConfig>(
r#"
auth.imds.max_attempts = 5
auth.imds.connect_timeout_seconds = 30
auth.imds.read_timeout_seconds = 10
"#,
)
.unwrap();

assert!(matches!(
config.auth,
AwsAuthentication::Default {
load_timeout_secs: None,
imds: ImdsAuthentication {
max_attempts: 5,
connect_timeout: CONNECT_TIMEOUT,
read_timeout: READ_TIMEOUT,
},
}
));
}
Expand Down Expand Up @@ -184,6 +272,41 @@ mod tests {
assert!(matches!(config.auth, AwsAuthentication::Role { .. }));
}

#[test]
fn parsing_assume_role_with_imds_client() {
let config = toml::from_str::<ComponentConfig>(
r#"
auth.assume_role = "root"
auth.imds.max_attempts = 5
auth.imds.connect_timeout_seconds = 30
auth.imds.read_timeout_seconds = 10
"#,
)
.unwrap();

match config.auth {
AwsAuthentication::Role {
assume_role,
load_timeout_secs,
imds,
region,
} => {
assert_eq!(&assume_role, "root");
assert_eq!(load_timeout_secs, None);
assert!(matches!(
imds,
ImdsAuthentication {
max_attempts: 5,
connect_timeout: CONNECT_TIMEOUT,
read_timeout: READ_TIMEOUT,
}
));
assert_eq!(region, None);
}
_ => panic!(),
}
}

#[test]
fn parsing_both_assume_role() {
let config = toml::from_str::<ComponentConfig>(
Expand All @@ -200,10 +323,12 @@ mod tests {
AwsAuthentication::Role {
assume_role,
load_timeout_secs,
imds,
region,
} => {
assert_eq!(&assume_role, "auth.root");
assert_eq!(load_timeout_secs, Some(10));
assert!(matches!(imds, ImdsAuthentication { .. }));
assert_eq!(region.unwrap(), "us-west-2");
}
_ => panic!(),
Expand Down
2 changes: 1 addition & 1 deletion src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::{Duration, SystemTime};

pub use auth::AwsAuthentication;
pub use auth::{AwsAuthentication, ImdsAuthentication};
use aws_config::meta::region::ProvideRegion;
use aws_sigv4::http_request::{SignableRequest, SigningSettings};
use aws_sigv4::SigningParams;
Expand Down
3 changes: 2 additions & 1 deletion src/sinks/aws_kinesis/firehose/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use tokio::time::{sleep, Duration};

use super::{config::KinesisFirehoseClientBuilder, *};
use crate::{
aws::{create_client, AwsAuthentication, RegionOrEndpoint},
aws::{create_client, AwsAuthentication, ImdsAuthentication, RegionOrEndpoint},
config::{ProxyConfig, SinkConfig, SinkContext},
sinks::{
elasticsearch::{BulkConfig, ElasticsearchAuth, ElasticsearchCommon, ElasticsearchConfig},
Expand Down Expand Up @@ -73,6 +73,7 @@ async fn firehose_put_records() {
let config = ElasticsearchConfig {
auth: Some(ElasticsearchAuth::Aws(AwsAuthentication::Default {
load_timeout_secs: Some(5),
imds: ImdsAuthentication::default(),
})),
endpoints: vec![elasticsearch_address()],
bulk: Some(BulkConfig {
Expand Down
5 changes: 4 additions & 1 deletion src/sinks/elasticsearch/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use vector_core::{

use super::{config::DATA_STREAM_TIMESTAMP_KEY, *};
use crate::{
aws::RegionOrEndpoint,
aws::{ImdsAuthentication, RegionOrEndpoint},
config::{ProxyConfig, SinkConfig, SinkContext},
http::HttpClient,
sinks::{
Expand Down Expand Up @@ -252,6 +252,7 @@ async fn auto_version_aws() {
let config = ElasticsearchConfig {
auth: Some(ElasticsearchAuth::Aws(AwsAuthentication::Default {
load_timeout_secs: Some(5),
imds: ImdsAuthentication::default(),
})),
endpoints: vec![aws_server()],
aws: Some(RegionOrEndpoint::with_region(String::from("localstack"))),
Expand Down Expand Up @@ -331,6 +332,7 @@ async fn insert_events_on_aws() {
ElasticsearchConfig {
auth: Some(ElasticsearchAuth::Aws(AwsAuthentication::Default {
load_timeout_secs: Some(5),
imds: ImdsAuthentication::default(),
})),
endpoints: vec![aws_server()],
aws: Some(RegionOrEndpoint::with_region(String::from("localstack"))),
Expand All @@ -351,6 +353,7 @@ async fn insert_events_on_aws_with_compression() {
ElasticsearchConfig {
auth: Some(ElasticsearchAuth::Aws(AwsAuthentication::Default {
load_timeout_secs: Some(5),
imds: ImdsAuthentication::default(),
})),
endpoints: vec![aws_server()],
aws: Some(RegionOrEndpoint::with_region(String::from("localstack"))),
Expand Down
29 changes: 29 additions & 0 deletions website/cue/reference/components/aws.cue
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,35 @@ components: _aws: {
examples: [30]
}
}
imds: {
description: "Configuration for authenticating with AWS through IMDS."
required: false
type: object: {
options: {
connect_timeout_seconds: {
description: "Connect timeout for IMDS."
required: false
type: uint: {
default: 1
unit: "seconds"
}
}
max_attempts: {
description: "Number of IMDS retries for fetching tokens and metadata."
required: false
type: uint: default: 4
}
read_timeout_seconds: {
description: "Read timeout for IMDS."
required: false
type: uint: {
default: 1
unit: "seconds"
}
}
}
}
}
profile: {
category: "Auth"
common: false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,40 @@ base: components: sinks: aws_cloudwatch_logs: configuration: {
required: true
type: string: syntax: "literal"
}
imds: {
description: "Configuration for authenticating with AWS through IMDS."
required: false
type: object: {
default: {
connect_timeout_seconds: 1
max_attempts: 4
read_timeout_seconds: 1
}
options: {
connect_timeout_seconds: {
description: "Connect timeout for IMDS."
required: false
type: uint: {
default: 1
unit: "seconds"
}
}
max_attempts: {
description: "Number of IMDS retries for fetching tokens and metadata."
required: false
type: uint: default: 4
}
read_timeout_seconds: {
description: "Read timeout for IMDS."
required: false
type: uint: {
default: 1
unit: "seconds"
}
}
}
}
}
load_timeout_secs: {
description: "Timeout for successfully loading any credentials, in seconds."
required: false
Expand Down
Loading