From 8e402a30930d12bc823ea0845d93504659ad9410 Mon Sep 17 00:00:00 2001 From: rvcas Date: Tue, 5 Apr 2022 23:36:27 -0400 Subject: [PATCH 01/10] chore: add gcp raw and feature flags --- Cargo.lock | 280 +++++++++++++++++++++++++++++++++++++++++++++++++++-- Cargo.toml | 4 + 2 files changed, 278 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 61ce8258..d7f0ab97 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -26,6 +26,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "ansi_term" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2" +dependencies = [ + "winapi", +] + [[package]] name = "ascii" version = "1.0.0" @@ -455,12 +464,44 @@ version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf9ff0bbfd639f15c74af777d81383cf53efb7c93613f6cab67c6c11e05bbf8b" +[[package]] +name = "bindgen" +version = "0.59.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bd2a9a458e8f4304c52c43ebb0cfbd520289f8379a52e329a38afda99bf8eb8" +dependencies = [ + "bitflags", + "cexpr", + "clang-sys", + "clap 2.34.0", + "env_logger", + "lazy_static", + "lazycell", + "log 0.4.16", + "peeking_take_while", + "proc-macro2", + "quote", + "regex", + "rustc-hash", + "shlex", + "which", +] + [[package]] name = "bitflags" version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +[[package]] +name = "boringssl-src" +version = "0.5.1+b9232f9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13550d246f6517024ac7f53ae2f1016bb3ed3b238f1489f8564380b635071664" +dependencies = [ + "cmake", +] + [[package]] name = "build_const" version = "0.2.2" @@ -513,6 +554,15 @@ version = "1.0.72" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22a9137b95ea06864e018375b72adfb7db6e6f68cfc8df5a04d00288050485ee" +[[package]] +name = "cexpr" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" +dependencies = [ + "nom", +] + [[package]] name = "cfg-if" version = "0.1.10" @@ -544,6 +594,32 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fff857943da45f546682664a79488be82e69e43c1a7a2307679ab9afb3a66d2e" +[[package]] +name = "clang-sys" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cc00842eed744b858222c4c9faf7243aafc6d33f92f96935263ef4d8a41ce21" +dependencies = [ + "glob", + "libc", + "libloading", +] + +[[package]] +name = "clap" +version = "2.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0610544180c38b88101fecf2dd634b174a62eef6946f84dfc6a7127512b381c" +dependencies = [ + "ansi_term", + "atty", + "bitflags", + "strsim 0.8.0", + "textwrap 0.11.0", + "unicode-width", + "vec_map", +] + [[package]] name = "clap" version = "3.1.11" @@ -554,9 +630,19 @@ dependencies = [ "bitflags", "clap_lex", "indexmap", - "strsim", + "os_str_bytes", + "strsim 0.10.0", "termcolor", - "textwrap", + "textwrap 0.15.0", +] + +[[package]] +name = "cmake" +version = "0.1.48" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8ad8cef104ac57b68b89df3208164d228503abbdce70f6880ffa3d970e7443a" +dependencies = [ + "cc", ] [[package]] @@ -692,7 +778,7 @@ dependencies = [ "ident_case", "proc-macro2", "quote", - "strsim", + "strsim 0.10.0", "syn", ] @@ -842,6 +928,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f73fe65f54d1e12b726f517d3e2135ca3125a437b6d998caf1962961f7172d9e" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.21" @@ -849,6 +950,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3083ce4b914124575708913bca19bfe887522d6e2e6d0952943f5eac4a74010" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -857,6 +959,17 @@ version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3" +[[package]] +name = "futures-executor" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9420b90cfa29e327d0429f19be13e7ddb68fa1cccb09d65e5706b8c7a749b8a6" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.21" @@ -892,9 +1005,11 @@ version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a" dependencies = [ + "futures-channel", "futures-core", "futures-io", "futures-macro", + "futures-sink", "futures-task", "memchr", "pin-project-lite", @@ -919,6 +1034,54 @@ version = "0.26.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78cc372d058dcf6d5ecd98510e7fbc9e5aec4d21de70f65fea8fecebcd881bd4" +[[package]] +name = "glob" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574" + +[[package]] +name = "google-cloud-rust-raw" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "159e4a62540e8e00f2b918817e3033138a2671962bccffd6932450e97696abb4" +dependencies = [ + "futures", + "grpcio", + "protobuf", +] + +[[package]] +name = "grpcio" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3e636f21532a4a55b3c43d8852014686f54898dd0ea083e40a6b040bbc6d737" +dependencies = [ + "futures-executor", + "futures-util", + "grpcio-sys", + "libc", + "log 0.4.16", + "parking_lot 0.11.2", + "protobuf", +] + +[[package]] +name = "grpcio-sys" +version = "0.10.1+1.44.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "925586932dbbea927e913783da0be160ee74e0b0519d7b20cec35547a0a84631" +dependencies = [ + "bindgen", + "boringssl-src", + "cc", + "cmake", + "libc", + "libz-sys", + "pkg-config", + "walkdir", +] + [[package]] name = "h2" version = "0.3.11" @@ -1155,11 +1318,39 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" +[[package]] +name = "lazycell" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" + [[package]] name = "libc" -version = "0.2.117" +version = "0.2.121" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "efaa7b300f3b5fe8eb6bf21ce3895e1751d9665086af2d64b42f19701015ff4f" + +[[package]] +name = "libloading" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e74d72e0f9b65b5b4ca49a346af3976df0f9c61d550727f349ecd559f251a26c" +checksum = "efbc0f03f9a775e9f6aed295c6a1ba2253c5757a9e03d55c6caa46a681abcddd" +dependencies = [ + "cfg-if 1.0.0", + "winapi", +] + +[[package]] +name = "libz-sys" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f35facd4a5673cb5a48822be2be1d4236c1c99cb4113cab7061ac720d5bf859" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] [[package]] name = "lock_api" @@ -1482,12 +1673,13 @@ dependencies = [ "aws-sdk-s3", "aws-sdk-sqs", "bech32", - "clap", + "clap 3.1.8", "config", "crossterm 0.23.2", "elasticsearch", "env_logger", "file-rotate", + "google-cloud-rust-raw", "hex", "kafka", "log 0.4.16", @@ -1634,6 +1826,12 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8835116a5c179084a830efb3adc117ab007512b535bc1a21c991d3b32a6b44dd" +[[package]] +name = "peeking_take_while" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" + [[package]] name = "percent-encoding" version = "2.1.0" @@ -1743,6 +1941,12 @@ dependencies = [ "tiny_http", ] +[[package]] +name = "protobuf" +version = "2.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf7e6d18738ecd0902d30d1ad232c9125985a3422929b16c65517b38adc14f96" + [[package]] name = "quote" version = "1.0.15" @@ -1892,6 +2096,12 @@ version = "0.1.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ef03e0a2b150c7a90d01faf6254c9c48a41e95fb2a8c2ac1c6f0d2b9aefc342" +[[package]] +name = "rustc-hash" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" + [[package]] name = "rustc_version" version = "0.2.3" @@ -1947,6 +2157,15 @@ version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73b4b750c782965c211b42f022f59af1fbceabdd026623714f104152f1ec149f" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "schannel" version = "0.1.19" @@ -2083,6 +2302,12 @@ dependencies = [ "syn", ] +[[package]] +name = "shlex" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43b2853a4d09f215c24cc5489c992ce46052d359b5109343cbafbf26bc62f8a3" + [[package]] name = "signal-hook" version = "0.3.13" @@ -2158,6 +2383,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +[[package]] +name = "strsim" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" + [[package]] name = "strsim" version = "0.10.0" @@ -2217,6 +2448,15 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "textwrap" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060" +dependencies = [ + "unicode-width", +] + [[package]] name = "textwrap" version = "0.15.0" @@ -2525,6 +2765,12 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" +[[package]] +name = "vec_map" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191" + [[package]] name = "version_check" version = "0.9.4" @@ -2537,6 +2783,17 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" +[[package]] +name = "walkdir" +version = "2.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56" +dependencies = [ + "same-file", + "winapi", + "winapi-util", +] + [[package]] name = "want" version = "0.3.0" @@ -2639,6 +2896,17 @@ dependencies = [ "untrusted", ] +[[package]] +name = "which" +version = "4.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c4fb54e6113b6a8772ee41c3404fb0301ac79604489467e0a9ce1f3e97c24ae" +dependencies = [ + "either", + "lazy_static", + "libc", +] + [[package]] name = "winapi" version = "0.3.9" diff --git a/Cargo.toml b/Cargo.toml index 63429d66..aaf81f86 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,6 +61,9 @@ tokio = { version = "1.17.0", optional = true, features = ["rt"] } # required for CI to complete successfully openssl = { version = "0.10", optional = true, features = ["vendored"] } +# features: gcp +google-cloud-rust-raw = { version = "0.13.0", optional = true } + [features] default = [] logs = ["file-rotate"] @@ -70,3 +73,4 @@ kafkasink = ["kafka", "openssl"] elasticsink = ["elasticsearch", "tokio"] fingerprint = ["murmur3"] aws = ["aws-config", "aws-sdk-sqs", "aws-sdk-lambda", "aws-sdk-s3", "tokio"] +gcp = ["google-cloud-rust-raw"] From ac5596c1ef80e42b84007eccf8802b657a858305 Mon Sep 17 00:00:00 2001 From: rvcas Date: Wed, 6 Apr 2022 20:53:15 -0400 Subject: [PATCH 02/10] feat: implement the pubsub sink --- Cargo.lock | 2 + Cargo.toml | 4 +- src/sinks/gcp_pubsub/mod.rs | 4 ++ src/sinks/gcp_pubsub/run.rs | 57 +++++++++++++++++++++++++ src/sinks/gcp_pubsub/setup.rs | 80 +++++++++++++++++++++++++++++++++++ src/sinks/mod.rs | 3 ++ 6 files changed, 149 insertions(+), 1 deletion(-) create mode 100644 src/sinks/gcp_pubsub/mod.rs create mode 100644 src/sinks/gcp_pubsub/run.rs create mode 100644 src/sinks/gcp_pubsub/setup.rs diff --git a/Cargo.lock b/Cargo.lock index d7f0ab97..7beae260 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1680,6 +1680,7 @@ dependencies = [ "env_logger", "file-rotate", "google-cloud-rust-raw", + "grpcio", "hex", "kafka", "log 0.4.16", @@ -1689,6 +1690,7 @@ dependencies = [ "openssl", "pallas", "prometheus_exporter", + "protobuf", "reqwest", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index aaf81f86..aad4c524 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -63,6 +63,8 @@ openssl = { version = "0.10", optional = true, features = ["vendored"] } # features: gcp google-cloud-rust-raw = { version = "0.13.0", optional = true } +grpcio = { version = "0.10.0", optional = true } +protobuf = { version = "2.27.1", optional = true } [features] default = [] @@ -73,4 +75,4 @@ kafkasink = ["kafka", "openssl"] elasticsink = ["elasticsearch", "tokio"] fingerprint = ["murmur3"] aws = ["aws-config", "aws-sdk-sqs", "aws-sdk-lambda", "aws-sdk-s3", "tokio"] -gcp = ["google-cloud-rust-raw"] +gcp = ["google-cloud-rust-raw", "grpcio", "protobuf"] diff --git a/src/sinks/gcp_pubsub/mod.rs b/src/sinks/gcp_pubsub/mod.rs new file mode 100644 index 00000000..0a447c1d --- /dev/null +++ b/src/sinks/gcp_pubsub/mod.rs @@ -0,0 +1,4 @@ +mod run; +mod setup; + +pub use setup::*; diff --git a/src/sinks/gcp_pubsub/run.rs b/src/sinks/gcp_pubsub/run.rs new file mode 100644 index 00000000..acf585b7 --- /dev/null +++ b/src/sinks/gcp_pubsub/run.rs @@ -0,0 +1,57 @@ +use std::{sync::Arc, time::SystemTime}; + +use google_cloud_rust_raw::pubsub::v1::{ + pubsub::{PublishRequest, PublishResponse, PubsubMessage, Topic}, + pubsub_grpc::PublisherClient, +}; +use protobuf::RepeatedField; +use serde_json::json; + +use crate::{model::Event, pipelining::StageReceiver, utils::Utils, Error}; + +fn send_pubsub_msg( + client: &PublisherClient, + topic: &Topic, + event: &Event, +) -> ::grpcio::Result { + let body = json!(event).to_string(); + + let timestamp_in_seconds = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs(); + + let mut timestamp = protobuf::well_known_types::Timestamp::new(); + timestamp.set_seconds(timestamp_in_seconds as i64); + + let mut pubsub_msg = PubsubMessage::new(); + pubsub_msg.set_data(body.into_bytes()); + pubsub_msg.set_publish_time(timestamp); + + let mut request = PublishRequest::new(); + request.set_topic(topic.get_name().to_string()); + request.set_messages(RepeatedField::from_vec(vec![pubsub_msg])); + + client.publish(&request) +} + +pub fn writer_loop( + input: StageReceiver, + publisher: PublisherClient, + topic: Topic, + utils: Arc, +) -> Result<(), Error> { + for event in input.iter() { + // notify the pipeline where we are + utils.track_sink_progress(&event); + + let result = send_pubsub_msg(&publisher, &topic, &event); + + if let Err(err) = result { + log::error!("unrecoverable error sending message to PubSub: {:?}", err); + return Err(Box::new(err)); + } + } + + Ok(()) +} diff --git a/src/sinks/gcp_pubsub/setup.rs b/src/sinks/gcp_pubsub/setup.rs new file mode 100644 index 00000000..89cdb34f --- /dev/null +++ b/src/sinks/gcp_pubsub/setup.rs @@ -0,0 +1,80 @@ +use std::{collections::HashMap, sync::Arc}; + +use google_cloud_rust_raw::pubsub::v1::{ + pubsub::{GetTopicRequest, Topic}, + pubsub_grpc::PublisherClient, +}; +use grpcio::{Channel, ChannelBuilder, ChannelCredentials, EnvBuilder}; +use serde::Deserialize; + +use crate::{ + pipelining::{BootstrapResult, SinkProvider, StageReceiver}, + utils::WithUtils, +}; + +use super::run::writer_loop; + +#[derive(Debug, Default, Deserialize)] +pub struct Config { + pub project_id: String, + pub topic_name: String, +} + +impl SinkProvider for WithUtils { + fn bootstrap(&self, input: StageReceiver) -> BootstrapResult { + let project_id = self.inner.project_id.to_owned(); + let topic_name = self.inner.topic_name.to_owned(); + + let channel = connect("pubsub.googleapis.com"); + let publisher = PublisherClient::new(channel); + + // TODO: do we want to do this in the spawned thread instead? + let topic_full_name = format!("projects/{project_id}/topics/{topic_name}"); + let topic = find_or_create_topic(&publisher, &topic_full_name).unwrap(); + + let utils = self.utils.clone(); + let handle = std::thread::spawn(move || { + writer_loop(input, publisher, topic, utils).expect("writer loop failed") + }); + + Ok(handle) + } +} + +fn connect(endpoint: &str) -> Channel { + // Set up the gRPC environment. + let env = Arc::new(EnvBuilder::new().build()); + let creds = + ChannelCredentials::google_default_credentials().expect("No Google credentials found"); + + // Create a channel to connect to Gcloud. + ChannelBuilder::new(env) + // Set the max size to correspond to server-side limits. + .max_send_message_len(1 << 28) + .max_receive_message_len(1 << 28) + .secure_connect(endpoint, creds) +} + +fn find_or_create_topic(client: &PublisherClient, topic_name: &str) -> grpcio::Result { + // find topic + let mut request = GetTopicRequest::new(); + + request.set_topic(topic_name.to_string()); + + if let Ok(topic) = client.get_topic(&request) { + return Ok(topic); + } + + // otherwise create topic + let mut labels = HashMap::new(); + + // TODO: do we need this? + labels.insert("environment".to_string(), "test".to_string()); + + let mut topic = Topic::new(); + + topic.set_name(topic_name.to_string()); + topic.set_labels(labels); + + client.create_topic(&topic) +} diff --git a/src/sinks/mod.rs b/src/sinks/mod.rs index 50a89817..b7f5bb21 100644 --- a/src/sinks/mod.rs +++ b/src/sinks/mod.rs @@ -22,3 +22,6 @@ pub mod aws_lambda; #[cfg(feature = "aws")] pub mod aws_s3; + +#[cfg(feature = "gcp")] +pub mod gcp_pubsub; From 3a7594f76401bada1cc07f236cc6d80e127820cb Mon Sep 17 00:00:00 2001 From: rvcas Date: Wed, 6 Apr 2022 21:40:24 -0400 Subject: [PATCH 03/10] feat: include gcp pubsub in the daemon bin --- src/bin/oura/daemon.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/bin/oura/daemon.rs b/src/bin/oura/daemon.rs index fad3e2ef..3a1e875f 100644 --- a/src/bin/oura/daemon.rs +++ b/src/bin/oura/daemon.rs @@ -44,6 +44,9 @@ use oura::sinks::aws_lambda::Config as AwsLambdaConfig; #[cfg(feature = "aws")] use oura::sinks::aws_s3::Config as AwsS3Config; +#[cfg(feature = "gcp")] +use oura::sinks::gcp_pubsub::Config as GcpPubSubConfig; + #[cfg(feature = "fingerprint")] use oura::filters::fingerprint::Config as FingerprintConfig; @@ -117,6 +120,9 @@ enum Sink { #[cfg(feature = "aws")] AwsS3(AwsS3Config), + + #[cfg(feature = "gcp")] + GcpPubSub(GcpPubSubConfig), } fn bootstrap_sink(config: Sink, input: StageReceiver, utils: Arc) -> BootstrapResult { @@ -145,6 +151,9 @@ fn bootstrap_sink(config: Sink, input: StageReceiver, utils: Arc) -> Boot #[cfg(feature = "aws")] Sink::AwsS3(c) => WithUtils::new(c, utils).bootstrap(input), + + #[cfg(feature = "gcp")] + Sink::GcpPubSub(c) => WithUtils::new(c, utils).bootstrap(input), } } From 57ea2a10649dff09bf235542e1d62051a30eff91 Mon Sep 17 00:00:00 2001 From: rvcas Date: Sat, 9 Apr 2022 20:30:41 -0400 Subject: [PATCH 04/10] fix: do not create or fetch the topic --- src/sinks/gcp_pubsub/setup.rs | 35 +++++++++++------------------------ 1 file changed, 11 insertions(+), 24 deletions(-) diff --git a/src/sinks/gcp_pubsub/setup.rs b/src/sinks/gcp_pubsub/setup.rs index 89cdb34f..9f096a88 100644 --- a/src/sinks/gcp_pubsub/setup.rs +++ b/src/sinks/gcp_pubsub/setup.rs @@ -1,9 +1,6 @@ -use std::{collections::HashMap, sync::Arc}; +use std::sync::Arc; -use google_cloud_rust_raw::pubsub::v1::{ - pubsub::{GetTopicRequest, Topic}, - pubsub_grpc::PublisherClient, -}; +use google_cloud_rust_raw::pubsub::v1::{pubsub::Topic, pubsub_grpc::PublisherClient}; use grpcio::{Channel, ChannelBuilder, ChannelCredentials, EnvBuilder}; use serde::Deserialize; @@ -28,9 +25,8 @@ impl SinkProvider for WithUtils { let channel = connect("pubsub.googleapis.com"); let publisher = PublisherClient::new(channel); - // TODO: do we want to do this in the spawned thread instead? let topic_full_name = format!("projects/{project_id}/topics/{topic_name}"); - let topic = find_or_create_topic(&publisher, &topic_full_name).unwrap(); + let topic = build_topic(&topic_full_name); let utils = self.utils.clone(); let handle = std::thread::spawn(move || { @@ -55,26 +51,17 @@ fn connect(endpoint: &str) -> Channel { .secure_connect(endpoint, creds) } -fn find_or_create_topic(client: &PublisherClient, topic_name: &str) -> grpcio::Result { - // find topic - let mut request = GetTopicRequest::new(); - - request.set_topic(topic_name.to_string()); - - if let Ok(topic) = client.get_topic(&request) { - return Ok(topic); - } +fn build_topic(topic_name: &str) -> Topic { + let mut topic = Topic::new(); - // otherwise create topic - let mut labels = HashMap::new(); + topic.set_name(topic_name.to_string()); - // TODO: do we need this? - labels.insert("environment".to_string(), "test".to_string()); + // let mut labels = HashMap::new(); - let mut topic = Topic::new(); + // // TODO: do we need this? + // labels.insert("environment".to_string(), "test".to_string()); - topic.set_name(topic_name.to_string()); - topic.set_labels(labels); + // topic.set_labels(labels); - client.create_topic(&topic) + topic } From 77844c33bce09c42b9aeed7068cc1221f2494df6 Mon Sep 17 00:00:00 2001 From: rvcas Date: Thu, 14 Apr 2022 19:40:55 -0400 Subject: [PATCH 05/10] feat: pubsub is sending --- .gitignore | 6 +- Cargo.lock | 412 ++++++++++++++++------------------ Cargo.toml | 6 +- src/sinks/gcp_pubsub/run.rs | 50 ++--- src/sinks/gcp_pubsub/setup.rs | 49 +--- 5 files changed, 217 insertions(+), 306 deletions(-) diff --git a/.gitignore b/.gitignore index 2504685c..3f5c6a6d 100644 --- a/.gitignore +++ b/.gitignore @@ -11,4 +11,8 @@ Cargo.lock # Local testdrive environments, mainly for development testdrive/custom -testdrive/assert \ No newline at end of file +testdrive/assert + +oura.toml + +oura-*.json \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 7beae260..5a0037f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -27,13 +27,10 @@ dependencies = [ ] [[package]] -name = "ansi_term" -version = "0.12.1" +name = "arc-swap" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2" -dependencies = [ - "winapi", -] +checksum = "c5d78ce20460b82d3fa150275ed9d55e21064fc7951177baacf86a145c4a4b1f" [[package]] name = "ascii" @@ -440,6 +437,12 @@ dependencies = [ "rustc-demangle", ] +[[package]] +name = "base-x" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4521f3e3d031370679b3b140beb36dfe4801b09ac77e30c61941f97df3ef28b" + [[package]] name = "base58" version = "0.2.0" @@ -464,44 +467,12 @@ version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf9ff0bbfd639f15c74af777d81383cf53efb7c93613f6cab67c6c11e05bbf8b" -[[package]] -name = "bindgen" -version = "0.59.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2bd2a9a458e8f4304c52c43ebb0cfbd520289f8379a52e329a38afda99bf8eb8" -dependencies = [ - "bitflags", - "cexpr", - "clang-sys", - "clap 2.34.0", - "env_logger", - "lazy_static", - "lazycell", - "log 0.4.16", - "peeking_take_while", - "proc-macro2", - "quote", - "regex", - "rustc-hash", - "shlex", - "which", -] - [[package]] name = "bitflags" version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" -[[package]] -name = "boringssl-src" -version = "0.5.1+b9232f9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13550d246f6517024ac7f53ae2f1016bb3ed3b238f1489f8564380b635071664" -dependencies = [ - "cmake", -] - [[package]] name = "build_const" version = "0.2.2" @@ -554,15 +525,6 @@ version = "1.0.72" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22a9137b95ea06864e018375b72adfb7db6e6f68cfc8df5a04d00288050485ee" -[[package]] -name = "cexpr" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" -dependencies = [ - "nom", -] - [[package]] name = "cfg-if" version = "0.1.10" @@ -594,32 +556,6 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fff857943da45f546682664a79488be82e69e43c1a7a2307679ab9afb3a66d2e" -[[package]] -name = "clang-sys" -version = "1.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4cc00842eed744b858222c4c9faf7243aafc6d33f92f96935263ef4d8a41ce21" -dependencies = [ - "glob", - "libc", - "libloading", -] - -[[package]] -name = "clap" -version = "2.34.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0610544180c38b88101fecf2dd634b174a62eef6946f84dfc6a7127512b381c" -dependencies = [ - "ansi_term", - "atty", - "bitflags", - "strsim 0.8.0", - "textwrap 0.11.0", - "unicode-width", - "vec_map", -] - [[package]] name = "clap" version = "3.1.11" @@ -631,18 +567,30 @@ dependencies = [ "clap_lex", "indexmap", "os_str_bytes", - "strsim 0.10.0", + "strsim", "termcolor", - "textwrap 0.15.0", + "textwrap", ] [[package]] -name = "cmake" -version = "0.1.48" +name = "cloud-pubsub" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8ad8cef104ac57b68b89df3208164d228503abbdce70f6880ffa3d970e7443a" +checksum = "2fe98334bc3400fe5b4dc6447fbbc33fcc5bfe5d896aff2bc49f33dd9f656aa4" dependencies = [ - "cc", + "base64 0.13.0", + "bytes", + "goauth", + "hyper", + "hyper-tls", + "lazy_static", + "log 0.4.16", + "rand", + "serde", + "serde_derive", + "serde_json", + "smpl_jwt", + "tokio", ] [[package]] @@ -668,6 +616,12 @@ dependencies = [ "toml", ] +[[package]] +name = "const_fn" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbdcdcb6d86f71c5e97409ad45898af11cbc995b4ee8112d59095a28d376c935" + [[package]] name = "core-foundation" version = "0.9.3" @@ -778,7 +732,7 @@ dependencies = [ "ident_case", "proc-macro2", "quote", - "strsim 0.10.0", + "strsim", "syn", ] @@ -793,6 +747,12 @@ dependencies = [ "syn", ] +[[package]] +name = "discard" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0" + [[package]] name = "dyn-clone" version = "1.0.4" @@ -1035,51 +995,22 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78cc372d058dcf6d5ecd98510e7fbc9e5aec4d21de70f65fea8fecebcd881bd4" [[package]] -name = "glob" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574" - -[[package]] -name = "google-cloud-rust-raw" -version = "0.13.0" +name = "goauth" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "159e4a62540e8e00f2b918817e3033138a2671962bccffd6932450e97696abb4" +checksum = "d94101e84ede813c04773b0a43396c01b5a3a9376537dbce1125858ae090ae60" dependencies = [ + "arc-swap", "futures", - "grpcio", - "protobuf", -] - -[[package]] -name = "grpcio" -version = "0.10.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3e636f21532a4a55b3c43d8852014686f54898dd0ea083e40a6b040bbc6d737" -dependencies = [ - "futures-executor", - "futures-util", - "grpcio-sys", - "libc", "log 0.4.16", - "parking_lot 0.11.2", - "protobuf", -] - -[[package]] -name = "grpcio-sys" -version = "0.10.1+1.44.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "925586932dbbea927e913783da0be160ee74e0b0519d7b20cec35547a0a84631" -dependencies = [ - "bindgen", - "boringssl-src", - "cc", - "cmake", - "libc", - "libz-sys", - "pkg-config", - "walkdir", + "reqwest", + "serde", + "serde_derive", + "serde_json", + "simpl", + "smpl_jwt", + "time 0.2.27", + "tokio", ] [[package]] @@ -1318,40 +1249,12 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" -[[package]] -name = "lazycell" -version = "1.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" - [[package]] name = "libc" version = "0.2.121" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "efaa7b300f3b5fe8eb6bf21ce3895e1751d9665086af2d64b42f19701015ff4f" -[[package]] -name = "libloading" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "efbc0f03f9a775e9f6aed295c6a1ba2253c5757a9e03d55c6caa46a681abcddd" -dependencies = [ - "cfg-if 1.0.0", - "winapi", -] - -[[package]] -name = "libz-sys" -version = "1.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6f35facd4a5673cb5a48822be2be1d4236c1c99cb4113cab7061ac720d5bf859" -dependencies = [ - "cc", - "libc", - "pkg-config", - "vcpkg", -] - [[package]] name = "lock_api" version = "0.4.6" @@ -1673,14 +1576,13 @@ dependencies = [ "aws-sdk-s3", "aws-sdk-sqs", "bech32", - "clap 3.1.8", + "clap", + "cloud-pubsub", "config", "crossterm 0.23.2", "elasticsearch", "env_logger", "file-rotate", - "google-cloud-rust-raw", - "grpcio", "hex", "kafka", "log 0.4.16", @@ -1690,7 +1592,6 @@ dependencies = [ "openssl", "pallas", "prometheus_exporter", - "protobuf", "reqwest", "serde", "serde_json", @@ -1828,12 +1729,6 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8835116a5c179084a830efb3adc117ab007512b535bc1a21c991d3b32a6b44dd" -[[package]] -name = "peeking_take_while" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" - [[package]] name = "percent-encoding" version = "2.1.0" @@ -1908,6 +1803,12 @@ dependencies = [ "version_check", ] +[[package]] +name = "proc-macro-hack" +version = "0.5.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5" + [[package]] name = "proc-macro2" version = "1.0.36" @@ -1943,12 +1844,6 @@ dependencies = [ "tiny_http", ] -[[package]] -name = "protobuf" -version = "2.27.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf7e6d18738ecd0902d30d1ad232c9125985a3422929b16c65517b38adc14f96" - [[package]] name = "quote" version = "1.0.15" @@ -2098,12 +1993,6 @@ version = "0.1.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ef03e0a2b150c7a90d01faf6254c9c48a41e95fb2a8c2ac1c6f0d2b9aefc342" -[[package]] -name = "rustc-hash" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" - [[package]] name = "rustc_version" version = "0.2.3" @@ -2159,15 +2048,6 @@ version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73b4b750c782965c211b42f022f59af1fbceabdd026623714f104152f1ec149f" -[[package]] -name = "same-file" -version = "1.0.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" -dependencies = [ - "winapi-util", -] - [[package]] name = "schannel" version = "0.1.19" @@ -2305,10 +2185,19 @@ dependencies = [ ] [[package]] -name = "shlex" -version = "1.1.0" +name = "sha1" +version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43b2853a4d09f215c24cc5489c992ce46052d359b5109343cbafbf26bc62f8a3" +checksum = "c1da05c97445caa12d05e848c4a4fcbbea29e748ac28f7e80e9b010392063770" +dependencies = [ + "sha1_smol", +] + +[[package]] +name = "sha1_smol" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012" [[package]] name = "signal-hook" @@ -2341,6 +2230,12 @@ dependencies = [ "libc", ] +[[package]] +name = "simpl" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a30f10c911c0355f80f1c2faa8096efc4a58cdf8590b954d5b395efa071c711" + [[package]] name = "slab" version = "0.4.5" @@ -2353,6 +2248,22 @@ version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83" +[[package]] +name = "smpl_jwt" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4370044f8b20f944e05c35d77edd3518e6f21fc4de77e593919f287c6a3f428a" +dependencies = [ + "base64 0.13.0", + "log 0.4.16", + "openssl", + "serde", + "serde_derive", + "serde_json", + "simpl", + "time 0.2.27", +] + [[package]] name = "snap" version = "0.2.5" @@ -2379,6 +2290,15 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" +[[package]] +name = "standback" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e113fb6f3de07a243d434a56ec6f186dfd51cb08448239fe7bcae73f87ff28ff" +dependencies = [ + "version_check", +] + [[package]] name = "static_assertions" version = "1.1.0" @@ -2386,10 +2306,53 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" [[package]] -name = "strsim" -version = "0.8.0" +name = "stdweb" +version = "0.4.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d022496b16281348b52d0e30ae99e01a73d737b2f45d38fed4edf79f9325a1d5" +dependencies = [ + "discard", + "rustc_version 0.2.3", + "stdweb-derive", + "stdweb-internal-macros", + "stdweb-internal-runtime", + "wasm-bindgen", +] + +[[package]] +name = "stdweb-derive" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c87a60a40fccc84bef0652345bbbbbe20a605bf5d0ce81719fc476f5c03b50ef" +dependencies = [ + "proc-macro2", + "quote", + "serde", + "serde_derive", + "syn", +] + +[[package]] +name = "stdweb-internal-macros" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "58fa5ff6ad0d98d1ffa8cb115892b6e69d67799f6763e162a1c9db421dc22e11" +dependencies = [ + "base-x", + "proc-macro2", + "quote", + "serde", + "serde_derive", + "serde_json", + "sha1", + "syn", +] + +[[package]] +name = "stdweb-internal-runtime" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" +checksum = "213701ba3370744dcd1a12960caa4843b3d68b4d1c0a5d575e0d65b2ee9d16c0" [[package]] name = "strsim" @@ -2450,15 +2413,6 @@ dependencies = [ "winapi-util", ] -[[package]] -name = "textwrap" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060" -dependencies = [ - "unicode-width", -] - [[package]] name = "textwrap" version = "0.15.0" @@ -2496,6 +2450,21 @@ dependencies = [ "winapi", ] +[[package]] +name = "time" +version = "0.2.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4752a97f8eebd6854ff91f1c1824cd6160626ac4bd44287f7f4ea2035a02a242" +dependencies = [ + "const_fn", + "libc", + "standback", + "stdweb", + "time-macros 0.1.1", + "version_check", + "winapi", +] + [[package]] name = "time" version = "0.3.7" @@ -2505,7 +2474,17 @@ dependencies = [ "itoa", "libc", "num_threads", - "time-macros", + "time-macros 0.2.3", +] + +[[package]] +name = "time-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "957e9c6e26f12cb6d0dd7fc776bb67a706312e7299aed74c8dd5b17ebb27e2f1" +dependencies = [ + "proc-macro-hack", + "time-macros-impl", ] [[package]] @@ -2514,6 +2493,19 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "25eb0ca3468fc0acc11828786797f6ef9aa1555e4a211a60d64cc8e4d1be47d6" +[[package]] +name = "time-macros-impl" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd3c141a1b43194f3f56a1411225df8646c55781d5f26db825b3d98507eb482f" +dependencies = [ + "proc-macro-hack", + "proc-macro2", + "quote", + "standback", + "syn", +] + [[package]] name = "tiny_http" version = "0.10.0" @@ -2767,12 +2759,6 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" -[[package]] -name = "vec_map" -version = "0.8.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191" - [[package]] name = "version_check" version = "0.9.4" @@ -2785,17 +2771,6 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" -[[package]] -name = "walkdir" -version = "2.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56" -dependencies = [ - "same-file", - "winapi", - "winapi-util", -] - [[package]] name = "want" version = "0.3.0" @@ -2898,17 +2873,6 @@ dependencies = [ "untrusted", ] -[[package]] -name = "which" -version = "4.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c4fb54e6113b6a8772ee41c3404fb0301ac79604489467e0a9ce1f3e97c24ae" -dependencies = [ - "either", - "lazy_static", - "libc", -] - [[package]] name = "winapi" version = "0.3.9" diff --git a/Cargo.toml b/Cargo.toml index aad4c524..9ff333b5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,9 +62,7 @@ tokio = { version = "1.17.0", optional = true, features = ["rt"] } openssl = { version = "0.10", optional = true, features = ["vendored"] } # features: gcp -google-cloud-rust-raw = { version = "0.13.0", optional = true } -grpcio = { version = "0.10.0", optional = true } -protobuf = { version = "2.27.1", optional = true } +cloud-pubsub = { version = "0.8.0", optional = true } [features] default = [] @@ -75,4 +73,4 @@ kafkasink = ["kafka", "openssl"] elasticsink = ["elasticsearch", "tokio"] fingerprint = ["murmur3"] aws = ["aws-config", "aws-sdk-sqs", "aws-sdk-lambda", "aws-sdk-s3", "tokio"] -gcp = ["google-cloud-rust-raw", "grpcio", "protobuf"] +gcp = ["cloud-pubsub", "tokio"] diff --git a/src/sinks/gcp_pubsub/run.rs b/src/sinks/gcp_pubsub/run.rs index acf585b7..c69450b8 100644 --- a/src/sinks/gcp_pubsub/run.rs +++ b/src/sinks/gcp_pubsub/run.rs @@ -1,51 +1,35 @@ -use std::{sync::Arc, time::SystemTime}; +use std::sync::Arc; -use google_cloud_rust_raw::pubsub::v1::{ - pubsub::{PublishRequest, PublishResponse, PubsubMessage, Topic}, - pubsub_grpc::PublisherClient, -}; -use protobuf::RepeatedField; +use cloud_pubsub::{error::Error, topic::PublishMessageResponse, Client, Topic}; use serde_json::json; -use crate::{model::Event, pipelining::StageReceiver, utils::Utils, Error}; +use crate::{model::Event, pipelining::StageReceiver, utils::Utils}; -fn send_pubsub_msg( - client: &PublisherClient, - topic: &Topic, - event: &Event, -) -> ::grpcio::Result { +async fn send_pubsub_msg(client: &Topic, event: &Event) -> Result { let body = json!(event).to_string(); - let timestamp_in_seconds = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_secs(); - - let mut timestamp = protobuf::well_known_types::Timestamp::new(); - timestamp.set_seconds(timestamp_in_seconds as i64); - - let mut pubsub_msg = PubsubMessage::new(); - pubsub_msg.set_data(body.into_bytes()); - pubsub_msg.set_publish_time(timestamp); - - let mut request = PublishRequest::new(); - request.set_topic(topic.get_name().to_string()); - request.set_messages(RepeatedField::from_vec(vec![pubsub_msg])); - - client.publish(&request) + client.publish(body).await } pub fn writer_loop( input: StageReceiver, - publisher: PublisherClient, - topic: Topic, + credentials: String, + topic_name: String, utils: Arc, -) -> Result<(), Error> { +) -> Result<(), crate::Error> { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_time() + .enable_io() + .build()?; + + let publisher = rt.block_on(Client::new(credentials))?; + let topic = publisher.topic(topic_name); + for event in input.iter() { // notify the pipeline where we are utils.track_sink_progress(&event); - let result = send_pubsub_msg(&publisher, &topic, &event); + let result = rt.block_on(send_pubsub_msg(&topic, &event)); if let Err(err) = result { log::error!("unrecoverable error sending message to PubSub: {:?}", err); diff --git a/src/sinks/gcp_pubsub/setup.rs b/src/sinks/gcp_pubsub/setup.rs index 9f096a88..c61cee4a 100644 --- a/src/sinks/gcp_pubsub/setup.rs +++ b/src/sinks/gcp_pubsub/setup.rs @@ -1,7 +1,3 @@ -use std::sync::Arc; - -use google_cloud_rust_raw::pubsub::v1::{pubsub::Topic, pubsub_grpc::PublisherClient}; -use grpcio::{Channel, ChannelBuilder, ChannelCredentials, EnvBuilder}; use serde::Deserialize; use crate::{ @@ -13,55 +9,20 @@ use super::run::writer_loop; #[derive(Debug, Default, Deserialize)] pub struct Config { - pub project_id: String, - pub topic_name: String, + pub topic: String, + pub credentials: String, } impl SinkProvider for WithUtils { fn bootstrap(&self, input: StageReceiver) -> BootstrapResult { - let project_id = self.inner.project_id.to_owned(); - let topic_name = self.inner.topic_name.to_owned(); - - let channel = connect("pubsub.googleapis.com"); - let publisher = PublisherClient::new(channel); - - let topic_full_name = format!("projects/{project_id}/topics/{topic_name}"); - let topic = build_topic(&topic_full_name); + let credentials = self.inner.credentials.to_owned(); + let topic_name = self.inner.topic.to_owned(); let utils = self.utils.clone(); let handle = std::thread::spawn(move || { - writer_loop(input, publisher, topic, utils).expect("writer loop failed") + writer_loop(input, credentials, topic_name, utils).expect("writer loop failed"); }); Ok(handle) } } - -fn connect(endpoint: &str) -> Channel { - // Set up the gRPC environment. - let env = Arc::new(EnvBuilder::new().build()); - let creds = - ChannelCredentials::google_default_credentials().expect("No Google credentials found"); - - // Create a channel to connect to Gcloud. - ChannelBuilder::new(env) - // Set the max size to correspond to server-side limits. - .max_send_message_len(1 << 28) - .max_receive_message_len(1 << 28) - .secure_connect(endpoint, creds) -} - -fn build_topic(topic_name: &str) -> Topic { - let mut topic = Topic::new(); - - topic.set_name(topic_name.to_string()); - - // let mut labels = HashMap::new(); - - // // TODO: do we need this? - // labels.insert("environment".to_string(), "test".to_string()); - - // topic.set_labels(labels); - - topic -} From a8e5a28f7e1f37b0a4a863098e16132bac2b132d Mon Sep 17 00:00:00 2001 From: rvcas Date: Thu, 14 Apr 2022 19:47:47 -0400 Subject: [PATCH 06/10] chore: add some docs for pubsub --- book/src/sinks/README.md | 3 ++- book/src/sinks/gcp_pubsub.md | 18 ++++++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) create mode 100644 book/src/sinks/gcp_pubsub.md diff --git a/book/src/sinks/README.md b/book/src/sinks/README.md index e62a734d..10e9efbe 100644 --- a/book/src/sinks/README.md +++ b/book/src/sinks/README.md @@ -14,5 +14,6 @@ These are the existing sinks that are included as part the main _Oura_ codebase: - [AWS SQS](aws_sqs.md): a sink that sends each event as message to an AWS SQS queue. - [AWS Lamda](aws_lambda.md): a sink that invokes an AWS Lambda function for each event. - [AWS S3](aws_s3.md): a sink that saves the CBOR content of the blocks as an AWS S3 object. +- [GCP PubSub](gcp_pubsub.md): a sink that sends each event as a message to a goolge cloud PubSub topic. -New sinks are being developed, information will be added in this documentation to reflect the updated list. Contributions and feature request are welcome in our [Github Repo](https://github.com/txpipe/oura). \ No newline at end of file +New sinks are being developed, information will be added in this documentation to reflect the updated list. Contributions and feature request are welcome in our [Github Repo](https://github.com/txpipe/oura). diff --git a/book/src/sinks/gcp_pubsub.md b/book/src/sinks/gcp_pubsub.md new file mode 100644 index 00000000..c933bdac --- /dev/null +++ b/book/src/sinks/gcp_pubsub.md @@ -0,0 +1,18 @@ +# Google Cloud PubSub + +A sink that sends each event as a message to a PubSub topic. Each event is json-encoded and sent to a configurable PubSub topic. + +## Configuration + +```toml +[sink] +type = "GcpPubSub" +credentials = "oura-test-347101-ff3f7b2d69cc.json" +topic = "test" +``` + +### Section: `sink` + +- `type`: the literal value `GcpPubSub`. +- `credentials`: the service account json file downloaded from the cloud console. +- `topic`: the short name of the topic to send message to. From cccc0bc2e8c3ddbb557336e55d22197fd6a418ac Mon Sep 17 00:00:00 2001 From: rvcas Date: Thu, 14 Apr 2022 19:49:32 -0400 Subject: [PATCH 07/10] chore: better wording for credentials --- book/src/sinks/gcp_pubsub.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/book/src/sinks/gcp_pubsub.md b/book/src/sinks/gcp_pubsub.md index c933bdac..1814860f 100644 --- a/book/src/sinks/gcp_pubsub.md +++ b/book/src/sinks/gcp_pubsub.md @@ -14,5 +14,5 @@ topic = "test" ### Section: `sink` - `type`: the literal value `GcpPubSub`. -- `credentials`: the service account json file downloaded from the cloud console. +- `credentials`: the path to the service account json file downloaded from the cloud console. - `topic`: the short name of the topic to send message to. From 427cb8ddffea68518b0d009a5c8cad132fd5180f Mon Sep 17 00:00:00 2001 From: rvcas Date: Wed, 20 Apr 2022 21:50:57 -0400 Subject: [PATCH 08/10] chore: remove .gitignore additions and fix build --- .gitignore | 4 ---- book/src/sinks/README.md | 2 +- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/.gitignore b/.gitignore index 3f5c6a6d..fe2fd25f 100644 --- a/.gitignore +++ b/.gitignore @@ -12,7 +12,3 @@ Cargo.lock # Local testdrive environments, mainly for development testdrive/custom testdrive/assert - -oura.toml - -oura-*.json \ No newline at end of file diff --git a/book/src/sinks/README.md b/book/src/sinks/README.md index 10e9efbe..ea4a2ae1 100644 --- a/book/src/sinks/README.md +++ b/book/src/sinks/README.md @@ -16,4 +16,4 @@ These are the existing sinks that are included as part the main _Oura_ codebase: - [AWS S3](aws_s3.md): a sink that saves the CBOR content of the blocks as an AWS S3 object. - [GCP PubSub](gcp_pubsub.md): a sink that sends each event as a message to a goolge cloud PubSub topic. -New sinks are being developed, information will be added in this documentation to reflect the updated list. Contributions and feature request are welcome in our [Github Repo](https://github.com/txpipe/oura). +New sinks are being developed, information will be added in this documentation to reflect the updated list. Contributions and feature request are welcome in our [Github Repo](https://github.com/txpipe/oura). \ No newline at end of file From 1cf69ca467d2979e9d93985daffd6c18990efe56 Mon Sep 17 00:00:00 2001 From: rvcas Date: Wed, 20 Apr 2022 22:14:28 -0400 Subject: [PATCH 09/10] feat(pubsub): add retry logic with backoff delay --- Cargo.lock | 12 +++++++++ Cargo.toml | 3 ++- src/sinks/common.rs | 7 +++++ src/sinks/gcp_pubsub/run.rs | 51 +++++++++++++++++++++++++++-------- src/sinks/gcp_pubsub/setup.rs | 33 ++++++++++++++++++++++- src/sinks/mod.rs | 4 +++ src/sinks/webhook/run.rs | 4 +-- src/sinks/webhook/setup.rs | 7 +---- 8 files changed, 99 insertions(+), 22 deletions(-) create mode 100644 src/sinks/common.rs diff --git a/Cargo.lock b/Cargo.lock index 5a0037f1..d3c1099e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -51,6 +51,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "async-recursion" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2cda8f4bcc10624c4e85bc66b3f452cca98cfa5ca002dc83a16aad2367641bea" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "async-trait" version = "0.1.52" @@ -1571,6 +1582,7 @@ checksum = "8e22443d1643a904602595ba1cd8f7d896afe56d26712531c5ff73a15b2fbf64" name = "oura" version = "1.3.0" dependencies = [ + "async-recursion", "aws-config", "aws-sdk-lambda", "aws-sdk-s3", diff --git a/Cargo.toml b/Cargo.toml index 9ff333b5..33a890ff 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -63,6 +63,7 @@ openssl = { version = "0.10", optional = true, features = ["vendored"] } # features: gcp cloud-pubsub = { version = "0.8.0", optional = true } +async-recursion = { version = "1.0.0", optional = true } [features] default = [] @@ -73,4 +74,4 @@ kafkasink = ["kafka", "openssl"] elasticsink = ["elasticsearch", "tokio"] fingerprint = ["murmur3"] aws = ["aws-config", "aws-sdk-sqs", "aws-sdk-lambda", "aws-sdk-s3", "tokio"] -gcp = ["cloud-pubsub", "tokio"] +gcp = ["cloud-pubsub", "tokio", "async-recursion"] diff --git a/src/sinks/common.rs b/src/sinks/common.rs new file mode 100644 index 00000000..f0c8b908 --- /dev/null +++ b/src/sinks/common.rs @@ -0,0 +1,7 @@ +use serde::Deserialize; + +#[derive(Debug, Deserialize, Clone)] +pub enum ErrorPolicy { + Continue, + Exit, +} diff --git a/src/sinks/gcp_pubsub/run.rs b/src/sinks/gcp_pubsub/run.rs index c69450b8..a96d75f0 100644 --- a/src/sinks/gcp_pubsub/run.rs +++ b/src/sinks/gcp_pubsub/run.rs @@ -1,20 +1,48 @@ -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; -use cloud_pubsub::{error::Error, topic::PublishMessageResponse, Client, Topic}; +use async_recursion::async_recursion; +use cloud_pubsub::{error::Error, Client, Topic}; use serde_json::json; -use crate::{model::Event, pipelining::StageReceiver, utils::Utils}; +use crate::{model::Event, pipelining::StageReceiver, sinks::ErrorPolicy, utils::Utils}; -async fn send_pubsub_msg(client: &Topic, event: &Event) -> Result { +#[async_recursion] +async fn send_pubsub_msg( + client: &Topic, + event: &Event, + policy: &ErrorPolicy, + retry_quota: usize, + backoff_delay: Duration, +) -> Result<(), Error> { let body = json!(event).to_string(); - client.publish(body).await + let result = client.publish(body).await; + + match (result, policy, retry_quota) { + (Ok(_), _, _) => { + log::info!("successful pubsub publish"); + Ok(()) + } + (Err(x), ErrorPolicy::Exit, 0) => Err(x), + (Err(x), ErrorPolicy::Continue, 0) => { + log::warn!("failed to publish to pubsub: {:?}", x); + Ok(()) + } + (Err(x), _, quota) => { + log::warn!("failed attempt to execute pubsub publish: {:?}", x); + std::thread::sleep(backoff_delay); + send_pubsub_msg(client, event, policy, quota - 1, backoff_delay).await + } + } } pub fn writer_loop( input: StageReceiver, credentials: String, topic_name: String, + error_policy: &ErrorPolicy, + max_retries: usize, + backoff_delay: Duration, utils: Arc, ) -> Result<(), crate::Error> { let rt = tokio::runtime::Builder::new_current_thread() @@ -29,12 +57,13 @@ pub fn writer_loop( // notify the pipeline where we are utils.track_sink_progress(&event); - let result = rt.block_on(send_pubsub_msg(&topic, &event)); - - if let Err(err) = result { - log::error!("unrecoverable error sending message to PubSub: {:?}", err); - return Err(Box::new(err)); - } + rt.block_on(send_pubsub_msg( + &topic, + &event, + error_policy, + max_retries, + backoff_delay, + ))?; } Ok(()) diff --git a/src/sinks/gcp_pubsub/setup.rs b/src/sinks/gcp_pubsub/setup.rs index c61cee4a..2e6f07d2 100644 --- a/src/sinks/gcp_pubsub/setup.rs +++ b/src/sinks/gcp_pubsub/setup.rs @@ -1,7 +1,10 @@ +use std::time::Duration; + use serde::Deserialize; use crate::{ pipelining::{BootstrapResult, SinkProvider, StageReceiver}, + sinks::ErrorPolicy, utils::WithUtils, }; @@ -11,16 +14,44 @@ use super::run::writer_loop; pub struct Config { pub topic: String, pub credentials: String, + pub error_policy: Option, + pub max_retries: Option, + pub backoff_delay: Option, } +const DEFAULT_MAX_RETRIES: usize = 20; +const DEFAULT_BACKOFF_DELAY: u64 = 5_000; + impl SinkProvider for WithUtils { fn bootstrap(&self, input: StageReceiver) -> BootstrapResult { let credentials = self.inner.credentials.to_owned(); let topic_name = self.inner.topic.to_owned(); + let error_policy = self + .inner + .error_policy + .as_ref() + .cloned() + .unwrap_or(ErrorPolicy::Exit); + + let max_retries = self.inner.max_retries.unwrap_or(DEFAULT_MAX_RETRIES); + + let backoff_delay = + Duration::from_millis(self.inner.backoff_delay.unwrap_or(DEFAULT_BACKOFF_DELAY)); + let utils = self.utils.clone(); + let handle = std::thread::spawn(move || { - writer_loop(input, credentials, topic_name, utils).expect("writer loop failed"); + writer_loop( + input, + credentials, + topic_name, + &error_policy, + max_retries, + backoff_delay, + utils, + ) + .expect("writer loop failed"); }); Ok(handle) diff --git a/src/sinks/mod.rs b/src/sinks/mod.rs index b7f5bb21..41c00ee8 100644 --- a/src/sinks/mod.rs +++ b/src/sinks/mod.rs @@ -1,7 +1,11 @@ +mod common; + pub mod assert; pub mod stdout; pub mod terminal; +pub use common::*; + #[cfg(feature = "logs")] pub mod logs; diff --git a/src/sinks/webhook/run.rs b/src/sinks/webhook/run.rs index b771191c..20e396f4 100644 --- a/src/sinks/webhook/run.rs +++ b/src/sinks/webhook/run.rs @@ -3,9 +3,7 @@ use std::{sync::Arc, time::Duration}; use reqwest::blocking::Client; use serde::Serialize; -use crate::{model::Event, pipelining::StageReceiver, utils::Utils, Error}; - -use super::ErrorPolicy; +use crate::{model::Event, pipelining::StageReceiver, sinks::ErrorPolicy, utils::Utils, Error}; #[derive(Serialize)] struct RequestBody { diff --git a/src/sinks/webhook/setup.rs b/src/sinks/webhook/setup.rs index 72865e88..fe47f2ad 100644 --- a/src/sinks/webhook/setup.rs +++ b/src/sinks/webhook/setup.rs @@ -5,6 +5,7 @@ use serde::Deserialize; use crate::{ pipelining::{BootstrapResult, SinkProvider, StageReceiver}, + sinks::ErrorPolicy, utils::WithUtils, Error, }; @@ -13,12 +14,6 @@ use super::run::request_loop; static APP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION")); -#[derive(Debug, Deserialize, Clone)] -pub enum ErrorPolicy { - Continue, - Exit, -} - #[derive(Default, Debug, Deserialize)] pub struct Config { pub url: String, From dff6fe4bf0d4c24e3f4028783429b41b478d322f Mon Sep 17 00:00:00 2001 From: rvcas Date: Wed, 20 Apr 2022 22:15:51 -0400 Subject: [PATCH 10/10] chore: no newline in .gitignore --- .gitignore | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index fe2fd25f..2504685c 100644 --- a/.gitignore +++ b/.gitignore @@ -11,4 +11,4 @@ Cargo.lock # Local testdrive environments, mainly for development testdrive/custom -testdrive/assert +testdrive/assert \ No newline at end of file