diff --git a/Cargo.lock b/Cargo.lock index 56640ba0..c06129cc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -73,6 +73,286 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "aws-config" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2d5596c84d0f2e569ec211af64cc219492d56d4aec2d16a7a4d0622d61ec82d" +dependencies = [ + "aws-http", + "aws-sdk-sso", + "aws-sdk-sts", + "aws-smithy-async", + "aws-smithy-client", + "aws-smithy-http", + "aws-smithy-http-tower", + "aws-smithy-json", + "aws-smithy-types", + "aws-types", + "bytes", + "hex", + "http", + "hyper", + "ring", + "tokio", + "tower", + "tracing", + "zeroize", +] + +[[package]] +name = "aws-endpoint" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0990fe9d60185efea41850b10a205f4a9abe71499ec70298b11d2d830130167" +dependencies = [ + "aws-smithy-http", + "aws-types", + "http", + "regex", + "tracing", +] + +[[package]] +name = "aws-http" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6794b0b27fb74ef2696c41e1be08e916993ef043bbeda7ec554c4f50c3b81506" +dependencies = [ + "aws-smithy-http", + "aws-smithy-types", + "aws-types", + "http", + "lazy_static", + "percent-encoding", + "tracing", +] + +[[package]] +name = "aws-sdk-sqs" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "551504c2dd114701d6a7702308c097982f678146eb4a56a8a94c3c5b2420413f" +dependencies = [ + "aws-endpoint", + "aws-http", + "aws-sig-auth", + "aws-smithy-async", + "aws-smithy-client", + "aws-smithy-http", + "aws-smithy-http-tower", + "aws-smithy-query", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "bytes", + "http", + "tokio-stream", + "tower", +] + +[[package]] +name = "aws-sdk-sso" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e4ce01b97e0ae8a2abd82b4aa13780fb1ddf7b6134e2da719ee1a16c2da3540" +dependencies = [ + "aws-endpoint", + "aws-http", + "aws-sig-auth", + "aws-smithy-async", + "aws-smithy-client", + "aws-smithy-http", + "aws-smithy-http-tower", + "aws-smithy-json", + "aws-smithy-types", + "aws-types", + "bytes", + "http", + "tokio-stream", + "tower", +] + +[[package]] +name = "aws-sdk-sts" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9c36263a8813bd5791fdba2f817178032cccd63d4c834ab52192e7020d9c371" +dependencies = [ + "aws-endpoint", + "aws-http", + "aws-sig-auth", + "aws-smithy-async", + "aws-smithy-client", + "aws-smithy-http", + "aws-smithy-http-tower", + "aws-smithy-query", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "bytes", + "http", + "tower", +] + +[[package]] +name = "aws-sig-auth" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fa501148ae6b5e0de5eeb8c4cf87fa3403d9a00077e543ad64011da781f73a6" +dependencies = [ + "aws-sigv4", + "aws-smithy-http", + "aws-types", + "http", + "thiserror", + "tracing", +] + +[[package]] +name = "aws-sigv4" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d371fb688d909e5b866ff1f297bbec4621eed4f9fcdac566fcc33541f0c6a6" +dependencies = [ + "aws-smithy-http", + "form_urlencoded", + "hex", + "http", + "once_cell", + "percent-encoding", + "regex", + "ring", + "time 0.3.7", + "tracing", +] + +[[package]] +name = "aws-smithy-async" +version = "0.38.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ec4efb4a27ced592009787f4f03925f348a5b4a55e6a617e6819788d6cd5ed8" +dependencies = [ + "futures-util", + "pin-project-lite", + "tokio", + "tokio-stream", +] + +[[package]] +name = "aws-smithy-client" +version = "0.38.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dad1857eb59d562e82f05c02fbcb9f46c1089301c86770a9798c9e64e5a4677a" +dependencies = [ + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-http-tower", + "aws-smithy-types", + "bytes", + "fastrand", + "http", + "http-body", + "hyper", + "hyper-rustls", + "lazy_static", + "pin-project", + "pin-project-lite", + "tokio", + "tower", + "tracing", +] + +[[package]] +name = "aws-smithy-http" +version = "0.38.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12c787e24b757634453a60ff05948aa1b450f5b3a7a2094f22acff8a5022635b" +dependencies = [ + "aws-smithy-types", + "bytes", + "bytes-utils", + "futures-core", + "http", + "http-body", + "hyper", + "percent-encoding", + "pin-project", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "aws-smithy-http-tower" +version = "0.38.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64f80a2c56fc09fc9a2da3c63f286ec2a89465433219f8165e14e522283a5eb8" +dependencies = [ + "aws-smithy-http", + "bytes", + "http", + "http-body", + "pin-project", + "tower", + "tracing", +] + +[[package]] +name = "aws-smithy-json" +version = "0.38.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b59d67d8baecb7485eeb75eb7f262777d5727cd368b16757207c9c1bdf506bd8" +dependencies = [ + "aws-smithy-types", +] + +[[package]] +name = "aws-smithy-query" +version = "0.38.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b804b3302b20ec701104fbd59058ab09e5d4a03387b37c9a1fb990615f6c81e" +dependencies = [ + "aws-smithy-types", + "urlencoding", +] + +[[package]] +name = "aws-smithy-types" +version = "0.38.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dfed653678d1059bed597054c65ce44892aa79cd94444e386d7611843db9f0a2" +dependencies = [ + "itoa", + "num-integer", + "ryu", + "time 0.3.7", +] + +[[package]] +name = "aws-smithy-xml" +version = "0.38.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7aa6c9de6c3f875faabcaaad1fb1f4ef241683bfc22795f731719e3568c3ca9f" +dependencies = [ + "thiserror", + "xmlparser", +] + +[[package]] +name = "aws-types" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b111a0d144e1c570675358d2fae7eb5ddf9010d9db63142fe3bb80353ff65f38" +dependencies = [ + "aws-smithy-async", + "aws-smithy-client", + "aws-smithy-types", + "rustc_version 0.4.0", + "tracing", + "zeroize", +] + [[package]] name = "backtrace" version = "0.3.64" @@ -148,6 +428,16 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" +[[package]] +name = "bytes-utils" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e314712951c43123e5920a446464929adc667a5eade7f8fb3997776c9df6e54" +dependencies = [ + "bytes", + "either", +] + [[package]] name = "cassowary" version = "0.3.0" @@ -301,6 +591,15 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "129eabb7b0b78644a3a7e7cf220714aba47463bb281f69fa7a71ca5d12564cca" +[[package]] +name = "ct-logs" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1a816186fa68d9e426e3cb4ae4dff1fcd8e4a2c34b781bf7a822574a0d0aac8" +dependencies = [ + "sct", +] + [[package]] name = "darling" version = "0.13.1" @@ -360,7 +659,7 @@ dependencies = [ "lazy_static", "percent-encoding", "reqwest", - "rustc_version", + "rustc_version 0.2.3", "serde", "serde_json", "serde_with", @@ -492,6 +791,17 @@ version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b" +[[package]] +name = "futures-macro" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33c1e13800337f4d4d7a316bf45a567dbcb6ffe087f16424852d97e97a91f512" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.21" @@ -512,6 +822,7 @@ checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a" dependencies = [ "futures-core", "futures-io", + "futures-macro", "futures-task", "memchr", "pin-project-lite", @@ -652,6 +963,23 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-rustls" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f9f7a97316d44c0af9b0301e65010573a853a9fc97046d7331d7f6bc0fd5a64" +dependencies = [ + "ct-logs", + "futures-util", + "hyper", + "log 0.4.14", + "rustls", + "rustls-native-certs", + "tokio", + "tokio-rustls", + "webpki", +] + [[package]] name = "hyper-tls" version = "0.5.0" @@ -1074,6 +1402,8 @@ dependencies = [ name = "oura" version = "1.2.2" dependencies = [ + "aws-config", + "aws-sdk-sqs", "bech32", "clap", "config", @@ -1232,6 +1562,26 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" +[[package]] +name = "pin-project" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "58ad3879ad3baf4e44784bc6a718a8698867bb991f8ce24d1bcbe2cfb4c3a75e" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "744b6f092ba29c3650faf274db506afd39944f48420f6c86b17cfe0ee1cb36bb" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.2.8" @@ -1443,6 +1793,21 @@ dependencies = [ "winreg", ] +[[package]] +name = "ring" +version = "0.16.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc" +dependencies = [ + "cc", + "libc", + "once_cell", + "spin", + "untrusted", + "web-sys", + "winapi", +] + [[package]] name = "rustc-demangle" version = "0.1.21" @@ -1455,7 +1820,41 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "138e3e0acb6c9fb258b19b67cb8abd63c00679d2851805ea151465464fe9030a" dependencies = [ - "semver", + "semver 0.9.0", +] + +[[package]] +name = "rustc_version" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" +dependencies = [ + "semver 1.0.6", +] + +[[package]] +name = "rustls" +version = "0.19.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35edb675feee39aec9c99fa5ff985081995a06d594114ae14cbe797ad7b7a6d7" +dependencies = [ + "base64 0.13.0", + "log 0.4.14", + "ring", + "sct", + "webpki", +] + +[[package]] +name = "rustls-native-certs" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a07b7c1885bd8ed3831c289b7870b13ef46fe0e856d288c30d9cc17d75a2092" +dependencies = [ + "openssl-probe", + "rustls", + "schannel", + "security-framework", ] [[package]] @@ -1486,6 +1885,16 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" +[[package]] +name = "sct" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b362b83898e0e69f38515b82ee15aa80636befe47c3b6d3d89a911e78fc228ce" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "security-framework" version = "2.6.1" @@ -1518,6 +1927,12 @@ dependencies = [ "semver-parser", ] +[[package]] +name = "semver" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4a3381e03edd24287172047536f20cabde766e2cd3e65e6b00fb3af51c4f38d" + [[package]] name = "semver-parser" version = "0.7.0" @@ -1652,6 +2067,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "spin" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" + [[package]] name = "static_assertions" version = "1.1.0" @@ -1826,6 +2247,28 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-rustls" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc6844de72e57df1980054b38be3a9f4702aba4858be64dd700181a8a6d0e1b6" +dependencies = [ + "rustls", + "tokio", + "webpki", +] + +[[package]] +name = "tokio-stream" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50145484efff8818b5ccd256697f36863f587da82cf8b409c53adf1e840798e3" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-util" version = "0.6.9" @@ -1849,6 +2292,28 @@ dependencies = [ "serde", ] +[[package]] +name = "tower" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a89fd63ad6adf737582df5db40d286574513c69a11dac5214dc3b5603d6713e" +dependencies = [ + "futures-core", + "futures-util", + "pin-project", + "pin-project-lite", + "tokio", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "343bc9466d3fe6b0f960ef45960509f84480bf4fd96f92901afe7ff3df9d3a62" + [[package]] name = "tower-service" version = "0.3.1" @@ -1862,10 +2327,23 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2d8d93354fe2a8e50d5953f5ae2e47a3fc2ef03292e7ea46e3cc38f549525fb9" dependencies = [ "cfg-if 1.0.0", + "log 0.4.14", "pin-project-lite", + "tracing-attributes", "tracing-core", ] +[[package]] +name = "tracing-attributes" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e65ce065b4b5c53e73bb28912318cb8c9e9ad3921f1d669eb0e68b4c8143a2b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tracing-core" version = "0.1.22" @@ -1938,6 +2416,12 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3" +[[package]] +name = "untrusted" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" + [[package]] name = "url" version = "2.2.2" @@ -1950,6 +2434,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "urlencoding" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a1f0175e03a0973cf4afd476bef05c26e228520400eb1fd473ad417b1c00ffb" + [[package]] name = "vcpkg" version = "0.2.15" @@ -2060,6 +2550,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki" +version = "0.21.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8e38c0608262c46d4a56202ebabdeb094cef7e560ca7a226c6bf055188aa4ea" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "winapi" version = "0.3.9" @@ -2142,3 +2642,15 @@ checksum = "80d0f4e272c85def139476380b12f9ac60926689dd2e01d4923222f40580869d" dependencies = [ "winapi", ] + +[[package]] +name = "xmlparser" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "114ba2b24d2167ef6d67d7d04c8cc86522b87f490025f39f0303b7db5bf5e3d8" + +[[package]] +name = "zeroize" +version = "1.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50344758e2f40e3a1fcfc8f6f91aa57b5f8ebd8d27919fe6451f15aaaf9ee608" diff --git a/Cargo.toml b/Cargo.toml index 58a53569..d18802f9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,11 +45,17 @@ kafka = { version = "0.8.0", optional = true } # feature: elasticsink elasticsearch = { version = "7.14.0-alpha.1", optional = true } -tokio = { version = "1.17.0", optional = true, features = ["rt"] } # feature: fingerprint murmur3 = { version = "0.5.1", optional = true } +# feature: aws +aws-config = { version = "0.8.0", optional = true } +aws-sdk-sqs = { version = "0.8.0", optional = true } + +# features: elasticsearch || aws +tokio = { version = "1.17.0", optional = true, features = ["rt"] } + # required for CI to complete successfully openssl = { version = "0.10", optional = true, features = ["vendored"] } @@ -61,3 +67,4 @@ tuisink = ["tui"] kafkasink = ["kafka", "openssl"] elasticsink = ["elasticsearch", "tokio"] fingerprint = ["murmur3"] +aws = ["aws-config", "aws-sdk-sqs", "tokio"] diff --git a/src/bin/oura/daemon.rs b/src/bin/oura/daemon.rs index d1d84a71..3666c329 100644 --- a/src/bin/oura/daemon.rs +++ b/src/bin/oura/daemon.rs @@ -35,6 +35,9 @@ use oura::sinks::kafka::Config as KafkaConfig; #[cfg(feature = "elasticsink")] use oura::sinks::elastic::Config as ElasticConfig; +#[cfg(feature = "aws")] +use oura::sinks::aws_sqs::Config as AwsSqsConfig; + #[cfg(feature = "fingerprint")] use oura::filters::fingerprint::Config as FingerprintConfig; @@ -99,6 +102,9 @@ enum Sink { #[cfg(feature = "elasticsink")] Elastic(ElasticConfig), + + #[cfg(feature = "aws")] + AwsSqs(AwsSqsConfig), } fn bootstrap_sink(config: Sink, input: StageReceiver, utils: Arc) -> BootstrapResult { @@ -118,6 +124,9 @@ fn bootstrap_sink(config: Sink, input: StageReceiver, utils: Arc) -> Boot #[cfg(feature = "elasticsink")] Sink::Elastic(c) => WithUtils::new(c, utils).bootstrap(input), + + #[cfg(feature = "aws")] + Sink::AwsSqs(c) => WithUtils::new(c, utils).bootstrap(input), } } diff --git a/src/sinks/aws_sqs/mod.rs b/src/sinks/aws_sqs/mod.rs new file mode 100644 index 00000000..0a447c1d --- /dev/null +++ b/src/sinks/aws_sqs/mod.rs @@ -0,0 +1,4 @@ +mod run; +mod setup; + +pub use setup::*; diff --git a/src/sinks/aws_sqs/run.rs b/src/sinks/aws_sqs/run.rs new file mode 100644 index 00000000..3c993b4e --- /dev/null +++ b/src/sinks/aws_sqs/run.rs @@ -0,0 +1,66 @@ +use aws_sdk_sqs::Client; +use serde_json::json; +use std::sync::Arc; + +use crate::{model::Event, pipelining::StageReceiver, utils::Utils, Error}; + +async fn send_sqs_msg( + client: Arc, + queue_url: &str, + group_id: &str, + fifo: bool, + event: &Event, +) -> Result<(), Error> { + let body = json!(event).to_string(); + + let mut req = client + .send_message() + .queue_url(queue_url) + .message_body(body); + + if fifo { + req = req.message_group_id(group_id); + + if let Some(id) = &event.fingerprint { + req = req.message_deduplication_id(id) + } + } + + let res = req.send().await?; + + log::trace!("SQS send response: {:?}", res); + + Ok(()) +} + +pub fn writer_loop( + input: StageReceiver, + client: Client, + queue_url: &str, + fifo: bool, + group_id: &str, + utils: Arc, +) -> Result<(), Error> { + let client = Arc::new(client); + + let rt = tokio::runtime::Builder::new_current_thread() + .enable_time() + .enable_io() + .build()?; + + loop { + let event = input.recv().unwrap(); + + // notify the pipeline where we are + utils.track_sink_progress(&event); + + let client = client.clone(); + + let result = rt.block_on(send_sqs_msg(client, queue_url, group_id, fifo, &event)); + + if let Err(err) = result { + log::error!("unrecoverable error sending message to SQS: {:?}", err); + break Err(err); + } + } +} diff --git a/src/sinks/aws_sqs/setup.rs b/src/sinks/aws_sqs/setup.rs new file mode 100644 index 00000000..9cae80eb --- /dev/null +++ b/src/sinks/aws_sqs/setup.rs @@ -0,0 +1,59 @@ +use std::borrow::Cow; + +use aws_config::{self, meta::region::RegionProviderChain, RetryConfig}; +use aws_sdk_sqs::{Client, Region}; +use serde::Deserialize; + +use crate::{ + pipelining::{BootstrapResult, SinkProvider, StageReceiver}, + utils::WithUtils, +}; + +use super::run::writer_loop; + +const DEFAULT_MAX_RETRIES: u32 = 5; + +#[derive(Default, Debug, Deserialize)] +pub struct Config { + pub region: String, + pub queue_url: String, + pub fifo: Option, + pub group_id: Option, + pub max_retries: Option, +} + +impl SinkProvider for WithUtils { + fn bootstrap(&self, input: StageReceiver) -> BootstrapResult { + let explicit_region = Cow::Owned(self.inner.region.to_owned()); + + let region_provider = + RegionProviderChain::first_try(Region::new(explicit_region)).or_default_provider(); + + let aws_config = tokio::runtime::Runtime::new()? + .block_on(aws_config::from_env().region(region_provider).load()); + + let retry_config = RetryConfig::new() + .with_max_attempts(self.inner.max_retries.unwrap_or(DEFAULT_MAX_RETRIES)); + + let sqs_config = aws_sdk_sqs::config::Builder::from(&aws_config) + .retry_config(retry_config) + .build(); + + let client = Client::from_conf(sqs_config); + let queue_url = self.inner.queue_url.clone(); + let fifo = self.inner.fifo.unwrap_or_default(); + let group_id = self + .inner + .group_id + .clone() + .unwrap_or_else(|| "oura-sink".to_string()); + + let utils = self.utils.clone(); + let handle = std::thread::spawn(move || { + writer_loop(input, client, &queue_url, fifo, &group_id, utils) + .expect("writer loop failed") + }); + + Ok(handle) + } +} diff --git a/src/sinks/mod.rs b/src/sinks/mod.rs index 21e3dba0..d499269d 100644 --- a/src/sinks/mod.rs +++ b/src/sinks/mod.rs @@ -13,3 +13,6 @@ pub mod kafka; #[cfg(feature = "elasticsink")] pub mod elastic; + +#[cfg(feature = "aws")] +pub mod aws_sqs;