From 67334fbb5947bba8885c2c63457192d02069344e Mon Sep 17 00:00:00 2001 From: Nathan Fox Date: Tue, 2 Nov 2021 13:22:23 -0400 Subject: [PATCH 01/17] save Signed-off-by: Nathan Fox --- src/sources/aws_sqs/mod.rs | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 src/sources/aws_sqs/mod.rs diff --git a/src/sources/aws_sqs/mod.rs b/src/sources/aws_sqs/mod.rs new file mode 100644 index 0000000000000..8116fddb63a23 --- /dev/null +++ b/src/sources/aws_sqs/mod.rs @@ -0,0 +1,3 @@ +inventory::submit! { + SourceDescription::new::("aws_sqs") +} From 457471a5ca9639b0b6c6e13b2a9ba4218ac4e875 Mon Sep 17 00:00:00 2001 From: Nathan Fox Date: Wed, 3 Nov 2021 13:49:33 -0400 Subject: [PATCH 02/17] save Signed-off-by: Nathan Fox --- Cargo.lock | 287 ++++++++++++++++++ Cargo.toml | 10 +- src/{rusoto => aws}/auth.rs | 54 +--- src/aws/aws_sdk/auth.rs | 54 ++++ src/aws/aws_sdk/mod.rs | 2 + src/aws/aws_sdk/region.rs | 14 + src/aws/mod.rs | 8 + src/aws/region.rs | 24 ++ src/aws/rusoto/auth.rs | 59 ++++ src/{ => aws}/rusoto/mod.rs | 22 +- src/{ => aws}/rusoto/region.rs | 24 +- src/lib.rs | 6 +- src/sinks/aws_cloudwatch_logs/mod.rs | 24 +- src/sinks/aws_cloudwatch_metrics.rs | 3 +- src/sinks/aws_kinesis_firehose.rs | 14 +- src/sinks/aws_kinesis_streams/config.rs | 4 +- .../aws_kinesis_streams/integration_tests.rs | 4 +- src/sinks/aws_s3/config.rs | 6 +- src/sinks/aws_s3/tests.rs | 2 +- src/sinks/aws_sqs.rs | 16 +- src/sinks/datadog_archives.rs | 3 +- src/sinks/elasticsearch/common.rs | 4 +- src/sinks/elasticsearch/config.rs | 2 +- src/sinks/elasticsearch/mod.rs | 4 +- src/sinks/elasticsearch/service.rs | 2 +- src/sinks/elasticsearch/tests.rs | 2 +- src/sinks/s3_common/config.rs | 4 +- src/sources/aws_s3/mod.rs | 5 +- src/sources/aws_sqs/config.rs | 101 ++++++ src/sources/aws_sqs/mod.rs | 6 + src/sources/aws_sqs/source.rs | 44 +++ src/sources/mod.rs | 2 + 32 files changed, 679 insertions(+), 137 deletions(-) rename src/{rusoto => aws}/auth.rs (69%) create mode 100644 src/aws/aws_sdk/auth.rs create mode 100644 src/aws/aws_sdk/mod.rs create mode 100644 src/aws/aws_sdk/region.rs create mode 100644 src/aws/mod.rs create mode 100644 src/aws/region.rs create mode 100644 src/aws/rusoto/auth.rs rename src/{ => aws}/rusoto/mod.rs (97%) rename src/{ => aws}/rusoto/region.rs (92%) create mode 100644 src/sources/aws_sqs/config.rs create mode 100644 src/sources/aws_sqs/source.rs diff --git a/Cargo.lock b/Cargo.lock index 3a2e954345b4a..1e7560f02572a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -543,6 +543,274 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "aws-config" +version = "0.0.22-alpha" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00ee7a2e95018f362ae10ae5f5629e3e9e9d1d8b68604604d3a190a72f29ffb3" +dependencies = [ + "aws-http", + "aws-hyper", + "aws-sdk-sts", + "aws-smithy-async", + "aws-smithy-client", + "aws-smithy-http", + "aws-smithy-http-tower", + "aws-smithy-json", + "aws-smithy-types", + "aws-types", + "bytes 1.1.0", + "http", + "tokio", + "tower", + "tracing 0.1.29", +] + +[[package]] +name = "aws-endpoint" +version = "0.0.22-alpha" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23b6ad91071353f652fcb4b3a2eaa53e0ecd7177af36dc9020a0f1c664521181" +dependencies = [ + "aws-smithy-http", + "aws-types", + "http", + "regex", + "tracing 0.1.29", +] + +[[package]] +name = "aws-http" +version = "0.0.22-alpha" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d66f94a4d0015a44ce3c7ab3df403d7fcc28e6595bf24189cc29adc119a1bdaa" +dependencies = [ + "aws-smithy-http", + "aws-smithy-types", + "aws-types", + "http", + "lazy_static", + "thiserror", + "tracing 0.1.29", +] + +[[package]] +name = "aws-hyper" +version = "0.0.22-alpha" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24f98b694ef3cabefa99b1eabcb0df967760d3bbd9a3b2c590ae72e35c893227" +dependencies = [ + "aws-endpoint", + "aws-http", + "aws-sig-auth", + "aws-smithy-client", + "aws-smithy-http", + "aws-smithy-http-tower", + "aws-smithy-types", + "bytes 1.1.0", + "fastrand", + "http", + "http-body", + "hyper", + "hyper-rustls", + "pin-project 1.0.8", + "tokio", + "tower", + "tracing 0.1.29", +] + +[[package]] +name = "aws-sdk-sqs" +version = "0.0.22-alpha" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95bea08a2055361eb2e97b1fba1dea3ba6f842cf6c1d141053a0faacfa41a3fc" +dependencies = [ + "aws-endpoint", + "aws-http", + "aws-hyper", + "aws-sig-auth", + "aws-smithy-client", + "aws-smithy-http", + "aws-smithy-query", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "bytes 1.1.0", + "http", +] + +[[package]] +name = "aws-sdk-sts" +version = "0.0.22-alpha" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc701f0fabb1c6dbcd618396637a42fdc2943d72875cb43c3adf881f2c25528b" +dependencies = [ + "aws-endpoint", + "aws-http", + "aws-hyper", + "aws-sig-auth", + "aws-smithy-client", + "aws-smithy-http", + "aws-smithy-query", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "bytes 1.1.0", + "http", +] + +[[package]] +name = "aws-sig-auth" +version = "0.0.22-alpha" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d7f393e2c69c0cfa58ddc1036475c79648e8e462b907454c5b99c62d68a131d" +dependencies = [ + "aws-sigv4", + "aws-smithy-http", + "aws-types", + "http", + "thiserror", + "tracing 0.1.29", +] + +[[package]] +name = "aws-sigv4" +version = "0.0.22-alpha" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9bb10064e2edb9052a46464fb158b6348539a3601443fc07a2c474a0fd6e61b2" +dependencies = [ + "chrono", + "form_urlencoded", + "hex", + "http", + "percent-encoding", + "ring", + "tracing 0.1.29", +] + +[[package]] +name = "aws-smithy-async" +version = "0.27.0-alpha" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ad697e2a97ba37c18437fe43a0f3f0c731c286361db1851d73c2b387ee4b805" +dependencies = [ + "pin-project-lite", + "tokio", +] + +[[package]] +name = "aws-smithy-client" +version = "0.27.0-alpha" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d56918ee8de8dbfdb966fb1a55e6fa82101df2805a6f8413d8864286ca1105fe" +dependencies = [ + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-http-tower", + "aws-smithy-types", + "bytes 1.1.0", + "fastrand", + "http", + "http-body", + "hyper", + "hyper-rustls", + "lazy_static", + "pin-project 1.0.8", + "pin-project-lite", + "tokio", + "tower", + "tracing 0.1.29", +] + +[[package]] +name = "aws-smithy-http" +version = "0.27.0-alpha" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9039d48a4ba324f52162aeaf095e4eb15bcc7e228a9cee9a65642c257be2cf7e" +dependencies = [ + "aws-smithy-types", + "bytes 1.1.0", + "bytes-utils", + "futures-core", + "http", + "http-body", + "hyper", + "percent-encoding", + "pin-project 1.0.8", + "thiserror", + "tokio", + "tokio-util", + "tracing 0.1.29", +] + +[[package]] +name = "aws-smithy-http-tower" +version = "0.27.0-alpha" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73b1502e912935b05eaf5be8f9eae607bc14d9d21ea69d0b19f622e0ee7d98b2" +dependencies = [ + "aws-smithy-http", + "bytes 1.1.0", + "http", + "http-body", + "pin-project 1.0.8", + "tower", + "tracing 0.1.29", +] + +[[package]] +name = "aws-smithy-json" +version = "0.27.0-alpha" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15c90840bccbd4cdf9b51db0ca2e051920c1ad94d456e29359fe9feada1c9e40" +dependencies = [ + "aws-smithy-types", +] + +[[package]] +name = "aws-smithy-query" +version = "0.27.0-alpha" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4afed6996a0cd17fcd61980ee600ae3133a6e5937f1968a402da8ac768872a59" +dependencies = [ + "aws-smithy-types", + "urlencoding", +] + +[[package]] +name = "aws-smithy-types" +version = "0.27.0-alpha" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6bcfd3eaa047d37438d72dfe059ddcf01d5b758c844a8930ce7b6a6acdc543f" +dependencies = [ + "chrono", + "itoa", + "num-integer", + "ryu", +] + +[[package]] +name = "aws-smithy-xml" +version = "0.27.0-alpha" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b39b2f34906aa0c0523fe15d528f70f0ff2490f5b1c37d6ef418ce799d3bcc07" +dependencies = [ + "thiserror", + "xmlparser", +] + +[[package]] +name = "aws-types" +version = "0.0.22-alpha" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "971b04d79f51492a84e944f96a7e9a648acad8ee0f3c9720442531f7b043a6d0" +dependencies = [ + "aws-smithy-async", + "aws-smithy-types", + "rustc_version 0.4.0", + "zeroize", +] + [[package]] name = "azure_core" version = "0.1.0" @@ -907,6 +1175,16 @@ dependencies = [ "serde", ] +[[package]] +name = "bytes-utils" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e314712951c43123e5920a446464929adc667a5eade7f8fb3997776c9df6e54" +dependencies = [ + "bytes 1.1.0", + "either", +] + [[package]] name = "bytesize" version = "1.1.0" @@ -7586,6 +7864,12 @@ dependencies = [ "serde", ] +[[package]] +name = "urlencoding" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a1f0175e03a0973cf4afd476bef05c26e228520400eb1fd473ad417b1c00ffb" + [[package]] name = "utf-8" version = "0.7.6" @@ -7634,6 +7918,9 @@ dependencies = [ "async-trait", "atty", "avro-rs", + "aws-config", + "aws-sdk-sqs", + "aws-types", "azure_core", "azure_storage", "base64 0.13.0", diff --git a/Cargo.toml b/Cargo.toml index 0890c6b6f71bd..c6623e5b6aeef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -132,7 +132,7 @@ metrics = { version = "0.17.0", default-features = false, features = ["std"] } metrics-tracing-context = { version = "0.8.0", default-features = false } metrics-util = { version = "0.10.1", default-features = false, features = ["std"] } -# Aws +# AWS - Rusoto rusoto_cloudwatch = { version = "0.47.0", optional = true } rusoto_core = { version = "0.47.0", features = ["encoding"], optional = true } rusoto_credential = { version = "0.47.0", optional = true } @@ -145,6 +145,12 @@ rusoto_signature = { version = "0.47.0", optional = true } rusoto_sqs = { version = "0.47.0", optional = true } rusoto_sts = { version = "0.47.0", optional = true } +# AWS - Official SDK +aws-config = {version = "0.0.22-alpha", optional = true} +aws-types = {version = "0.0.22-alpha", optional = true} +aws-sdk-sqs = {version = "0.0.22-alpha", optional = true} + + # Azure azure_core = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "16bcf0ab1bb6e380d966a69d314de1e99ede553a", default-features = false, features = ["enable_reqwest"], optional = true } azure_storage = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "16bcf0ab1bb6e380d966a69d314de1e99ede553a", default-features = false, features = ["blob"], optional = true } @@ -415,6 +421,7 @@ sources = ["sources-logs", "sources-metrics"] sources-logs = [ "sources-aws_kinesis_firehose", "sources-aws_s3", + "sources-aws_sqs", "sources-datadog_agent", "sources-docker_logs", "sources-exec", @@ -453,6 +460,7 @@ sources-apache_metrics = [] sources-aws_ecs_metrics = [] sources-aws_kinesis_firehose = ["base64", "infer", "sources-utils-tls", "warp", "codecs"] sources-aws_s3 = ["rusoto", "rusoto_s3", "rusoto_sqs", "semver", "uuid", "codecs", "zstd"] +sources-aws_sqs = ["aws-config", "aws-types", "aws-sdk-sqs"] sources-datadog_agent = ["snap", "sources-utils-tls", "warp", "sources-utils-http-error", "protobuf-build", "codecs"] sources-dnstap = ["base64", "data-encoding", "trust-dns-proto", "dnsmsg-parser", "protobuf-build"] sources-docker_logs = ["docker"] diff --git a/src/rusoto/auth.rs b/src/aws/auth.rs similarity index 69% rename from src/rusoto/auth.rs rename to src/aws/auth.rs index f515edb0c053d..7f65cce035eff 100644 --- a/src/rusoto/auth.rs +++ b/src/aws/auth.rs @@ -1,4 +1,4 @@ -use super::AwsCredentialsProvider; +use crate::aws::rusoto::AwsCredentialsProvider; use rusoto_core::Region; use serde::{Deserialize, Serialize}; @@ -27,59 +27,7 @@ pub enum AwsAuthentication { Default {}, } -impl AwsAuthentication { - const AWS_DEFAULT_PROFILE: &'static str = "default"; - pub fn build( - &self, - region: &Region, - old_assume_role: Option, - ) -> crate::Result { - if old_assume_role.is_some() { - warn!("Option `assume_role` has been renamed to `auth.assume_role`. Please use that one instead."); - } - match self { - Self::Static { - access_key_id, - secret_access_key, - } => { - if old_assume_role.is_some() { - warn!("Ignoring option `assume_role`, instead using access options."); - } - Ok(AwsCredentialsProvider::new_minimal( - access_key_id, - secret_access_key, - )) - } - Self::File { - credentials_file, - profile, - } => { - if old_assume_role.is_some() { - warn!( - "Ignoring option `assume_role`, instead using AWS credentials file options." - ); - } - AwsCredentialsProvider::new_with_credentials_file( - credentials_file, - profile - .as_ref() - .unwrap_or(&AwsAuthentication::AWS_DEFAULT_PROFILE.to_string()) - .as_str(), - ) - } - Self::Role { assume_role } => { - if old_assume_role.is_some() { - warn!( - "Ignoring option `assume_role`, instead using option `auth.assume_role`." - ); - } - AwsCredentialsProvider::new(region, Some(assume_role.clone())) - } - Self::Default {} => AwsCredentialsProvider::new(region, old_assume_role), - } - } -} #[cfg(test)] mod tests { diff --git a/src/aws/aws_sdk/auth.rs b/src/aws/aws_sdk/auth.rs new file mode 100644 index 0000000000000..9c7ee0bf7e255 --- /dev/null +++ b/src/aws/aws_sdk/auth.rs @@ -0,0 +1,54 @@ +use crate::aws::auth::AwsAuthentication; +use aws_config::default_provider::credentials::default_provider; +use aws_config::meta::credentials::LazyCachingCredentialsProvider; +use aws_config::profile::ProfileFileCredentialsProvider; +use aws_config::provider_config::ProviderConfig; +use aws_config::sts::AssumeRoleProviderBuilder; +use aws_types::config::{Builder, Config}; +use aws_types::credentials::SharedCredentialsProvider; +use aws_types::region::Region; +use aws_types::Credentials; + +impl AwsAuthentication { + pub async fn build_config(&self, region: Region) -> Config { + let mut builder = Builder::default().region(region); + + let credentials_provider = match self { + Self::Static { + access_key_id, + secret_access_key, + } => SharedCredentialsProvider::new(Credentials::from_keys( + access_key_id, + secret_access_key, + None, + )), + AwsAuthentication::File { + credentials_file, + profile, + } => { + warn!("Overriding the credentials file is not supported. `~/.aws/config` and `~/.aws/credentials` will be used instead of \"{}\"", credentials_file); + let mut file_provider = ProfileFileCredentialsProvider::builder(); + if let Some(profile) = profile { + file_provider = file_provider.profile_name(profile); + } + SharedCredentialsProvider::new( + LazyCachingCredentialsProvider::builder() + .load(file_provider.build()) + .build(), + ) + } + AwsAuthentication::Role { assume_role } => SharedCredentialsProvider::new( + AssumeRoleProviderBuilder::new(assume_role) + .build(default_credentials_provider().await), + ), + AwsAuthentication::Default {} => { + SharedCredentialsProvider::new(default_credentials_provider().await) + } + }; + builder.credentials_provider(credentials_provider).build() + } +} + +async fn default_credentials_provider() -> SharedCredentialsProvider { + SharedCredentialsProvider::new(default_provider().await) +} diff --git a/src/aws/aws_sdk/mod.rs b/src/aws/aws_sdk/mod.rs new file mode 100644 index 0000000000000..d3f5c5ed83085 --- /dev/null +++ b/src/aws/aws_sdk/mod.rs @@ -0,0 +1,2 @@ +mod auth; +mod region; diff --git a/src/aws/aws_sdk/region.rs b/src/aws/aws_sdk/region.rs new file mode 100644 index 0000000000000..95dfc58f21e50 --- /dev/null +++ b/src/aws/aws_sdk/region.rs @@ -0,0 +1,14 @@ +use crate::aws::region::RegionOrEndpoint; +use aws_sdk_sqs::Endpoint; +use http::Uri; +use std::str::FromStr; + +impl RegionOrEndpoint { + pub fn endpoint(&self) -> crate::Result> { + if let Some(endpoint) = &self.endpoint { + Ok(Some(Endpoint::immutable(Uri::from_str(endpoint)?))) + } else { + Ok(None) + } + } +} diff --git a/src/aws/mod.rs b/src/aws/mod.rs new file mode 100644 index 0000000000000..b13079b9be90c --- /dev/null +++ b/src/aws/mod.rs @@ -0,0 +1,8 @@ +pub mod auth; +pub mod region; + +#[cfg(feature = "rusoto_core")] +pub mod rusoto; + +#[cfg(feature = "aws-config")] +pub mod aws_sdk; diff --git a/src/aws/region.rs b/src/aws/region.rs new file mode 100644 index 0000000000000..cc180529cb94d --- /dev/null +++ b/src/aws/region.rs @@ -0,0 +1,24 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)] +#[serde(default)] +pub struct RegionOrEndpoint { + pub region: Option, + pub endpoint: Option, +} + +impl RegionOrEndpoint { + pub const fn with_region(region: String) -> Self { + Self { + region: Some(region), + endpoint: None, + } + } + + pub const fn with_endpoint(endpoint: String) -> Self { + Self { + region: None, + endpoint: Some(endpoint), + } + } +} diff --git a/src/aws/rusoto/auth.rs b/src/aws/rusoto/auth.rs new file mode 100644 index 0000000000000..b49dcdd2d723a --- /dev/null +++ b/src/aws/rusoto/auth.rs @@ -0,0 +1,59 @@ +use rusoto_core::Region; +use crate::aws::auth::AwsAuthentication; +use crate::aws::rusoto::AwsCredentialsProvider; + +const AWS_DEFAULT_PROFILE: &'static str = "default"; + +impl AwsAuthentication { + + + pub fn build( + &self, + region: &Region, + old_assume_role: Option, + ) -> crate::Result { + if old_assume_role.is_some() { + warn!("Option `assume_role` has been renamed to `auth.assume_role`. Please use that one instead."); + } + match self { + Self::Static { + access_key_id, + secret_access_key, + } => { + if old_assume_role.is_some() { + warn!("Ignoring option `assume_role`, instead using access options."); + } + Ok(AwsCredentialsProvider::new_minimal( + access_key_id, + secret_access_key, + )) + } + Self::File { + credentials_file, + profile, + } => { + if old_assume_role.is_some() { + warn!( + "Ignoring option `assume_role`, instead using AWS credentials file options." + ); + } + AwsCredentialsProvider::new_with_credentials_file( + credentials_file, + profile + .as_ref() + .unwrap_or(&AWS_DEFAULT_PROFILE.to_string()) + .as_str(), + ) + } + Self::Role { assume_role } => { + if old_assume_role.is_some() { + warn!( + "Ignoring option `assume_role`, instead using option `auth.assume_role`." + ); + } + AwsCredentialsProvider::new(region, Some(assume_role.clone())) + } + Self::Default {} => AwsCredentialsProvider::new(region, old_assume_role), + } + } +} diff --git a/src/rusoto/mod.rs b/src/aws/rusoto/mod.rs similarity index 97% rename from src/rusoto/mod.rs rename to src/aws/rusoto/mod.rs index 4584e3e4e1387..93890bd290574 100644 --- a/src/rusoto/mod.rs +++ b/src/aws/rusoto/mod.rs @@ -1,3 +1,8 @@ +mod auth; +pub mod region; + +//TODO: replace with direct import +pub use super::auth::AwsAuthentication; use crate::config::ProxyConfig; use crate::{http::HttpError, tls::MaybeTlsSettings}; use async_trait::async_trait; @@ -8,8 +13,11 @@ use http::{ Method, Request, Response, StatusCode, }; use hyper::body::{Body, HttpBody}; +use hyper::client; use once_cell::sync::OnceCell; use regex::bytes::RegexSet; +pub use region::{region_from_endpoint, RegionOrEndpoint}; +use rusoto_core::credential::ProfileProvider; use rusoto_core::{ request::{ DispatchSignedRequest, DispatchSignedRequestFuture, HttpDispatchError, HttpResponse, @@ -30,19 +38,13 @@ use std::{ time::Duration, }; use tower::{Service, ServiceExt}; +// use crate::http; -pub mod auth; -pub mod region; -pub use auth::AwsAuthentication; -use hyper::client; -pub use region::{region_from_endpoint, RegionOrEndpoint}; -use rusoto_core::credential::ProfileProvider; - -pub type Client = HttpClient>; +pub type Client = HttpClient>; pub fn client(proxy: &ProxyConfig) -> crate::Result { let settings = MaybeTlsSettings::enable_client()?; - let client = super::http::HttpClient::new(settings, proxy)?; + let client = crate::http::HttpClient::new(settings, proxy)?; Ok(HttpClient { client }) } @@ -51,7 +53,7 @@ pub fn custom_client( client_builder: &mut client::Builder, ) -> crate::Result { let settings = MaybeTlsSettings::enable_client()?; - let client = super::http::HttpClient::new_with_custom_client(settings, proxy, client_builder)?; + let client = crate::http::HttpClient::new_with_custom_client(settings, proxy, client_builder)?; Ok(HttpClient { client }) } diff --git a/src/rusoto/region.rs b/src/aws/rusoto/region.rs similarity index 92% rename from src/rusoto/region.rs rename to src/aws/rusoto/region.rs index dbea60d958f16..ddab04081fcb7 100644 --- a/src/rusoto/region.rs +++ b/src/aws/rusoto/region.rs @@ -5,28 +5,8 @@ use snafu::{ResultExt, Snafu}; use std::convert::TryFrom; use std::str::FromStr; -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)] -#[serde(default)] -pub struct RegionOrEndpoint { - region: Option, - endpoint: Option, -} - -impl RegionOrEndpoint { - pub const fn with_region(region: String) -> Self { - Self { - region: Some(region), - endpoint: None, - } - } - - pub const fn with_endpoint(endpoint: String) -> Self { - Self { - region: None, - endpoint: Some(endpoint), - } - } -} +//TODO: backwards compat - use the type directly instead of from here +pub use crate::aws::region::RegionOrEndpoint; #[derive(Debug, Snafu)] pub enum ParseError { diff --git a/src/lib.rs b/src/lib.rs index af89f05c270a0..96030b60bf8d2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -53,8 +53,8 @@ pub mod list; pub(crate) mod pipeline; pub(crate) mod proto; pub mod providers; -#[cfg(feature = "rusoto_core")] -pub mod rusoto; +#[cfg(any(feature = "rusoto_core", feature = "aws-config"))] +pub mod aws; pub mod serde; #[cfg(windows)] pub mod service; @@ -87,7 +87,7 @@ pub mod vector_windows; pub use pipeline::Pipeline; -pub use vector_core::{event, mapping, metrics, Error, Result}; +pub use vector_core::{Error, event, mapping, metrics, Result}; pub fn vector_version() -> impl std::fmt::Display { #[cfg(feature = "nightly")] diff --git a/src/sinks/aws_cloudwatch_logs/mod.rs b/src/sinks/aws_cloudwatch_logs/mod.rs index cecb695bc3408..b986067cc045c 100644 --- a/src/sinks/aws_cloudwatch_logs/mod.rs +++ b/src/sinks/aws_cloudwatch_logs/mod.rs @@ -2,22 +2,21 @@ mod request; use crate::{ config::{ - log_schema, DataType, GenerateConfig, ProxyConfig, SinkConfig, SinkContext, SinkDescription, + DataType, GenerateConfig, log_schema, ProxyConfig, SinkConfig, SinkContext, SinkDescription, }, event::{Event, LogEvent, Value}, internal_events::TemplateRenderingFailed, - rusoto::{self, AwsAuthentication, RegionOrEndpoint}, sinks::util::{ batch::{BatchConfig, BatchSettings}, - encoding::{EncodingConfig, EncodingConfiguration}, - retries::{FixedRetryPolicy, RetryLogic}, - Compression, EncodedEvent, EncodedLength, PartitionBatchSink, PartitionBuffer, - PartitionInnerBuffer, TowerRequestConfig, TowerRequestSettings, VecBuffer, + Compression, + EncodedEvent, + EncodedLength, encoding::{EncodingConfig, EncodingConfiguration}, PartitionBatchSink, PartitionBuffer, PartitionInnerBuffer, + retries::{FixedRetryPolicy, RetryLogic}, TowerRequestConfig, TowerRequestSettings, VecBuffer, }, template::Template, }; use chrono::{Duration, Utc}; -use futures::{future::BoxFuture, ready, stream, FutureExt, SinkExt, StreamExt, TryFutureExt}; +use futures::{future::BoxFuture, FutureExt, ready, SinkExt, stream, StreamExt, TryFutureExt}; use rusoto_core::{request::BufferedHttpResponse, RusotoError}; use rusoto_logs::{ CloudWatchLogs, CloudWatchLogsClient, CreateLogGroupError, CreateLogStreamError, @@ -36,10 +35,11 @@ use tower::{ buffer::Buffer, limit::{concurrency::ConcurrencyLimit, rate::RateLimit}, retry::Retry, - timeout::Timeout, - Service, ServiceBuilder, ServiceExt, + Service, + ServiceBuilder, ServiceExt, timeout::Timeout, }; use vector_core::ByteSizeOf; +use crate::aws::rusoto::{self, AwsAuthentication, RegionOrEndpoint}; // Estimated maximum size of InputLogEvent with an empty message const EVENT_SIZE_OVERHEAD: usize = 50; @@ -681,10 +681,10 @@ mod tests { use super::*; use crate::{ event::{Event, Value}, - rusoto::RegionOrEndpoint, }; use std::collections::HashMap; use std::convert::{TryFrom, TryInto}; + use crate::aws::rusoto::RegionOrEndpoint; #[test] fn generate_config() { @@ -861,14 +861,14 @@ mod integration_tests { use super::*; use crate::{ config::{ProxyConfig, SinkConfig, SinkContext}, - rusoto::RegionOrEndpoint, test_util::{random_lines, random_lines_with_stream, random_string, trace_init}, }; - use futures::{stream, SinkExt, StreamExt}; + use futures::{SinkExt, stream, StreamExt}; use pretty_assertions::assert_eq; use rusoto_core::Region; use rusoto_logs::{CloudWatchLogs, CreateLogGroupRequest, GetLogEventsRequest}; use std::convert::TryFrom; + use crate::aws::rusoto::RegionOrEndpoint; const GROUP_NAME: &str = "vector-cw"; diff --git a/src/sinks/aws_cloudwatch_metrics.rs b/src/sinks/aws_cloudwatch_metrics.rs index f30b1b7ccb39a..cc85635c5ae68 100644 --- a/src/sinks/aws_cloudwatch_metrics.rs +++ b/src/sinks/aws_cloudwatch_metrics.rs @@ -1,10 +1,11 @@ +use crate::aws::auth::AwsAuthentication; +use crate::aws::rusoto::{self, RegionOrEndpoint}; use crate::{ config::{DataType, ProxyConfig, SinkConfig, SinkContext, SinkDescription}, event::{ metric::{Metric, MetricValue}, Event, }, - rusoto::{self, AwsAuthentication, RegionOrEndpoint}, sinks::util::{ batch::{BatchConfig, BatchSettings}, buffer::metrics::{MetricNormalize, MetricNormalizer, MetricSet, MetricsBuffer}, diff --git a/src/sinks/aws_kinesis_firehose.rs b/src/sinks/aws_kinesis_firehose.rs index 899e924b436c4..991d7a79663b4 100644 --- a/src/sinks/aws_kinesis_firehose.rs +++ b/src/sinks/aws_kinesis_firehose.rs @@ -1,17 +1,16 @@ use crate::{ config::{DataType, GenerateConfig, ProxyConfig, SinkConfig, SinkContext, SinkDescription}, event::Event, - rusoto::{self, AwsAuthentication, RegionOrEndpoint}, sinks::util::{ - encoding::{EncodingConfig, EncodingConfiguration}, - retries::RetryLogic, - sink::{self, Response}, - BatchConfig, BatchSettings, Compression, EncodedEvent, EncodedLength, TowerRequestConfig, + BatchConfig, + BatchSettings, + Compression, + EncodedEvent, EncodedLength, encoding::{EncodingConfig, EncodingConfiguration}, retries::RetryLogic, sink::{self, Response}, TowerRequestConfig, VecBuffer, }, }; use bytes::Bytes; -use futures::{future::BoxFuture, stream, FutureExt, Sink, SinkExt, StreamExt}; +use futures::{future::BoxFuture, FutureExt, Sink, SinkExt, stream, StreamExt}; use rusoto_core::RusotoError; use rusoto_firehose::{ DescribeDeliveryStreamError, DescribeDeliveryStreamInput, KinesisFirehose, @@ -27,6 +26,7 @@ use std::{ use tower::Service; use tracing_futures::Instrument; use vector_core::ByteSizeOf; +use crate::aws::rusoto::{self, AwsAuthentication, RegionOrEndpoint}; // AWS Kinesis Firehose API accepts payloads up to 4MB or 500 events // https://docs.aws.amazon.com/firehose/latest/dev/limits.html @@ -385,7 +385,7 @@ mod integration_tests { use rusoto_es::{CreateElasticsearchDomainRequest, Es, EsClient}; use rusoto_firehose::{CreateDeliveryStreamInput, ElasticsearchDestinationConfiguration}; use serde_json::{json, Value}; - use tokio::time::{sleep, Duration}; + use tokio::time::{Duration, sleep}; #[tokio::test] async fn firehose_put_records() { diff --git a/src/sinks/aws_kinesis_streams/config.rs b/src/sinks/aws_kinesis_streams/config.rs index 84ffcb20c16b8..49b2913e99bb4 100644 --- a/src/sinks/aws_kinesis_streams/config.rs +++ b/src/sinks/aws_kinesis_streams/config.rs @@ -1,7 +1,7 @@ use std::convert::TryInto; use crate::config::{DataType, GenerateConfig, ProxyConfig, SinkConfig, SinkContext}; -use crate::rusoto::{AwsAuthentication, RegionOrEndpoint}; +use crate::aws::rusoto::{AwsAuthentication, RegionOrEndpoint}; use crate::sinks::aws_kinesis_streams::service::KinesisService; use crate::sinks::util::encoding::{EncodingConfig, StandardEncodings}; use crate::sinks::util::{BatchConfig, BatchSettings, Compression, TowerRequestConfig}; @@ -11,7 +11,7 @@ use rusoto_kinesis::{DescribeStreamInput, Kinesis, KinesisClient, PutRecordsErro use serde::{Deserialize, Serialize}; use super::service::KinesisResponse; -use crate::rusoto; +use crate::aws::rusoto; use crate::sinks::aws_kinesis_streams::request_builder::KinesisRequestBuilder; use crate::sinks::aws_kinesis_streams::sink::KinesisSink; use crate::sinks::util::retries::RetryLogic; diff --git a/src/sinks/aws_kinesis_streams/integration_tests.rs b/src/sinks/aws_kinesis_streams/integration_tests.rs index 191b7ca914fb2..7d14b9c16b552 100644 --- a/src/sinks/aws_kinesis_streams/integration_tests.rs +++ b/src/sinks/aws_kinesis_streams/integration_tests.rs @@ -8,13 +8,13 @@ use crate::sinks::util::{BatchConfig, Compression}; use crate::test_util::components; use crate::{ config::SinkContext, - rusoto::RegionOrEndpoint, test_util::{random_lines_with_stream, random_string}, }; use rusoto_core::Region; use rusoto_kinesis::{Kinesis, KinesisClient}; use std::sync::Arc; -use tokio::time::{sleep, Duration}; +use tokio::time::{Duration, sleep}; +use crate::aws::rusoto::RegionOrEndpoint; #[tokio::test] async fn kinesis_put_records() { diff --git a/src/sinks/aws_s3/config.rs b/src/sinks/aws_s3/config.rs index 22cf697bd9a6e..334ec5f54609a 100644 --- a/src/sinks/aws_s3/config.rs +++ b/src/sinks/aws_s3/config.rs @@ -4,8 +4,8 @@ use crate::sinks::util::encoding::StandardEncodings; use crate::sinks::util::BatchSettings; use crate::{ config::{DataType, GenerateConfig, ProxyConfig, SinkConfig}, - rusoto::{AwsAuthentication, RegionOrEndpoint}, sinks::{ + Healthcheck, s3_common::{ self, config::{S3Options, S3RetryLogic}, @@ -13,10 +13,9 @@ use crate::{ service::S3Service, }, util::{ - encoding::EncodingConfig, BatchConfig, Compression, ServiceBuilderExt, + BatchConfig, Compression, encoding::EncodingConfig, ServiceBuilderExt, TowerRequestConfig, }, - Healthcheck, }, }; use rusoto_s3::S3Client; @@ -24,6 +23,7 @@ use serde::{Deserialize, Serialize}; use std::convert::TryInto; use tower::ServiceBuilder; use vector_core::sink::VectorSink; +use crate::aws::rusoto::{AwsAuthentication, RegionOrEndpoint}; use super::sink::S3RequestOptions; diff --git a/src/sinks/aws_s3/tests.rs b/src/sinks/aws_s3/tests.rs index 354d4553588b9..e3609cdf9b0cd 100644 --- a/src/sinks/aws_s3/tests.rs +++ b/src/sinks/aws_s3/tests.rs @@ -2,7 +2,7 @@ #[cfg(test)] mod integration_tests { use crate::config::SinkContext; - use crate::rusoto::RegionOrEndpoint; + use crate::aws::rusoto::RegionOrEndpoint; use crate::sinks::aws_s3::S3SinkConfig; use crate::sinks::s3_common::config::S3Options; use crate::sinks::util::encoding::StandardEncodings; diff --git a/src/sinks/aws_sqs.rs b/src/sinks/aws_sqs.rs index e9652732ecbc0..598a4a609f90b 100644 --- a/src/sinks/aws_sqs.rs +++ b/src/sinks/aws_sqs.rs @@ -1,19 +1,18 @@ use crate::{ config::{ - log_schema, DataType, GenerateConfig, ProxyConfig, SinkConfig, SinkContext, SinkDescription, + DataType, GenerateConfig, log_schema, ProxyConfig, SinkConfig, SinkContext, SinkDescription, }, event::Event, internal_events::{AwsSqsEventSent, TemplateRenderingFailed}, - rusoto::{self, AwsAuthentication, RegionOrEndpoint}, sinks::util::{ - encoding::{EncodingConfig, EncodingConfiguration}, - retries::RetryLogic, - sink::{self, Response}, - BatchSettings, EncodedEvent, EncodedLength, TowerRequestConfig, VecBuffer, + BatchSettings, + EncodedEvent, + EncodedLength, + encoding::{EncodingConfig, EncodingConfiguration}, retries::RetryLogic, sink::{self, Response}, TowerRequestConfig, VecBuffer, }, template::{Template, TemplateParseError}, }; -use futures::{future::BoxFuture, stream, FutureExt, Sink, SinkExt, StreamExt, TryFutureExt}; +use futures::{future::BoxFuture, FutureExt, Sink, SinkExt, stream, StreamExt, TryFutureExt}; use rusoto_core::RusotoError; use rusoto_sqs::{ GetQueueAttributesError, GetQueueAttributesRequest, SendMessageError, SendMessageRequest, @@ -28,6 +27,7 @@ use std::{ use tower::Service; use tracing_futures::Instrument; use vector_core::ByteSizeOf; +use crate::aws::rusoto::{self, AwsAuthentication, RegionOrEndpoint}; #[derive(Debug, Snafu)] enum BuildError { @@ -375,7 +375,7 @@ mod integration_tests { use rusoto_core::Region; use rusoto_sqs::{CreateQueueRequest, GetQueueUrlRequest, ReceiveMessageRequest}; use std::collections::HashMap; - use tokio::time::{sleep, Duration}; + use tokio::time::{Duration, sleep}; #[tokio::test] async fn sqs_send_message_batch() { diff --git a/src/sinks/datadog_archives.rs b/src/sinks/datadog_archives.rs index 39e5cdc4721f0..62a580459bcd2 100644 --- a/src/sinks/datadog_archives.rs +++ b/src/sinks/datadog_archives.rs @@ -20,10 +20,11 @@ use vector_core::{ ByteSizeOf, }; +use crate::aws::auth::AwsAuthentication; +use crate::aws::rusoto::RegionOrEndpoint; use crate::{ config::GenerateConfig, config::{DataType, SinkConfig, SinkContext}, - rusoto::{AwsAuthentication, RegionOrEndpoint}, sinks::{ s3_common::{ self, diff --git a/src/sinks/elasticsearch/common.rs b/src/sinks/elasticsearch/common.rs index fab89078c3c23..167701cdccefe 100644 --- a/src/sinks/elasticsearch/common.rs +++ b/src/sinks/elasticsearch/common.rs @@ -6,7 +6,7 @@ use crate::transforms::metric_to_log::MetricToLog; use crate::sinks::util::http::RequestConfig; -use crate::rusoto::region_from_endpoint; +use crate::aws::rusoto::region_from_endpoint; use crate::sinks::util::{Compression, TowerRequestConfig, UriSerde}; use crate::tls::TlsSettings; use http::{StatusCode, Uri}; @@ -16,7 +16,7 @@ use snafu::ResultExt; use std::convert::TryFrom; use super::{InvalidHost, Request}; -use crate::rusoto; +use crate::aws::rusoto; use crate::sinks::elasticsearch::encoder::ElasticSearchEncoder; use crate::sinks::util::encoding::EncodingConfigFixed; use crate::sinks::HealthcheckError; diff --git a/src/sinks/elasticsearch/config.rs b/src/sinks/elasticsearch/config.rs index 8bbfb94657634..58f52c5be24f6 100644 --- a/src/sinks/elasticsearch/config.rs +++ b/src/sinks/elasticsearch/config.rs @@ -3,7 +3,7 @@ use crate::config::{DataType, SinkConfig, SinkContext}; use crate::event::{EventRef, LogEvent, Value}; use crate::http::HttpClient; use crate::internal_events::TemplateRenderingFailed; -use crate::rusoto::RegionOrEndpoint; +use crate::aws::rusoto::RegionOrEndpoint; use crate::sinks::elasticsearch::request_builder::ElasticsearchRequestBuilder; use crate::sinks::elasticsearch::sink::ElasticSearchSink; use crate::sinks::elasticsearch::{BatchActionTemplate, IndexTemplate}; diff --git a/src/sinks/elasticsearch/mod.rs b/src/sinks/elasticsearch/mod.rs index d3167f6cd7aea..3e52753a625b4 100644 --- a/src/sinks/elasticsearch/mod.rs +++ b/src/sinks/elasticsearch/mod.rs @@ -20,13 +20,12 @@ pub use encoder::ElasticSearchEncoder; use crate::{ config::SinkDescription, internal_events::TemplateRenderingFailed, - rusoto::{self, AwsAuthentication}, template::{Template, TemplateParseError}, }; use http::{ header::{HeaderName, HeaderValue}, - uri::InvalidUri, Request, + uri::InvalidUri, }; use rusoto_credential::{CredentialsError, ProvideAwsCredentials}; @@ -36,6 +35,7 @@ use serde::{Deserialize, Serialize}; use snafu::{ResultExt, Snafu}; use std::convert::TryFrom; +use crate::aws::rusoto::{self, AwsAuthentication}; use crate::event::{EventRef, LogEvent}; // use crate::sinks::elasticsearch::ParseError::AwsCredentialsGenerateFailed; diff --git a/src/sinks/elasticsearch/service.rs b/src/sinks/elasticsearch/service.rs index fb17a6607834f..d896c2df395f2 100644 --- a/src/sinks/elasticsearch/service.rs +++ b/src/sinks/elasticsearch/service.rs @@ -11,7 +11,7 @@ use hyper::{Body, Request}; use std::task::{Context, Poll}; use tower::ServiceExt; -use crate::rusoto::AwsCredentialsProvider; +use crate::aws::rusoto::AwsCredentialsProvider; use crate::sinks::util::{Compression, ElementCount}; use http::header::HeaderName; use hyper::header::HeaderValue; diff --git a/src/sinks/elasticsearch/tests.rs b/src/sinks/elasticsearch/tests.rs index 5558cf7bcf72f..c8012ed3de23e 100644 --- a/src/sinks/elasticsearch/tests.rs +++ b/src/sinks/elasticsearch/tests.rs @@ -1,6 +1,6 @@ use super::BulkAction; use crate::event::{LogEvent, Metric, MetricKind, MetricValue, Value}; -use crate::rusoto::AwsAuthentication; +use crate::aws::rusoto::AwsAuthentication; use crate::sinks::elasticsearch::sink::process_log; use crate::sinks::elasticsearch::{ DataStreamConfig, ElasticSearchAuth, ElasticSearchCommon, ElasticSearchConfig, diff --git a/src/sinks/s3_common/config.rs b/src/sinks/s3_common/config.rs index 43ed53a7da7d2..0b33b29c30570 100644 --- a/src/sinks/s3_common/config.rs +++ b/src/sinks/s3_common/config.rs @@ -1,7 +1,7 @@ use super::service::S3Service; use crate::config::ProxyConfig; -use crate::rusoto; -use crate::rusoto::{AwsAuthentication, RegionOrEndpoint}; +use crate::aws::rusoto; +use crate::aws::rusoto::{AwsAuthentication, RegionOrEndpoint}; use crate::sinks::util::retries::RetryLogic; use crate::sinks::Healthcheck; use futures::FutureExt; diff --git a/src/sources/aws_s3/mod.rs b/src/sources/aws_s3/mod.rs index bd5025b1e7607..41c7cdabb8d8a 100644 --- a/src/sources/aws_s3/mod.rs +++ b/src/sources/aws_s3/mod.rs @@ -1,8 +1,9 @@ use super::util::MultilineConfig; +use crate::aws::auth::AwsAuthentication; +use crate::aws::rusoto::{self, RegionOrEndpoint}; use crate::{ config::{DataType, ProxyConfig, SourceConfig, SourceContext, SourceDescription}, line_agg, - rusoto::{self, AwsAuthentication, RegionOrEndpoint}, }; use async_compression::tokio::bufread; use futures::{stream, stream::StreamExt}; @@ -294,10 +295,10 @@ mod test { #[cfg(test)] mod integration_tests { use super::{sqs, AwsS3Config, Compression, Strategy}; + use crate::aws::rusoto::RegionOrEndpoint; use crate::{ config::{SourceConfig, SourceContext}, line_agg, - rusoto::RegionOrEndpoint, sources::util::MultilineConfig, test_util::{ collect_n, lines_from_gzip_file, lines_from_zst_file, random_lines, trace_init, diff --git a/src/sources/aws_sqs/config.rs b/src/sources/aws_sqs/config.rs new file mode 100644 index 0000000000000..934f63c4ec226 --- /dev/null +++ b/src/sources/aws_sqs/config.rs @@ -0,0 +1,101 @@ +use crate::aws::auth::AwsAuthentication; +use crate::aws::rusoto::RegionOrEndpoint; +use crate::codecs::{FramingConfig, ParserConfig}; +use crate::config::{DataType, SourceConfig, SourceContext}; +use crate::serde::{default_decoding, default_framing_message_based}; +use crate::sources::aws_sqs::source::SqsSource; +use aws_sdk_sqs::Endpoint; +use http::Uri; +use serde::{Deserialize, Serialize}; +use std::cmp; + +#[derive(Deserialize, Serialize, Debug, Clone)] +pub struct AwsSqsConfig { + #[serde(flatten)] + pub region: RegionOrEndpoint, + #[serde(default)] + pub auth: AwsAuthentication, + + pub queue_url: String, + // restricted to u32 for safe conversion to i64 later + // #[serde(default = "default_poll_secs")] + // pub poll_secs: u32, + // + // // restricted to u32 for safe conversion to i64 later + // #[serde(default = "default_visibility_timeout_secs")] + // pub visibility_timeout_secs: u32, + // + // #[serde(default = "default_true")] + // pub delete_message: bool, + // + // // number of tasks spawned for running the SQS/S3 receive loop + // #[serde(default = "default_client_concurrency")] + // pub client_concurrency: u32, + // + // #[serde(default = "default_framing_message_based")] + // framing: Box, + // #[serde(default = "default_decoding")] + // decoding: Box, +} + +#[async_trait::async_trait] +#[typetag::serde(name = "aws_sqs")] +impl SourceConfig for AwsSqsConfig { + async fn build(&self, cx: SourceContext) -> crate::Result { + let mut config_builder = aws_sdk_sqs::config::Builder::new(); + + if let Some(endpoint_override) = self.region.endpoint()? { + config_builder = config_builder.endpoint_resolver(endpoint_override); + } + + let client = aws_sdk_sqs::Client::from_conf(config_builder.build()); + + Ok(Box::pin(SqsSource { client }.run(cx.out, cx.shutdown))) + // let multiline_config: Option = self + // .multiline + // .as_ref() + // .map(|config| config.try_into()) + // .transpose()?; + // + // match self.strategy { + // Strategy::Sqs => Ok(Box::pin( + // self.create_sqs_ingestor(multiline_config, &cx.proxy) + // .await? + // .run(cx.out, cx.shutdown), + // )), + // } + // todo!() + } + + fn output_type(&self) -> DataType { + DataType::Log + } + + fn source_type(&self) -> &'static str { + "aws_sqs" + } +} + +const fn default_poll_secs() -> u32 { + 15 +} + +const fn default_visibility_timeout_secs() -> u32 { + 300 +} + +const fn default_true() -> bool { + true +} + +fn default_client_concurrency() -> u32 { + cmp::max(1, num_cpus::get() as u32) +} + +impl_generate_config_from_default!(AwsSqsConfig); + +impl Default for AwsSqsConfig { + fn default() -> Self { + todo!() + } +} diff --git a/src/sources/aws_sqs/mod.rs b/src/sources/aws_sqs/mod.rs index 8116fddb63a23..5e92dddb84bc9 100644 --- a/src/sources/aws_sqs/mod.rs +++ b/src/sources/aws_sqs/mod.rs @@ -1,3 +1,9 @@ +mod config; +mod source; + +use crate::config::SourceDescription; +use config::AwsSqsConfig; + inventory::submit! { SourceDescription::new::("aws_sqs") } diff --git a/src/sources/aws_sqs/source.rs b/src/sources/aws_sqs/source.rs new file mode 100644 index 0000000000000..95a1f6b7ae7f5 --- /dev/null +++ b/src/sources/aws_sqs/source.rs @@ -0,0 +1,44 @@ +use crate::shutdown::ShutdownSignal; +use crate::Pipeline; +use aws_sdk_sqs::Client as SqsClient; + +pub struct SqsSource { + pub client: SqsClient, + pub queue_url: String, +} + +impl SqsSource { + pub async fn run(self, out: Pipeline, shutdown: ShutdownSignal) -> Result<(), ()> { + let x = self + .client + .receive_message() + .queue_url(&self.queue_url) + .max_number_of_messages(10) + //TODO: this will lower CPU / HTTP requests in low load scenarios + // .wait_time_seconds(x); + .send() + .await + .unwrap(); + todo!() + // let mut handles = Vec::new(); + // for _ in 0..self.state.client_concurrency { + // let process = + // IngestorProcess::new(Arc::clone(&self.state), out.clone(), shutdown.clone()); + // let fut = async move { process.run().await }; + // let handle = tokio::spawn(fut.in_current_span()); + // handles.push(handle); + // } + // + // // Wait for all of the processes to finish. If any one of them panics, we resume + // // that panic here to properly shutdown Vector. + // for handle in handles.drain(..) { + // if let Err(e) = handle.await { + // if e.is_panic() { + // panic::resume_unwind(e.into_panic()); + // } + // } + // } + // + // Ok(()) + } +} diff --git a/src/sources/mod.rs b/src/sources/mod.rs index e3bafadd1f2c6..add3afa420e92 100644 --- a/src/sources/mod.rs +++ b/src/sources/mod.rs @@ -8,6 +8,8 @@ pub mod aws_ecs_metrics; pub mod aws_kinesis_firehose; #[cfg(feature = "sources-aws_s3")] pub mod aws_s3; +#[cfg(feature = "sources-aws_sqs")] +pub mod aws_sqs; #[cfg(any(feature = "sources-datadog_agent"))] pub mod datadog; #[cfg(all(unix, feature = "sources-dnstap"))] From 89779c8be10aeb929cc70376a6251fdcb3911fd7 Mon Sep 17 00:00:00 2001 From: Nathan Fox Date: Fri, 5 Nov 2021 11:11:21 -0400 Subject: [PATCH 03/17] add E2E acking Signed-off-by: Nathan Fox --- Cargo.toml | 4 +- Makefile | 12 ++ src/aws/aws_sdk/auth.rs | 23 ++- src/aws/aws_sdk/region.rs | 5 + src/aws/region.rs | 11 +- src/sinks/aws_cloudwatch_logs/mod.rs | 44 ++-- src/sinks/aws_kinesis_firehose.rs | 20 +- .../aws_kinesis_streams/integration_tests.rs | 6 +- src/sinks/aws_sqs.rs | 18 +- src/sources/aws_sqs/config.rs | 89 ++++---- src/sources/aws_sqs/integration_tests.rs | 125 ++++++++++++ src/sources/aws_sqs/mod.rs | 1 + src/sources/aws_sqs/source.rs | 193 +++++++++++++++--- 13 files changed, 418 insertions(+), 133 deletions(-) create mode 100644 src/sources/aws_sqs/integration_tests.rs diff --git a/Cargo.toml b/Cargo.toml index c6623e5b6aeef..c9f350a73d2b5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -460,7 +460,7 @@ sources-apache_metrics = [] sources-aws_ecs_metrics = [] sources-aws_kinesis_firehose = ["base64", "infer", "sources-utils-tls", "warp", "codecs"] sources-aws_s3 = ["rusoto", "rusoto_s3", "rusoto_sqs", "semver", "uuid", "codecs", "zstd"] -sources-aws_sqs = ["aws-config", "aws-types", "aws-sdk-sqs"] +sources-aws_sqs = ["aws-config", "aws-types", "aws-sdk-sqs", "codecs"] sources-datadog_agent = ["snap", "sources-utils-tls", "warp", "sources-utils-http-error", "protobuf-build", "codecs"] sources-dnstap = ["base64", "data-encoding", "trust-dns-proto", "dnsmsg-parser", "protobuf-build"] sources-docker_logs = ["docker"] @@ -732,7 +732,7 @@ aws-ecs-metrics-integration-tests = ["sources-aws_ecs_metrics"] aws-kinesis-firehose-integration-tests = ["rusoto_es", "sinks-aws_kinesis_firehose", "sinks-elasticsearch"] aws-kinesis-streams-integration-tests = ["sinks-aws_kinesis_streams"] aws-s3-integration-tests = ["sinks-aws_s3", "sources-aws_s3"] -aws-sqs-integration-tests = ["sinks-aws_sqs"] +aws-sqs-integration-tests = ["sinks-aws_sqs", "sources-aws_sqs"] azure-blob-integration-tests = ["sinks-azure_blob"] clickhouse-integration-tests = ["sinks-clickhouse", "warp"] docker-logs-integration-tests = ["sources-docker_logs", "unix"] diff --git a/Makefile b/Makefile index 2726035c4e3a8..b5e415ab09db7 100644 --- a/Makefile +++ b/Makefile @@ -322,6 +322,18 @@ ifeq ($(AUTODESPAWN), true) @scripts/setup_integration_env.sh aws stop endif +.PHONY: test-integration-aws-sqs +test-integration-aws-sqs: ## Runs AWS SQS integration tests +ifeq ($(AUTOSPAWN), true) + @scripts/setup_integration_env.sh aws stop + @scripts/setup_integration_env.sh aws start + sleep 10 # Many services are very slow... Give them a sec... +endif + ${MAYBE_ENVIRONMENT_EXEC} cargo test --no-fail-fast --no-default-features --features aws-sqs-integration-tests --lib ::aws_sqs +ifeq ($(AUTODESPAWN), true) + @scripts/setup_integration_env.sh aws stop +endif + .PHONY: test-integration-azure test-integration-azure: ## Runs Azure integration tests ifeq ($(AUTOSPAWN), true) diff --git a/src/aws/aws_sdk/auth.rs b/src/aws/aws_sdk/auth.rs index 9c7ee0bf7e255..a227b25316fd0 100644 --- a/src/aws/aws_sdk/auth.rs +++ b/src/aws/aws_sdk/auth.rs @@ -10,10 +10,8 @@ use aws_types::region::Region; use aws_types::Credentials; impl AwsAuthentication { - pub async fn build_config(&self, region: Region) -> Config { - let mut builder = Builder::default().region(region); - - let credentials_provider = match self { + pub async fn credentials_provider(&self) -> SharedCredentialsProvider { + match self { Self::Static { access_key_id, secret_access_key, @@ -44,8 +42,21 @@ impl AwsAuthentication { AwsAuthentication::Default {} => { SharedCredentialsProvider::new(default_credentials_provider().await) } - }; - builder.credentials_provider(credentials_provider).build() + } + } + + // pub async fn build_config(&self, region: Region) -> Config { + // let mut builder = Builder::default().region(region); + // + // let credentials_provider = ; + // builder.credentials_provider(credentials_provider).build() + // } + + pub(crate) fn test_auth() -> AwsAuthentication { + AwsAuthentication::Static { + access_key_id: "dummy".to_string(), + secret_access_key: "dummy".to_string(), + } } } diff --git a/src/aws/aws_sdk/region.rs b/src/aws/aws_sdk/region.rs index 95dfc58f21e50..5d659313b934f 100644 --- a/src/aws/aws_sdk/region.rs +++ b/src/aws/aws_sdk/region.rs @@ -1,5 +1,6 @@ use crate::aws::region::RegionOrEndpoint; use aws_sdk_sqs::Endpoint; +use aws_types::region::Region; use http::Uri; use std::str::FromStr; @@ -11,4 +12,8 @@ impl RegionOrEndpoint { Ok(None) } } + + pub fn region(&self) -> Option { + self.region.clone().map(Region::new) + } } diff --git a/src/aws/region.rs b/src/aws/region.rs index cc180529cb94d..35889c8c9afc9 100644 --- a/src/aws/region.rs +++ b/src/aws/region.rs @@ -15,10 +15,17 @@ impl RegionOrEndpoint { } } - pub const fn with_endpoint(endpoint: String) -> Self { + pub fn with_endpoint(endpoint: impl Into) -> Self { Self { region: None, - endpoint: Some(endpoint), + endpoint: Some(endpoint.into()), + } + } + + pub fn with_both(region: impl Into, endpoint: impl Into) -> Self { + Self { + region: Some(region.into()), + endpoint: Some(endpoint.into()), } } } diff --git a/src/sinks/aws_cloudwatch_logs/mod.rs b/src/sinks/aws_cloudwatch_logs/mod.rs index b986067cc045c..030f8d57af763 100644 --- a/src/sinks/aws_cloudwatch_logs/mod.rs +++ b/src/sinks/aws_cloudwatch_logs/mod.rs @@ -1,22 +1,23 @@ mod request; +use crate::aws::rusoto::{self, AwsAuthentication, RegionOrEndpoint}; use crate::{ config::{ - DataType, GenerateConfig, log_schema, ProxyConfig, SinkConfig, SinkContext, SinkDescription, + log_schema, DataType, GenerateConfig, ProxyConfig, SinkConfig, SinkContext, SinkDescription, }, event::{Event, LogEvent, Value}, internal_events::TemplateRenderingFailed, sinks::util::{ batch::{BatchConfig, BatchSettings}, - Compression, - EncodedEvent, - EncodedLength, encoding::{EncodingConfig, EncodingConfiguration}, PartitionBatchSink, PartitionBuffer, PartitionInnerBuffer, - retries::{FixedRetryPolicy, RetryLogic}, TowerRequestConfig, TowerRequestSettings, VecBuffer, + encoding::{EncodingConfig, EncodingConfiguration}, + retries::{FixedRetryPolicy, RetryLogic}, + Compression, EncodedEvent, EncodedLength, PartitionBatchSink, PartitionBuffer, + PartitionInnerBuffer, TowerRequestConfig, TowerRequestSettings, VecBuffer, }, template::Template, }; use chrono::{Duration, Utc}; -use futures::{future::BoxFuture, FutureExt, ready, SinkExt, stream, StreamExt, TryFutureExt}; +use futures::{future::BoxFuture, ready, stream, FutureExt, SinkExt, StreamExt, TryFutureExt}; use rusoto_core::{request::BufferedHttpResponse, RusotoError}; use rusoto_logs::{ CloudWatchLogs, CloudWatchLogsClient, CreateLogGroupError, CreateLogStreamError, @@ -35,11 +36,10 @@ use tower::{ buffer::Buffer, limit::{concurrency::ConcurrencyLimit, rate::RateLimit}, retry::Retry, - Service, - ServiceBuilder, ServiceExt, timeout::Timeout, + timeout::Timeout, + Service, ServiceBuilder, ServiceExt, }; use vector_core::ByteSizeOf; -use crate::aws::rusoto::{self, AwsAuthentication, RegionOrEndpoint}; // Estimated maximum size of InputLogEvent with an empty message const EVENT_SIZE_OVERHEAD: usize = 50; @@ -679,12 +679,10 @@ impl From> for CloudwatchError { #[cfg(test)] mod tests { use super::*; - use crate::{ - event::{Event, Value}, - }; + use crate::aws::rusoto::RegionOrEndpoint; + use crate::event::{Event, Value}; use std::collections::HashMap; use std::convert::{TryFrom, TryInto}; - use crate::aws::rusoto::RegionOrEndpoint; #[test] fn generate_config() { @@ -787,7 +785,7 @@ mod tests { fn svc(config: CloudwatchLogsSinkConfig) -> CloudwatchLogsSvc { let config = CloudwatchLogsSinkConfig { - region: RegionOrEndpoint::with_endpoint("http://localhost:6000".into()), + region: RegionOrEndpoint::with_endpoint("http://localhost:6000"), ..config }; let key = CloudwatchKey { @@ -859,16 +857,16 @@ mod tests { #[cfg(test)] mod integration_tests { use super::*; + use crate::aws::rusoto::RegionOrEndpoint; use crate::{ config::{ProxyConfig, SinkConfig, SinkContext}, test_util::{random_lines, random_lines_with_stream, random_string, trace_init}, }; - use futures::{SinkExt, stream, StreamExt}; + use futures::{stream, SinkExt, StreamExt}; use pretty_assertions::assert_eq; use rusoto_core::Region; use rusoto_logs::{CloudWatchLogs, CreateLogGroupRequest, GetLogEventsRequest}; use std::convert::TryFrom; - use crate::aws::rusoto::RegionOrEndpoint; const GROUP_NAME: &str = "vector-cw"; @@ -882,7 +880,7 @@ mod integration_tests { let config = CloudwatchLogsSinkConfig { stream_name: Template::try_from(stream_name.as_str()).unwrap(), group_name: Template::try_from(GROUP_NAME).unwrap(), - region: RegionOrEndpoint::with_endpoint("http://localhost:6000".into()), + region: RegionOrEndpoint::with_endpoint("http://localhost:6000"), encoding: Encoding::Text.into(), create_missing_group: None, create_missing_stream: None, @@ -929,7 +927,7 @@ mod integration_tests { let config = CloudwatchLogsSinkConfig { stream_name: Template::try_from(stream_name.as_str()).unwrap(), group_name: Template::try_from(GROUP_NAME).unwrap(), - region: RegionOrEndpoint::with_endpoint("http://localhost:6000".into()), + region: RegionOrEndpoint::with_endpoint("http://localhost:6000"), encoding: Encoding::Text.into(), create_missing_group: None, create_missing_stream: None, @@ -995,7 +993,7 @@ mod integration_tests { let config = CloudwatchLogsSinkConfig { stream_name: Template::try_from(stream_name.as_str()).unwrap(), group_name: Template::try_from(GROUP_NAME).unwrap(), - region: RegionOrEndpoint::with_endpoint("http://localhost:6000".into()), + region: RegionOrEndpoint::with_endpoint("http://localhost:6000"), encoding: Encoding::Text.into(), create_missing_group: None, create_missing_stream: None, @@ -1067,7 +1065,7 @@ mod integration_tests { let config = CloudwatchLogsSinkConfig { stream_name: Template::try_from(stream_name.as_str()).unwrap(), group_name: Template::try_from(group_name.as_str()).unwrap(), - region: RegionOrEndpoint::with_endpoint("http://localhost:6000".into()), + region: RegionOrEndpoint::with_endpoint("http://localhost:6000"), encoding: Encoding::Text.into(), create_missing_group: None, create_missing_stream: None, @@ -1116,7 +1114,7 @@ mod integration_tests { let config = CloudwatchLogsSinkConfig { stream_name: Template::try_from(stream_name.as_str()).unwrap(), group_name: Template::try_from(group_name.as_str()).unwrap(), - region: RegionOrEndpoint::with_endpoint("http://localhost:6000".into()), + region: RegionOrEndpoint::with_endpoint("http://localhost:6000"), encoding: Encoding::Text.into(), create_missing_group: None, create_missing_stream: None, @@ -1167,7 +1165,7 @@ mod integration_tests { let config = CloudwatchLogsSinkConfig { group_name: Template::try_from(GROUP_NAME).unwrap(), stream_name: Template::try_from(format!("{}-{{{{key}}}}", stream_name)).unwrap(), - region: RegionOrEndpoint::with_endpoint("http://localhost:6000".into()), + region: RegionOrEndpoint::with_endpoint("http://localhost:6000"), encoding: Encoding::Text.into(), create_missing_group: None, create_missing_stream: None, @@ -1253,7 +1251,7 @@ mod integration_tests { let config = CloudwatchLogsSinkConfig { stream_name: Template::try_from("test-stream").unwrap(), group_name: Template::try_from(GROUP_NAME).unwrap(), - region: RegionOrEndpoint::with_endpoint("http://localhost:6000".into()), + region: RegionOrEndpoint::with_endpoint("http://localhost:6000"), encoding: Encoding::Text.into(), create_missing_group: None, create_missing_stream: None, diff --git a/src/sinks/aws_kinesis_firehose.rs b/src/sinks/aws_kinesis_firehose.rs index 991d7a79663b4..8228911aad5ca 100644 --- a/src/sinks/aws_kinesis_firehose.rs +++ b/src/sinks/aws_kinesis_firehose.rs @@ -1,16 +1,17 @@ +use crate::aws::rusoto::{self, AwsAuthentication, RegionOrEndpoint}; use crate::{ config::{DataType, GenerateConfig, ProxyConfig, SinkConfig, SinkContext, SinkDescription}, event::Event, sinks::util::{ - BatchConfig, - BatchSettings, - Compression, - EncodedEvent, EncodedLength, encoding::{EncodingConfig, EncodingConfiguration}, retries::RetryLogic, sink::{self, Response}, TowerRequestConfig, + encoding::{EncodingConfig, EncodingConfiguration}, + retries::RetryLogic, + sink::{self, Response}, + BatchConfig, BatchSettings, Compression, EncodedEvent, EncodedLength, TowerRequestConfig, VecBuffer, }, }; use bytes::Bytes; -use futures::{future::BoxFuture, FutureExt, Sink, SinkExt, stream, StreamExt}; +use futures::{future::BoxFuture, stream, FutureExt, Sink, SinkExt, StreamExt}; use rusoto_core::RusotoError; use rusoto_firehose::{ DescribeDeliveryStreamError, DescribeDeliveryStreamInput, KinesisFirehose, @@ -26,7 +27,6 @@ use std::{ use tower::Service; use tracing_futures::Instrument; use vector_core::ByteSizeOf; -use crate::aws::rusoto::{self, AwsAuthentication, RegionOrEndpoint}; // AWS Kinesis Firehose API accepts payloads up to 4MB or 500 events // https://docs.aws.amazon.com/firehose/latest/dev/limits.html @@ -295,7 +295,7 @@ mod tests { fn check_batch_size() { let config = KinesisFirehoseSinkConfig { stream_name: String::from("test"), - region: RegionOrEndpoint::with_endpoint("http://localhost:4566".into()), + region: RegionOrEndpoint::with_endpoint("http://localhost:4566"), encoding: EncodingConfig::from(Encoding::Json), compression: Compression::None, batch: BatchConfig { @@ -323,7 +323,7 @@ mod tests { fn check_batch_events() { let config = KinesisFirehoseSinkConfig { stream_name: String::from("test"), - region: RegionOrEndpoint::with_endpoint("http://localhost:4566".into()), + region: RegionOrEndpoint::with_endpoint("http://localhost:4566"), encoding: EncodingConfig::from(Encoding::Json), compression: Compression::None, batch: BatchConfig { @@ -385,7 +385,7 @@ mod integration_tests { use rusoto_es::{CreateElasticsearchDomainRequest, Es, EsClient}; use rusoto_firehose::{CreateDeliveryStreamInput, ElasticsearchDestinationConfiguration}; use serde_json::{json, Value}; - use tokio::time::{Duration, sleep}; + use tokio::time::{sleep, Duration}; #[tokio::test] async fn firehose_put_records() { @@ -403,7 +403,7 @@ mod integration_tests { let config = KinesisFirehoseSinkConfig { stream_name: stream.clone(), - region: RegionOrEndpoint::with_endpoint("http://localhost:4566".into()), + region: RegionOrEndpoint::with_endpoint("http://localhost:4566"), encoding: EncodingConfig::from(Encoding::Json), // required for ES destination w/ localstack compression: Compression::None, batch: BatchConfig { diff --git a/src/sinks/aws_kinesis_streams/integration_tests.rs b/src/sinks/aws_kinesis_streams/integration_tests.rs index 7d14b9c16b552..7fff87acd0c44 100644 --- a/src/sinks/aws_kinesis_streams/integration_tests.rs +++ b/src/sinks/aws_kinesis_streams/integration_tests.rs @@ -2,6 +2,7 @@ #![cfg(test)] use super::*; +use crate::aws::rusoto::RegionOrEndpoint; use crate::config::SinkConfig; use crate::sinks::util::encoding::StandardEncodings; use crate::sinks::util::{BatchConfig, Compression}; @@ -13,8 +14,7 @@ use crate::{ use rusoto_core::Region; use rusoto_kinesis::{Kinesis, KinesisClient}; use std::sync::Arc; -use tokio::time::{Duration, sleep}; -use crate::aws::rusoto::RegionOrEndpoint; +use tokio::time::{sleep, Duration}; #[tokio::test] async fn kinesis_put_records() { @@ -30,7 +30,7 @@ async fn kinesis_put_records() { let config = KinesisSinkConfig { stream_name: stream.clone(), partition_key_field: None, - region: RegionOrEndpoint::with_endpoint("http://localhost:4566".into()), + region: RegionOrEndpoint::with_endpoint("http://localhost:4566"), encoding: StandardEncodings::Text.into(), compression: Compression::None, batch: BatchConfig { diff --git a/src/sinks/aws_sqs.rs b/src/sinks/aws_sqs.rs index 598a4a609f90b..e72ac32302508 100644 --- a/src/sinks/aws_sqs.rs +++ b/src/sinks/aws_sqs.rs @@ -1,18 +1,19 @@ +use crate::aws::rusoto::{self, AwsAuthentication, RegionOrEndpoint}; use crate::{ config::{ - DataType, GenerateConfig, log_schema, ProxyConfig, SinkConfig, SinkContext, SinkDescription, + log_schema, DataType, GenerateConfig, ProxyConfig, SinkConfig, SinkContext, SinkDescription, }, event::Event, internal_events::{AwsSqsEventSent, TemplateRenderingFailed}, sinks::util::{ - BatchSettings, - EncodedEvent, - EncodedLength, - encoding::{EncodingConfig, EncodingConfiguration}, retries::RetryLogic, sink::{self, Response}, TowerRequestConfig, VecBuffer, + encoding::{EncodingConfig, EncodingConfiguration}, + retries::RetryLogic, + sink::{self, Response}, + BatchSettings, EncodedEvent, EncodedLength, TowerRequestConfig, VecBuffer, }, template::{Template, TemplateParseError}, }; -use futures::{future::BoxFuture, FutureExt, Sink, SinkExt, stream, StreamExt, TryFutureExt}; +use futures::{future::BoxFuture, stream, FutureExt, Sink, SinkExt, StreamExt, TryFutureExt}; use rusoto_core::RusotoError; use rusoto_sqs::{ GetQueueAttributesError, GetQueueAttributesRequest, SendMessageError, SendMessageRequest, @@ -27,7 +28,6 @@ use std::{ use tower::Service; use tracing_futures::Instrument; use vector_core::ByteSizeOf; -use crate::aws::rusoto::{self, AwsAuthentication, RegionOrEndpoint}; #[derive(Debug, Snafu)] enum BuildError { @@ -375,7 +375,7 @@ mod integration_tests { use rusoto_core::Region; use rusoto_sqs::{CreateQueueRequest, GetQueueUrlRequest, ReceiveMessageRequest}; use std::collections::HashMap; - use tokio::time::{Duration, sleep}; + use tokio::time::{sleep, Duration}; #[tokio::test] async fn sqs_send_message_batch() { @@ -394,7 +394,7 @@ mod integration_tests { let config = SqsSinkConfig { queue_url: queue_url.clone(), - region: RegionOrEndpoint::with_endpoint("http://localhost:4566".into()), + region: RegionOrEndpoint::with_endpoint("http://localhost:4566"), encoding: Encoding::Text.into(), message_group_id: None, message_deduplication_id: None, diff --git a/src/sources/aws_sqs/config.rs b/src/sources/aws_sqs/config.rs index 934f63c4ec226..e3215601c4c86 100644 --- a/src/sources/aws_sqs/config.rs +++ b/src/sources/aws_sqs/config.rs @@ -1,15 +1,18 @@ use crate::aws::auth::AwsAuthentication; use crate::aws::rusoto::RegionOrEndpoint; -use crate::codecs::{FramingConfig, ParserConfig}; +use crate::codecs::{DecodingConfig, FramingConfig, ParserConfig}; use crate::config::{DataType, SourceConfig, SourceContext}; use crate::serde::{default_decoding, default_framing_message_based}; use crate::sources::aws_sqs::source::SqsSource; use aws_sdk_sqs::Endpoint; +use aws_types::region::Region; use http::Uri; use serde::{Deserialize, Serialize}; use std::cmp; -#[derive(Deserialize, Serialize, Debug, Clone)] +#[derive(Deserialize, Serialize, Derivative, Debug, Clone)] +#[derivative(Default)] +#[serde(deny_unknown_fields)] pub struct AwsSqsConfig { #[serde(flatten)] pub region: RegionOrEndpoint, @@ -17,54 +20,52 @@ pub struct AwsSqsConfig { pub auth: AwsAuthentication, pub queue_url: String, - // restricted to u32 for safe conversion to i64 later - // #[serde(default = "default_poll_secs")] - // pub poll_secs: u32, - // - // // restricted to u32 for safe conversion to i64 later - // #[serde(default = "default_visibility_timeout_secs")] - // pub visibility_timeout_secs: u32, - // - // #[serde(default = "default_true")] - // pub delete_message: bool, - // - // // number of tasks spawned for running the SQS/S3 receive loop - // #[serde(default = "default_client_concurrency")] - // pub client_concurrency: u32, - // - // #[serde(default = "default_framing_message_based")] - // framing: Box, - // #[serde(default = "default_decoding")] - // decoding: Box, + + #[serde(default = "default_poll_secs")] + #[derivative(Default(value = "default_poll_secs()"))] + pub poll_secs: u32, + + // number of concurrent tasks spawned for receiving/processing SQS messages + #[serde(default = "default_client_concurrency")] + #[derivative(Default(value = "default_client_concurrency()"))] + pub client_concurrency: u32, + + #[serde(default = "default_framing_message_based")] + #[derivative(Default(value = "default_framing_message_based()"))] + pub framing: Box, + #[serde(default = "default_decoding")] + #[derivative(Default(value = "default_decoding()"))] + pub decoding: Box, } #[async_trait::async_trait] #[typetag::serde(name = "aws_sqs")] impl SourceConfig for AwsSqsConfig { async fn build(&self, cx: SourceContext) -> crate::Result { - let mut config_builder = aws_sdk_sqs::config::Builder::new(); + let mut config_builder = aws_sdk_sqs::config::Builder::new() + .credentials_provider(self.auth.credentials_provider().await); if let Some(endpoint_override) = self.region.endpoint()? { config_builder = config_builder.endpoint_resolver(endpoint_override); } + if let Some(region) = self.region.region() { + config_builder = config_builder.region(region); + } let client = aws_sdk_sqs::Client::from_conf(config_builder.build()); + let decoder = DecodingConfig::new(self.framing.clone(), self.decoding.clone()).build()?; - Ok(Box::pin(SqsSource { client }.run(cx.out, cx.shutdown))) - // let multiline_config: Option = self - // .multiline - // .as_ref() - // .map(|config| config.try_into()) - // .transpose()?; - // - // match self.strategy { - // Strategy::Sqs => Ok(Box::pin( - // self.create_sqs_ingestor(multiline_config, &cx.proxy) - // .await? - // .run(cx.out, cx.shutdown), - // )), - // } - // todo!() + Ok(Box::pin( + SqsSource { + client, + queue_url: self.queue_url.clone(), + decoder, + poll_secs: self.poll_secs, + concurrency: self.client_concurrency, + acknowledgements: cx.acknowledgements, + } + .run(cx.out, cx.shutdown), + )) } fn output_type(&self) -> DataType { @@ -80,22 +81,8 @@ const fn default_poll_secs() -> u32 { 15 } -const fn default_visibility_timeout_secs() -> u32 { - 300 -} - -const fn default_true() -> bool { - true -} - fn default_client_concurrency() -> u32 { cmp::max(1, num_cpus::get() as u32) } impl_generate_config_from_default!(AwsSqsConfig); - -impl Default for AwsSqsConfig { - fn default() -> Self { - todo!() - } -} diff --git a/src/sources/aws_sqs/integration_tests.rs b/src/sources/aws_sqs/integration_tests.rs new file mode 100644 index 0000000000000..c6eeeea797155 --- /dev/null +++ b/src/sources/aws_sqs/integration_tests.rs @@ -0,0 +1,125 @@ +#![cfg(feature = "aws-sqs-integration-tests")] +#![cfg(test)] + +use crate::aws::auth::AwsAuthentication; +use crate::aws::region::RegionOrEndpoint; +use crate::config::log_schema; +use crate::config::{SourceConfig, SourceContext}; +use crate::event::Event; +use crate::http::Auth; +use crate::sources::aws_sqs::config::AwsSqsConfig; +use crate::test_util::random_string; +use crate::Pipeline; +use aws_sdk_sqs::model::SendMessageBatchRequestEntry; +use aws_sdk_sqs::output::CreateQueueOutput; +use aws_sdk_sqs::Endpoint; +use aws_types::region::Region; +use futures::{FutureExt, StreamExt}; +use http::Uri; +use std::collections::{HashMap, HashSet}; +use std::str::FromStr; +use std::time::Duration; +use tokio::time::timeout; + +// fn gen_queue_url(name: &str) -> String { +// format!( +// "https://sqs.us-east-1.amazonaws.com/123456789012/test-{}", +// name +// ) +// } + +fn gen_queue_name() -> String { + random_string(10).to_lowercase() +} + +async fn ensure_queue(queue_name: &str, client: &aws_sdk_sqs::Client) -> CreateQueueOutput { + client + .create_queue() + .queue_name(queue_name) + .send() + .await + .unwrap() +} + +async fn send_test_events(mut count: u32, queue_url: &str, client: &aws_sdk_sqs::Client) { + for i in 0..count { + let x = client + .send_message() + .message_body(calculate_message(i)) + .queue_url(queue_url) + .send() + .await + .unwrap(); + } +} + +async fn get_sqs_client() -> aws_sdk_sqs::Client { + let mut config = aws_sdk_sqs::config::Builder::new() + .credentials_provider(AwsAuthentication::test_auth().credentials_provider().await) + .endpoint_resolver(Endpoint::immutable( + Uri::from_str("http://localhost:4566").unwrap(), + )) + .region(Some(Region::new("us-east-1"))) + .build(); + + aws_sdk_sqs::Client::from_conf(config) +} + +#[tokio::test] +pub async fn test() { + let sqs_client = get_sqs_client().await; + let queue_name = gen_queue_name(); + let queue_url = ensure_queue(&queue_name, &sqs_client) + .await + .queue_url + .expect("Create queue should return the url"); + + let num_events = 3; + send_test_events(num_events, &queue_url, &sqs_client).await; + + let config = AwsSqsConfig { + region: RegionOrEndpoint::with_both("us-east-1", "http://localhost:4566"), + auth: AwsAuthentication::test_auth(), + queue_url: queue_url.clone(), + ..Default::default() + }; + + println!("Spawning source task"); + let (tx, rx) = Pipeline::new_test(); + let source_task = tokio::spawn(async move { + config + .build(SourceContext::new_test(tx)) + .await + .unwrap() + .await + .unwrap() + }); + + let mut expected_messages = HashSet::new(); + for i in 0..num_events { + expected_messages.insert(calculate_message(i)); + } + + let events: Vec = timeout( + Duration::from_secs(10), + rx.take(num_events as usize).collect(), + ) + .await + .unwrap(); + + for event in events { + let message = event + .as_log() + .get(log_schema().message_key()) + .unwrap() + .to_string_lossy(); + if !expected_messages.remove(&message) { + panic!("Received unexpected message: {:?}", message); + } + } + assert!(expected_messages.is_empty()); +} + +fn calculate_message(index: u32) -> String { + format!("Test message: {}", index) +} diff --git a/src/sources/aws_sqs/mod.rs b/src/sources/aws_sqs/mod.rs index 5e92dddb84bc9..9e672b0c2b48f 100644 --- a/src/sources/aws_sqs/mod.rs +++ b/src/sources/aws_sqs/mod.rs @@ -1,4 +1,5 @@ mod config; +mod integration_tests; mod source; use crate::config::SourceDescription; diff --git a/src/sources/aws_sqs/source.rs b/src/sources/aws_sqs/source.rs index 95a1f6b7ae7f5..9c0cbce215fce 100644 --- a/src/sources/aws_sqs/source.rs +++ b/src/sources/aws_sqs/source.rs @@ -1,44 +1,183 @@ +use crate::codecs::Decoder; +use crate::config::log_schema; +use crate::event::{BatchNotifier, Event}; +use crate::event::{BatchStatus, Value}; use crate::shutdown::ShutdownSignal; +use crate::sources::util::TcpError; use crate::Pipeline; +use aws_sdk_sqs::model::DeleteMessageBatchRequestEntry; +use aws_sdk_sqs::output::ReceiveMessageOutput; use aws_sdk_sqs::Client as SqsClient; +use bytes::Bytes; +use futures::future::ready; +use futures::{FutureExt, TryStreamExt}; +use futures::{SinkExt, Stream, StreamExt}; +use std::io::Cursor; +use std::panic; +use tokio::time::Duration; +use tokio::{pin, select}; +use tokio_util::codec::FramedRead; +// This is the maximum SQS supports in a single batch request +const MAX_BATCH_SIZE: i32 = 10; + +#[derive(Clone)] pub struct SqsSource { pub client: SqsClient, pub queue_url: String, + pub decoder: Decoder, + pub poll_secs: u32, + pub concurrency: u32, + pub acknowledgements: bool, } impl SqsSource { pub async fn run(self, out: Pipeline, shutdown: ShutdownSignal) -> Result<(), ()> { - let x = self + let mut task_handles = vec![]; + + for _ in 0..self.concurrency { + let source = self.clone(); + let shutdown = shutdown.clone().fuse(); + let mut out = out.clone(); + task_handles.push(tokio::spawn(async move { + pin!(shutdown); + loop { + select! { + _ = &mut shutdown => break, + _ = source.run_once(&mut out, self.acknowledgements) => {}, + } + } + })); + } + + // Wait for all of the processes to finish. If any one of them panics, we resume + // that panic here to properly shutdown Vector. + for task_handle in task_handles.drain(..) { + if let Err(e) = task_handle.await { + if e.is_panic() { + panic::resume_unwind(e.into_panic()); + } + } + } + Ok(()) + } + + async fn run_once(&self, out: &mut Pipeline, acknowledgements: bool) { + let result = self .client .receive_message() .queue_url(&self.queue_url) - .max_number_of_messages(10) - //TODO: this will lower CPU / HTTP requests in low load scenarios - // .wait_time_seconds(x); + .max_number_of_messages(MAX_BATCH_SIZE) + .wait_time_seconds(self.poll_secs as i32) .send() - .await - .unwrap(); - todo!() - // let mut handles = Vec::new(); - // for _ in 0..self.state.client_concurrency { - // let process = - // IngestorProcess::new(Arc::clone(&self.state), out.clone(), shutdown.clone()); - // let fut = async move { process.run().await }; - // let handle = tokio::spawn(fut.in_current_span()); - // handles.push(handle); - // } - // - // // Wait for all of the processes to finish. If any one of them panics, we resume - // // that panic here to properly shutdown Vector. - // for handle in handles.drain(..) { - // if let Err(e) = handle.await { - // if e.is_panic() { - // panic::resume_unwind(e.into_panic()); - // } - // } - // } - // - // Ok(()) + .await; + + let receive_message_output = match result { + Ok(output) => output, + Err(err) => { + error!("SQS receive message error: {:?}", err); + // prevent rapid errors from flooding the logs + tokio::time::sleep(Duration::from_secs(1)).await; + return; + } + }; + + if let Some(messages) = receive_message_output.messages { + let mut receipts_to_ack = vec![]; + if acknowledgements { + let (batch, receiver) = BatchNotifier::new_with_receiver(); + for message in messages { + if let Some(body) = message.body { + let mut stream = decode_message(self.decoder.clone(), &body); + + let mut stream = stream.map_ok(|event| event.with_batch_notifier(&batch)); + let send_result = out.send_all(&mut stream).await; + + match send_result { + Err(err) => error!(message = "Error sending to sink.", error = %err), + Ok(()) => { + // a receipt handle should always exist + if let Some(receipt_handle) = message.receipt_handle { + receipts_to_ack.push(receipt_handle); + } + } + } + } + } + + let client = self.client.clone(); + let queue_url = self.queue_url.clone(); + tokio::spawn(async move { + let batch_status = receiver.await; + if batch_status == BatchStatus::Delivered { + delete_messages(&client, &receipts_to_ack, &queue_url).await; + } + }); + } else { + for message in messages { + if let Some(body) = message.body { + let mut stream = decode_message(self.decoder.clone(), &body); + match out.send_all(&mut stream).await { + Err(err) => error!(message = "Error sending to sink.", error = %err), + Ok(()) => { + // a receipt handle should always exist + if let Some(receipt_handle) = message.receipt_handle { + receipts_to_ack.push(receipt_handle); + } + } + } + } + } + delete_messages(&self.client, &receipts_to_ack, &self.queue_url).await; + } + } + } +} + +async fn delete_messages(client: &SqsClient, receipts: &[String], queue_url: &str) { + if !receipts.is_empty() { + let mut batch = client.delete_message_batch().queue_url(queue_url); + + for (id, receipt) in receipts.into_iter().enumerate() { + batch = batch.entries( + DeleteMessageBatchRequestEntry::builder() + .id(id.to_string()) + .receipt_handle(receipt) + .build(), + ); + } + if let Err(err) = batch.send().await { + //TODO: emit as event? + error!("SQS Delete failed: {:?}", err); + } } } + +fn decode_message(decoder: Decoder, message: &str) -> impl Stream> { + let schema = log_schema(); + + let payload = Cursor::new(Bytes::copy_from_slice(message.as_bytes())); + FramedRead::new(payload, decoder) + .map(|input| match input { + Ok((mut events, _)) => { + let mut event = events.pop().expect("event must exist"); + if let Event::Log(ref mut log) = event { + log.try_insert(schema.source_type_key(), Bytes::from("aws_sqs")); + // log.try_insert(schema.timestamp_key(), timestamp); + } + + Some(Some(Ok(event))) + } + Err(e) => { + // Error is logged by `crate::codecs::Decoder`, no further handling + // is needed here. + if !e.can_continue() { + Some(None) + } else { + None + } + } + }) + .take_while(|x| ready(x.is_some())) + .filter_map(|x| ready(x.expect("should have inner value"))) +} From a78b44463bcde563e9887be9dcf6efb8fc15e55a Mon Sep 17 00:00:00 2001 From: Nathan Fox Date: Fri, 5 Nov 2021 11:14:30 -0400 Subject: [PATCH 04/17] safe Signed-off-by: Nathan Fox --- src/aws/auth.rs | 31 ------------------------------- src/aws/aws_sdk/auth.rs | 6 +++--- src/aws/rusoto/auth.rs | 34 +++++++++++++++++++++++++++++++--- src/aws/rusoto/region.rs | 2 +- src/sources/aws_sqs/config.rs | 6 +++--- src/sources/aws_sqs/source.rs | 6 +++--- 6 files changed, 41 insertions(+), 44 deletions(-) diff --git a/src/aws/auth.rs b/src/aws/auth.rs index 7f65cce035eff..47d9e5bd2a67f 100644 --- a/src/aws/auth.rs +++ b/src/aws/auth.rs @@ -1,5 +1,3 @@ -use crate::aws::rusoto::AwsCredentialsProvider; -use rusoto_core::Region; use serde::{Deserialize, Serialize}; /// Configuration for configuring authentication strategy for AWS. @@ -27,8 +25,6 @@ pub enum AwsAuthentication { Default {}, } - - #[cfg(test)] mod tests { use super::*; @@ -146,31 +142,4 @@ mod tests { _ => panic!(), } } - - #[test] - fn parsing_credentials_file() { - let tmpdir = tempfile::tempdir().unwrap(); - let tmpfile_path = tmpdir.path().join("credentials"); - let mut tmpfile = File::create(&tmpfile_path).unwrap(); - - writeln!( - tmpfile, - r#" - [default] - aws_access_key_id = default-access-key-id - aws_secret_access_key = default-secret - "# - ) - .unwrap(); - - let auth = AwsAuthentication::File { - credentials_file: tmpfile_path.to_str().unwrap().to_string(), - profile: Some("default".to_string()), - }; - let result = auth.build(&Region::AfSouth1, None).unwrap(); - assert!(matches!(result, AwsCredentialsProvider::File { .. })); - - drop(tmpfile); - tmpdir.close().unwrap(); - } } diff --git a/src/aws/aws_sdk/auth.rs b/src/aws/aws_sdk/auth.rs index a227b25316fd0..2927234a0de2c 100644 --- a/src/aws/aws_sdk/auth.rs +++ b/src/aws/aws_sdk/auth.rs @@ -2,11 +2,11 @@ use crate::aws::auth::AwsAuthentication; use aws_config::default_provider::credentials::default_provider; use aws_config::meta::credentials::LazyCachingCredentialsProvider; use aws_config::profile::ProfileFileCredentialsProvider; -use aws_config::provider_config::ProviderConfig; + use aws_config::sts::AssumeRoleProviderBuilder; -use aws_types::config::{Builder, Config}; + use aws_types::credentials::SharedCredentialsProvider; -use aws_types::region::Region; + use aws_types::Credentials; impl AwsAuthentication { diff --git a/src/aws/rusoto/auth.rs b/src/aws/rusoto/auth.rs index b49dcdd2d723a..8bc73f8956384 100644 --- a/src/aws/rusoto/auth.rs +++ b/src/aws/rusoto/auth.rs @@ -1,12 +1,10 @@ -use rusoto_core::Region; use crate::aws::auth::AwsAuthentication; use crate::aws::rusoto::AwsCredentialsProvider; +use rusoto_core::Region; const AWS_DEFAULT_PROFILE: &'static str = "default"; impl AwsAuthentication { - - pub fn build( &self, region: &Region, @@ -57,3 +55,33 @@ impl AwsAuthentication { } } } +#[cfg(test)] +mod test { + + #[test] + fn parsing_credentials_file() { + let tmpdir = tempfile::tempdir().unwrap(); + let tmpfile_path = tmpdir.path().join("credentials"); + let mut tmpfile = File::create(&tmpfile_path).unwrap(); + + writeln!( + tmpfile, + r#" + [default] + aws_access_key_id = default-access-key-id + aws_secret_access_key = default-secret + "# + ) + .unwrap(); + + let auth = AwsAuthentication::File { + credentials_file: tmpfile_path.to_str().unwrap().to_string(), + profile: Some("default".to_string()), + }; + let result = auth.build(&Region::AfSouth1, None).unwrap(); + assert!(matches!(result, AwsCredentialsProvider::File { .. })); + + drop(tmpfile); + tmpdir.close().unwrap(); + } +} diff --git a/src/aws/rusoto/region.rs b/src/aws/rusoto/region.rs index ddab04081fcb7..2ee757665d6a3 100644 --- a/src/aws/rusoto/region.rs +++ b/src/aws/rusoto/region.rs @@ -1,6 +1,6 @@ use http::{uri::InvalidUri, Uri}; use rusoto_core::{region::ParseRegionError, Region}; -use serde::{Deserialize, Serialize}; + use snafu::{ResultExt, Snafu}; use std::convert::TryFrom; use std::str::FromStr; diff --git a/src/sources/aws_sqs/config.rs b/src/sources/aws_sqs/config.rs index e3215601c4c86..0b7297ded9a93 100644 --- a/src/sources/aws_sqs/config.rs +++ b/src/sources/aws_sqs/config.rs @@ -4,9 +4,9 @@ use crate::codecs::{DecodingConfig, FramingConfig, ParserConfig}; use crate::config::{DataType, SourceConfig, SourceContext}; use crate::serde::{default_decoding, default_framing_message_based}; use crate::sources::aws_sqs::source::SqsSource; -use aws_sdk_sqs::Endpoint; -use aws_types::region::Region; -use http::Uri; + + + use serde::{Deserialize, Serialize}; use std::cmp; diff --git a/src/sources/aws_sqs/source.rs b/src/sources/aws_sqs/source.rs index 9c0cbce215fce..cc61e026d4ef3 100644 --- a/src/sources/aws_sqs/source.rs +++ b/src/sources/aws_sqs/source.rs @@ -1,12 +1,12 @@ use crate::codecs::Decoder; use crate::config::log_schema; use crate::event::{BatchNotifier, Event}; -use crate::event::{BatchStatus, Value}; +use crate::event::{BatchStatus}; use crate::shutdown::ShutdownSignal; use crate::sources::util::TcpError; use crate::Pipeline; use aws_sdk_sqs::model::DeleteMessageBatchRequestEntry; -use aws_sdk_sqs::output::ReceiveMessageOutput; + use aws_sdk_sqs::Client as SqsClient; use bytes::Bytes; use futures::future::ready; @@ -88,7 +88,7 @@ impl SqsSource { let (batch, receiver) = BatchNotifier::new_with_receiver(); for message in messages { if let Some(body) = message.body { - let mut stream = decode_message(self.decoder.clone(), &body); + let stream = decode_message(self.decoder.clone(), &body); let mut stream = stream.map_ok(|event| event.with_batch_notifier(&batch)); let send_result = out.send_all(&mut stream).await; From 0649320e09cc4a46111da5e58ea10a66469a0e3b Mon Sep 17 00:00:00 2001 From: Nathan Fox Date: Fri, 5 Nov 2021 11:41:25 -0400 Subject: [PATCH 05/17] tests pass Signed-off-by: Nathan Fox --- src/aws/rusoto/auth.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/aws/rusoto/auth.rs b/src/aws/rusoto/auth.rs index 8bc73f8956384..a00fb98a80775 100644 --- a/src/aws/rusoto/auth.rs +++ b/src/aws/rusoto/auth.rs @@ -57,6 +57,11 @@ impl AwsAuthentication { } #[cfg(test)] mod test { + use super::*; + use crate::aws::auth::AwsAuthentication; + use rusoto_core::Region; + use std::fs::File; + use std::io::Write; #[test] fn parsing_credentials_file() { From 9ca3303ed0ab519f30549240160017ea3c984bc8 Mon Sep 17 00:00:00 2001 From: Nathan Fox Date: Mon, 8 Nov 2021 16:59:42 -0500 Subject: [PATCH 06/17] save Signed-off-by: Nathan Fox --- .../src/internal_event/events_received.rs | 23 +++ .../core-common/src/internal_event/mod.rs | 2 + src/aws/auth.rs | 6 +- src/aws/aws_sdk/auth.rs | 10 +- src/internal_events/common.rs | 22 +-- src/sources/aws_sqs/config.rs | 4 +- src/sources/aws_sqs/events.rs | 38 ++++ src/sources/aws_sqs/integration_tests.rs | 15 +- src/sources/aws_sqs/mod.rs | 1 + src/sources/aws_sqs/source.rs | 184 +++++++++++++----- .../reference/components/sources/aws_sqs.cue | 125 ++++++++++++ 11 files changed, 332 insertions(+), 98 deletions(-) create mode 100644 lib/vector-core/core-common/src/internal_event/events_received.rs create mode 100644 src/sources/aws_sqs/events.rs create mode 100644 website/cue/reference/components/sources/aws_sqs.cue diff --git a/lib/vector-core/core-common/src/internal_event/events_received.rs b/lib/vector-core/core-common/src/internal_event/events_received.rs new file mode 100644 index 0000000000000..93cba2f1a4897 --- /dev/null +++ b/lib/vector-core/core-common/src/internal_event/events_received.rs @@ -0,0 +1,23 @@ +use crate::internal_event::InternalEvent; +use metrics::counter; + +#[derive(Debug)] +pub struct EventsReceived { + pub count: usize, + pub byte_size: usize, +} + +impl InternalEvent for EventsReceived { + fn emit_logs(&self) { + trace!(message = "Events received.", count = %self.count, byte_size = %self.byte_size); + } + + fn emit_metrics(&self) { + counter!("component_received_events_total", self.count as u64); + counter!("events_in_total", self.count as u64); + counter!( + "component_received_event_bytes_total", + self.byte_size as u64 + ); + } +} diff --git a/lib/vector-core/core-common/src/internal_event/mod.rs b/lib/vector-core/core-common/src/internal_event/mod.rs index b4a7ae41aa042..40a339165475d 100644 --- a/lib/vector-core/core-common/src/internal_event/mod.rs +++ b/lib/vector-core/core-common/src/internal_event/mod.rs @@ -1,7 +1,9 @@ mod bytes_sent; +mod events_received; mod events_sent; pub use bytes_sent::BytesSent; +pub use events_received::EventsReceived; pub use events_sent::EventsSent; pub trait InternalEvent { diff --git a/src/aws/auth.rs b/src/aws/auth.rs index 47d9e5bd2a67f..844fbc1699f79 100644 --- a/src/aws/auth.rs +++ b/src/aws/auth.rs @@ -28,9 +28,9 @@ pub enum AwsAuthentication { #[cfg(test)] mod tests { use super::*; - use std::fs::File; - use std::io::Write; - use tempfile; + + + #[derive(Serialize, Deserialize, Clone, Debug)] struct ComponentConfig { diff --git a/src/aws/aws_sdk/auth.rs b/src/aws/aws_sdk/auth.rs index 2927234a0de2c..a50ab2c4a1aac 100644 --- a/src/aws/aws_sdk/auth.rs +++ b/src/aws/aws_sdk/auth.rs @@ -45,14 +45,8 @@ impl AwsAuthentication { } } - // pub async fn build_config(&self, region: Region) -> Config { - // let mut builder = Builder::default().region(region); - // - // let credentials_provider = ; - // builder.credentials_provider(credentials_provider).build() - // } - - pub(crate) fn test_auth() -> AwsAuthentication { + #[cfg(test)] + pub fn test_auth() -> AwsAuthentication { AwsAuthentication::Static { access_key_id: "dummy".to_string(), secret_access_key: "dummy".to_string(), diff --git a/src/internal_events/common.rs b/src/internal_events/common.rs index b304be7b01780..3ebbe9b73cfc7 100644 --- a/src/internal_events/common.rs +++ b/src/internal_events/common.rs @@ -1,27 +1,7 @@ use metrics::counter; +pub use vector_core::internal_event::EventsReceived; use vector_core::internal_event::InternalEvent; -#[derive(Debug)] -pub struct EventsReceived { - pub count: usize, - pub byte_size: usize, -} - -impl InternalEvent for EventsReceived { - fn emit_logs(&self) { - trace!(message = "Events received.", count = %self.count, byte_size = %self.byte_size); - } - - fn emit_metrics(&self) { - counter!("component_received_events_total", self.count as u64); - counter!("events_in_total", self.count as u64); - counter!( - "component_received_event_bytes_total", - self.byte_size as u64 - ); - } -} - #[derive(Debug)] pub struct HttpClientBytesReceived<'a> { pub byte_size: usize, diff --git a/src/sources/aws_sqs/config.rs b/src/sources/aws_sqs/config.rs index 0b7297ded9a93..4267cc2412025 100644 --- a/src/sources/aws_sqs/config.rs +++ b/src/sources/aws_sqs/config.rs @@ -1,12 +1,10 @@ use crate::aws::auth::AwsAuthentication; -use crate::aws::rusoto::RegionOrEndpoint; use crate::codecs::{DecodingConfig, FramingConfig, ParserConfig}; use crate::config::{DataType, SourceConfig, SourceContext}; use crate::serde::{default_decoding, default_framing_message_based}; use crate::sources::aws_sqs::source::SqsSource; - - +use crate::aws::region::RegionOrEndpoint; use serde::{Deserialize, Serialize}; use std::cmp; diff --git a/src/sources/aws_sqs/events.rs b/src/sources/aws_sqs/events.rs new file mode 100644 index 0000000000000..c2aa32c824394 --- /dev/null +++ b/src/sources/aws_sqs/events.rs @@ -0,0 +1,38 @@ +use aws_sdk_sqs::error::DeleteMessageBatchError; +use aws_sdk_sqs::SdkError; +use metrics::counter; +use vector_core::internal_event::InternalEvent; + +#[derive(Debug)] +pub struct AwsSqsBytesReceived { + pub byte_size: usize, +} + +impl InternalEvent for AwsSqsBytesReceived { + fn emit_logs(&self) { + trace!( + message = "Bytes received.", + byte_size = %self.byte_size, + protocol = "http", + ); + } + + fn emit_metrics(&self) { + counter!("component_received_bytes_total", self.byte_size as u64); + } +} + +#[derive(Debug)] +pub struct SqsMessageDeleteError<'a> { + pub error: &'a SdkError, +} + +impl<'a> InternalEvent for SqsMessageDeleteError<'a> { + fn emit_logs(&self) { + warn!(message = "Failed to delete SQS events.", error = %self.error); + } + + fn emit_metrics(&self) { + counter!("sqs_delete_failed_total", 1); + } +} diff --git a/src/sources/aws_sqs/integration_tests.rs b/src/sources/aws_sqs/integration_tests.rs index c6eeeea797155..a99ac717e508a 100644 --- a/src/sources/aws_sqs/integration_tests.rs +++ b/src/sources/aws_sqs/integration_tests.rs @@ -6,17 +6,15 @@ use crate::aws::region::RegionOrEndpoint; use crate::config::log_schema; use crate::config::{SourceConfig, SourceContext}; use crate::event::Event; -use crate::http::Auth; use crate::sources::aws_sqs::config::AwsSqsConfig; use crate::test_util::random_string; use crate::Pipeline; -use aws_sdk_sqs::model::SendMessageBatchRequestEntry; use aws_sdk_sqs::output::CreateQueueOutput; use aws_sdk_sqs::Endpoint; use aws_types::region::Region; -use futures::{FutureExt, StreamExt}; +use futures::StreamExt; use http::Uri; -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use std::str::FromStr; use std::time::Duration; use tokio::time::timeout; @@ -41,9 +39,9 @@ async fn ensure_queue(queue_name: &str, client: &aws_sdk_sqs::Client) -> CreateQ .unwrap() } -async fn send_test_events(mut count: u32, queue_url: &str, client: &aws_sdk_sqs::Client) { +async fn send_test_events(count: u32, queue_url: &str, client: &aws_sdk_sqs::Client) { for i in 0..count { - let x = client + client .send_message() .message_body(calculate_message(i)) .queue_url(queue_url) @@ -54,7 +52,7 @@ async fn send_test_events(mut count: u32, queue_url: &str, client: &aws_sdk_sqs: } async fn get_sqs_client() -> aws_sdk_sqs::Client { - let mut config = aws_sdk_sqs::config::Builder::new() + let config = aws_sdk_sqs::config::Builder::new() .credentials_provider(AwsAuthentication::test_auth().credentials_provider().await) .endpoint_resolver(Endpoint::immutable( Uri::from_str("http://localhost:4566").unwrap(), @@ -84,9 +82,8 @@ pub async fn test() { ..Default::default() }; - println!("Spawning source task"); let (tx, rx) = Pipeline::new_test(); - let source_task = tokio::spawn(async move { + tokio::spawn(async move { config .build(SourceContext::new_test(tx)) .await diff --git a/src/sources/aws_sqs/mod.rs b/src/sources/aws_sqs/mod.rs index 9e672b0c2b48f..50395c14372de 100644 --- a/src/sources/aws_sqs/mod.rs +++ b/src/sources/aws_sqs/mod.rs @@ -1,4 +1,5 @@ mod config; +mod events; mod integration_tests; mod source; diff --git a/src/sources/aws_sqs/source.rs b/src/sources/aws_sqs/source.rs index cc61e026d4ef3..38bcba76139e7 100644 --- a/src/sources/aws_sqs/source.rs +++ b/src/sources/aws_sqs/source.rs @@ -1,22 +1,28 @@ use crate::codecs::Decoder; use crate::config::log_schema; +use crate::event::BatchStatus; use crate::event::{BatchNotifier, Event}; -use crate::event::{BatchStatus}; use crate::shutdown::ShutdownSignal; use crate::sources::util::TcpError; use crate::Pipeline; -use aws_sdk_sqs::model::DeleteMessageBatchRequestEntry; +use aws_sdk_sqs::model::{DeleteMessageBatchRequestEntry, MessageSystemAttributeName}; +use std::collections::HashMap; +use super::events::*; +use crate::vector_core::ByteSizeOf; +use async_stream::stream; use aws_sdk_sqs::Client as SqsClient; use bytes::Bytes; -use futures::future::ready; +use chrono::{DateTime, TimeZone, Utc}; use futures::{FutureExt, TryStreamExt}; use futures::{SinkExt, Stream, StreamExt}; use std::io::Cursor; use std::panic; +use std::str::FromStr; use tokio::time::Duration; use tokio::{pin, select}; use tokio_util::codec::FramedRead; +use vector_core::internal_event::EventsReceived; // This is the maximum SQS supports in a single batch request const MAX_BATCH_SIZE: i32 = 10; @@ -69,6 +75,7 @@ impl SqsSource { .queue_url(&self.queue_url) .max_number_of_messages(MAX_BATCH_SIZE) .wait_time_seconds(self.poll_secs as i32) + .attribute_names(MessageSystemAttributeName::SentTimestamp.as_str()) .send() .await; @@ -84,27 +91,38 @@ impl SqsSource { if let Some(messages) = receive_message_output.messages { let mut receipts_to_ack = vec![]; - if acknowledgements { - let (batch, receiver) = BatchNotifier::new_with_receiver(); - for message in messages { - if let Some(body) = message.body { - let stream = decode_message(self.decoder.clone(), &body); + let mut batch_receiver = None; + for message in messages { + if let Some(body) = message.body { + emit!(&AwsSqsBytesReceived { + byte_size: body.len() + }); + let timestamp = get_timestamp(&message.attributes); + let stream = decode_message(self.decoder.clone(), &body, timestamp); + pin!(stream); + let send_result = if acknowledgements { + let (batch, receiver) = BatchNotifier::new_with_receiver(); let mut stream = stream.map_ok(|event| event.with_batch_notifier(&batch)); - let send_result = out.send_all(&mut stream).await; - - match send_result { - Err(err) => error!(message = "Error sending to sink.", error = %err), - Ok(()) => { - // a receipt handle should always exist - if let Some(receipt_handle) = message.receipt_handle { - receipts_to_ack.push(receipt_handle); - } + batch_receiver = Some(receiver); + out.send_all(&mut stream).await + } else { + out.send_all(&mut stream).await + }; + + match send_result { + Err(err) => error!(message = "Error sending to sink.", error = %err), + Ok(()) => { + // a receipt handle should always exist + if let Some(receipt_handle) = message.receipt_handle { + receipts_to_ack.push(receipt_handle); } } } } + } + if let Some(receiver) = batch_receiver { let client = self.client.clone(); let queue_url = self.queue_url.clone(); tokio::spawn(async move { @@ -114,26 +132,21 @@ impl SqsSource { } }); } else { - for message in messages { - if let Some(body) = message.body { - let mut stream = decode_message(self.decoder.clone(), &body); - match out.send_all(&mut stream).await { - Err(err) => error!(message = "Error sending to sink.", error = %err), - Ok(()) => { - // a receipt handle should always exist - if let Some(receipt_handle) = message.receipt_handle { - receipts_to_ack.push(receipt_handle); - } - } - } - } - } delete_messages(&self.client, &receipts_to_ack, &self.queue_url).await; } } } } +fn get_timestamp( + attributes: &Option>, +) -> Option> { + attributes.as_ref().and_then(|attributes| { + let sent_time_str = attributes.get(&MessageSystemAttributeName::SentTimestamp)?; + Some(Utc.timestamp_millis(i64::from_str(sent_time_str).ok()?)) + }) +} + async fn delete_messages(client: &SqsClient, receipts: &[String], queue_url: &str) { if !receipts.is_empty() { let mut batch = client.delete_message_batch().queue_url(queue_url); @@ -147,37 +160,100 @@ async fn delete_messages(client: &SqsClient, receipts: &[String], queue_url: &st ); } if let Err(err) = batch.send().await { - //TODO: emit as event? - error!("SQS Delete failed: {:?}", err); + emit!(&SqsMessageDeleteError { error: &err }); } } } -fn decode_message(decoder: Decoder, message: &str) -> impl Stream> { +fn decode_message( + decoder: Decoder, + message: &str, + sent_time: Option>, +) -> impl Stream> { let schema = log_schema(); let payload = Cursor::new(Bytes::copy_from_slice(message.as_bytes())); - FramedRead::new(payload, decoder) - .map(|input| match input { - Ok((mut events, _)) => { - let mut event = events.pop().expect("event must exist"); - if let Event::Log(ref mut log) = event { - log.try_insert(schema.source_type_key(), Bytes::from("aws_sqs")); - // log.try_insert(schema.timestamp_key(), timestamp); - } + let mut stream = FramedRead::new(payload, decoder); - Some(Some(Ok(event))) - } - Err(e) => { - // Error is logged by `crate::codecs::Decoder`, no further handling - // is needed here. - if !e.can_continue() { - Some(None) - } else { - None + let stream = stream! { + loop { + match stream.next().await { + Some(Ok((events, _))) => { + let count = events.len(); + let mut total_events_size = 0; + for mut event in events { + if let Event::Log(ref mut log) = event { + log.try_insert(schema.source_type_key(), Bytes::from("aws_sqs")); + if let Some(sent_time) = sent_time { + log.try_insert(schema.timestamp_key(), sent_time); + } + } + total_events_size += event.size_of(); + yield event; + } + emit!(&EventsReceived { + byte_size: total_events_size, + count + }); + }, + Some(Err(error)) => { + // Error is logged by `crate::codecs::Decoder`, no further handling + // is needed here. + if !error.can_continue() { + break; + } } + None => break, } - }) - .take_while(|x| ready(x.is_some())) - .filter_map(|x| ready(x.expect("should have inner value"))) + } + }; + stream.map(Ok) +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::SecondsFormat; + + #[tokio::test] + async fn test_decode() { + let message = "test"; + let now = Utc::now(); + let stream = decode_message::<()>(Decoder::default(), "test", Some(now)); + let events: Vec<_> = stream.collect().await; + assert_eq!(events.len(), 1); + assert_eq!( + events[0] + .clone() + .unwrap() + .as_log() + .get(log_schema().message_key()) + .unwrap() + .to_string_lossy(), + message + ); + assert_eq!( + events[0] + .clone() + .unwrap() + .as_log() + .get(log_schema().timestamp_key()) + .unwrap() + .to_string_lossy(), + now.to_rfc3339_opts(SecondsFormat::AutoSi, true) + ); + } + + #[test] + fn test_get_timestamp() { + let attributes = HashMap::from([( + MessageSystemAttributeName::SentTimestamp, + "1636408546018".to_string(), + )]); + + assert_eq!( + get_timestamp(&Some(attributes)), + Some(Utc.timestamp_millis(1636408546018)) + ); + } } diff --git a/website/cue/reference/components/sources/aws_sqs.cue b/website/cue/reference/components/sources/aws_sqs.cue new file mode 100644 index 0000000000000..1fe8836c717ab --- /dev/null +++ b/website/cue/reference/components/sources/aws_sqs.cue @@ -0,0 +1,125 @@ +package metadata + +components: sources: aws_sqs: components._aws & { + title: "AWS SQS" + +// features: { +// collect: { +// checkpoint: enabled: false +// tls: { +// enabled: true +// can_enable: true +// can_verify_certificate: false +// can_verify_hostname: false +// enabled_default: false +// } +// from: components._kafka.features.collect.from +// } +// multiline: enabled: false +// codecs: { +// enabled: true +// default_framing: "bytes" +// } +// } + + classes: { + commonly_used: true + deployment_roles: ["aggregator"] + delivery: "at_least_once" + development: "beta" + egress_method: "stream" + stateful: false + } + + support: { + targets: { + "aarch64-unknown-linux-gnu": true + "aarch64-unknown-linux-musl": true + "armv7-unknown-linux-gnueabihf": true + "armv7-unknown-linux-musleabihf": true + "x86_64-apple-darwin": true + "x86_64-pc-windows-msv": true + "x86_64-unknown-linux-gnu": true + "x86_64-unknown-linux-musl": true + } + requirements: [ + """ + The AWS SQS source requires an SQS queue. + """, + ] + warnings: [] + notices: [] + } + + installation: { + platform_name: null + } + + configuration: { + acknowledgements: configuration._acknowledgements + poll_secs: { + common: true + description: "How long to wait when polling SQS for new messages. 0-20 seconds" + required: false + warnings: [] + type: uint: { + default: 15 + unit: "seconds" + } + } + client_concurrency: { + common: true + description: "How many clients are receiving / acking SQS messages. Increasing may allow higher throughput." + required: false + warnings: [] + type: uint: { + default: "1 per CPU core" + unit: "# of clients" + } + } + queue_url: { + description: "The URL of the SQS queue to receive bucket notifications from." + required: true + warnings: [] + type: string: { + examples: ["https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue"] + syntax: "literal" + } + } + } + + output: logs: record: { + description: "An individual SQS record" + fields: { + message: { + description: "The raw message from the SQS record." + required: true + type: string: { + examples: ["53.126.150.246 - - [01/Oct/2020:11:25:58 -0400] \"GET /disintermediate HTTP/2.0\" 401 20308"] + syntax: "literal" + } + } + } + } + + telemetry: metrics: { + events_failed_total: components.sources.internal_metrics.output.metrics.events_failed_total + events_in_total: components.sources.internal_metrics.output.metrics.events_in_total + consumer_offset_updates_failed_total: components.sources.internal_metrics.output.metrics.consumer_offset_updates_failed_total + kafka_queue_messages: components.sources.internal_metrics.output.metrics.kafka_queue_messages + kafka_queue_messages_bytes: components.sources.internal_metrics.output.metrics.kafka_queue_messages_bytes + kafka_requests_total: components.sources.internal_metrics.output.metrics.kafka_requests_total + kafka_requests_bytes_total: components.sources.internal_metrics.output.metrics.kafka_requests_bytes_total + kafka_responses_total: components.sources.internal_metrics.output.metrics.kafka_responses_total + kafka_responses_bytes_total: components.sources.internal_metrics.output.metrics.kafka_responses_bytes_total + kafka_produced_messages_total: components.sources.internal_metrics.output.metrics.kafka_produced_messages_total + kafka_produced_messages_bytes_total: components.sources.internal_metrics.output.metrics.kafka_produced_messages_bytes_total + kafka_consumed_messages_total: components.sources.internal_metrics.output.metrics.kafka_consumed_messages_total + kafka_consumed_messages_bytes_total: components.sources.internal_metrics.output.metrics.kafka_consumed_messages_bytes_total + processed_bytes_total: components.sources.internal_metrics.output.metrics.processed_bytes_total + processed_events_total: components.sources.internal_metrics.output.metrics.processed_events_total + component_received_events_total: components.sources.internal_metrics.output.metrics.component_received_events_total + } + + how_it_works: components._kafka.how_it_works +} From 68676b6e9b75e968c949cf40def98019c0a52ea8 Mon Sep 17 00:00:00 2001 From: Nathan Fox Date: Mon, 8 Nov 2021 17:13:32 -0500 Subject: [PATCH 07/17] cleanup Signed-off-by: Nathan Fox --- src/aws/rusoto/auth.rs | 2 +- src/sources/aws_sqs/source.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/aws/rusoto/auth.rs b/src/aws/rusoto/auth.rs index a00fb98a80775..5430591d9246e 100644 --- a/src/aws/rusoto/auth.rs +++ b/src/aws/rusoto/auth.rs @@ -2,7 +2,7 @@ use crate::aws::auth::AwsAuthentication; use crate::aws::rusoto::AwsCredentialsProvider; use rusoto_core::Region; -const AWS_DEFAULT_PROFILE: &'static str = "default"; +const AWS_DEFAULT_PROFILE: &str = "default"; impl AwsAuthentication { pub fn build( diff --git a/src/sources/aws_sqs/source.rs b/src/sources/aws_sqs/source.rs index 38bcba76139e7..eea2c1a5a084c 100644 --- a/src/sources/aws_sqs/source.rs +++ b/src/sources/aws_sqs/source.rs @@ -151,7 +151,7 @@ async fn delete_messages(client: &SqsClient, receipts: &[String], queue_url: &st if !receipts.is_empty() { let mut batch = client.delete_message_batch().queue_url(queue_url); - for (id, receipt) in receipts.into_iter().enumerate() { + for (id, receipt) in receipts.iter().enumerate() { batch = batch.entries( DeleteMessageBatchRequestEntry::builder() .id(id.to_string()) From e3e1eb39ccbd44b4cb588001e65ab18e7bcce4f5 Mon Sep 17 00:00:00 2001 From: Nathan Fox Date: Tue, 9 Nov 2021 10:23:53 -0500 Subject: [PATCH 08/17] fix tests Signed-off-by: Nathan Fox --- src/sinks/aws_kinesis_firehose/tests.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/sinks/aws_kinesis_firehose/tests.rs b/src/sinks/aws_kinesis_firehose/tests.rs index 894fd9381a259..2fe3ce23e6fb5 100644 --- a/src/sinks/aws_kinesis_firehose/tests.rs +++ b/src/sinks/aws_kinesis_firehose/tests.rs @@ -1,8 +1,8 @@ #![cfg(test)] use super::*; +use crate::aws::RegionOrEndpoint; use crate::config::{SinkConfig, SinkContext}; -use crate::rusoto::RegionOrEndpoint; use crate::sinks::aws_kinesis_firehose::config::{ BuildError, MAX_PAYLOAD_EVENTS, MAX_PAYLOAD_SIZE, }; @@ -19,7 +19,7 @@ fn generate_config() { async fn check_batch_size() { let config = KinesisFirehoseSinkConfig { stream_name: String::from("test"), - region: RegionOrEndpoint::with_endpoint("http://localhost:4566".into()), + region: RegionOrEndpoint::with_endpoint("http://localhost:4566"), encoding: EncodingConfig::from(StandardEncodings::Json), compression: Compression::None, batch: BatchConfig { @@ -44,7 +44,7 @@ async fn check_batch_size() { async fn check_batch_events() { let config = KinesisFirehoseSinkConfig { stream_name: String::from("test"), - region: RegionOrEndpoint::with_endpoint("http://localhost:4566".into()), + region: RegionOrEndpoint::with_endpoint("http://localhost:4566"), encoding: EncodingConfig::from(StandardEncodings::Json), compression: Compression::None, batch: BatchConfig { From 1630b1dc118dc9fa7df241751112d290cba2fbd0 Mon Sep 17 00:00:00 2001 From: Nathan Fox Date: Tue, 9 Nov 2021 11:18:46 -0500 Subject: [PATCH 09/17] cargo fmt Signed-off-by: Nathan Fox --- src/aws/auth.rs | 3 --- src/lib.rs | 6 +++--- src/sinks/aws_kinesis_streams/config.rs | 2 +- src/sinks/aws_s3/config.rs | 6 +++--- src/sinks/aws_s3/tests.rs | 2 +- src/sinks/elasticsearch/config.rs | 2 +- src/sinks/elasticsearch/mod.rs | 4 ++-- src/sinks/elasticsearch/tests.rs | 2 +- src/sinks/s3_common/config.rs | 2 +- 9 files changed, 13 insertions(+), 16 deletions(-) diff --git a/src/aws/auth.rs b/src/aws/auth.rs index 844fbc1699f79..30be6d821bedd 100644 --- a/src/aws/auth.rs +++ b/src/aws/auth.rs @@ -28,9 +28,6 @@ pub enum AwsAuthentication { #[cfg(test)] mod tests { use super::*; - - - #[derive(Serialize, Deserialize, Clone, Debug)] struct ComponentConfig { diff --git a/src/lib.rs b/src/lib.rs index 33e6b3a10b093..5ed9b04bc709a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -37,6 +37,8 @@ pub mod internal_events; pub mod api; pub mod app; pub mod async_read; +#[cfg(any(feature = "rusoto_core", feature = "aws-config"))] +pub mod aws; #[cfg(feature = "codecs")] pub mod codecs; pub mod encoding_transcode; @@ -52,8 +54,6 @@ pub mod list; pub(crate) mod pipeline; pub(crate) mod proto; pub mod providers; -#[cfg(any(feature = "rusoto_core", feature = "aws-config"))] -pub mod aws; pub mod serde; #[cfg(windows)] pub mod service; @@ -86,7 +86,7 @@ pub mod vector_windows; pub use pipeline::Pipeline; -pub use vector_core::{Error, event, mapping, metrics, Result}; +pub use vector_core::{event, mapping, metrics, Error, Result}; pub fn vector_version() -> impl std::fmt::Display { #[cfg(feature = "nightly")] diff --git a/src/sinks/aws_kinesis_streams/config.rs b/src/sinks/aws_kinesis_streams/config.rs index 49b2913e99bb4..ff4cd487b2a67 100644 --- a/src/sinks/aws_kinesis_streams/config.rs +++ b/src/sinks/aws_kinesis_streams/config.rs @@ -1,7 +1,7 @@ use std::convert::TryInto; -use crate::config::{DataType, GenerateConfig, ProxyConfig, SinkConfig, SinkContext}; use crate::aws::rusoto::{AwsAuthentication, RegionOrEndpoint}; +use crate::config::{DataType, GenerateConfig, ProxyConfig, SinkConfig, SinkContext}; use crate::sinks::aws_kinesis_streams::service::KinesisService; use crate::sinks::util::encoding::{EncodingConfig, StandardEncodings}; use crate::sinks::util::{BatchConfig, BatchSettings, Compression, TowerRequestConfig}; diff --git a/src/sinks/aws_s3/config.rs b/src/sinks/aws_s3/config.rs index 8a1edd19a5551..2fde1128ec425 100644 --- a/src/sinks/aws_s3/config.rs +++ b/src/sinks/aws_s3/config.rs @@ -1,3 +1,4 @@ +use crate::aws::rusoto::{AwsAuthentication, RegionOrEndpoint}; use crate::config::SinkContext; use crate::sinks::s3_common::sink::S3Sink; use crate::sinks::util::encoding::StandardEncodings; @@ -5,16 +6,16 @@ use crate::sinks::util::BatchSettings; use crate::{ config::{DataType, GenerateConfig, ProxyConfig, SinkConfig}, sinks::{ - Healthcheck, s3_common::{ self, config::{S3Options, S3RetryLogic}, service::S3Service, }, util::{ - BatchConfig, Compression, encoding::EncodingConfig, ServiceBuilderExt, + encoding::EncodingConfig, BatchConfig, Compression, ServiceBuilderExt, TowerRequestConfig, }, + Healthcheck, }, }; use rusoto_s3::S3Client; @@ -22,7 +23,6 @@ use serde::{Deserialize, Serialize}; use std::convert::TryInto; use tower::ServiceBuilder; use vector_core::sink::VectorSink; -use crate::aws::rusoto::{AwsAuthentication, RegionOrEndpoint}; use super::sink::S3RequestOptions; use crate::sinks::util::partitioner::KeyPartitioner; diff --git a/src/sinks/aws_s3/tests.rs b/src/sinks/aws_s3/tests.rs index e3609cdf9b0cd..0804b7fcb5f1a 100644 --- a/src/sinks/aws_s3/tests.rs +++ b/src/sinks/aws_s3/tests.rs @@ -1,8 +1,8 @@ #[cfg(feature = "aws-s3-integration-tests")] #[cfg(test)] mod integration_tests { - use crate::config::SinkContext; use crate::aws::rusoto::RegionOrEndpoint; + use crate::config::SinkContext; use crate::sinks::aws_s3::S3SinkConfig; use crate::sinks::s3_common::config::S3Options; use crate::sinks::util::encoding::StandardEncodings; diff --git a/src/sinks/elasticsearch/config.rs b/src/sinks/elasticsearch/config.rs index 58f52c5be24f6..e3a3184882de7 100644 --- a/src/sinks/elasticsearch/config.rs +++ b/src/sinks/elasticsearch/config.rs @@ -1,9 +1,9 @@ +use crate::aws::rusoto::RegionOrEndpoint; use crate::config::log_schema; use crate::config::{DataType, SinkConfig, SinkContext}; use crate::event::{EventRef, LogEvent, Value}; use crate::http::HttpClient; use crate::internal_events::TemplateRenderingFailed; -use crate::aws::rusoto::RegionOrEndpoint; use crate::sinks::elasticsearch::request_builder::ElasticsearchRequestBuilder; use crate::sinks::elasticsearch::sink::ElasticSearchSink; use crate::sinks::elasticsearch::{BatchActionTemplate, IndexTemplate}; diff --git a/src/sinks/elasticsearch/mod.rs b/src/sinks/elasticsearch/mod.rs index 3e52753a625b4..b29519430c2aa 100644 --- a/src/sinks/elasticsearch/mod.rs +++ b/src/sinks/elasticsearch/mod.rs @@ -24,8 +24,8 @@ use crate::{ }; use http::{ header::{HeaderName, HeaderValue}, - Request, uri::InvalidUri, + Request, }; use rusoto_credential::{CredentialsError, ProvideAwsCredentials}; @@ -34,8 +34,8 @@ use serde::{Deserialize, Serialize}; use snafu::{ResultExt, Snafu}; -use std::convert::TryFrom; use crate::aws::rusoto::{self, AwsAuthentication}; +use std::convert::TryFrom; use crate::event::{EventRef, LogEvent}; // use crate::sinks::elasticsearch::ParseError::AwsCredentialsGenerateFailed; diff --git a/src/sinks/elasticsearch/tests.rs b/src/sinks/elasticsearch/tests.rs index c8012ed3de23e..6b756417844e7 100644 --- a/src/sinks/elasticsearch/tests.rs +++ b/src/sinks/elasticsearch/tests.rs @@ -1,6 +1,6 @@ use super::BulkAction; -use crate::event::{LogEvent, Metric, MetricKind, MetricValue, Value}; use crate::aws::rusoto::AwsAuthentication; +use crate::event::{LogEvent, Metric, MetricKind, MetricValue, Value}; use crate::sinks::elasticsearch::sink::process_log; use crate::sinks::elasticsearch::{ DataStreamConfig, ElasticSearchAuth, ElasticSearchCommon, ElasticSearchConfig, diff --git a/src/sinks/s3_common/config.rs b/src/sinks/s3_common/config.rs index 0b33b29c30570..909da14c20d35 100644 --- a/src/sinks/s3_common/config.rs +++ b/src/sinks/s3_common/config.rs @@ -1,7 +1,7 @@ use super::service::S3Service; -use crate::config::ProxyConfig; use crate::aws::rusoto; use crate::aws::rusoto::{AwsAuthentication, RegionOrEndpoint}; +use crate::config::ProxyConfig; use crate::sinks::util::retries::RetryLogic; use crate::sinks::Healthcheck; use futures::FutureExt; From 30ec387ec2e4d4574a2d5944cbc57c3a0d762ac9 Mon Sep 17 00:00:00 2001 From: Nathan Fox Date: Tue, 9 Nov 2021 14:48:13 -0500 Subject: [PATCH 10/17] cleanup Signed-off-by: Nathan Fox --- src/aws/rusoto/region.rs | 4 +--- src/sources/aws_sqs/integration_tests.rs | 7 ------- 2 files changed, 1 insertion(+), 10 deletions(-) diff --git a/src/aws/rusoto/region.rs b/src/aws/rusoto/region.rs index 2ee757665d6a3..9a80632fa04ed 100644 --- a/src/aws/rusoto/region.rs +++ b/src/aws/rusoto/region.rs @@ -1,13 +1,11 @@ use http::{uri::InvalidUri, Uri}; use rusoto_core::{region::ParseRegionError, Region}; +pub use crate::aws::region::RegionOrEndpoint; use snafu::{ResultExt, Snafu}; use std::convert::TryFrom; use std::str::FromStr; -//TODO: backwards compat - use the type directly instead of from here -pub use crate::aws::region::RegionOrEndpoint; - #[derive(Debug, Snafu)] pub enum ParseError { #[snafu(display("Failed to parse custom endpoint as URI: {}", source))] diff --git a/src/sources/aws_sqs/integration_tests.rs b/src/sources/aws_sqs/integration_tests.rs index a99ac717e508a..9e969278ce3b4 100644 --- a/src/sources/aws_sqs/integration_tests.rs +++ b/src/sources/aws_sqs/integration_tests.rs @@ -19,13 +19,6 @@ use std::str::FromStr; use std::time::Duration; use tokio::time::timeout; -// fn gen_queue_url(name: &str) -> String { -// format!( -// "https://sqs.us-east-1.amazonaws.com/123456789012/test-{}", -// name -// ) -// } - fn gen_queue_name() -> String { random_string(10).to_lowercase() } From 50440ab681904581d813d076a077a17728ef9c65 Mon Sep 17 00:00:00 2001 From: Nathan Fox Date: Wed, 10 Nov 2021 16:45:08 -0500 Subject: [PATCH 11/17] update docs Signed-off-by: Nathan Fox --- .../reference/components/sources/aws_sqs.cue | 63 +++++++++---------- 1 file changed, 29 insertions(+), 34 deletions(-) diff --git a/website/cue/reference/components/sources/aws_sqs.cue b/website/cue/reference/components/sources/aws_sqs.cue index 1fe8836c717ab..ba3fff0c2f85f 100644 --- a/website/cue/reference/components/sources/aws_sqs.cue +++ b/website/cue/reference/components/sources/aws_sqs.cue @@ -3,24 +3,19 @@ package metadata components: sources: aws_sqs: components._aws & { title: "AWS SQS" -// features: { -// collect: { -// checkpoint: enabled: false -// tls: { -// enabled: true -// can_enable: true -// can_verify_certificate: false -// can_verify_hostname: false -// enabled_default: false -// } -// from: components._kafka.features.collect.from -// } -// multiline: enabled: false -// codecs: { -// enabled: true -// default_framing: "bytes" -// } -// } + features: { + collect: { + tls: enabled: false + checkpoint: enabled: false + proxy: enabled: true + from: service: services.aws_sqs + } + multiline: enabled: false + codecs: { + enabled: true + default_framing: "bytes" + } + } classes: { commonly_used: true @@ -103,23 +98,23 @@ components: sources: aws_sqs: components._aws & { } telemetry: metrics: { - events_failed_total: components.sources.internal_metrics.output.metrics.events_failed_total - events_in_total: components.sources.internal_metrics.output.metrics.events_in_total - consumer_offset_updates_failed_total: components.sources.internal_metrics.output.metrics.consumer_offset_updates_failed_total - kafka_queue_messages: components.sources.internal_metrics.output.metrics.kafka_queue_messages - kafka_queue_messages_bytes: components.sources.internal_metrics.output.metrics.kafka_queue_messages_bytes - kafka_requests_total: components.sources.internal_metrics.output.metrics.kafka_requests_total - kafka_requests_bytes_total: components.sources.internal_metrics.output.metrics.kafka_requests_bytes_total - kafka_responses_total: components.sources.internal_metrics.output.metrics.kafka_responses_total - kafka_responses_bytes_total: components.sources.internal_metrics.output.metrics.kafka_responses_bytes_total - kafka_produced_messages_total: components.sources.internal_metrics.output.metrics.kafka_produced_messages_total - kafka_produced_messages_bytes_total: components.sources.internal_metrics.output.metrics.kafka_produced_messages_bytes_total - kafka_consumed_messages_total: components.sources.internal_metrics.output.metrics.kafka_consumed_messages_total - kafka_consumed_messages_bytes_total: components.sources.internal_metrics.output.metrics.kafka_consumed_messages_bytes_total - processed_bytes_total: components.sources.internal_metrics.output.metrics.processed_bytes_total - processed_events_total: components.sources.internal_metrics.output.metrics.processed_events_total + component_received_event_bytes_total: components.sources.internal_metrics.output.metrics.component_received_event_bytes_total component_received_events_total: components.sources.internal_metrics.output.metrics.component_received_events_total + component_received_bytes_total: components.sources.internal_metrics.output.metrics.component_received_bytes_total + sqs_delete_failed_total: components.sources.internal_metrics.output.metrics.sqs_message_delete_failed_total } - how_it_works: components._kafka.how_it_works + how_it_works: { + aws_sqs: { + title: "AWS SQS" + body: """ + The `aws_sqs` source receives messages from [AWS SQS](https://aws.amazon.com/sqs/) + (Simple Queue Service). This is a highly scaleable / durable queueing system with + at-least-once queuing semantics. Messages are received in batches (up to 10 at a time), + and then deleted in batches (again up to 10). Messages are either deleted immediately + after receiving, or after it has been fully processed by the sinks, depending on the + `acknowledgements` setting. + """ + } + } } From d929e52244d6720ff0a6cd90de64bdd5af692d89 Mon Sep 17 00:00:00 2001 From: Nathan Fox Date: Wed, 10 Nov 2021 16:46:24 -0500 Subject: [PATCH 12/17] fix import Signed-off-by: Nathan Fox --- src/sinks/aws_kinesis_firehose/integration_tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sinks/aws_kinesis_firehose/integration_tests.rs b/src/sinks/aws_kinesis_firehose/integration_tests.rs index 235eacb05ca4e..4026f22e24ab7 100644 --- a/src/sinks/aws_kinesis_firehose/integration_tests.rs +++ b/src/sinks/aws_kinesis_firehose/integration_tests.rs @@ -2,8 +2,8 @@ #![cfg(test)] use super::*; +use crate::aws::{AwsAuthentication, RegionOrEndpoint}; use crate::config::{SinkConfig, SinkContext}; -use crate::rusoto::{AwsAuthentication, RegionOrEndpoint}; use crate::sinks::util::encoding::{EncodingConfig, StandardEncodings}; use crate::sinks::util::{BatchConfig, Compression, TowerRequestConfig}; use crate::test_util::components; From c0ac9fbfcb04bb6da5fd89673e491174c426afa0 Mon Sep 17 00:00:00 2001 From: Nathan Fox Date: Thu, 11 Nov 2021 09:51:48 -0500 Subject: [PATCH 13/17] appease the clippy gods Signed-off-by: Nathan Fox --- src/sinks/aws_kinesis_firehose/integration_tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sinks/aws_kinesis_firehose/integration_tests.rs b/src/sinks/aws_kinesis_firehose/integration_tests.rs index 4026f22e24ab7..f928f1056f9ed 100644 --- a/src/sinks/aws_kinesis_firehose/integration_tests.rs +++ b/src/sinks/aws_kinesis_firehose/integration_tests.rs @@ -37,7 +37,7 @@ async fn firehose_put_records() { let config = KinesisFirehoseSinkConfig { stream_name: stream.clone(), - region: RegionOrEndpoint::with_endpoint("http://localhost:4566".into()), + region: RegionOrEndpoint::with_endpoint("http://localhost:4566"), encoding: EncodingConfig::from(StandardEncodings::Json), // required for ES destination w/ localstack compression: Compression::None, batch: BatchConfig { From 568c892f7d9044e00197d06ccfba0d8bba1b248a Mon Sep 17 00:00:00 2001 From: Nathan Fox Date: Thu, 11 Nov 2021 10:07:19 -0500 Subject: [PATCH 14/17] fix cue docs Signed-off-by: Nathan Fox --- .../reference/components/sources/aws_sqs.cue | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/website/cue/reference/components/sources/aws_sqs.cue b/website/cue/reference/components/sources/aws_sqs.cue index ba3fff0c2f85f..2ff41164da561 100644 --- a/website/cue/reference/components/sources/aws_sqs.cue +++ b/website/cue/reference/components/sources/aws_sqs.cue @@ -62,16 +62,16 @@ components: sources: aws_sqs: components._aws & { unit: "seconds" } } - client_concurrency: { - common: true - description: "How many clients are receiving / acking SQS messages. Increasing may allow higher throughput." - required: false - warnings: [] - type: uint: { - default: "1 per CPU core" - unit: "# of clients" - } - } +// client_concurrency: { +// common: true +// description: "How many clients are receiving / acking SQS messages. Increasing may allow higher throughput. Note: the default is 1 / CPU core" +// required: false +// warnings: [] +// type: uint: { +// default: 1 +// unit: "# of clients" +// } +// } queue_url: { description: "The URL of the SQS queue to receive bucket notifications from." required: true @@ -101,7 +101,7 @@ components: sources: aws_sqs: components._aws & { component_received_event_bytes_total: components.sources.internal_metrics.output.metrics.component_received_event_bytes_total component_received_events_total: components.sources.internal_metrics.output.metrics.component_received_events_total component_received_bytes_total: components.sources.internal_metrics.output.metrics.component_received_bytes_total - sqs_delete_failed_total: components.sources.internal_metrics.output.metrics.sqs_message_delete_failed_total + sqs_message_delete_failed_total: components.sources.internal_metrics.output.metrics.sqs_message_delete_failed_total } how_it_works: { From e3c6d86412f363095ef0204eb912a67c1fb428c3 Mon Sep 17 00:00:00 2001 From: Nathan Fox Date: Thu, 11 Nov 2021 10:27:42 -0500 Subject: [PATCH 15/17] actually add SQS to the docs... Signed-off-by: Nathan Fox --- src/sources/aws_sqs/events.rs | 2 +- .../reference/configuration/sources/aws_sqs.md | 14 ++++++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) create mode 100644 website/content/en/docs/reference/configuration/sources/aws_sqs.md diff --git a/src/sources/aws_sqs/events.rs b/src/sources/aws_sqs/events.rs index c2aa32c824394..d4273fe3eed85 100644 --- a/src/sources/aws_sqs/events.rs +++ b/src/sources/aws_sqs/events.rs @@ -33,6 +33,6 @@ impl<'a> InternalEvent for SqsMessageDeleteError<'a> { } fn emit_metrics(&self) { - counter!("sqs_delete_failed_total", 1); + counter!("sqs_message_delete_failed_total", 1); } } diff --git a/website/content/en/docs/reference/configuration/sources/aws_sqs.md b/website/content/en/docs/reference/configuration/sources/aws_sqs.md new file mode 100644 index 0000000000000..2742e684b907c --- /dev/null +++ b/website/content/en/docs/reference/configuration/sources/aws_sqs.md @@ -0,0 +1,14 @@ +--- +title: AWS SQS +description: Collect logs from [AWS SQS](https://aws.amazon.com/sqs) +kind: source +layout: component +tags: ["aws", "sqs", "queue", "component", "source", "logs"] +--- + +{{/* +This doc is generated using: + +1. The template in layouts/docs/component.html +2. The relevant CUE data in cue/reference/components/... +*/}} From 79e2fa078cb2e2f1151767592b95375ef0f2557d Mon Sep 17 00:00:00 2001 From: Nathan Fox Date: Thu, 11 Nov 2021 10:42:29 -0500 Subject: [PATCH 16/17] improve docs Signed-off-by: Nathan Fox --- website/cue/reference/components/sources/aws_sqs.cue | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/website/cue/reference/components/sources/aws_sqs.cue b/website/cue/reference/components/sources/aws_sqs.cue index 2ff41164da561..5190a8378bfe0 100644 --- a/website/cue/reference/components/sources/aws_sqs.cue +++ b/website/cue/reference/components/sources/aws_sqs.cue @@ -73,7 +73,7 @@ components: sources: aws_sqs: components._aws & { // } // } queue_url: { - description: "The URL of the SQS queue to receive bucket notifications from." + description: "The URL of the SQS queue to receive events from." required: true warnings: [] type: string: { @@ -94,6 +94,9 @@ components: sources: aws_sqs: components._aws & { syntax: "literal" } } + timestamp: fields._current_timestamp & { + description: "The time this message was sent to SQS." + } } } From b6662b1113e11f68e92ce1ab2e3adc89e927aa21 Mon Sep 17 00:00:00 2001 From: Nathan Fox Date: Thu, 11 Nov 2021 14:46:05 -0500 Subject: [PATCH 17/17] fix cue formatting Signed-off-by: Nathan Fox --- .../reference/components/sources/aws_sqs.cue | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/website/cue/reference/components/sources/aws_sqs.cue b/website/cue/reference/components/sources/aws_sqs.cue index 5190a8378bfe0..5f084f480eaf7 100644 --- a/website/cue/reference/components/sources/aws_sqs.cue +++ b/website/cue/reference/components/sources/aws_sqs.cue @@ -39,8 +39,8 @@ components: sources: aws_sqs: components._aws & { } requirements: [ """ - The AWS SQS source requires an SQS queue. - """, + The AWS SQS source requires an SQS queue. + """, ] warnings: [] notices: [] @@ -62,16 +62,16 @@ components: sources: aws_sqs: components._aws & { unit: "seconds" } } -// client_concurrency: { -// common: true -// description: "How many clients are receiving / acking SQS messages. Increasing may allow higher throughput. Note: the default is 1 / CPU core" -// required: false -// warnings: [] -// type: uint: { -// default: 1 -// unit: "# of clients" -// } -// } + // client_concurrency: { + // common: true + // description: "How many clients are receiving / acking SQS messages. Increasing may allow higher throughput. Note: the default is 1 / CPU core" + // required: false + // warnings: [] + // type: uint: { + // default: 1 + // unit: "# of clients" + // } + // } queue_url: { description: "The URL of the SQS queue to receive events from." required: true @@ -110,7 +110,7 @@ components: sources: aws_sqs: components._aws & { how_it_works: { aws_sqs: { title: "AWS SQS" - body: """ + body: """ The `aws_sqs` source receives messages from [AWS SQS](https://aws.amazon.com/sqs/) (Simple Queue Service). This is a highly scaleable / durable queueing system with at-least-once queuing semantics. Messages are received in batches (up to 10 at a time),