diff --git a/Cargo.lock b/Cargo.lock index 027ab3b3e650b..7e29e8b7036da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1105,7 +1105,7 @@ dependencies = [ "quote 1.0.10", "regex", "rustc-hash", - "shlex 0.1.1", + "shlex", "which 3.1.1", ] @@ -6440,118 +6440,6 @@ dependencies = [ "xmlparser", ] -[[package]] -name = "rusoto_core" -version = "0.47.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b4f000e8934c1b4f70adde180056812e7ea6b1a247952db8ee98c94cd3116cc" -dependencies = [ - "async-trait", - "base64 0.13.0", - "bytes 1.1.0", - "crc32fast", - "flate2", - "futures 0.3.21", - "http", - "hyper", - "hyper-tls", - "lazy_static", - "log", - "rusoto_credential", - "rusoto_signature", - "rustc_version 0.4.0", - "serde", - "serde_json", - "tokio", - "xml-rs", -] - -[[package]] -name = "rusoto_credential" -version = "0.47.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a46b67db7bb66f5541e44db22b0a02fed59c9603e146db3a9e633272d3bac2f" -dependencies = [ - "async-trait", - "chrono", - "dirs-next", - "futures 0.3.21", - "hyper", - "serde", - "serde_json", - "shlex 1.1.0", - "tokio", - "zeroize", -] - -[[package]] -name = "rusoto_s3" -version = "0.47.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "048c2fe811a823ad5a9acc976e8bf4f1d910df719dcf44b15c3e96c5b7a51027" -dependencies = [ - "async-trait", - "bytes 1.1.0", - "futures 0.3.21", - "rusoto_core", - "xml-rs", -] - -[[package]] -name = "rusoto_signature" -version = "0.47.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6264e93384b90a747758bcc82079711eacf2e755c3a8b5091687b5349d870bcc" -dependencies = [ - "base64 0.13.0", - "bytes 1.1.0", - "chrono", - "digest 0.9.0", - "futures 0.3.21", - "hex", - "hmac", - "http", - "hyper", - "log", - "md-5 0.9.1", - "percent-encoding", - "pin-project-lite", - "rusoto_credential", - "rustc_version 0.4.0", - "serde", - "sha2 0.9.8", - "tokio", -] - -[[package]] -name = "rusoto_sqs" -version = "0.47.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ae091bb560b2aa3b6ec2ab8224516b63f6b6f7c495ae4e41f0566089b156e5f" -dependencies = [ - "async-trait", - "bytes 1.1.0", - "futures 0.3.21", - "rusoto_core", - "serde_urlencoded 0.7.1", - "xml-rs", -] - -[[package]] -name = "rusoto_sts" -version = "0.47.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e7edd42473ac006fd54105f619e480b0a94136e7f53cf3fb73541363678fd92" -dependencies = [ - "async-trait", - "bytes 1.1.0", - "chrono", - "futures 0.3.21", - "rusoto_core", - "serde_urlencoded 0.7.1", - "xml-rs", -] - [[package]] name = "rust-argon2" version = "0.8.3" @@ -7109,12 +6997,6 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7fdf1b9db47230893d76faad238fd6097fd6d6a9245cd7a4d90dbd639536bbd2" -[[package]] -name = "shlex" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43b2853a4d09f215c24cc5489c992ce46052d359b5109343cbafbf26bc62f8a3" - [[package]] name = "signal-hook" version = "0.3.13" @@ -8824,12 +8706,6 @@ dependencies = [ "rmp-serde", "rmpv", "roaring", - "rusoto_core", - "rusoto_credential", - "rusoto_s3", - "rusoto_signature", - "rusoto_sqs", - "rusoto_sts", "schannel", "seahash", "security-framework", diff --git a/Cargo.toml b/Cargo.toml index 2108c5169342b..81b7235b7b9e0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -139,14 +139,6 @@ tracing-tower = { git = "https://github.com/tokio-rs/tracing", default-features metrics = { version = "0.17.1", default-features = false, features = ["std"] } metrics-tracing-context = { version = "0.9.0", default-features = false } -# AWS - Rusoto -rusoto_core = { version = "0.47.0", features = ["encoding"], optional = true } -rusoto_credential = { version = "0.47.0", optional = true } -rusoto_s3 = { version = "0.47.0", optional = true } -rusoto_signature = { version = "0.47.0", optional = true } -rusoto_sqs = { version = "0.47.0", optional = true } -rusoto_sts = { version = "0.47.0", optional = true } - # AWS - Official SDK aws-config = { version = "0.9.0", optional = true } aws-types = { version = "0.9.0", optional = true, features = ["hardcoded-credentials"]} @@ -373,7 +365,6 @@ target-powerpc64le-unknown-linux-gnu = ["api", "api-client", "enrichment-tables" # currently doesn't build due to lack of support for 64-bit atomics target-powerpc-unknown-linux-gnu = ["api", "api-client", "enrichment-tables", "rdkafka/cmake_build", "sinks", "sources", "sources-dnstap", "transforms", "unix", "openssl/vendored", "vrl-cli", "datadog-pipelines"] -rusoto = ["rusoto_core", "rusoto_credential", "rusoto_signature", "rusoto_sts"] # Enables features that work only on systems providing `cfg(unix)` unix = ["tikv-jemallocator"] diff --git a/src/aws/auth.rs b/src/aws/auth.rs index 30be6d821bedd..95dacfee9d1f1 100644 --- a/src/aws/auth.rs +++ b/src/aws/auth.rs @@ -1,3 +1,5 @@ +use aws_config::{default_provider::credentials::default_provider, sts::AssumeRoleProviderBuilder}; +use aws_types::{credentials::SharedCredentialsProvider, Credentials}; use serde::{Deserialize, Serialize}; /// Configuration for configuring authentication strategy for AWS. @@ -25,6 +27,43 @@ pub enum AwsAuthentication { Default {}, } +impl AwsAuthentication { + pub async fn credentials_provider(&self) -> crate::Result { + match self { + Self::Static { + access_key_id, + secret_access_key, + } => Ok(SharedCredentialsProvider::new(Credentials::from_keys( + access_key_id, + secret_access_key, + None, + ))), + AwsAuthentication::File { .. } => { + Err("Overriding the credentials file is not supported.".into()) + } + AwsAuthentication::Role { assume_role } => Ok(SharedCredentialsProvider::new( + AssumeRoleProviderBuilder::new(assume_role) + .build(default_credentials_provider().await), + )), + AwsAuthentication::Default {} => Ok(SharedCredentialsProvider::new( + default_credentials_provider().await, + )), + } + } + + #[cfg(test)] + pub fn test_auth() -> AwsAuthentication { + AwsAuthentication::Static { + access_key_id: "dummy".to_string(), + secret_access_key: "dummy".to_string(), + } + } +} + +async fn default_credentials_provider() -> SharedCredentialsProvider { + SharedCredentialsProvider::new(default_provider().await) +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/aws/aws_sdk/auth.rs b/src/aws/aws_sdk/auth.rs deleted file mode 100644 index f464bb4b29302..0000000000000 --- a/src/aws/aws_sdk/auth.rs +++ /dev/null @@ -1,41 +0,0 @@ -use aws_config::{default_provider::credentials::default_provider, sts::AssumeRoleProviderBuilder}; -use aws_types::{credentials::SharedCredentialsProvider, Credentials}; - -use crate::aws::auth::AwsAuthentication; - -impl AwsAuthentication { - pub async fn credentials_provider(&self) -> crate::Result { - match self { - Self::Static { - access_key_id, - secret_access_key, - } => Ok(SharedCredentialsProvider::new(Credentials::from_keys( - access_key_id, - secret_access_key, - None, - ))), - AwsAuthentication::File { .. } => { - Err("Overriding the credentials file is not supported.".into()) - } - AwsAuthentication::Role { assume_role } => Ok(SharedCredentialsProvider::new( - AssumeRoleProviderBuilder::new(assume_role) - .build(default_credentials_provider().await), - )), - AwsAuthentication::Default {} => Ok(SharedCredentialsProvider::new( - default_credentials_provider().await, - )), - } - } - - #[cfg(test)] - pub fn test_auth() -> AwsAuthentication { - AwsAuthentication::Static { - access_key_id: "dummy".to_string(), - secret_access_key: "dummy".to_string(), - } - } -} - -async fn default_credentials_provider() -> SharedCredentialsProvider { - SharedCredentialsProvider::new(default_provider().await) -} diff --git a/src/aws/aws_sdk/mod.rs b/src/aws/aws_sdk/mod.rs deleted file mode 100644 index 5410c450d8561..0000000000000 --- a/src/aws/aws_sdk/mod.rs +++ /dev/null @@ -1,107 +0,0 @@ -use crate::aws::AwsAuthentication; -use crate::config::ProxyConfig; -use crate::http::{build_proxy_connector, build_tls_connector}; -use crate::tls::{MaybeTlsSettings, TlsOptions}; -use aws_smithy_client::erase::DynConnector; -use aws_smithy_client::SdkError; -use aws_smithy_http::endpoint::Endpoint; -use aws_types::credentials::SharedCredentialsProvider; -use aws_types::region::Region; -use once_cell::sync::OnceCell; -use regex::RegexSet; - -mod auth; -mod region; - -static RETRIABLE_CODES: OnceCell = OnceCell::new(); - -pub fn is_retriable_error(error: &SdkError) -> bool { - match error { - SdkError::TimeoutError(_) => true, - SdkError::DispatchFailure(_) => true, - SdkError::ResponseError { err: _, raw } | SdkError::ServiceError { err: _, raw } => { - // This header is a direct indication that we should retry the request. Eventually it'd - // be nice to actually schedule the retry after the given delay, but for now we just - // check that it contains a positive value. - let retry_header = raw.http().headers().get("x-amz-retry-after").is_some(); - - // Certain 400-level responses will contain an error code indicating that the request - // should be retried. Since we don't retry 400-level responses by default, we'll look - // for these specifically before falling back to more general heuristics. Because AWS - // services use a mix of XML and JSON response bodies and Rusoto doesn't give us - // a parsed representation, we resort to a simple string match. - // - // S3: RequestTimeout - // SQS: RequestExpired, ThrottlingException - // ECS: RequestExpired, ThrottlingException - // Kinesis: RequestExpired, ThrottlingException - // Cloudwatch: RequestExpired, ThrottlingException - // - // Now just look for those when it's a client_error - let re = RETRIABLE_CODES.get_or_init(|| { - RegexSet::new(&["RequestTimeout", "RequestExpired", "ThrottlingException"]) - .expect("invalid regex") - }); - - let status = raw.http().status(); - let response_body = String::from_utf8_lossy(raw.http().body().bytes().unwrap_or(&[])); - - retry_header - || status.is_server_error() - || status == http::StatusCode::TOO_MANY_REQUESTS - || (status.is_client_error() && re.is_match(response_body.as_ref())) - } - _ => false, - } -} - -pub trait ClientBuilder { - type ConfigBuilder; - type Client; - - fn create_config_builder( - credentials_provider: SharedCredentialsProvider, - ) -> Self::ConfigBuilder; - - fn with_endpoint_resolver( - builder: Self::ConfigBuilder, - endpoint: Endpoint, - ) -> Self::ConfigBuilder; - - fn with_region(builder: Self::ConfigBuilder, region: Region) -> Self::ConfigBuilder; - - fn client_from_conf_conn(builder: Self::ConfigBuilder, connector: DynConnector) - -> Self::Client; -} - -pub async fn create_client( - auth: &AwsAuthentication, - region: Option, - endpoint: Option, - proxy: &ProxyConfig, - tls_options: &Option, -) -> crate::Result { - let mut config_builder = T::create_config_builder(auth.credentials_provider().await?); - - if let Some(endpoint_override) = endpoint { - config_builder = T::with_endpoint_resolver(config_builder, endpoint_override); - } - - if let Some(region) = region { - config_builder = T::with_region(config_builder, region); - } - - let tls_settings = MaybeTlsSettings::tls_client(tls_options)?; - - let connector = if proxy.enabled { - let proxy = build_proxy_connector(tls_settings, proxy)?; - let hyper_client = aws_smithy_client::hyper_ext::Adapter::builder().build(proxy); - aws_smithy_client::erase::DynConnector::new(hyper_client) - } else { - let tls_connector = build_tls_connector(tls_settings)?; - let hyper_client = aws_smithy_client::hyper_ext::Adapter::builder().build(tls_connector); - aws_smithy_client::erase::DynConnector::new(hyper_client) - }; - - Ok(T::client_from_conf_conn(config_builder, connector)) -} diff --git a/src/aws/aws_sdk/region.rs b/src/aws/aws_sdk/region.rs deleted file mode 100644 index cf193b18f67ca..0000000000000 --- a/src/aws/aws_sdk/region.rs +++ /dev/null @@ -1,20 +0,0 @@ -use aws_smithy_http::endpoint::Endpoint; -use aws_types::region::Region; -use http::Uri; -use std::str::FromStr; - -use crate::aws::region::RegionOrEndpoint; - -impl RegionOrEndpoint { - pub fn endpoint(&self) -> crate::Result> { - if let Some(endpoint) = &self.endpoint { - Ok(Some(Endpoint::immutable(Uri::from_str(endpoint)?))) - } else { - Ok(None) - } - } - - pub fn region(&self) -> Option { - self.region.clone().map(Region::new) - } -} diff --git a/src/aws/mod.rs b/src/aws/mod.rs index 772a7b93e18ac..08a7decd718b1 100644 --- a/src/aws/mod.rs +++ b/src/aws/mod.rs @@ -1,11 +1,108 @@ pub mod auth; pub mod region; +use crate::config::ProxyConfig; +use crate::http::{build_proxy_connector, build_tls_connector}; +use crate::tls::{MaybeTlsSettings, TlsOptions}; pub use auth::AwsAuthentication; +use aws_smithy_client::erase::DynConnector; +use aws_smithy_client::SdkError; +use aws_smithy_http::endpoint::Endpoint; +use aws_types::credentials::SharedCredentialsProvider; +use aws_types::region::Region; +use once_cell::sync::OnceCell; +use regex::RegexSet; pub use region::RegionOrEndpoint; -#[cfg(feature = "rusoto_core")] -pub mod rusoto; +static RETRIABLE_CODES: OnceCell = OnceCell::new(); -#[cfg(feature = "aws-core")] -pub mod aws_sdk; +pub fn is_retriable_error(error: &SdkError) -> bool { + match error { + SdkError::TimeoutError(_) => true, + SdkError::DispatchFailure(_) => true, + SdkError::ResponseError { err: _, raw } | SdkError::ServiceError { err: _, raw } => { + // This header is a direct indication that we should retry the request. Eventually it'd + // be nice to actually schedule the retry after the given delay, but for now we just + // check that it contains a positive value. + let retry_header = raw.http().headers().get("x-amz-retry-after").is_some(); + + // Certain 400-level responses will contain an error code indicating that the request + // should be retried. Since we don't retry 400-level responses by default, we'll look + // for these specifically before falling back to more general heuristics. Because AWS + // services use a mix of XML and JSON response bodies and the AWS SDK doesn't give us + // a parsed representation, we resort to a simple string match. + // + // S3: RequestTimeout + // SQS: RequestExpired, ThrottlingException + // ECS: RequestExpired, ThrottlingException + // Kinesis: RequestExpired, ThrottlingException + // Cloudwatch: RequestExpired, ThrottlingException + // + // Now just look for those when it's a client_error + let re = RETRIABLE_CODES.get_or_init(|| { + RegexSet::new(&["RequestTimeout", "RequestExpired", "ThrottlingException"]) + .expect("invalid regex") + }); + + let status = raw.http().status(); + let response_body = String::from_utf8_lossy(raw.http().body().bytes().unwrap_or(&[])); + + retry_header + || status.is_server_error() + || status == http::StatusCode::TOO_MANY_REQUESTS + || (status.is_client_error() && re.is_match(response_body.as_ref())) + } + _ => false, + } +} + +pub trait ClientBuilder { + type ConfigBuilder; + type Client; + + fn create_config_builder( + credentials_provider: SharedCredentialsProvider, + ) -> Self::ConfigBuilder; + + fn with_endpoint_resolver( + builder: Self::ConfigBuilder, + endpoint: Endpoint, + ) -> Self::ConfigBuilder; + + fn with_region(builder: Self::ConfigBuilder, region: Region) -> Self::ConfigBuilder; + + fn client_from_conf_conn(builder: Self::ConfigBuilder, connector: DynConnector) + -> Self::Client; +} + +pub async fn create_client( + auth: &AwsAuthentication, + region: Option, + endpoint: Option, + proxy: &ProxyConfig, + tls_options: &Option, +) -> crate::Result { + let mut config_builder = T::create_config_builder(auth.credentials_provider().await?); + + if let Some(endpoint_override) = endpoint { + config_builder = T::with_endpoint_resolver(config_builder, endpoint_override); + } + + if let Some(region) = region { + config_builder = T::with_region(config_builder, region); + } + + let tls_settings = MaybeTlsSettings::tls_client(tls_options)?; + + let connector = if proxy.enabled { + let proxy = build_proxy_connector(tls_settings, proxy)?; + let hyper_client = aws_smithy_client::hyper_ext::Adapter::builder().build(proxy); + aws_smithy_client::erase::DynConnector::new(hyper_client) + } else { + let tls_connector = build_tls_connector(tls_settings)?; + let hyper_client = aws_smithy_client::hyper_ext::Adapter::builder().build(tls_connector); + aws_smithy_client::erase::DynConnector::new(hyper_client) + }; + + Ok(T::client_from_conf_conn(config_builder, connector)) +} diff --git a/src/aws/region.rs b/src/aws/region.rs index 35889c8c9afc9..c8881ab937521 100644 --- a/src/aws/region.rs +++ b/src/aws/region.rs @@ -1,4 +1,8 @@ +use aws_smithy_http::endpoint::Endpoint; +use aws_types::region::Region; +use http::Uri; use serde::{Deserialize, Serialize}; +use std::str::FromStr; #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)] #[serde(default)] @@ -28,4 +32,16 @@ impl RegionOrEndpoint { endpoint: Some(endpoint.into()), } } + + pub fn endpoint(&self) -> crate::Result> { + if let Some(endpoint) = &self.endpoint { + Ok(Some(Endpoint::immutable(Uri::from_str(endpoint)?))) + } else { + Ok(None) + } + } + + pub fn region(&self) -> Option { + self.region.clone().map(Region::new) + } } diff --git a/src/aws/rusoto/auth.rs b/src/aws/rusoto/auth.rs deleted file mode 100644 index 9da88de1fdbd8..0000000000000 --- a/src/aws/rusoto/auth.rs +++ /dev/null @@ -1,93 +0,0 @@ -use rusoto_core::Region; - -use crate::aws::{auth::AwsAuthentication, rusoto::AwsCredentialsProvider}; - -const AWS_DEFAULT_PROFILE: &str = "default"; - -impl AwsAuthentication { - pub fn build( - &self, - region: &Region, - old_assume_role: Option, - ) -> crate::Result { - if old_assume_role.is_some() { - warn!("Option `assume_role` has been renamed to `auth.assume_role`. Please use that one instead."); - } - match self { - Self::Static { - access_key_id, - secret_access_key, - } => { - if old_assume_role.is_some() { - warn!("Ignoring option `assume_role`, instead using access options."); - } - Ok(AwsCredentialsProvider::new_minimal( - access_key_id, - secret_access_key, - )) - } - Self::File { - credentials_file, - profile, - } => { - if old_assume_role.is_some() { - warn!( - "Ignoring option `assume_role`, instead using AWS credentials file options." - ); - } - AwsCredentialsProvider::new_with_credentials_file( - credentials_file, - profile - .as_ref() - .unwrap_or(&AWS_DEFAULT_PROFILE.to_string()) - .as_str(), - ) - } - Self::Role { assume_role } => { - if old_assume_role.is_some() { - warn!( - "Ignoring option `assume_role`, instead using option `auth.assume_role`." - ); - } - AwsCredentialsProvider::new(region, Some(assume_role.clone())) - } - Self::Default {} => AwsCredentialsProvider::new(region, old_assume_role), - } - } -} -#[cfg(test)] -mod test { - use std::{fs::File, io::Write}; - - use rusoto_core::Region; - - use super::*; - use crate::aws::auth::AwsAuthentication; - - #[test] - fn parsing_credentials_file() { - let tmpdir = tempfile::tempdir().unwrap(); - let tmpfile_path = tmpdir.path().join("credentials"); - let mut tmpfile = File::create(&tmpfile_path).unwrap(); - - writeln!( - tmpfile, - r#" - [default] - aws_access_key_id = default-access-key-id - aws_secret_access_key = default-secret - "# - ) - .unwrap(); - - let auth = AwsAuthentication::File { - credentials_file: tmpfile_path.to_str().unwrap().to_string(), - profile: Some("default".to_string()), - }; - let result = auth.build(&Region::AfSouth1, None).unwrap(); - assert!(matches!(result, AwsCredentialsProvider::File { .. })); - - drop(tmpfile); - tmpdir.close().unwrap(); - } -} diff --git a/src/aws/rusoto/mod.rs b/src/aws/rusoto/mod.rs deleted file mode 100644 index 67bd6e44688c6..0000000000000 --- a/src/aws/rusoto/mod.rs +++ /dev/null @@ -1,414 +0,0 @@ -mod auth; -pub mod region; - -//TODO: replace with direct import -use std::{ - fmt, io, - pin::Pin, - task::{Context, Poll}, - time::Duration, -}; - -use async_trait::async_trait; -use bytes::Bytes; -use futures::StreamExt; -use http::{ - header::{HeaderMap, HeaderName, HeaderValue}, - Method, Request, Response, StatusCode, -}; -use hyper::{ - body::{Body, HttpBody}, - client, -}; -use once_cell::sync::OnceCell; -use regex::bytes::RegexSet; -pub use region::{region_from_endpoint, RegionOrEndpoint}; -use rusoto_core::{ - credential::ProfileProvider, - request::{ - DispatchSignedRequest, DispatchSignedRequestFuture, HttpDispatchError, HttpResponse, - }, - ByteStream, Region, RusotoError, -}; -use rusoto_credential::{ - AutoRefreshingProvider, AwsCredentials, ChainProvider, CredentialsError, ProvideAwsCredentials, - StaticProvider, -}; -use rusoto_signature::{SignedRequest, SignedRequestPayload}; -use rusoto_sts::{StsAssumeRoleSessionCredentialsProvider, StsClient, WebIdentityProvider}; -use snafu::{ResultExt, Snafu}; -use tower::{Service, ServiceExt}; - -pub use super::auth::AwsAuthentication; -use crate::{config::ProxyConfig, http::HttpError, tls::MaybeTlsSettings}; -// use crate::http; - -pub type Client = HttpClient>; - -pub fn client(tls_setting: Option, proxy: &ProxyConfig) -> crate::Result { - let settings = tls_setting.unwrap_or(MaybeTlsSettings::enable_client()?); - let client = crate::http::HttpClient::new(settings, proxy)?; - Ok(HttpClient { client }) -} - -pub fn custom_client( - proxy: &ProxyConfig, - client_builder: &mut client::Builder, -) -> crate::Result { - let settings = MaybeTlsSettings::enable_client()?; - let client = crate::http::HttpClient::new_with_custom_client(settings, proxy, client_builder)?; - Ok(HttpClient { client }) -} - -#[derive(Debug, Snafu)] -enum AwsRusotoError { - #[snafu(display("Failed to create request dispatcher"))] - DispatcherError, - - #[snafu(display("Invalid AWS credentials: {}", source))] - InvalidAwsCredentials { source: CredentialsError }, -} - -// A custom chain provider incorporating web identity support -// See - https://github.com/rusoto/rusoto/issues/1781 -pub struct CustomChainProvider { - chain_provider: ChainProvider, - web_provider: WebIdentityProvider, -} - -impl CustomChainProvider { - pub fn new() -> CustomChainProvider { - CustomChainProvider { - chain_provider: ChainProvider::new(), - web_provider: WebIdentityProvider::from_k8s_env(), - } - } - - pub fn set_timeout(&mut self, duration: Duration) { - self.chain_provider.set_timeout(duration); - } -} - -impl Default for CustomChainProvider { - fn default() -> Self { - Self::new() - } -} - -#[async_trait] -impl ProvideAwsCredentials for CustomChainProvider { - async fn credentials(&self) -> Result { - match self.web_provider.credentials().await { - Ok(creds) => Ok(creds), - Err(error_1) => match self.chain_provider.credentials().await { - Ok(creds) => Ok(creds), - Err(error_2) => Err(CredentialsError::new(format!( - "Failed creating AWS credentials. Errors: {:?}", - [error_1, error_2] - ))), - }, - } - } -} - -// A place-holder for the types of AWS credentials we support -#[allow(clippy::large_enum_variant)] // discovered during Rust upgrade to 1.57; just allowing for now since we did previously -pub enum AwsCredentialsProvider { - Default(AutoRefreshingProvider), - Role(AutoRefreshingProvider), - Static(StaticProvider), - File(AutoRefreshingProvider), -} - -impl fmt::Debug for AwsCredentialsProvider { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let name = match self { - Self::Default(_) => "default", - Self::Role(_) => "role", - Self::Static(_) => "static", - Self::File(_) => "file", - }; - - f.debug_tuple("AwsCredentialsProvider") - .field(&name) - .finish() - } -} - -impl AwsCredentialsProvider { - pub fn new(region: &Region, assume_role: Option) -> crate::Result { - if let Some(role) = assume_role { - debug!("Using STS assume role credentials for AWS."); - - let dispatcher = rusoto_core::request::HttpClient::new() - .map_err(|_| AwsRusotoError::DispatcherError)?; - - let mut credentials = CustomChainProvider::new(); - credentials.set_timeout(Duration::from_secs(8)); - - let sts = StsClient::new_with(dispatcher, credentials, region.clone()); - - let provider = StsAssumeRoleSessionCredentialsProvider::new( - sts, - role, - "default".to_owned(), - None, - None, - None, - None, - ); - - let creds = - AutoRefreshingProvider::new(provider).context(InvalidAwsCredentialsSnafu)?; - Ok(Self::Role(creds)) - } else { - debug!("Using default credentials provider for AWS."); - let mut chain = CustomChainProvider::new(); - // 8 seconds because our default healthcheck timeout - // is 10 seconds. - chain.set_timeout(Duration::from_secs(8)); - - let creds = AutoRefreshingProvider::new(chain).context(InvalidAwsCredentialsSnafu)?; - - Ok(Self::Default(creds)) - } - } - - pub fn new_minimal, S: Into>(access_key: A, secret_key: S) -> Self { - Self::Static(StaticProvider::new_minimal( - access_key.into(), - secret_key.into(), - )) - } - - pub fn new_with_credentials_file(credentials_file: &str, profile: &str) -> crate::Result { - let creds = AutoRefreshingProvider::new(ProfileProvider::with_configuration( - credentials_file, - profile, - )) - .context(InvalidAwsCredentialsSnafu)?; - Ok(Self::File(creds)) - } -} - -#[async_trait] -impl ProvideAwsCredentials for AwsCredentialsProvider { - async fn credentials(&self) -> Result { - let fut = match self { - Self::Default(p) => p.credentials(), - Self::Role(p) => p.credentials(), - Self::Static(p) => p.credentials(), - Self::File(p) => p.credentials(), - }; - fut.await - } -} - -#[derive(Debug, Clone)] -pub struct HttpClient { - client: T, -} - -#[derive(Debug)] -pub struct RusotoBody { - inner: Option, -} - -impl HttpClient { - pub const fn new(client: T) -> Self { - HttpClient { client } - } -} - -impl DispatchSignedRequest for HttpClient -where - T: Service, Response = Response, Error = HttpError> - + Clone - + Send - + 'static, - T::Future: Send + 'static, -{ - // Adaptation of https://docs.rs/rusoto_core/0.44.0/src/rusoto_core/request.rs.html#314-416 - fn dispatch( - &self, - request: SignedRequest, - timeout: Option, - ) -> DispatchSignedRequestFuture { - assert!(timeout.is_none(), "timeout is not supported at this level"); - - let client = self.client.clone(); - - Box::pin(async move { - let method = match request.method() { - "POST" => Method::POST, - "PUT" => Method::PUT, - "DELETE" => Method::DELETE, - "GET" => Method::GET, - "HEAD" => Method::HEAD, - v => { - return Err(HttpDispatchError::new(format!( - "Unsupported HTTP verb {}", - v - ))); - } - }; - - let mut headers = HeaderMap::new(); - for h in request.headers().iter() { - let header_name = match h.0.parse::() { - Ok(name) => name, - Err(err) => { - return Err(HttpDispatchError::new(format!( - "Error parsing header name: {}", - err - ))); - } - }; - for v in h.1.iter() { - let header_value = match HeaderValue::from_bytes(v) { - Ok(value) => value, - Err(err) => { - return Err(HttpDispatchError::new(format!( - "Value of header {:?} contains invalid header byte. Error: {}", - h.0, err - ))); - } - }; - headers.append(&header_name, header_value); - } - } - - let mut uri = format!( - "{}://{}{}", - request.scheme(), - request.hostname(), - request.canonical_path() - ); - - if !request.canonical_query_string().is_empty() { - uri += &format!("?{}", request.canonical_query_string()); - } - - let mut request = Request::builder() - .method(method) - .uri(uri) - .body(RusotoBody::from(request.payload)) - .map_err(|error| format!("Error building request: {}", error)) - .map_err(HttpDispatchError::new)?; - - *request.headers_mut() = headers; - - let response = client.oneshot(request).await.map_err(|error| { - HttpDispatchError::new(format!("Error during dispatch: {}", error)) - })?; - - let status = StatusCode::from_u16(response.status().as_u16()).unwrap(); - let headers = response - .headers() - .iter() - .map(|(h, v)| { - let value_string = v.to_str().unwrap().to_owned(); - // This should always be valid since we are coming from http. - let name = HeaderName::from_bytes(h.as_ref()) - .expect("Should always be a valid header"); - (name, value_string) - }) - .collect(); - - let body = response - .into_body() - .map(|try_chunk| try_chunk.map_err(|e| io::Error::new(io::ErrorKind::Other, e))); - - Ok(HttpResponse { - status, - headers, - body: ByteStream::new(body), - }) - }) - } -} - -impl HttpBody for RusotoBody { - type Data = Bytes; - type Error = io::Error; - - fn poll_data( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll>> { - match self.inner.as_mut() { - Some(SignedRequestPayload::Buffer(buf)) => { - if !buf.is_empty() { - let buf = buf.split_off(0); - Poll::Ready(Some(Ok(buf))) - } else { - Poll::Ready(None) - } - } - Some(SignedRequestPayload::Stream(stream)) => match stream.poll_next_unpin(cx) { - Poll::Ready(Some(result)) => match result { - Ok(buf) => Poll::Ready(Some(Ok(buf))), - Err(error) => Poll::Ready(Some(Err(error))), - }, - Poll::Ready(None) => Poll::Ready(None), - Poll::Pending => Poll::Pending, - }, - None => Poll::Ready(None), - } - } - - fn poll_trailers( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { - Poll::Ready(Ok(None)) - } -} - -impl From> for RusotoBody { - fn from(inner: Option) -> Self { - RusotoBody { inner } - } -} - -static RETRIABLE_CODES: OnceCell = OnceCell::new(); - -pub fn is_retriable_error(error: &RusotoError) -> bool { - match error { - RusotoError::HttpDispatch(_) => true, - RusotoError::Unknown(response) => { - // This header is a direct indication that we should retry the request. Eventually it'd - // be nice to actually schedule the retry after the given delay, but for now we just - // check that it contains a positive value. - let retry_header = response - .headers - .get("x-amz-retry-after") - .and_then(|value| value.parse::().ok()) - .filter(|duration| *duration > 0); - - // Certain 400-level responses will contain an error code indicating that the request - // should be retried. Since we don't retry 400-level responses by default, we'll look - // for these specifically before falling back to more general heuristics. Because AWS - // services use a mix of XML and JSON response bodies and Rusoto doesn't give us - // a parsed representation, we resort to a simple string match. - // - // S3: RequestTimeout - // SQS: RequestExpired, ThrottlingException - // ECS: RequestExpired, ThrottlingException - // Kinesis: RequestExpired, ThrottlingException - // Cloudwatch: RequestExpired, ThrottlingException - // - // Now just look for those when it's a client_error - let re = RETRIABLE_CODES.get_or_init(|| { - RegexSet::new(&["RequestTimeout", "RequestExpired", "ThrottlingException"]) - .expect("invalid regex") - }); - - retry_header.is_some() - || response.status.is_server_error() - || response.status == http::StatusCode::TOO_MANY_REQUESTS - || (response.status.is_client_error() && re.is_match(&response.body)) - } - _ => false, - } -} diff --git a/src/aws/rusoto/region.rs b/src/aws/rusoto/region.rs deleted file mode 100644 index 5be94d513c765..0000000000000 --- a/src/aws/rusoto/region.rs +++ /dev/null @@ -1,209 +0,0 @@ -use std::{convert::TryFrom, str::FromStr}; - -use http::{uri::InvalidUri, Uri}; -use rusoto_core::{region::ParseRegionError, Region}; -use snafu::{ResultExt, Snafu}; - -pub use crate::aws::region::RegionOrEndpoint; - -#[derive(Debug, Snafu)] -pub enum ParseError { - #[snafu(display("Failed to parse custom endpoint as URI: {}", source))] - EndpointParseError { source: InvalidUri }, - #[snafu(display("Failed to parse region: {}", source))] - RegionParseError { source: ParseRegionError }, - #[snafu(display("Must set either 'region' or 'endpoint'"))] - MissingRegionAndEndpoint, -} - -impl TryFrom<&RegionOrEndpoint> for Region { - type Error = ParseError; - - fn try_from(r: &RegionOrEndpoint) -> Result { - match (&r.region, &r.endpoint) { - (Some(region), None) => region.parse().context(RegionParseSnafu), - (None, Some(endpoint)) => region_from_endpoint(endpoint), - (Some(region), Some(endpoint)) => Ok(Region::Custom { - name: region.to_string(), - endpoint: endpoint.to_string(), - }), - (None, None) => Err(ParseError::MissingRegionAndEndpoint), - } - } -} - -impl TryFrom for Region { - type Error = ParseError; - fn try_from(r: RegionOrEndpoint) -> Result { - Region::try_from(&r) - } -} - -/// Translate an endpoint URL into a Region -pub fn region_from_endpoint(endpoint: &str) -> Result { - let uri = endpoint.parse::().context(EndpointParseSnafu)?; - let name = uri - .host() - .and_then(region_name_from_host) - .unwrap_or_else(|| Region::default().name().into()); - let endpoint = strip_endpoint(&uri); - Ok(Region::Custom { name, endpoint }) -} - -/// Reconstitute the endpoint from the URI, but strip off all path components -fn strip_endpoint(uri: &Uri) -> String { - let pq_len = uri - .path_and_query() - .map(|pq| pq.as_str().len()) - .unwrap_or(0); - let endpoint = uri.to_string(); - endpoint[..endpoint.len() - pq_len].to_string() -} - -/// Translate a hostname into a region name by finding the first part of -/// the domain name that matches a known region. -fn region_name_from_host(host: &str) -> Option { - host.split('.') - .filter_map(|part| Region::from_str(part).ok()) - .map(|region| region.name().into()) - .next() -} - -#[cfg(test)] -mod tests { - use std::convert::TryInto; - - use indoc::indoc; - use rusoto_core::Region; - use serde::Deserialize; - - use super::*; - - #[derive(Deserialize)] - struct Config { - inner: Inner, - } - - #[derive(Deserialize)] - struct Inner { - #[serde(flatten)] - region: RegionOrEndpoint, - } - - #[test] - fn region_es_east_1() { - let config: Config = toml::from_str(indoc! {r#" - [inner] - region = "us-east-1" - "#}) - .unwrap(); - - let region: Region = config.inner.region.try_into().unwrap(); - assert_eq!(region, Region::UsEast1); - } - - #[test] - fn custom_name_endpoint_localhost() { - let config: Config = toml::from_str(indoc! {r#" - [inner] - endpoint = "http://localhost:9000" - "#}) - .unwrap(); - - let expected_region = Region::Custom { - name: "us-east-1".into(), - endpoint: "http://localhost:9000".into(), - }; - - let region: Region = config.inner.region.try_into().unwrap(); - assert_eq!(region, expected_region); - } - - #[test] - fn region_not_provided() { - let config: Config = toml::from_str(indoc! {r#" - [inner] - endpoint_is_spelled_wrong = "http://localhost:9000" - "#}) - .unwrap(); - - let region: Result = config.inner.region.try_into(); - match region { - Err(ParseError::MissingRegionAndEndpoint) => {} - other => panic!("Assertion failed, wrong result {:?}", other), - } - } - - #[test] - fn extracts_region_name_from_host() { - assert_eq!(region_name_from_host("localhost"), None); - assert_eq!( - region_name_from_host("us-west-1.es.amazonaws.com"), - Some("us-west-1".into()) - ); - assert_eq!( - region_name_from_host("this-is-a-test.us-west-2.es.amazonaws.com"), - Some("us-west-2".into()) - ); - assert_eq!( - region_name_from_host("test.cn-north-1.es.amazonaws.com.cn"), - Some("cn-north-1".into()) - ); - } - - #[test] - fn region_from_endpoint_localhost() { - assert_eq!( - region_from_endpoint("http://localhost:9000").unwrap(), - Region::Custom { - name: "us-east-1".into(), - endpoint: "http://localhost:9000".into() - } - ); - } - - #[test] - fn region_from_endpoint_standard_region() { - assert_eq!( - region_from_endpoint( - "https://this-is-a-test-5dec2c2qbgsuekvsecuylqu.us-west-2.es.amazonaws.com" - ) - .unwrap(), - Region::Custom { - name: "us-west-2".into(), - endpoint: - "https://this-is-a-test-5dec2c2qbgsuekvsecuylqu.us-west-2.es.amazonaws.com" - .into() - } - ); - } - - #[test] - fn region_from_endpoint_without_scheme() { - assert_eq!( - region_from_endpoint("ams3.digitaloceanspaces.com").unwrap(), - Region::Custom { - name: "us-east-1".into(), - endpoint: "ams3.digitaloceanspaces.com".into() - } - ); - assert_eq!( - region_from_endpoint("https://ams3.digitaloceanspaces.com/").unwrap(), - Region::Custom { - name: "us-east-1".into(), - endpoint: "https://ams3.digitaloceanspaces.com".into() - } - ); - } - - #[test] - fn region_from_endpoint_strips_path_query() { - assert_eq!( - region_from_endpoint("http://localhost:9000/path?query").unwrap(), - Region::Custom { - name: "us-east-1".into(), - endpoint: "http://localhost:9000".into() - } - ); - } -} diff --git a/src/common/s3.rs b/src/common/s3.rs index 30c6765939171..29f3aa47809da 100644 --- a/src/common/s3.rs +++ b/src/common/s3.rs @@ -1,4 +1,4 @@ -use crate::aws::aws_sdk::ClientBuilder; +use crate::aws::ClientBuilder; use aws_sdk_s3::{Endpoint, Region}; use aws_smithy_client::erase::DynConnector; use aws_types::credentials::SharedCredentialsProvider; diff --git a/src/common/sqs.rs b/src/common/sqs.rs index 6ac4b432bae08..c3480b9c7e678 100644 --- a/src/common/sqs.rs +++ b/src/common/sqs.rs @@ -1,4 +1,4 @@ -use crate::aws::aws_sdk::ClientBuilder; +use crate::aws::ClientBuilder; use aws_sdk_sqs::{Endpoint, Region}; use aws_smithy_client::erase::DynConnector; use aws_types::credentials::SharedCredentialsProvider; diff --git a/src/internal_events/aws_s3_source.rs b/src/internal_events/aws_s3_source.rs deleted file mode 100644 index 45b91f0c7a8ec..0000000000000 --- a/src/internal_events/aws_s3_source.rs +++ /dev/null @@ -1,230 +0,0 @@ -use metrics::counter; -use rusoto_core::RusotoError; -use rusoto_sqs::{ - BatchResultErrorEntry, DeleteMessageBatchError, DeleteMessageBatchRequestEntry, - DeleteMessageBatchResultEntry, ReceiveMessageError, -}; -use vector_core::internal_event::InternalEvent; - -use crate::internal_events::prelude::{error_stage, error_type}; -use crate::sources::aws_s3::sqs::ProcessingError; - -#[derive(Debug)] -pub struct SqsS3EventsReceived { - pub byte_size: usize, -} - -impl InternalEvent for SqsS3EventsReceived { - fn emit_logs(&self) { - trace!( - message = "Events received.", - count = 1, - byte_size = %self.byte_size, - ); - } - - fn emit_metrics(&self) { - counter!("component_received_events_total", 1); - counter!( - "component_received_event_bytes_total", - self.byte_size as u64 - ); - // deprecated - counter!("events_in_total", 1); - } -} - -#[derive(Debug)] -pub struct SqsMessageReceiveError<'a> { - pub error: &'a RusotoError, -} - -impl<'a> InternalEvent for SqsMessageReceiveError<'a> { - fn emit_logs(&self) { - error!( - message = "Failed to fetch SQS events.", - error = %self.error, - error_code = "failed_fetching_sqs_events", - error_type = error_type::REQUEST_FAILED, - stage = error_stage::RECEIVING, - ); - } - - fn emit_metrics(&self) { - counter!( - "component_errors_total", 1, - "error_code" => "failed_fetching_sqs_events", - "error_type" => error_type::REQUEST_FAILED, - "stage" => error_stage::RECEIVING, - ); - // deprecated - counter!("sqs_message_receive_failed_total", 1); - } -} - -#[derive(Debug)] -pub struct SqsMessageReceiveSucceeded { - pub count: usize, -} - -impl InternalEvent for SqsMessageReceiveSucceeded { - fn emit_logs(&self) { - trace!(message = "Received SQS messages.", count = %self.count); - } - - fn emit_metrics(&self) { - counter!("sqs_message_receive_succeeded_total", 1); - counter!("sqs_message_received_messages_total", self.count as u64); - } -} - -#[derive(Debug)] -pub struct SqsMessageProcessingSucceeded<'a> { - pub message_id: &'a str, -} - -impl<'a> InternalEvent for SqsMessageProcessingSucceeded<'a> { - fn emit_logs(&self) { - trace!(message = "Processed SQS message succeededly.", message_id = %self.message_id); - } - - fn emit_metrics(&self) { - counter!("sqs_message_processing_succeeded_total", 1); - } -} - -#[derive(Debug)] -pub struct SqsMessageProcessingError<'a> { - pub message_id: &'a str, - pub error: &'a ProcessingError, -} - -impl<'a> InternalEvent for SqsMessageProcessingError<'a> { - fn emit_logs(&self) { - error!( - message = "Failed to process SQS message.", - message_id = %self.message_id, - error = %self.error, - error_code = "failed_processing_sqs_message", - error_type = error_type::PARSER_FAILED, - stage = error_stage::PROCESSING, - ); - } - - fn emit_metrics(&self) { - counter!( - "component_errors_total", 1, - "error_code" => "failed_processing_sqs_message", - "error_type" => error_type::PARSER_FAILED, - "stage" => error_stage::PROCESSING, - ); - // deprecated - counter!("sqs_message_processing_failed_total", 1); - } -} - -#[derive(Debug)] -pub struct SqsMessageDeleteSucceeded { - pub message_ids: Vec, -} - -impl InternalEvent for SqsMessageDeleteSucceeded { - fn emit_logs(&self) { - trace!(message = "Deleted SQS message(s).", - message_ids = %self.message_ids.iter() - .map(|x| x.id.to_string()) - .collect::>() - .join(", ")); - } - - fn emit_metrics(&self) { - counter!( - "sqs_message_delete_succeeded_total", - self.message_ids.len() as u64 - ); - } -} - -#[derive(Debug)] -pub struct SqsMessageDeletePartialError { - pub entries: Vec, -} - -impl InternalEvent for SqsMessageDeletePartialError { - fn emit_logs(&self) { - error!( - message = "Deletion of SQS message(s) failed.", - message_ids = %self.entries.iter() - .map(|x| format!("{}/{}", x.id, x.code)) - .collect::>() - .join(", "), - error_code = "failed_deleting_some_sqs_messages", - error_type = error_type::ACKNOWLEDGMENT_FAILED, - stage = error_stage::PROCESSING, - ); - } - - fn emit_metrics(&self) { - counter!( - "component_errors_total", 1, - "error_code" => "failed_deleting_some_sqs_messages", - "error_type" => error_type::ACKNOWLEDGMENT_FAILED, - "stage" => error_stage::PROCESSING, - ); - // deprecated - counter!("sqs_message_delete_failed_total", self.entries.len() as u64); - } -} - -#[derive(Debug)] -pub struct SqsMessageDeleteBatchError { - pub entries: Vec, - pub error: RusotoError, -} - -impl InternalEvent for SqsMessageDeleteBatchError { - fn emit_logs(&self) { - error!( - message = "Deletion of SQS message(s) failed.", - message_ids = %self.entries.iter() - .map(|x| x.id.to_string()) - .collect::>() - .join(", "), - error = %self.error, - error_code = "failed_deleting_all_sqs_messages", - error_type = error_type::ACKNOWLEDGMENT_FAILED, - stage = error_stage::PROCESSING, - ); - } - - fn emit_metrics(&self) { - counter!( - "component_errors_total", 1, - "error_code" => "failed_deleting_all_sqs_messages", - "error_type" => error_type::ACKNOWLEDGMENT_FAILED, - "stage" => error_stage::PROCESSING, - ); - // deprecated - counter!("sqs_message_delete_failed_total", self.entries.len() as u64); - counter!("sqs_message_delete_batch_failed_total", 1); - } -} - -#[derive(Debug)] -pub struct SqsS3EventRecordInvalidEventIgnored<'a> { - pub bucket: &'a str, - pub key: &'a str, - pub kind: &'a str, - pub name: &'a str, -} - -impl<'a> InternalEvent for SqsS3EventRecordInvalidEventIgnored<'a> { - fn emit_logs(&self) { - warn!(message = "Ignored S3 record in SQS message for an event that was not ObjectCreated.", - bucket = %self.bucket, key = %self.key, kind = %self.kind, name = %self.name); - } - - fn emit_metrics(&self) { - counter!("sqs_s3_event_record_ignored_total", 1, "ignore_type" => "invalid_event_kind"); - } -} diff --git a/src/internal_events/common.rs b/src/internal_events/common.rs index 3b892309983f3..dd629691d0a6d 100644 --- a/src/internal_events/common.rs +++ b/src/internal_events/common.rs @@ -62,37 +62,14 @@ impl<'a> InternalEvent for EndpointBytesSent<'a> { } } -#[cfg(feature = "rusoto")] -pub struct AwsBytesSent { - pub byte_size: usize, - pub region: rusoto_core::Region, -} - -#[cfg(feature = "rusoto")] -impl InternalEvent for AwsBytesSent { - fn emit(self) { - trace!( - message = "Bytes sent.", - protocol = "https", - byte_size = %self.byte_size, - region = ?self.region, - ); - counter!( - "component_sent_bytes_total", self.byte_size as u64, - "protocol" => "https", - "region" => self.region.name().to_owned(), - ); - } -} - #[cfg(feature = "aws-core")] -pub struct AwsSdkBytesSent { +pub struct AwsBytesSent { pub byte_size: usize, pub region: Option, } #[cfg(feature = "aws-core")] -impl InternalEvent for AwsSdkBytesSent { +impl InternalEvent for AwsBytesSent { fn emit(self) { trace!( message = "Bytes sent.", diff --git a/src/lib.rs b/src/lib.rs index f23a588d01c78..eadc832541d14 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -46,7 +46,7 @@ pub mod internal_events; pub mod api; pub mod app; pub mod async_read; -#[cfg(any(feature = "rusoto_core", feature = "aws-config"))] +#[cfg(feature = "aws-config")] pub mod aws; #[cfg(feature = "codecs")] #[allow(unreachable_pub)] diff --git a/src/sinks/aws_cloudwatch_logs/config.rs b/src/sinks/aws_cloudwatch_logs/config.rs index 9fd8a5099be1d..5813a97616a4c 100644 --- a/src/sinks/aws_cloudwatch_logs/config.rs +++ b/src/sinks/aws_cloudwatch_logs/config.rs @@ -6,7 +6,7 @@ use futures::FutureExt; use serde::{Deserialize, Serialize}; use vector_core::config::log_schema; -use crate::aws::aws_sdk::{create_client, ClientBuilder}; +use crate::aws::{create_client, ClientBuilder}; use crate::sinks::util::ServiceBuilderExt; use crate::{ aws::{AwsAuthentication, RegionOrEndpoint}, diff --git a/src/sinks/aws_cloudwatch_logs/integration_tests.rs b/src/sinks/aws_cloudwatch_logs/integration_tests.rs index 446734d5af258..013ac2502b554 100644 --- a/src/sinks/aws_cloudwatch_logs/integration_tests.rs +++ b/src/sinks/aws_cloudwatch_logs/integration_tests.rs @@ -17,7 +17,7 @@ use crate::{ test_util::{random_lines, random_lines_with_stream, random_string, trace_init}, }; -use crate::aws::aws_sdk::create_client; +use crate::aws::create_client; use crate::aws::{AwsAuthentication, RegionOrEndpoint}; use crate::sinks::aws_cloudwatch_logs::config::CloudwatchLogsClientBuilder; use aws_sdk_cloudwatchlogs::Client as CloudwatchLogsClient; diff --git a/src/sinks/aws_cloudwatch_metrics.rs b/src/sinks/aws_cloudwatch_metrics.rs index 4c3a8439606ea..e47a2b885e109 100644 --- a/src/sinks/aws_cloudwatch_metrics.rs +++ b/src/sinks/aws_cloudwatch_metrics.rs @@ -17,8 +17,8 @@ use tower::Service; use vector_core::ByteSizeOf; use super::util::SinkBatchSettings; -use crate::aws::aws_sdk::{create_client, is_retriable_error, ClientBuilder}; use crate::aws::RegionOrEndpoint; +use crate::aws::{create_client, is_retriable_error, ClientBuilder}; use crate::{ aws::auth::AwsAuthentication, config::{ diff --git a/src/sinks/aws_kinesis_firehose/config.rs b/src/sinks/aws_kinesis_firehose/config.rs index 73cf3cfbc2099..1a9a3f6d85201 100644 --- a/src/sinks/aws_kinesis_firehose/config.rs +++ b/src/sinks/aws_kinesis_firehose/config.rs @@ -12,7 +12,7 @@ use serde::{Deserialize, Serialize}; use snafu::Snafu; use tower::ServiceBuilder; -use crate::aws::aws_sdk::{create_client, is_retriable_error, ClientBuilder}; +use crate::aws::{create_client, is_retriable_error, ClientBuilder}; use crate::{ aws::{AwsAuthentication, RegionOrEndpoint}, config::{AcknowledgementsConfig, GenerateConfig, Input, ProxyConfig, SinkConfig, SinkContext}, diff --git a/src/sinks/aws_kinesis_firehose/integration_tests.rs b/src/sinks/aws_kinesis_firehose/integration_tests.rs index 5daf500dad01a..16a8e7921b8f9 100644 --- a/src/sinks/aws_kinesis_firehose/integration_tests.rs +++ b/src/sinks/aws_kinesis_firehose/integration_tests.rs @@ -8,7 +8,7 @@ use serde_json::{json, Value}; use tokio::time::{sleep, Duration}; use super::*; -use crate::aws::aws_sdk::create_client; +use crate::aws::create_client; use crate::config::ProxyConfig; use crate::sinks::aws_kinesis_firehose::config::KinesisFirehoseClientBuilder; use crate::sinks::elasticsearch::BulkConfig; diff --git a/src/sinks/aws_kinesis_firehose/service.rs b/src/sinks/aws_kinesis_firehose/service.rs index 7f06c0d8323c6..087a65e9ff3a0 100644 --- a/src/sinks/aws_kinesis_firehose/service.rs +++ b/src/sinks/aws_kinesis_firehose/service.rs @@ -8,7 +8,7 @@ use tracing::Instrument; use vector_core::{internal_event::EventsSent, stream::DriverResponse}; use crate::{ - event::EventStatus, internal_events::AwsSdkBytesSent, + event::EventStatus, internal_events::AwsBytesSent, sinks::aws_kinesis_firehose::request_builder::KinesisRequest, }; use aws_sdk_firehose::{Client as KinesisFirehoseClient, Region}; @@ -82,7 +82,7 @@ impl Service> for KinesisService { .instrument(info_span!("request").or_current()) .await?; - emit!(AwsSdkBytesSent { + emit!(AwsBytesSent { byte_size: processed_bytes_total, region }); diff --git a/src/sinks/aws_kinesis_streams/config.rs b/src/sinks/aws_kinesis_streams/config.rs index a78ed8b447529..258e64a9834d5 100644 --- a/src/sinks/aws_kinesis_streams/config.rs +++ b/src/sinks/aws_kinesis_streams/config.rs @@ -11,7 +11,7 @@ use snafu::Snafu; use tower::ServiceBuilder; use super::service::KinesisResponse; -use crate::aws::aws_sdk::{create_client, is_retriable_error, ClientBuilder}; +use crate::aws::{create_client, is_retriable_error, ClientBuilder}; use crate::aws::{AwsAuthentication, RegionOrEndpoint}; use crate::{ config::{AcknowledgementsConfig, GenerateConfig, Input, ProxyConfig, SinkConfig, SinkContext}, diff --git a/src/sinks/aws_kinesis_streams/integration_tests.rs b/src/sinks/aws_kinesis_streams/integration_tests.rs index 9f7c6520617eb..70884dd066f8f 100644 --- a/src/sinks/aws_kinesis_streams/integration_tests.rs +++ b/src/sinks/aws_kinesis_streams/integration_tests.rs @@ -7,7 +7,7 @@ use aws_sdk_kinesis::types::DateTime; use tokio::time::{sleep, Duration}; use super::*; -use crate::aws::aws_sdk::create_client; +use crate::aws::create_client; use crate::aws::{AwsAuthentication, RegionOrEndpoint}; use crate::config::ProxyConfig; use crate::sinks::aws_kinesis_streams::config::KinesisClientBuilder; diff --git a/src/sinks/aws_kinesis_streams/service.rs b/src/sinks/aws_kinesis_streams/service.rs index 8be4fd0468bfc..156fdceffd8ec 100644 --- a/src/sinks/aws_kinesis_streams/service.rs +++ b/src/sinks/aws_kinesis_streams/service.rs @@ -6,7 +6,7 @@ use tracing::Instrument; use vector_core::{internal_event::EventsSent, stream::DriverResponse}; use crate::{ - event::EventStatus, internal_events::AwsSdkBytesSent, + event::EventStatus, internal_events::AwsBytesSent, sinks::aws_kinesis_streams::request_builder::KinesisRequest, }; use aws_sdk_kinesis::error::PutRecordsError; @@ -85,7 +85,7 @@ impl Service> for KinesisService { .stream_name(stream_name) .send() .inspect_ok(|_| { - emit!(AwsSdkBytesSent { + emit!(AwsBytesSent { byte_size: processed_bytes_total, region, }); diff --git a/src/sinks/aws_s3/tests.rs b/src/sinks/aws_s3/tests.rs index 36924964e577a..931a7d98f6d36 100644 --- a/src/sinks/aws_s3/tests.rs +++ b/src/sinks/aws_s3/tests.rs @@ -24,7 +24,7 @@ mod integration_tests { event::{BatchNotifier, BatchStatus, BatchStatusReceiver, Event, EventArray, LogEvent}, }; - use crate::aws::aws_sdk::create_client; + use crate::aws::create_client; use crate::aws::{AwsAuthentication, RegionOrEndpoint}; use crate::common::s3::S3ClientBuilder; use crate::{ diff --git a/src/sinks/aws_sqs/config.rs b/src/sinks/aws_sqs/config.rs index 0d1589ce7b18f..b19694777057a 100644 --- a/src/sinks/aws_sqs/config.rs +++ b/src/sinks/aws_sqs/config.rs @@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize}; use snafu::{ResultExt, Snafu}; use crate::{ - aws::aws_sdk::create_client, + aws::create_client, aws::{AwsAuthentication, RegionOrEndpoint}, common::sqs::SqsClientBuilder, config::{AcknowledgementsConfig, GenerateConfig, Input, ProxyConfig, SinkConfig, SinkContext}, diff --git a/src/sinks/aws_sqs/integration_tests.rs b/src/sinks/aws_sqs/integration_tests.rs index 42f18d093caa0..cde5f0790a63e 100644 --- a/src/sinks/aws_sqs/integration_tests.rs +++ b/src/sinks/aws_sqs/integration_tests.rs @@ -9,7 +9,7 @@ use tokio::time::{sleep, Duration}; use super::config::{Encoding, SqsSinkConfig}; use super::sink::SqsSink; -use crate::aws::aws_sdk::create_client; +use crate::aws::create_client; use crate::aws::{AwsAuthentication, RegionOrEndpoint}; use crate::common::sqs::SqsClientBuilder; use crate::config::{ProxyConfig, SinkContext}; diff --git a/src/sinks/aws_sqs/retry.rs b/src/sinks/aws_sqs/retry.rs index d12d5b54e4c66..373cb1dcab127 100644 --- a/src/sinks/aws_sqs/retry.rs +++ b/src/sinks/aws_sqs/retry.rs @@ -2,7 +2,7 @@ use aws_sdk_sqs::error::SendMessageError; use aws_sdk_sqs::types::SdkError; use super::service::SendMessageResponse; -use crate::aws::aws_sdk::is_retriable_error; +use crate::aws::is_retriable_error; use crate::sinks::util::retries::RetryLogic; #[derive(Debug, Clone)] diff --git a/src/sinks/s3_common/config.rs b/src/sinks/s3_common/config.rs index d6b06e116d783..bd777de20f448 100644 --- a/src/sinks/s3_common/config.rs +++ b/src/sinks/s3_common/config.rs @@ -10,7 +10,7 @@ use serde::{Deserialize, Serialize}; use snafu::Snafu; use super::service::{S3Response, S3Service}; -use crate::aws::aws_sdk::{create_client, is_retriable_error}; +use crate::aws::{create_client, is_retriable_error}; use crate::aws::{AwsAuthentication, RegionOrEndpoint}; use crate::common::s3::S3ClientBuilder; use crate::tls::TlsOptions; diff --git a/src/sinks/s3_common/service.rs b/src/sinks/s3_common/service.rs index c78d9937c5519..660ec19385c78 100644 --- a/src/sinks/s3_common/service.rs +++ b/src/sinks/s3_common/service.rs @@ -16,7 +16,7 @@ use vector_core::{ }; use super::config::S3Options; -use crate::internal_events::AwsSdkBytesSent; +use crate::internal_events::AwsBytesSent; #[derive(Debug, Clone)] pub struct S3Request { @@ -67,10 +67,10 @@ impl DriverResponse for S3Response { } } -/// Wrapper for the Rusoto S3 client. +/// Wrapper for the AWS SDK S3 client. /// /// Provides a `tower::Service`-compatible wrapper around the native -/// `rusoto_s3::S3Client`, allowing it to be composed within a Tower "stack", +/// AWS SDK S3 Client, allowing it to be composed within a Tower "stack", /// such that we can easily and transparently provide retries, concurrency /// limits, rate limits, and more. #[derive(Clone)] @@ -148,7 +148,7 @@ impl Service for S3Service { .await; result.map(|_inner| { - emit!(AwsSdkBytesSent { + emit!(AwsBytesSent { byte_size: request_size, region, }); diff --git a/src/sinks/util/buffer/compression.rs b/src/sinks/util/buffer/compression.rs index d3f7e939d656d..efeab8521ce74 100644 --- a/src/sinks/util/buffer/compression.rs +++ b/src/sinks/util/buffer/compression.rs @@ -59,18 +59,6 @@ impl fmt::Display for Compression { } } -#[cfg(feature = "rusoto_core")] -impl From for rusoto_core::encoding::ContentEncoding { - fn from(compression: Compression) -> Self { - match compression { - Compression::None => rusoto_core::encoding::ContentEncoding::Identity, - Compression::Gzip(level) => { - rusoto_core::encoding::ContentEncoding::Gzip(None, level.level()) - } - } - } -} - impl<'de> de::Deserialize<'de> for Compression { fn deserialize(deserializer: D) -> Result where diff --git a/src/sources/aws_s3/mod.rs b/src/sources/aws_s3/mod.rs index 0899c39f11966..907a4dc3b5715 100644 --- a/src/sources/aws_s3/mod.rs +++ b/src/sources/aws_s3/mod.rs @@ -13,8 +13,8 @@ use snafu::Snafu; use tokio_util::io::StreamReader; use super::util::MultilineConfig; -use crate::aws::aws_sdk::{create_client, ClientBuilder}; use crate::aws::RegionOrEndpoint; +use crate::aws::{create_client, ClientBuilder}; use crate::common::sqs::SqsClientBuilder; use crate::tls::TlsOptions; use crate::{ @@ -372,7 +372,7 @@ mod integration_tests { use pretty_assertions::assert_eq; use super::{sqs, AwsS3Config, Compression, Strategy}; - use crate::aws::aws_sdk::create_client; + use crate::aws::create_client; use crate::aws::{AwsAuthentication, RegionOrEndpoint}; use crate::common::sqs::SqsClientBuilder; use crate::config::ProxyConfig; diff --git a/src/sources/aws_sqs/config.rs b/src/sources/aws_sqs/config.rs index 173983957fb84..80198ed9b815c 100644 --- a/src/sources/aws_sqs/config.rs +++ b/src/sources/aws_sqs/config.rs @@ -1,4 +1,4 @@ -use crate::aws::aws_sdk::create_client; +use crate::aws::create_client; use crate::codecs::DecodingConfig; use crate::common::sqs::SqsClientBuilder; use crate::tls::TlsOptions; diff --git a/website/content/en/highlights/2022-03-22-0-21-0-upgrade-guide.md b/website/content/en/highlights/2022-03-22-0-21-0-upgrade-guide.md index cd03844fb8bde..a2ac1de495916 100644 --- a/website/content/en/highlights/2022-03-22-0-21-0-upgrade-guide.md +++ b/website/content/en/highlights/2022-03-22-0-21-0-upgrade-guide.md @@ -227,7 +227,7 @@ If `.value` was zero, Vector would panic. This can be fixed by handling the erro #### AWS SDK Migration {#aws-sdk-migration} -We are in the process of migrating sources and sinks that utilize AWS to the official AWS SDK (from Rusoto). +We have migrated sources and sinks that utilize AWS to the official AWS SDK (from Rusoto). This comes with some benefits such as support for IMDSv2. This requires us to deprecate some config options. If you were providing a custom path to an AWS credentials file, this is no longer supported for components that have diff --git a/website/cue/reference/components/aws.cue b/website/cue/reference/components/aws.cue index 830b9373c6896..879dabc7f9d76 100644 --- a/website/cue/reference/components/aws.cue +++ b/website/cue/reference/components/aws.cue @@ -39,16 +39,6 @@ components: _aws: { examples: ["arn:aws:iam::123456789098:role/my_role"] } } - credentials_file: { - category: "Auth" - common: false - description: "The path to AWS credentials file. Used for AWS authentication when communicating with AWS services." - required: false - type: string: { - default: null - examples: ["/path/to/aws/credentials"] - } - } profile: { category: "Auth" common: false @@ -73,7 +63,6 @@ components: _aws: { examples: ["127.0.0.0:5000/path/to/service"] } } - region: { description: "The [AWS region](\(urls.aws_regions)) of the target service. If `endpoint` is provided it will override this value since the endpoint includes the region." required: true @@ -196,64 +185,3 @@ components: _aws: { } } } - -components: _aws_new_sdk: { - configuration: { - auth: { - common: false - description: "Options for the authentication strategy." - required: false - type: object: { - examples: [] - options: { - access_key_id: { - category: "Auth" - common: false - description: "The AWS access key id. Used for AWS authentication when communicating with AWS services." - required: false - type: string: { - default: null - examples: ["AKIAIOSFODNN7EXAMPLE"] - } - } - secret_access_key: { - category: "Auth" - common: false - description: "The AWS secret access key. Used for AWS authentication when communicating with AWS services." - required: false - type: string: { - default: null - examples: ["wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"] - } - } - assume_role: { - category: "Auth" - common: false - description: "The ARN of an [IAM role](\(urls.aws_iam_role)) to assume at startup." - required: false - type: string: { - default: null - examples: ["arn:aws:iam::123456789098:role/my_role"] - } - } - profile: { - category: "Auth" - common: false - description: "The AWS profile name. Used to select AWS credentials from a provided credentials file." - required: false - type: string: { - default: "default" - examples: ["develop"] - } - } - } - } - } - - endpoint: components._aws.configuration.endpoint - region: components._aws.configuration.region - } - - env_vars: components._aws.env_vars - how_it_works: components._aws.how_it_works -} diff --git a/website/cue/reference/components/sinks/aws_cloudwatch_logs.cue b/website/cue/reference/components/sinks/aws_cloudwatch_logs.cue index 52a19948c8d27..1dc42d594f309 100644 --- a/website/cue/reference/components/sinks/aws_cloudwatch_logs.cue +++ b/website/cue/reference/components/sinks/aws_cloudwatch_logs.cue @@ -1,6 +1,6 @@ package metadata -components: sinks: aws_cloudwatch_logs: components._aws_new_sdk & { +components: sinks: aws_cloudwatch_logs: components._aws & { title: "AWS Cloudwatch Logs" classes: { diff --git a/website/cue/reference/components/sinks/aws_cloudwatch_metrics.cue b/website/cue/reference/components/sinks/aws_cloudwatch_metrics.cue index eab67d5a74aff..05b91cce2f018 100644 --- a/website/cue/reference/components/sinks/aws_cloudwatch_metrics.cue +++ b/website/cue/reference/components/sinks/aws_cloudwatch_metrics.cue @@ -1,6 +1,6 @@ package metadata -components: sinks: aws_cloudwatch_metrics: components._aws_new_sdk & { +components: sinks: aws_cloudwatch_metrics: components._aws & { title: "AWS Cloudwatch Metrics" classes: { diff --git a/website/cue/reference/components/sinks/aws_sqs.cue b/website/cue/reference/components/sinks/aws_sqs.cue index 724ce30c13f63..48e6e9b8b624c 100644 --- a/website/cue/reference/components/sinks/aws_sqs.cue +++ b/website/cue/reference/components/sinks/aws_sqs.cue @@ -1,6 +1,6 @@ package metadata -components: sinks: aws_sqs: components._aws_new_sdk & { +components: sinks: aws_sqs: components._aws & { title: "Amazon Simple Queue Service (SQS)" classes: { diff --git a/website/cue/reference/components/sources/aws_sqs.cue b/website/cue/reference/components/sources/aws_sqs.cue index 2c8546be21ec4..632e42e26f40d 100644 --- a/website/cue/reference/components/sources/aws_sqs.cue +++ b/website/cue/reference/components/sources/aws_sqs.cue @@ -1,6 +1,6 @@ package metadata -components: sources: aws_sqs: components._aws_new_sdk & { +components: sources: aws_sqs: components._aws & { title: "AWS SQS" features: {