diff --git a/Cargo.lock b/Cargo.lock index 4959f910d93b7..0d135d0b1b4d7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -543,6 +543,274 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "aws-config" +version = "0.0.22-alpha" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00ee7a2e95018f362ae10ae5f5629e3e9e9d1d8b68604604d3a190a72f29ffb3" +dependencies = [ + "aws-http", + "aws-hyper", + "aws-sdk-sts", + "aws-smithy-async", + "aws-smithy-client", + "aws-smithy-http", + "aws-smithy-http-tower", + "aws-smithy-json", + "aws-smithy-types", + "aws-types", + "bytes 1.1.0", + "http", + "tokio", + "tower", + "tracing 0.1.29", +] + +[[package]] +name = "aws-endpoint" +version = "0.0.22-alpha" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23b6ad91071353f652fcb4b3a2eaa53e0ecd7177af36dc9020a0f1c664521181" +dependencies = [ + "aws-smithy-http", + "aws-types", + "http", + "regex", + "tracing 0.1.29", +] + +[[package]] +name = "aws-http" +version = "0.0.22-alpha" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d66f94a4d0015a44ce3c7ab3df403d7fcc28e6595bf24189cc29adc119a1bdaa" +dependencies = [ + "aws-smithy-http", + "aws-smithy-types", + "aws-types", + "http", + "lazy_static", + "thiserror", + "tracing 0.1.29", +] + +[[package]] +name = "aws-hyper" +version = "0.0.22-alpha" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24f98b694ef3cabefa99b1eabcb0df967760d3bbd9a3b2c590ae72e35c893227" +dependencies = [ + "aws-endpoint", + "aws-http", + "aws-sig-auth", + "aws-smithy-client", + "aws-smithy-http", + "aws-smithy-http-tower", + "aws-smithy-types", + "bytes 1.1.0", + "fastrand", + "http", + "http-body", + "hyper", + "hyper-rustls", + "pin-project 1.0.8", + "tokio", + "tower", + "tracing 0.1.29", +] + +[[package]] +name = "aws-sdk-sqs" +version = "0.0.22-alpha" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95bea08a2055361eb2e97b1fba1dea3ba6f842cf6c1d141053a0faacfa41a3fc" +dependencies = [ + "aws-endpoint", + "aws-http", + "aws-hyper", + "aws-sig-auth", + "aws-smithy-client", + "aws-smithy-http", + "aws-smithy-query", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "bytes 1.1.0", + "http", +] + +[[package]] +name = "aws-sdk-sts" +version = "0.0.22-alpha" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc701f0fabb1c6dbcd618396637a42fdc2943d72875cb43c3adf881f2c25528b" +dependencies = [ + "aws-endpoint", + "aws-http", + "aws-hyper", + "aws-sig-auth", + "aws-smithy-client", + "aws-smithy-http", + "aws-smithy-query", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "bytes 1.1.0", + "http", +] + +[[package]] +name = "aws-sig-auth" +version = "0.0.22-alpha" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d7f393e2c69c0cfa58ddc1036475c79648e8e462b907454c5b99c62d68a131d" +dependencies = [ + "aws-sigv4", + "aws-smithy-http", + "aws-types", + "http", + "thiserror", + "tracing 0.1.29", +] + +[[package]] +name = "aws-sigv4" +version = "0.0.22-alpha" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9bb10064e2edb9052a46464fb158b6348539a3601443fc07a2c474a0fd6e61b2" +dependencies = [ + "chrono", + "form_urlencoded", + "hex", + "http", + "percent-encoding", + "ring", + "tracing 0.1.29", +] + +[[package]] +name = "aws-smithy-async" +version = "0.27.0-alpha" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ad697e2a97ba37c18437fe43a0f3f0c731c286361db1851d73c2b387ee4b805" +dependencies = [ + "pin-project-lite", + "tokio", +] + +[[package]] +name = "aws-smithy-client" +version = "0.27.0-alpha" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d56918ee8de8dbfdb966fb1a55e6fa82101df2805a6f8413d8864286ca1105fe" +dependencies = [ + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-http-tower", + "aws-smithy-types", + "bytes 1.1.0", + "fastrand", + "http", + "http-body", + "hyper", + "hyper-rustls", + "lazy_static", + "pin-project 1.0.8", + "pin-project-lite", + "tokio", + "tower", + "tracing 0.1.29", +] + +[[package]] +name = "aws-smithy-http" +version = "0.27.0-alpha" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9039d48a4ba324f52162aeaf095e4eb15bcc7e228a9cee9a65642c257be2cf7e" +dependencies = [ + "aws-smithy-types", + "bytes 1.1.0", + "bytes-utils", + "futures-core", + "http", + "http-body", + "hyper", + "percent-encoding", + "pin-project 1.0.8", + "thiserror", + "tokio", + "tokio-util", + "tracing 0.1.29", +] + +[[package]] +name = "aws-smithy-http-tower" +version = "0.27.0-alpha" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73b1502e912935b05eaf5be8f9eae607bc14d9d21ea69d0b19f622e0ee7d98b2" +dependencies = [ + "aws-smithy-http", + "bytes 1.1.0", + "http", + "http-body", + "pin-project 1.0.8", + "tower", + "tracing 0.1.29", +] + +[[package]] +name = "aws-smithy-json" +version = "0.27.0-alpha" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15c90840bccbd4cdf9b51db0ca2e051920c1ad94d456e29359fe9feada1c9e40" +dependencies = [ + "aws-smithy-types", +] + +[[package]] +name = "aws-smithy-query" +version = "0.27.0-alpha" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4afed6996a0cd17fcd61980ee600ae3133a6e5937f1968a402da8ac768872a59" +dependencies = [ + "aws-smithy-types", + "urlencoding", +] + +[[package]] +name = "aws-smithy-types" +version = "0.27.0-alpha" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6bcfd3eaa047d37438d72dfe059ddcf01d5b758c844a8930ce7b6a6acdc543f" +dependencies = [ + "chrono", + "itoa", + "num-integer", + "ryu", +] + +[[package]] +name = "aws-smithy-xml" +version = "0.27.0-alpha" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b39b2f34906aa0c0523fe15d528f70f0ff2490f5b1c37d6ef418ce799d3bcc07" +dependencies = [ + "thiserror", + "xmlparser", +] + +[[package]] +name = "aws-types" +version = "0.0.22-alpha" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "971b04d79f51492a84e944f96a7e9a648acad8ee0f3c9720442531f7b043a6d0" +dependencies = [ + "aws-smithy-async", + "aws-smithy-types", + "rustc_version 0.4.0", + "zeroize", +] + [[package]] name = "azure_core" version = "0.1.0" @@ -910,6 +1178,16 @@ dependencies = [ "serde", ] +[[package]] +name = "bytes-utils" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e314712951c43123e5920a446464929adc667a5eade7f8fb3997776c9df6e54" +dependencies = [ + "bytes 1.1.0", + "either", +] + [[package]] name = "bytesize" version = "1.1.0" @@ -7587,6 +7865,12 @@ dependencies = [ "serde", ] +[[package]] +name = "urlencoding" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a1f0175e03a0973cf4afd476bef05c26e228520400eb1fd473ad417b1c00ffb" + [[package]] name = "utf-8" version = "0.7.6" @@ -7635,6 +7919,9 @@ dependencies = [ "async-trait", "atty", "avro-rs", + "aws-config", + "aws-sdk-sqs", + "aws-types", "azure_core", "azure_storage", "base64 0.13.0", diff --git a/Cargo.toml b/Cargo.toml index 91c86595895d4..988adfac1acfd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -132,7 +132,7 @@ metrics = { version = "0.17.0", default-features = false, features = ["std"] } metrics-tracing-context = { version = "0.8.0", default-features = false } metrics-util = { version = "0.10.1", default-features = false, features = ["std"] } -# Aws +# AWS - Rusoto rusoto_cloudwatch = { version = "0.47.0", optional = true } rusoto_core = { version = "0.47.0", features = ["encoding"], optional = true } rusoto_credential = { version = "0.47.0", optional = true } @@ -145,6 +145,12 @@ rusoto_signature = { version = "0.47.0", optional = true } rusoto_sqs = { version = "0.47.0", optional = true } rusoto_sts = { version = "0.47.0", optional = true } +# AWS - Official SDK +aws-config = {version = "0.0.22-alpha", optional = true} +aws-types = {version = "0.0.22-alpha", optional = true} +aws-sdk-sqs = {version = "0.0.22-alpha", optional = true} + + # Azure azure_core = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "16bcf0ab1bb6e380d966a69d314de1e99ede553a", default-features = false, features = ["enable_reqwest"], optional = true } azure_storage = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "16bcf0ab1bb6e380d966a69d314de1e99ede553a", default-features = false, features = ["blob"], optional = true } @@ -416,6 +422,7 @@ sources = ["sources-logs", "sources-metrics"] sources-logs = [ "sources-aws_kinesis_firehose", "sources-aws_s3", + "sources-aws_sqs", "sources-datadog_agent", "sources-docker_logs", "sources-exec", @@ -454,6 +461,7 @@ sources-apache_metrics = [] sources-aws_ecs_metrics = [] sources-aws_kinesis_firehose = ["base64", "infer", "sources-utils-tls", "warp", "codecs"] sources-aws_s3 = ["rusoto", "rusoto_s3", "rusoto_sqs", "semver", "uuid", "codecs", "zstd"] +sources-aws_sqs = ["aws-config", "aws-types", "aws-sdk-sqs", "codecs"] sources-datadog_agent = ["snap", "sources-utils-tls", "warp", "sources-utils-http-error", "protobuf-build", "codecs"] sources-dnstap = ["base64", "data-encoding", "trust-dns-proto", "dnsmsg-parser", "protobuf-build"] sources-docker_logs = ["docker"] @@ -728,7 +736,7 @@ aws-ecs-metrics-integration-tests = ["sources-aws_ecs_metrics"] aws-kinesis-firehose-integration-tests = ["rusoto_es", "sinks-aws_kinesis_firehose", "sinks-elasticsearch"] aws-kinesis-streams-integration-tests = ["sinks-aws_kinesis_streams"] aws-s3-integration-tests = ["sinks-aws_s3", "sources-aws_s3"] -aws-sqs-integration-tests = ["sinks-aws_sqs"] +aws-sqs-integration-tests = ["sinks-aws_sqs", "sources-aws_sqs"] azure-blob-integration-tests = ["sinks-azure_blob"] clickhouse-integration-tests = ["sinks-clickhouse", "warp"] docker-logs-integration-tests = ["sources-docker_logs", "unix"] diff --git a/Makefile b/Makefile index 15a72ead2eaa8..42b8e489fbdd4 100644 --- a/Makefile +++ b/Makefile @@ -320,6 +320,18 @@ ifeq ($(AUTODESPAWN), true) @scripts/setup_integration_env.sh aws stop endif +.PHONY: test-integration-aws-sqs +test-integration-aws-sqs: ## Runs AWS SQS integration tests +ifeq ($(AUTOSPAWN), true) + @scripts/setup_integration_env.sh aws stop + @scripts/setup_integration_env.sh aws start + sleep 10 # Many services are very slow... Give them a sec... +endif + ${MAYBE_ENVIRONMENT_EXEC} cargo test --no-fail-fast --no-default-features --features aws-sqs-integration-tests --lib ::aws_sqs +ifeq ($(AUTODESPAWN), true) + @scripts/setup_integration_env.sh aws stop +endif + .PHONY: test-integration-azure test-integration-azure: ## Runs Azure integration tests ifeq ($(AUTOSPAWN), true) diff --git a/lib/vector-core/core-common/src/internal_event/events_received.rs b/lib/vector-core/core-common/src/internal_event/events_received.rs new file mode 100644 index 0000000000000..93cba2f1a4897 --- /dev/null +++ b/lib/vector-core/core-common/src/internal_event/events_received.rs @@ -0,0 +1,23 @@ +use crate::internal_event::InternalEvent; +use metrics::counter; + +#[derive(Debug)] +pub struct EventsReceived { + pub count: usize, + pub byte_size: usize, +} + +impl InternalEvent for EventsReceived { + fn emit_logs(&self) { + trace!(message = "Events received.", count = %self.count, byte_size = %self.byte_size); + } + + fn emit_metrics(&self) { + counter!("component_received_events_total", self.count as u64); + counter!("events_in_total", self.count as u64); + counter!( + "component_received_event_bytes_total", + self.byte_size as u64 + ); + } +} diff --git a/lib/vector-core/core-common/src/internal_event/mod.rs b/lib/vector-core/core-common/src/internal_event/mod.rs index b4a7ae41aa042..40a339165475d 100644 --- a/lib/vector-core/core-common/src/internal_event/mod.rs +++ b/lib/vector-core/core-common/src/internal_event/mod.rs @@ -1,7 +1,9 @@ mod bytes_sent; +mod events_received; mod events_sent; pub use bytes_sent::BytesSent; +pub use events_received::EventsReceived; pub use events_sent::EventsSent; pub trait InternalEvent { diff --git a/src/rusoto/auth.rs b/src/aws/auth.rs similarity index 54% rename from src/rusoto/auth.rs rename to src/aws/auth.rs index f515edb0c053d..30be6d821bedd 100644 --- a/src/rusoto/auth.rs +++ b/src/aws/auth.rs @@ -1,5 +1,3 @@ -use super::AwsCredentialsProvider; -use rusoto_core::Region; use serde::{Deserialize, Serialize}; /// Configuration for configuring authentication strategy for AWS. @@ -27,66 +25,9 @@ pub enum AwsAuthentication { Default {}, } -impl AwsAuthentication { - const AWS_DEFAULT_PROFILE: &'static str = "default"; - - pub fn build( - &self, - region: &Region, - old_assume_role: Option, - ) -> crate::Result { - if old_assume_role.is_some() { - warn!("Option `assume_role` has been renamed to `auth.assume_role`. Please use that one instead."); - } - match self { - Self::Static { - access_key_id, - secret_access_key, - } => { - if old_assume_role.is_some() { - warn!("Ignoring option `assume_role`, instead using access options."); - } - Ok(AwsCredentialsProvider::new_minimal( - access_key_id, - secret_access_key, - )) - } - Self::File { - credentials_file, - profile, - } => { - if old_assume_role.is_some() { - warn!( - "Ignoring option `assume_role`, instead using AWS credentials file options." - ); - } - AwsCredentialsProvider::new_with_credentials_file( - credentials_file, - profile - .as_ref() - .unwrap_or(&AwsAuthentication::AWS_DEFAULT_PROFILE.to_string()) - .as_str(), - ) - } - Self::Role { assume_role } => { - if old_assume_role.is_some() { - warn!( - "Ignoring option `assume_role`, instead using option `auth.assume_role`." - ); - } - AwsCredentialsProvider::new(region, Some(assume_role.clone())) - } - Self::Default {} => AwsCredentialsProvider::new(region, old_assume_role), - } - } -} - #[cfg(test)] mod tests { use super::*; - use std::fs::File; - use std::io::Write; - use tempfile; #[derive(Serialize, Deserialize, Clone, Debug)] struct ComponentConfig { @@ -198,31 +139,4 @@ mod tests { _ => panic!(), } } - - #[test] - fn parsing_credentials_file() { - let tmpdir = tempfile::tempdir().unwrap(); - let tmpfile_path = tmpdir.path().join("credentials"); - let mut tmpfile = File::create(&tmpfile_path).unwrap(); - - writeln!( - tmpfile, - r#" - [default] - aws_access_key_id = default-access-key-id - aws_secret_access_key = default-secret - "# - ) - .unwrap(); - - let auth = AwsAuthentication::File { - credentials_file: tmpfile_path.to_str().unwrap().to_string(), - profile: Some("default".to_string()), - }; - let result = auth.build(&Region::AfSouth1, None).unwrap(); - assert!(matches!(result, AwsCredentialsProvider::File { .. })); - - drop(tmpfile); - tmpdir.close().unwrap(); - } } diff --git a/src/aws/aws_sdk/auth.rs b/src/aws/aws_sdk/auth.rs new file mode 100644 index 0000000000000..a50ab2c4a1aac --- /dev/null +++ b/src/aws/aws_sdk/auth.rs @@ -0,0 +1,59 @@ +use crate::aws::auth::AwsAuthentication; +use aws_config::default_provider::credentials::default_provider; +use aws_config::meta::credentials::LazyCachingCredentialsProvider; +use aws_config::profile::ProfileFileCredentialsProvider; + +use aws_config::sts::AssumeRoleProviderBuilder; + +use aws_types::credentials::SharedCredentialsProvider; + +use aws_types::Credentials; + +impl AwsAuthentication { + pub async fn credentials_provider(&self) -> SharedCredentialsProvider { + match self { + Self::Static { + access_key_id, + secret_access_key, + } => SharedCredentialsProvider::new(Credentials::from_keys( + access_key_id, + secret_access_key, + None, + )), + AwsAuthentication::File { + credentials_file, + profile, + } => { + warn!("Overriding the credentials file is not supported. `~/.aws/config` and `~/.aws/credentials` will be used instead of \"{}\"", credentials_file); + let mut file_provider = ProfileFileCredentialsProvider::builder(); + if let Some(profile) = profile { + file_provider = file_provider.profile_name(profile); + } + SharedCredentialsProvider::new( + LazyCachingCredentialsProvider::builder() + .load(file_provider.build()) + .build(), + ) + } + AwsAuthentication::Role { assume_role } => SharedCredentialsProvider::new( + AssumeRoleProviderBuilder::new(assume_role) + .build(default_credentials_provider().await), + ), + AwsAuthentication::Default {} => { + SharedCredentialsProvider::new(default_credentials_provider().await) + } + } + } + + #[cfg(test)] + pub fn test_auth() -> AwsAuthentication { + AwsAuthentication::Static { + access_key_id: "dummy".to_string(), + secret_access_key: "dummy".to_string(), + } + } +} + +async fn default_credentials_provider() -> SharedCredentialsProvider { + SharedCredentialsProvider::new(default_provider().await) +} diff --git a/src/aws/aws_sdk/mod.rs b/src/aws/aws_sdk/mod.rs new file mode 100644 index 0000000000000..d3f5c5ed83085 --- /dev/null +++ b/src/aws/aws_sdk/mod.rs @@ -0,0 +1,2 @@ +mod auth; +mod region; diff --git a/src/aws/aws_sdk/region.rs b/src/aws/aws_sdk/region.rs new file mode 100644 index 0000000000000..5d659313b934f --- /dev/null +++ b/src/aws/aws_sdk/region.rs @@ -0,0 +1,19 @@ +use crate::aws::region::RegionOrEndpoint; +use aws_sdk_sqs::Endpoint; +use aws_types::region::Region; +use http::Uri; +use std::str::FromStr; + +impl RegionOrEndpoint { + pub fn endpoint(&self) -> crate::Result> { + if let Some(endpoint) = &self.endpoint { + Ok(Some(Endpoint::immutable(Uri::from_str(endpoint)?))) + } else { + Ok(None) + } + } + + pub fn region(&self) -> Option { + self.region.clone().map(Region::new) + } +} diff --git a/src/aws/mod.rs b/src/aws/mod.rs new file mode 100644 index 0000000000000..1152b5e07c910 --- /dev/null +++ b/src/aws/mod.rs @@ -0,0 +1,11 @@ +pub mod auth; +pub mod region; + +pub use auth::AwsAuthentication; +pub use region::RegionOrEndpoint; + +#[cfg(feature = "rusoto_core")] +pub mod rusoto; + +#[cfg(feature = "aws-config")] +pub mod aws_sdk; diff --git a/src/aws/region.rs b/src/aws/region.rs new file mode 100644 index 0000000000000..35889c8c9afc9 --- /dev/null +++ b/src/aws/region.rs @@ -0,0 +1,31 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)] +#[serde(default)] +pub struct RegionOrEndpoint { + pub region: Option, + pub endpoint: Option, +} + +impl RegionOrEndpoint { + pub const fn with_region(region: String) -> Self { + Self { + region: Some(region), + endpoint: None, + } + } + + pub fn with_endpoint(endpoint: impl Into) -> Self { + Self { + region: None, + endpoint: Some(endpoint.into()), + } + } + + pub fn with_both(region: impl Into, endpoint: impl Into) -> Self { + Self { + region: Some(region.into()), + endpoint: Some(endpoint.into()), + } + } +} diff --git a/src/aws/rusoto/auth.rs b/src/aws/rusoto/auth.rs new file mode 100644 index 0000000000000..5430591d9246e --- /dev/null +++ b/src/aws/rusoto/auth.rs @@ -0,0 +1,92 @@ +use crate::aws::auth::AwsAuthentication; +use crate::aws::rusoto::AwsCredentialsProvider; +use rusoto_core::Region; + +const AWS_DEFAULT_PROFILE: &str = "default"; + +impl AwsAuthentication { + pub fn build( + &self, + region: &Region, + old_assume_role: Option, + ) -> crate::Result { + if old_assume_role.is_some() { + warn!("Option `assume_role` has been renamed to `auth.assume_role`. Please use that one instead."); + } + match self { + Self::Static { + access_key_id, + secret_access_key, + } => { + if old_assume_role.is_some() { + warn!("Ignoring option `assume_role`, instead using access options."); + } + Ok(AwsCredentialsProvider::new_minimal( + access_key_id, + secret_access_key, + )) + } + Self::File { + credentials_file, + profile, + } => { + if old_assume_role.is_some() { + warn!( + "Ignoring option `assume_role`, instead using AWS credentials file options." + ); + } + AwsCredentialsProvider::new_with_credentials_file( + credentials_file, + profile + .as_ref() + .unwrap_or(&AWS_DEFAULT_PROFILE.to_string()) + .as_str(), + ) + } + Self::Role { assume_role } => { + if old_assume_role.is_some() { + warn!( + "Ignoring option `assume_role`, instead using option `auth.assume_role`." + ); + } + AwsCredentialsProvider::new(region, Some(assume_role.clone())) + } + Self::Default {} => AwsCredentialsProvider::new(region, old_assume_role), + } + } +} +#[cfg(test)] +mod test { + use super::*; + use crate::aws::auth::AwsAuthentication; + use rusoto_core::Region; + use std::fs::File; + use std::io::Write; + + #[test] + fn parsing_credentials_file() { + let tmpdir = tempfile::tempdir().unwrap(); + let tmpfile_path = tmpdir.path().join("credentials"); + let mut tmpfile = File::create(&tmpfile_path).unwrap(); + + writeln!( + tmpfile, + r#" + [default] + aws_access_key_id = default-access-key-id + aws_secret_access_key = default-secret + "# + ) + .unwrap(); + + let auth = AwsAuthentication::File { + credentials_file: tmpfile_path.to_str().unwrap().to_string(), + profile: Some("default".to_string()), + }; + let result = auth.build(&Region::AfSouth1, None).unwrap(); + assert!(matches!(result, AwsCredentialsProvider::File { .. })); + + drop(tmpfile); + tmpdir.close().unwrap(); + } +} diff --git a/src/rusoto/mod.rs b/src/aws/rusoto/mod.rs similarity index 97% rename from src/rusoto/mod.rs rename to src/aws/rusoto/mod.rs index 4584e3e4e1387..93890bd290574 100644 --- a/src/rusoto/mod.rs +++ b/src/aws/rusoto/mod.rs @@ -1,3 +1,8 @@ +mod auth; +pub mod region; + +//TODO: replace with direct import +pub use super::auth::AwsAuthentication; use crate::config::ProxyConfig; use crate::{http::HttpError, tls::MaybeTlsSettings}; use async_trait::async_trait; @@ -8,8 +13,11 @@ use http::{ Method, Request, Response, StatusCode, }; use hyper::body::{Body, HttpBody}; +use hyper::client; use once_cell::sync::OnceCell; use regex::bytes::RegexSet; +pub use region::{region_from_endpoint, RegionOrEndpoint}; +use rusoto_core::credential::ProfileProvider; use rusoto_core::{ request::{ DispatchSignedRequest, DispatchSignedRequestFuture, HttpDispatchError, HttpResponse, @@ -30,19 +38,13 @@ use std::{ time::Duration, }; use tower::{Service, ServiceExt}; +// use crate::http; -pub mod auth; -pub mod region; -pub use auth::AwsAuthentication; -use hyper::client; -pub use region::{region_from_endpoint, RegionOrEndpoint}; -use rusoto_core::credential::ProfileProvider; - -pub type Client = HttpClient>; +pub type Client = HttpClient>; pub fn client(proxy: &ProxyConfig) -> crate::Result { let settings = MaybeTlsSettings::enable_client()?; - let client = super::http::HttpClient::new(settings, proxy)?; + let client = crate::http::HttpClient::new(settings, proxy)?; Ok(HttpClient { client }) } @@ -51,7 +53,7 @@ pub fn custom_client( client_builder: &mut client::Builder, ) -> crate::Result { let settings = MaybeTlsSettings::enable_client()?; - let client = super::http::HttpClient::new_with_custom_client(settings, proxy, client_builder)?; + let client = crate::http::HttpClient::new_with_custom_client(settings, proxy, client_builder)?; Ok(HttpClient { client }) } diff --git a/src/rusoto/region.rs b/src/aws/rusoto/region.rs similarity index 91% rename from src/rusoto/region.rs rename to src/aws/rusoto/region.rs index dbea60d958f16..9a80632fa04ed 100644 --- a/src/rusoto/region.rs +++ b/src/aws/rusoto/region.rs @@ -1,33 +1,11 @@ use http::{uri::InvalidUri, Uri}; use rusoto_core::{region::ParseRegionError, Region}; -use serde::{Deserialize, Serialize}; + +pub use crate::aws::region::RegionOrEndpoint; use snafu::{ResultExt, Snafu}; use std::convert::TryFrom; use std::str::FromStr; -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)] -#[serde(default)] -pub struct RegionOrEndpoint { - region: Option, - endpoint: Option, -} - -impl RegionOrEndpoint { - pub const fn with_region(region: String) -> Self { - Self { - region: Some(region), - endpoint: None, - } - } - - pub const fn with_endpoint(endpoint: String) -> Self { - Self { - region: None, - endpoint: Some(endpoint), - } - } -} - #[derive(Debug, Snafu)] pub enum ParseError { #[snafu(display("Failed to parse custom endpoint as URI: {}", source))] diff --git a/src/internal_events/common.rs b/src/internal_events/common.rs index 7368b693f6641..a1dfce9ca4248 100644 --- a/src/internal_events/common.rs +++ b/src/internal_events/common.rs @@ -1,4 +1,5 @@ use metrics::counter; +pub use vector_core::internal_event::EventsReceived; use vector_core::internal_event::InternalEvent; #[derive(Debug)] @@ -17,27 +18,6 @@ impl InternalEvent for BytesReceived { } } -#[derive(Debug)] -pub struct EventsReceived { - pub count: usize, - pub byte_size: usize, -} - -impl InternalEvent for EventsReceived { - fn emit_logs(&self) { - trace!(message = "Events received.", count = %self.count, byte_size = %self.byte_size); - } - - fn emit_metrics(&self) { - counter!("component_received_events_total", self.count as u64); - counter!("events_in_total", self.count as u64); - counter!( - "component_received_event_bytes_total", - self.byte_size as u64 - ); - } -} - #[derive(Debug)] pub struct HttpClientBytesReceived<'a> { pub byte_size: usize, diff --git a/src/lib.rs b/src/lib.rs index a47f6449a21cf..5ed9b04bc709a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -37,6 +37,8 @@ pub mod internal_events; pub mod api; pub mod app; pub mod async_read; +#[cfg(any(feature = "rusoto_core", feature = "aws-config"))] +pub mod aws; #[cfg(feature = "codecs")] pub mod codecs; pub mod encoding_transcode; @@ -52,8 +54,6 @@ pub mod list; pub(crate) mod pipeline; pub(crate) mod proto; pub mod providers; -#[cfg(feature = "rusoto_core")] -pub mod rusoto; pub mod serde; #[cfg(windows)] pub mod service; diff --git a/src/sinks/aws_cloudwatch_logs/mod.rs b/src/sinks/aws_cloudwatch_logs/mod.rs index cecb695bc3408..030f8d57af763 100644 --- a/src/sinks/aws_cloudwatch_logs/mod.rs +++ b/src/sinks/aws_cloudwatch_logs/mod.rs @@ -1,12 +1,12 @@ mod request; +use crate::aws::rusoto::{self, AwsAuthentication, RegionOrEndpoint}; use crate::{ config::{ log_schema, DataType, GenerateConfig, ProxyConfig, SinkConfig, SinkContext, SinkDescription, }, event::{Event, LogEvent, Value}, internal_events::TemplateRenderingFailed, - rusoto::{self, AwsAuthentication, RegionOrEndpoint}, sinks::util::{ batch::{BatchConfig, BatchSettings}, encoding::{EncodingConfig, EncodingConfiguration}, @@ -679,10 +679,8 @@ impl From> for CloudwatchError { #[cfg(test)] mod tests { use super::*; - use crate::{ - event::{Event, Value}, - rusoto::RegionOrEndpoint, - }; + use crate::aws::rusoto::RegionOrEndpoint; + use crate::event::{Event, Value}; use std::collections::HashMap; use std::convert::{TryFrom, TryInto}; @@ -787,7 +785,7 @@ mod tests { fn svc(config: CloudwatchLogsSinkConfig) -> CloudwatchLogsSvc { let config = CloudwatchLogsSinkConfig { - region: RegionOrEndpoint::with_endpoint("http://localhost:6000".into()), + region: RegionOrEndpoint::with_endpoint("http://localhost:6000"), ..config }; let key = CloudwatchKey { @@ -859,9 +857,9 @@ mod tests { #[cfg(test)] mod integration_tests { use super::*; + use crate::aws::rusoto::RegionOrEndpoint; use crate::{ config::{ProxyConfig, SinkConfig, SinkContext}, - rusoto::RegionOrEndpoint, test_util::{random_lines, random_lines_with_stream, random_string, trace_init}, }; use futures::{stream, SinkExt, StreamExt}; @@ -882,7 +880,7 @@ mod integration_tests { let config = CloudwatchLogsSinkConfig { stream_name: Template::try_from(stream_name.as_str()).unwrap(), group_name: Template::try_from(GROUP_NAME).unwrap(), - region: RegionOrEndpoint::with_endpoint("http://localhost:6000".into()), + region: RegionOrEndpoint::with_endpoint("http://localhost:6000"), encoding: Encoding::Text.into(), create_missing_group: None, create_missing_stream: None, @@ -929,7 +927,7 @@ mod integration_tests { let config = CloudwatchLogsSinkConfig { stream_name: Template::try_from(stream_name.as_str()).unwrap(), group_name: Template::try_from(GROUP_NAME).unwrap(), - region: RegionOrEndpoint::with_endpoint("http://localhost:6000".into()), + region: RegionOrEndpoint::with_endpoint("http://localhost:6000"), encoding: Encoding::Text.into(), create_missing_group: None, create_missing_stream: None, @@ -995,7 +993,7 @@ mod integration_tests { let config = CloudwatchLogsSinkConfig { stream_name: Template::try_from(stream_name.as_str()).unwrap(), group_name: Template::try_from(GROUP_NAME).unwrap(), - region: RegionOrEndpoint::with_endpoint("http://localhost:6000".into()), + region: RegionOrEndpoint::with_endpoint("http://localhost:6000"), encoding: Encoding::Text.into(), create_missing_group: None, create_missing_stream: None, @@ -1067,7 +1065,7 @@ mod integration_tests { let config = CloudwatchLogsSinkConfig { stream_name: Template::try_from(stream_name.as_str()).unwrap(), group_name: Template::try_from(group_name.as_str()).unwrap(), - region: RegionOrEndpoint::with_endpoint("http://localhost:6000".into()), + region: RegionOrEndpoint::with_endpoint("http://localhost:6000"), encoding: Encoding::Text.into(), create_missing_group: None, create_missing_stream: None, @@ -1116,7 +1114,7 @@ mod integration_tests { let config = CloudwatchLogsSinkConfig { stream_name: Template::try_from(stream_name.as_str()).unwrap(), group_name: Template::try_from(group_name.as_str()).unwrap(), - region: RegionOrEndpoint::with_endpoint("http://localhost:6000".into()), + region: RegionOrEndpoint::with_endpoint("http://localhost:6000"), encoding: Encoding::Text.into(), create_missing_group: None, create_missing_stream: None, @@ -1167,7 +1165,7 @@ mod integration_tests { let config = CloudwatchLogsSinkConfig { group_name: Template::try_from(GROUP_NAME).unwrap(), stream_name: Template::try_from(format!("{}-{{{{key}}}}", stream_name)).unwrap(), - region: RegionOrEndpoint::with_endpoint("http://localhost:6000".into()), + region: RegionOrEndpoint::with_endpoint("http://localhost:6000"), encoding: Encoding::Text.into(), create_missing_group: None, create_missing_stream: None, @@ -1253,7 +1251,7 @@ mod integration_tests { let config = CloudwatchLogsSinkConfig { stream_name: Template::try_from("test-stream").unwrap(), group_name: Template::try_from(GROUP_NAME).unwrap(), - region: RegionOrEndpoint::with_endpoint("http://localhost:6000".into()), + region: RegionOrEndpoint::with_endpoint("http://localhost:6000"), encoding: Encoding::Text.into(), create_missing_group: None, create_missing_stream: None, diff --git a/src/sinks/aws_cloudwatch_metrics.rs b/src/sinks/aws_cloudwatch_metrics.rs index f30b1b7ccb39a..cc85635c5ae68 100644 --- a/src/sinks/aws_cloudwatch_metrics.rs +++ b/src/sinks/aws_cloudwatch_metrics.rs @@ -1,10 +1,11 @@ +use crate::aws::auth::AwsAuthentication; +use crate::aws::rusoto::{self, RegionOrEndpoint}; use crate::{ config::{DataType, ProxyConfig, SinkConfig, SinkContext, SinkDescription}, event::{ metric::{Metric, MetricValue}, Event, }, - rusoto::{self, AwsAuthentication, RegionOrEndpoint}, sinks::util::{ batch::{BatchConfig, BatchSettings}, buffer::metrics::{MetricNormalize, MetricNormalizer, MetricSet, MetricsBuffer}, diff --git a/src/sinks/aws_kinesis_firehose/config.rs b/src/sinks/aws_kinesis_firehose/config.rs index f969acb019d89..7ebc0aeae2d86 100644 --- a/src/sinks/aws_kinesis_firehose/config.rs +++ b/src/sinks/aws_kinesis_firehose/config.rs @@ -1,6 +1,5 @@ +use crate::aws::{AwsAuthentication, RegionOrEndpoint}; use crate::config::{DataType, GenerateConfig, ProxyConfig, SinkConfig, SinkContext}; -use crate::rusoto; -use crate::rusoto::{AwsAuthentication, RegionOrEndpoint}; use crate::sinks::aws_kinesis_firehose::request_builder::KinesisRequestBuilder; use crate::sinks::aws_kinesis_firehose::service::{KinesisResponse, KinesisService}; use crate::sinks::aws_kinesis_firehose::sink::KinesisSink; @@ -17,6 +16,7 @@ use rusoto_firehose::{ }; use serde::{Deserialize, Serialize}; +use crate::aws::rusoto; use crate::sinks::{Healthcheck, VectorSink}; use snafu::Snafu; use tower::ServiceBuilder; diff --git a/src/sinks/aws_kinesis_firehose/integration_tests.rs b/src/sinks/aws_kinesis_firehose/integration_tests.rs index 235eacb05ca4e..f928f1056f9ed 100644 --- a/src/sinks/aws_kinesis_firehose/integration_tests.rs +++ b/src/sinks/aws_kinesis_firehose/integration_tests.rs @@ -2,8 +2,8 @@ #![cfg(test)] use super::*; +use crate::aws::{AwsAuthentication, RegionOrEndpoint}; use crate::config::{SinkConfig, SinkContext}; -use crate::rusoto::{AwsAuthentication, RegionOrEndpoint}; use crate::sinks::util::encoding::{EncodingConfig, StandardEncodings}; use crate::sinks::util::{BatchConfig, Compression, TowerRequestConfig}; use crate::test_util::components; @@ -37,7 +37,7 @@ async fn firehose_put_records() { let config = KinesisFirehoseSinkConfig { stream_name: stream.clone(), - region: RegionOrEndpoint::with_endpoint("http://localhost:4566".into()), + region: RegionOrEndpoint::with_endpoint("http://localhost:4566"), encoding: EncodingConfig::from(StandardEncodings::Json), // required for ES destination w/ localstack compression: Compression::None, batch: BatchConfig { diff --git a/src/sinks/aws_kinesis_firehose/tests.rs b/src/sinks/aws_kinesis_firehose/tests.rs index 894fd9381a259..2fe3ce23e6fb5 100644 --- a/src/sinks/aws_kinesis_firehose/tests.rs +++ b/src/sinks/aws_kinesis_firehose/tests.rs @@ -1,8 +1,8 @@ #![cfg(test)] use super::*; +use crate::aws::RegionOrEndpoint; use crate::config::{SinkConfig, SinkContext}; -use crate::rusoto::RegionOrEndpoint; use crate::sinks::aws_kinesis_firehose::config::{ BuildError, MAX_PAYLOAD_EVENTS, MAX_PAYLOAD_SIZE, }; @@ -19,7 +19,7 @@ fn generate_config() { async fn check_batch_size() { let config = KinesisFirehoseSinkConfig { stream_name: String::from("test"), - region: RegionOrEndpoint::with_endpoint("http://localhost:4566".into()), + region: RegionOrEndpoint::with_endpoint("http://localhost:4566"), encoding: EncodingConfig::from(StandardEncodings::Json), compression: Compression::None, batch: BatchConfig { @@ -44,7 +44,7 @@ async fn check_batch_size() { async fn check_batch_events() { let config = KinesisFirehoseSinkConfig { stream_name: String::from("test"), - region: RegionOrEndpoint::with_endpoint("http://localhost:4566".into()), + region: RegionOrEndpoint::with_endpoint("http://localhost:4566"), encoding: EncodingConfig::from(StandardEncodings::Json), compression: Compression::None, batch: BatchConfig { diff --git a/src/sinks/aws_kinesis_streams/config.rs b/src/sinks/aws_kinesis_streams/config.rs index 84ffcb20c16b8..ff4cd487b2a67 100644 --- a/src/sinks/aws_kinesis_streams/config.rs +++ b/src/sinks/aws_kinesis_streams/config.rs @@ -1,7 +1,7 @@ use std::convert::TryInto; +use crate::aws::rusoto::{AwsAuthentication, RegionOrEndpoint}; use crate::config::{DataType, GenerateConfig, ProxyConfig, SinkConfig, SinkContext}; -use crate::rusoto::{AwsAuthentication, RegionOrEndpoint}; use crate::sinks::aws_kinesis_streams::service::KinesisService; use crate::sinks::util::encoding::{EncodingConfig, StandardEncodings}; use crate::sinks::util::{BatchConfig, BatchSettings, Compression, TowerRequestConfig}; @@ -11,7 +11,7 @@ use rusoto_kinesis::{DescribeStreamInput, Kinesis, KinesisClient, PutRecordsErro use serde::{Deserialize, Serialize}; use super::service::KinesisResponse; -use crate::rusoto; +use crate::aws::rusoto; use crate::sinks::aws_kinesis_streams::request_builder::KinesisRequestBuilder; use crate::sinks::aws_kinesis_streams::sink::KinesisSink; use crate::sinks::util::retries::RetryLogic; diff --git a/src/sinks/aws_kinesis_streams/integration_tests.rs b/src/sinks/aws_kinesis_streams/integration_tests.rs index 191b7ca914fb2..7fff87acd0c44 100644 --- a/src/sinks/aws_kinesis_streams/integration_tests.rs +++ b/src/sinks/aws_kinesis_streams/integration_tests.rs @@ -2,13 +2,13 @@ #![cfg(test)] use super::*; +use crate::aws::rusoto::RegionOrEndpoint; use crate::config::SinkConfig; use crate::sinks::util::encoding::StandardEncodings; use crate::sinks::util::{BatchConfig, Compression}; use crate::test_util::components; use crate::{ config::SinkContext, - rusoto::RegionOrEndpoint, test_util::{random_lines_with_stream, random_string}, }; use rusoto_core::Region; @@ -30,7 +30,7 @@ async fn kinesis_put_records() { let config = KinesisSinkConfig { stream_name: stream.clone(), partition_key_field: None, - region: RegionOrEndpoint::with_endpoint("http://localhost:4566".into()), + region: RegionOrEndpoint::with_endpoint("http://localhost:4566"), encoding: StandardEncodings::Text.into(), compression: Compression::None, batch: BatchConfig { diff --git a/src/sinks/aws_s3/config.rs b/src/sinks/aws_s3/config.rs index 7e1861f439576..2fde1128ec425 100644 --- a/src/sinks/aws_s3/config.rs +++ b/src/sinks/aws_s3/config.rs @@ -1,10 +1,10 @@ +use crate::aws::rusoto::{AwsAuthentication, RegionOrEndpoint}; use crate::config::SinkContext; use crate::sinks::s3_common::sink::S3Sink; use crate::sinks::util::encoding::StandardEncodings; use crate::sinks::util::BatchSettings; use crate::{ config::{DataType, GenerateConfig, ProxyConfig, SinkConfig}, - rusoto::{AwsAuthentication, RegionOrEndpoint}, sinks::{ s3_common::{ self, diff --git a/src/sinks/aws_s3/tests.rs b/src/sinks/aws_s3/tests.rs index 354d4553588b9..0804b7fcb5f1a 100644 --- a/src/sinks/aws_s3/tests.rs +++ b/src/sinks/aws_s3/tests.rs @@ -1,8 +1,8 @@ #[cfg(feature = "aws-s3-integration-tests")] #[cfg(test)] mod integration_tests { + use crate::aws::rusoto::RegionOrEndpoint; use crate::config::SinkContext; - use crate::rusoto::RegionOrEndpoint; use crate::sinks::aws_s3::S3SinkConfig; use crate::sinks::s3_common::config::S3Options; use crate::sinks::util::encoding::StandardEncodings; diff --git a/src/sinks/aws_sqs.rs b/src/sinks/aws_sqs.rs index e9652732ecbc0..e72ac32302508 100644 --- a/src/sinks/aws_sqs.rs +++ b/src/sinks/aws_sqs.rs @@ -1,10 +1,10 @@ +use crate::aws::rusoto::{self, AwsAuthentication, RegionOrEndpoint}; use crate::{ config::{ log_schema, DataType, GenerateConfig, ProxyConfig, SinkConfig, SinkContext, SinkDescription, }, event::Event, internal_events::{AwsSqsEventSent, TemplateRenderingFailed}, - rusoto::{self, AwsAuthentication, RegionOrEndpoint}, sinks::util::{ encoding::{EncodingConfig, EncodingConfiguration}, retries::RetryLogic, @@ -394,7 +394,7 @@ mod integration_tests { let config = SqsSinkConfig { queue_url: queue_url.clone(), - region: RegionOrEndpoint::with_endpoint("http://localhost:4566".into()), + region: RegionOrEndpoint::with_endpoint("http://localhost:4566"), encoding: Encoding::Text.into(), message_group_id: None, message_deduplication_id: None, diff --git a/src/sinks/datadog_archives.rs b/src/sinks/datadog_archives.rs index 741ffe66de55d..fcd88a28f01d7 100644 --- a/src/sinks/datadog_archives.rs +++ b/src/sinks/datadog_archives.rs @@ -29,11 +29,14 @@ use super::util::{ BatchSettings, Compression, RequestBuilder, }; use crate::{ - config::GenerateConfig, - config::{DataType, SinkConfig, SinkContext}, + aws::{AwsAuthentication, RegionOrEndpoint}, http::HttpClient, - rusoto::{AwsAuthentication, RegionOrEndpoint}, serde::to_string, +}; + +use crate::{ + config::GenerateConfig, + config::{DataType, SinkConfig, SinkContext}, sinks::{ azure_common::{ self, diff --git a/src/sinks/elasticsearch/common.rs b/src/sinks/elasticsearch/common.rs index fab89078c3c23..167701cdccefe 100644 --- a/src/sinks/elasticsearch/common.rs +++ b/src/sinks/elasticsearch/common.rs @@ -6,7 +6,7 @@ use crate::transforms::metric_to_log::MetricToLog; use crate::sinks::util::http::RequestConfig; -use crate::rusoto::region_from_endpoint; +use crate::aws::rusoto::region_from_endpoint; use crate::sinks::util::{Compression, TowerRequestConfig, UriSerde}; use crate::tls::TlsSettings; use http::{StatusCode, Uri}; @@ -16,7 +16,7 @@ use snafu::ResultExt; use std::convert::TryFrom; use super::{InvalidHost, Request}; -use crate::rusoto; +use crate::aws::rusoto; use crate::sinks::elasticsearch::encoder::ElasticSearchEncoder; use crate::sinks::util::encoding::EncodingConfigFixed; use crate::sinks::HealthcheckError; diff --git a/src/sinks/elasticsearch/config.rs b/src/sinks/elasticsearch/config.rs index 8bbfb94657634..e3a3184882de7 100644 --- a/src/sinks/elasticsearch/config.rs +++ b/src/sinks/elasticsearch/config.rs @@ -1,9 +1,9 @@ +use crate::aws::rusoto::RegionOrEndpoint; use crate::config::log_schema; use crate::config::{DataType, SinkConfig, SinkContext}; use crate::event::{EventRef, LogEvent, Value}; use crate::http::HttpClient; use crate::internal_events::TemplateRenderingFailed; -use crate::rusoto::RegionOrEndpoint; use crate::sinks::elasticsearch::request_builder::ElasticsearchRequestBuilder; use crate::sinks::elasticsearch::sink::ElasticSearchSink; use crate::sinks::elasticsearch::{BatchActionTemplate, IndexTemplate}; diff --git a/src/sinks/elasticsearch/mod.rs b/src/sinks/elasticsearch/mod.rs index d3167f6cd7aea..b29519430c2aa 100644 --- a/src/sinks/elasticsearch/mod.rs +++ b/src/sinks/elasticsearch/mod.rs @@ -20,7 +20,6 @@ pub use encoder::ElasticSearchEncoder; use crate::{ config::SinkDescription, internal_events::TemplateRenderingFailed, - rusoto::{self, AwsAuthentication}, template::{Template, TemplateParseError}, }; use http::{ @@ -35,6 +34,7 @@ use serde::{Deserialize, Serialize}; use snafu::{ResultExt, Snafu}; +use crate::aws::rusoto::{self, AwsAuthentication}; use std::convert::TryFrom; use crate::event::{EventRef, LogEvent}; diff --git a/src/sinks/elasticsearch/service.rs b/src/sinks/elasticsearch/service.rs index ad75ad5913b41..137d77c255f84 100644 --- a/src/sinks/elasticsearch/service.rs +++ b/src/sinks/elasticsearch/service.rs @@ -10,7 +10,7 @@ use std::task::{Context, Poll}; use tower::ServiceExt; use vector_core::buffers::Ackable; -use crate::rusoto::AwsCredentialsProvider; +use crate::aws::rusoto::AwsCredentialsProvider; use crate::sinks::util::{Compression, ElementCount}; use http::header::HeaderName; use hyper::header::HeaderValue; diff --git a/src/sinks/elasticsearch/tests.rs b/src/sinks/elasticsearch/tests.rs index 5558cf7bcf72f..6b756417844e7 100644 --- a/src/sinks/elasticsearch/tests.rs +++ b/src/sinks/elasticsearch/tests.rs @@ -1,6 +1,6 @@ use super::BulkAction; +use crate::aws::rusoto::AwsAuthentication; use crate::event::{LogEvent, Metric, MetricKind, MetricValue, Value}; -use crate::rusoto::AwsAuthentication; use crate::sinks::elasticsearch::sink::process_log; use crate::sinks::elasticsearch::{ DataStreamConfig, ElasticSearchAuth, ElasticSearchCommon, ElasticSearchConfig, diff --git a/src/sinks/s3_common/config.rs b/src/sinks/s3_common/config.rs index 43ed53a7da7d2..909da14c20d35 100644 --- a/src/sinks/s3_common/config.rs +++ b/src/sinks/s3_common/config.rs @@ -1,7 +1,7 @@ use super::service::S3Service; +use crate::aws::rusoto; +use crate::aws::rusoto::{AwsAuthentication, RegionOrEndpoint}; use crate::config::ProxyConfig; -use crate::rusoto; -use crate::rusoto::{AwsAuthentication, RegionOrEndpoint}; use crate::sinks::util::retries::RetryLogic; use crate::sinks::Healthcheck; use futures::FutureExt; diff --git a/src/sources/aws_s3/mod.rs b/src/sources/aws_s3/mod.rs index bd5025b1e7607..41c7cdabb8d8a 100644 --- a/src/sources/aws_s3/mod.rs +++ b/src/sources/aws_s3/mod.rs @@ -1,8 +1,9 @@ use super::util::MultilineConfig; +use crate::aws::auth::AwsAuthentication; +use crate::aws::rusoto::{self, RegionOrEndpoint}; use crate::{ config::{DataType, ProxyConfig, SourceConfig, SourceContext, SourceDescription}, line_agg, - rusoto::{self, AwsAuthentication, RegionOrEndpoint}, }; use async_compression::tokio::bufread; use futures::{stream, stream::StreamExt}; @@ -294,10 +295,10 @@ mod test { #[cfg(test)] mod integration_tests { use super::{sqs, AwsS3Config, Compression, Strategy}; + use crate::aws::rusoto::RegionOrEndpoint; use crate::{ config::{SourceConfig, SourceContext}, line_agg, - rusoto::RegionOrEndpoint, sources::util::MultilineConfig, test_util::{ collect_n, lines_from_gzip_file, lines_from_zst_file, random_lines, trace_init, diff --git a/src/sources/aws_sqs/config.rs b/src/sources/aws_sqs/config.rs new file mode 100644 index 0000000000000..fd436ae93326e --- /dev/null +++ b/src/sources/aws_sqs/config.rs @@ -0,0 +1,86 @@ +use crate::aws::auth::AwsAuthentication; +use crate::codecs::{DecodingConfig, FramingConfig, ParserConfig}; +use crate::config::{DataType, SourceConfig, SourceContext}; +use crate::serde::{default_decoding, default_framing_message_based}; +use crate::sources::aws_sqs::source::SqsSource; + +use crate::aws::region::RegionOrEndpoint; +use serde::{Deserialize, Serialize}; +use std::cmp; + +#[derive(Deserialize, Serialize, Derivative, Debug, Clone)] +#[derivative(Default)] +#[serde(deny_unknown_fields)] +pub struct AwsSqsConfig { + #[serde(flatten)] + pub region: RegionOrEndpoint, + #[serde(default)] + pub auth: AwsAuthentication, + + pub queue_url: String, + + #[serde(default = "default_poll_secs")] + #[derivative(Default(value = "default_poll_secs()"))] + pub poll_secs: u32, + + // number of concurrent tasks spawned for receiving/processing SQS messages + #[serde(default = "default_client_concurrency")] + #[derivative(Default(value = "default_client_concurrency()"))] + pub client_concurrency: u32, + + #[serde(default = "default_framing_message_based")] + #[derivative(Default(value = "default_framing_message_based()"))] + pub framing: Box, + #[serde(default = "default_decoding")] + #[derivative(Default(value = "default_decoding()"))] + pub decoding: Box, +} + +#[async_trait::async_trait] +#[typetag::serde(name = "aws_sqs")] +impl SourceConfig for AwsSqsConfig { + async fn build(&self, cx: SourceContext) -> crate::Result { + let mut config_builder = aws_sdk_sqs::config::Builder::new() + .credentials_provider(self.auth.credentials_provider().await); + + if let Some(endpoint_override) = self.region.endpoint()? { + config_builder = config_builder.endpoint_resolver(endpoint_override); + } + if let Some(region) = self.region.region() { + config_builder = config_builder.region(region); + } + + let client = aws_sdk_sqs::Client::from_conf(config_builder.build()); + let decoder = DecodingConfig::new(self.framing.clone(), self.decoding.clone()).build()?; + + Ok(Box::pin( + SqsSource { + client, + queue_url: self.queue_url.clone(), + decoder, + poll_secs: self.poll_secs, + concurrency: self.client_concurrency, + acknowledgements: cx.acknowledgements.enabled, + } + .run(cx.out, cx.shutdown), + )) + } + + fn output_type(&self) -> DataType { + DataType::Log + } + + fn source_type(&self) -> &'static str { + "aws_sqs" + } +} + +const fn default_poll_secs() -> u32 { + 15 +} + +fn default_client_concurrency() -> u32 { + cmp::max(1, num_cpus::get() as u32) +} + +impl_generate_config_from_default!(AwsSqsConfig); diff --git a/src/sources/aws_sqs/events.rs b/src/sources/aws_sqs/events.rs new file mode 100644 index 0000000000000..d4273fe3eed85 --- /dev/null +++ b/src/sources/aws_sqs/events.rs @@ -0,0 +1,38 @@ +use aws_sdk_sqs::error::DeleteMessageBatchError; +use aws_sdk_sqs::SdkError; +use metrics::counter; +use vector_core::internal_event::InternalEvent; + +#[derive(Debug)] +pub struct AwsSqsBytesReceived { + pub byte_size: usize, +} + +impl InternalEvent for AwsSqsBytesReceived { + fn emit_logs(&self) { + trace!( + message = "Bytes received.", + byte_size = %self.byte_size, + protocol = "http", + ); + } + + fn emit_metrics(&self) { + counter!("component_received_bytes_total", self.byte_size as u64); + } +} + +#[derive(Debug)] +pub struct SqsMessageDeleteError<'a> { + pub error: &'a SdkError, +} + +impl<'a> InternalEvent for SqsMessageDeleteError<'a> { + fn emit_logs(&self) { + warn!(message = "Failed to delete SQS events.", error = %self.error); + } + + fn emit_metrics(&self) { + counter!("sqs_message_delete_failed_total", 1); + } +} diff --git a/src/sources/aws_sqs/integration_tests.rs b/src/sources/aws_sqs/integration_tests.rs new file mode 100644 index 0000000000000..9e969278ce3b4 --- /dev/null +++ b/src/sources/aws_sqs/integration_tests.rs @@ -0,0 +1,115 @@ +#![cfg(feature = "aws-sqs-integration-tests")] +#![cfg(test)] + +use crate::aws::auth::AwsAuthentication; +use crate::aws::region::RegionOrEndpoint; +use crate::config::log_schema; +use crate::config::{SourceConfig, SourceContext}; +use crate::event::Event; +use crate::sources::aws_sqs::config::AwsSqsConfig; +use crate::test_util::random_string; +use crate::Pipeline; +use aws_sdk_sqs::output::CreateQueueOutput; +use aws_sdk_sqs::Endpoint; +use aws_types::region::Region; +use futures::StreamExt; +use http::Uri; +use std::collections::HashSet; +use std::str::FromStr; +use std::time::Duration; +use tokio::time::timeout; + +fn gen_queue_name() -> String { + random_string(10).to_lowercase() +} + +async fn ensure_queue(queue_name: &str, client: &aws_sdk_sqs::Client) -> CreateQueueOutput { + client + .create_queue() + .queue_name(queue_name) + .send() + .await + .unwrap() +} + +async fn send_test_events(count: u32, queue_url: &str, client: &aws_sdk_sqs::Client) { + for i in 0..count { + client + .send_message() + .message_body(calculate_message(i)) + .queue_url(queue_url) + .send() + .await + .unwrap(); + } +} + +async fn get_sqs_client() -> aws_sdk_sqs::Client { + let config = aws_sdk_sqs::config::Builder::new() + .credentials_provider(AwsAuthentication::test_auth().credentials_provider().await) + .endpoint_resolver(Endpoint::immutable( + Uri::from_str("http://localhost:4566").unwrap(), + )) + .region(Some(Region::new("us-east-1"))) + .build(); + + aws_sdk_sqs::Client::from_conf(config) +} + +#[tokio::test] +pub async fn test() { + let sqs_client = get_sqs_client().await; + let queue_name = gen_queue_name(); + let queue_url = ensure_queue(&queue_name, &sqs_client) + .await + .queue_url + .expect("Create queue should return the url"); + + let num_events = 3; + send_test_events(num_events, &queue_url, &sqs_client).await; + + let config = AwsSqsConfig { + region: RegionOrEndpoint::with_both("us-east-1", "http://localhost:4566"), + auth: AwsAuthentication::test_auth(), + queue_url: queue_url.clone(), + ..Default::default() + }; + + let (tx, rx) = Pipeline::new_test(); + tokio::spawn(async move { + config + .build(SourceContext::new_test(tx)) + .await + .unwrap() + .await + .unwrap() + }); + + let mut expected_messages = HashSet::new(); + for i in 0..num_events { + expected_messages.insert(calculate_message(i)); + } + + let events: Vec = timeout( + Duration::from_secs(10), + rx.take(num_events as usize).collect(), + ) + .await + .unwrap(); + + for event in events { + let message = event + .as_log() + .get(log_schema().message_key()) + .unwrap() + .to_string_lossy(); + if !expected_messages.remove(&message) { + panic!("Received unexpected message: {:?}", message); + } + } + assert!(expected_messages.is_empty()); +} + +fn calculate_message(index: u32) -> String { + format!("Test message: {}", index) +} diff --git a/src/sources/aws_sqs/mod.rs b/src/sources/aws_sqs/mod.rs new file mode 100644 index 0000000000000..50395c14372de --- /dev/null +++ b/src/sources/aws_sqs/mod.rs @@ -0,0 +1,11 @@ +mod config; +mod events; +mod integration_tests; +mod source; + +use crate::config::SourceDescription; +use config::AwsSqsConfig; + +inventory::submit! { + SourceDescription::new::("aws_sqs") +} diff --git a/src/sources/aws_sqs/source.rs b/src/sources/aws_sqs/source.rs new file mode 100644 index 0000000000000..eea2c1a5a084c --- /dev/null +++ b/src/sources/aws_sqs/source.rs @@ -0,0 +1,259 @@ +use crate::codecs::Decoder; +use crate::config::log_schema; +use crate::event::BatchStatus; +use crate::event::{BatchNotifier, Event}; +use crate::shutdown::ShutdownSignal; +use crate::sources::util::TcpError; +use crate::Pipeline; +use aws_sdk_sqs::model::{DeleteMessageBatchRequestEntry, MessageSystemAttributeName}; +use std::collections::HashMap; + +use super::events::*; +use crate::vector_core::ByteSizeOf; +use async_stream::stream; +use aws_sdk_sqs::Client as SqsClient; +use bytes::Bytes; +use chrono::{DateTime, TimeZone, Utc}; +use futures::{FutureExt, TryStreamExt}; +use futures::{SinkExt, Stream, StreamExt}; +use std::io::Cursor; +use std::panic; +use std::str::FromStr; +use tokio::time::Duration; +use tokio::{pin, select}; +use tokio_util::codec::FramedRead; +use vector_core::internal_event::EventsReceived; + +// This is the maximum SQS supports in a single batch request +const MAX_BATCH_SIZE: i32 = 10; + +#[derive(Clone)] +pub struct SqsSource { + pub client: SqsClient, + pub queue_url: String, + pub decoder: Decoder, + pub poll_secs: u32, + pub concurrency: u32, + pub acknowledgements: bool, +} + +impl SqsSource { + pub async fn run(self, out: Pipeline, shutdown: ShutdownSignal) -> Result<(), ()> { + let mut task_handles = vec![]; + + for _ in 0..self.concurrency { + let source = self.clone(); + let shutdown = shutdown.clone().fuse(); + let mut out = out.clone(); + task_handles.push(tokio::spawn(async move { + pin!(shutdown); + loop { + select! { + _ = &mut shutdown => break, + _ = source.run_once(&mut out, self.acknowledgements) => {}, + } + } + })); + } + + // Wait for all of the processes to finish. If any one of them panics, we resume + // that panic here to properly shutdown Vector. + for task_handle in task_handles.drain(..) { + if let Err(e) = task_handle.await { + if e.is_panic() { + panic::resume_unwind(e.into_panic()); + } + } + } + Ok(()) + } + + async fn run_once(&self, out: &mut Pipeline, acknowledgements: bool) { + let result = self + .client + .receive_message() + .queue_url(&self.queue_url) + .max_number_of_messages(MAX_BATCH_SIZE) + .wait_time_seconds(self.poll_secs as i32) + .attribute_names(MessageSystemAttributeName::SentTimestamp.as_str()) + .send() + .await; + + let receive_message_output = match result { + Ok(output) => output, + Err(err) => { + error!("SQS receive message error: {:?}", err); + // prevent rapid errors from flooding the logs + tokio::time::sleep(Duration::from_secs(1)).await; + return; + } + }; + + if let Some(messages) = receive_message_output.messages { + let mut receipts_to_ack = vec![]; + + let mut batch_receiver = None; + for message in messages { + if let Some(body) = message.body { + emit!(&AwsSqsBytesReceived { + byte_size: body.len() + }); + let timestamp = get_timestamp(&message.attributes); + let stream = decode_message(self.decoder.clone(), &body, timestamp); + pin!(stream); + let send_result = if acknowledgements { + let (batch, receiver) = BatchNotifier::new_with_receiver(); + let mut stream = stream.map_ok(|event| event.with_batch_notifier(&batch)); + batch_receiver = Some(receiver); + out.send_all(&mut stream).await + } else { + out.send_all(&mut stream).await + }; + + match send_result { + Err(err) => error!(message = "Error sending to sink.", error = %err), + Ok(()) => { + // a receipt handle should always exist + if let Some(receipt_handle) = message.receipt_handle { + receipts_to_ack.push(receipt_handle); + } + } + } + } + } + + if let Some(receiver) = batch_receiver { + let client = self.client.clone(); + let queue_url = self.queue_url.clone(); + tokio::spawn(async move { + let batch_status = receiver.await; + if batch_status == BatchStatus::Delivered { + delete_messages(&client, &receipts_to_ack, &queue_url).await; + } + }); + } else { + delete_messages(&self.client, &receipts_to_ack, &self.queue_url).await; + } + } + } +} + +fn get_timestamp( + attributes: &Option>, +) -> Option> { + attributes.as_ref().and_then(|attributes| { + let sent_time_str = attributes.get(&MessageSystemAttributeName::SentTimestamp)?; + Some(Utc.timestamp_millis(i64::from_str(sent_time_str).ok()?)) + }) +} + +async fn delete_messages(client: &SqsClient, receipts: &[String], queue_url: &str) { + if !receipts.is_empty() { + let mut batch = client.delete_message_batch().queue_url(queue_url); + + for (id, receipt) in receipts.iter().enumerate() { + batch = batch.entries( + DeleteMessageBatchRequestEntry::builder() + .id(id.to_string()) + .receipt_handle(receipt) + .build(), + ); + } + if let Err(err) = batch.send().await { + emit!(&SqsMessageDeleteError { error: &err }); + } + } +} + +fn decode_message( + decoder: Decoder, + message: &str, + sent_time: Option>, +) -> impl Stream> { + let schema = log_schema(); + + let payload = Cursor::new(Bytes::copy_from_slice(message.as_bytes())); + let mut stream = FramedRead::new(payload, decoder); + + let stream = stream! { + loop { + match stream.next().await { + Some(Ok((events, _))) => { + let count = events.len(); + let mut total_events_size = 0; + for mut event in events { + if let Event::Log(ref mut log) = event { + log.try_insert(schema.source_type_key(), Bytes::from("aws_sqs")); + if let Some(sent_time) = sent_time { + log.try_insert(schema.timestamp_key(), sent_time); + } + } + total_events_size += event.size_of(); + yield event; + } + emit!(&EventsReceived { + byte_size: total_events_size, + count + }); + }, + Some(Err(error)) => { + // Error is logged by `crate::codecs::Decoder`, no further handling + // is needed here. + if !error.can_continue() { + break; + } + } + None => break, + } + } + }; + stream.map(Ok) +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::SecondsFormat; + + #[tokio::test] + async fn test_decode() { + let message = "test"; + let now = Utc::now(); + let stream = decode_message::<()>(Decoder::default(), "test", Some(now)); + let events: Vec<_> = stream.collect().await; + assert_eq!(events.len(), 1); + assert_eq!( + events[0] + .clone() + .unwrap() + .as_log() + .get(log_schema().message_key()) + .unwrap() + .to_string_lossy(), + message + ); + assert_eq!( + events[0] + .clone() + .unwrap() + .as_log() + .get(log_schema().timestamp_key()) + .unwrap() + .to_string_lossy(), + now.to_rfc3339_opts(SecondsFormat::AutoSi, true) + ); + } + + #[test] + fn test_get_timestamp() { + let attributes = HashMap::from([( + MessageSystemAttributeName::SentTimestamp, + "1636408546018".to_string(), + )]); + + assert_eq!( + get_timestamp(&Some(attributes)), + Some(Utc.timestamp_millis(1636408546018)) + ); + } +} diff --git a/src/sources/mod.rs b/src/sources/mod.rs index e3bafadd1f2c6..add3afa420e92 100644 --- a/src/sources/mod.rs +++ b/src/sources/mod.rs @@ -8,6 +8,8 @@ pub mod aws_ecs_metrics; pub mod aws_kinesis_firehose; #[cfg(feature = "sources-aws_s3")] pub mod aws_s3; +#[cfg(feature = "sources-aws_sqs")] +pub mod aws_sqs; #[cfg(any(feature = "sources-datadog_agent"))] pub mod datadog; #[cfg(all(unix, feature = "sources-dnstap"))] diff --git a/website/content/en/docs/reference/configuration/sources/aws_sqs.md b/website/content/en/docs/reference/configuration/sources/aws_sqs.md new file mode 100644 index 0000000000000..2742e684b907c --- /dev/null +++ b/website/content/en/docs/reference/configuration/sources/aws_sqs.md @@ -0,0 +1,14 @@ +--- +title: AWS SQS +description: Collect logs from [AWS SQS](https://aws.amazon.com/sqs) +kind: source +layout: component +tags: ["aws", "sqs", "queue", "component", "source", "logs"] +--- + +{{/* +This doc is generated using: + +1. The template in layouts/docs/component.html +2. The relevant CUE data in cue/reference/components/... +*/}} diff --git a/website/cue/reference/components/sources/aws_sqs.cue b/website/cue/reference/components/sources/aws_sqs.cue new file mode 100644 index 0000000000000..5f084f480eaf7 --- /dev/null +++ b/website/cue/reference/components/sources/aws_sqs.cue @@ -0,0 +1,123 @@ +package metadata + +components: sources: aws_sqs: components._aws & { + title: "AWS SQS" + + features: { + collect: { + tls: enabled: false + checkpoint: enabled: false + proxy: enabled: true + from: service: services.aws_sqs + } + multiline: enabled: false + codecs: { + enabled: true + default_framing: "bytes" + } + } + + classes: { + commonly_used: true + deployment_roles: ["aggregator"] + delivery: "at_least_once" + development: "beta" + egress_method: "stream" + stateful: false + } + + support: { + targets: { + "aarch64-unknown-linux-gnu": true + "aarch64-unknown-linux-musl": true + "armv7-unknown-linux-gnueabihf": true + "armv7-unknown-linux-musleabihf": true + "x86_64-apple-darwin": true + "x86_64-pc-windows-msv": true + "x86_64-unknown-linux-gnu": true + "x86_64-unknown-linux-musl": true + } + requirements: [ + """ + The AWS SQS source requires an SQS queue. + """, + ] + warnings: [] + notices: [] + } + + installation: { + platform_name: null + } + + configuration: { + acknowledgements: configuration._acknowledgements + poll_secs: { + common: true + description: "How long to wait when polling SQS for new messages. 0-20 seconds" + required: false + warnings: [] + type: uint: { + default: 15 + unit: "seconds" + } + } + // client_concurrency: { + // common: true + // description: "How many clients are receiving / acking SQS messages. Increasing may allow higher throughput. Note: the default is 1 / CPU core" + // required: false + // warnings: [] + // type: uint: { + // default: 1 + // unit: "# of clients" + // } + // } + queue_url: { + description: "The URL of the SQS queue to receive events from." + required: true + warnings: [] + type: string: { + examples: ["https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue"] + syntax: "literal" + } + } + } + + output: logs: record: { + description: "An individual SQS record" + fields: { + message: { + description: "The raw message from the SQS record." + required: true + type: string: { + examples: ["53.126.150.246 - - [01/Oct/2020:11:25:58 -0400] \"GET /disintermediate HTTP/2.0\" 401 20308"] + syntax: "literal" + } + } + timestamp: fields._current_timestamp & { + description: "The time this message was sent to SQS." + } + } + } + + telemetry: metrics: { + component_received_event_bytes_total: components.sources.internal_metrics.output.metrics.component_received_event_bytes_total + component_received_events_total: components.sources.internal_metrics.output.metrics.component_received_events_total + component_received_bytes_total: components.sources.internal_metrics.output.metrics.component_received_bytes_total + sqs_message_delete_failed_total: components.sources.internal_metrics.output.metrics.sqs_message_delete_failed_total + } + + how_it_works: { + aws_sqs: { + title: "AWS SQS" + body: """ + The `aws_sqs` source receives messages from [AWS SQS](https://aws.amazon.com/sqs/) + (Simple Queue Service). This is a highly scaleable / durable queueing system with + at-least-once queuing semantics. Messages are received in batches (up to 10 at a time), + and then deleted in batches (again up to 10). Messages are either deleted immediately + after receiving, or after it has been fully processed by the sinks, depending on the + `acknowledgements` setting. + """ + } + } +}