diff --git a/Cargo.lock b/Cargo.lock index 7c3fe784..27ae4d5f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -447,6 +447,12 @@ version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0c8ff0461b82559810cdccfde3215c3f373807f5e5232b71479bff7bb2583d7" +[[package]] +name = "futures-io" +version = "0.3.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1f9d34af5a1aac6fb380f735fe510746c38067c5bf16c7fd250280503c971b2" + [[package]] name = "futures-sink" version = "0.3.19" @@ -466,9 +472,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9b5cf40b47a271f77a8b1bec03ca09044d99d2372c0de244e66430761127164" dependencies = [ "futures-core", + "futures-io", "futures-task", + "memchr", "pin-project-lite", "pin-utils", + "slab", ] [[package]] @@ -928,6 +937,16 @@ dependencies = [ "autocfg", ] +[[package]] +name = "num_cpus" +version = "1.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19e64526ebdee182341572e50e9ad03965aa510cd94427a4549448f285e957a1" +dependencies = [ + "hermit-abi", + "libc", +] + [[package]] name = "object" version = "0.27.1" @@ -1014,6 +1033,7 @@ dependencies = [ "net2", "openssl", "pallas", + "reqwest", "serde", "serde_derive", "serde_json", @@ -1660,6 +1680,7 @@ dependencies = [ "libc", "memchr", "mio", + "num_cpus", "pin-project-lite", "winapi", ] diff --git a/Cargo.toml b/Cargo.toml index 7da6a002..33b7ae8c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,10 @@ serde_json = "1.0.74" serde_derive = "1.0.130" strum = "0.23" strum_macros = "0.23" +minicbor = "0.12.1" + +# feature: webhook +reqwest = { version = "0.11", optional = true, features = ["blocking", "json"] } # feature: tuisink tui = { version = "0.16", optional = true, default-features = false, features = ["crossterm"] } @@ -46,11 +50,11 @@ murmur3 = { version = "0.5.1", optional = true } # required for CI to complete successfully openssl = { version = "0.10", optional = true, features = ["vendored"] } -minicbor = "0.12.1" [features] default = [] -fingerprint = ["murmur3"] +webhook = ["reqwest"] +tuisink = ["tui"] kafkasink = ["kafka", "openssl"] elasticsink = ["elasticsearch", "tokio"] -tuisink = ["tui"] +fingerprint = ["murmur3"] diff --git a/README.md b/README.md index c89be87f..1a34aa18 100644 --- a/README.md +++ b/README.md @@ -68,7 +68,7 @@ If the available out-of-the-box features don't satisfiy your particular use-case - [ ] AWS SQS queue - [ ] AWS Lambda call - [ ] GCP PubSub - - [ ] webhook (http post) + - [x] webhook (http post) - [x] terminal (append-only, tail-like) - [ ] TUI - Filters diff --git a/src/bin/oura/daemon.rs b/src/bin/oura/daemon.rs index 9b67c0f6..80ef657d 100644 --- a/src/bin/oura/daemon.rs +++ b/src/bin/oura/daemon.rs @@ -11,6 +11,9 @@ use oura::sources::n2c::Config as N2CConfig; use oura::sources::n2n::Config as N2NConfig; use serde_derive::Deserialize; +#[cfg(feature = "webhook")] +use oura::sinks::webhook::Config as WebhookConfig; + #[cfg(feature = "kafkasink")] use oura::sinks::kafka::Config as KafkaConfig; @@ -68,6 +71,9 @@ impl FilterConfig for Filter { enum Sink { Terminal(TerminalConfig), + #[cfg(feature = "webhook")] + Webhook(WebhookConfig), + #[cfg(feature = "kafkasink")] Kafka(KafkaConfig), @@ -80,6 +86,9 @@ impl SinkConfig for Sink { match self { Sink::Terminal(c) => c.bootstrap(input), + #[cfg(feature = "webhook")] + Sink::Webhook(c) => c.bootstrap(input), + #[cfg(feature = "kafkasink")] Sink::Kafka(c) => c.bootstrap(input), diff --git a/src/filters/fingerpint.rs b/src/filters/fingerpint.rs index 48973034..abac4f6d 100644 --- a/src/filters/fingerpint.rs +++ b/src/filters/fingerpint.rs @@ -132,7 +132,7 @@ fn build_fingerprint(event: &Event, seed: u32) -> Result { .with_prefix("coll") .append_slice(tx_id)? .append_to_string(index)?, - EventData::NativeScript => b + EventData::NativeScript {} => b .with_slot(&event.context.slot) .with_prefix("scpt") .append_optional(&event.context.tx_hash)?, diff --git a/src/framework.rs b/src/framework.rs index 5c26ebe6..e434b3fd 100644 --- a/src/framework.rs +++ b/src/framework.rs @@ -208,7 +208,7 @@ pub enum EventData { tx_id: String, index: u64, }, - NativeScript, + NativeScript {}, PlutusScript { data: String, }, diff --git a/src/sinks/mod.rs b/src/sinks/mod.rs index b0193479..3fcc0f8f 100644 --- a/src/sinks/mod.rs +++ b/src/sinks/mod.rs @@ -1,5 +1,8 @@ pub mod terminal; +#[cfg(feature = "webhook")] +pub mod webhook; + #[cfg(feature = "tuisink")] pub mod tui; diff --git a/src/sinks/terminal/format.rs b/src/sinks/terminal/format.rs index 4ea64c9b..57c656bf 100644 --- a/src/sinks/terminal/format.rs +++ b/src/sinks/terminal/format.rs @@ -106,7 +106,7 @@ impl LogLine { source, max_width, }, - EventData::NativeScript => LogLine { + EventData::NativeScript {} => LogLine { prefix: "NATIVE", color: Color::White, content: "{{ ... }}".to_string(), diff --git a/src/sinks/webhook/mod.rs b/src/sinks/webhook/mod.rs new file mode 100644 index 00000000..0a447c1d --- /dev/null +++ b/src/sinks/webhook/mod.rs @@ -0,0 +1,4 @@ +mod run; +mod setup; + +pub use setup::*; diff --git a/src/sinks/webhook/run.rs b/src/sinks/webhook/run.rs new file mode 100644 index 00000000..c6e276fe --- /dev/null +++ b/src/sinks/webhook/run.rs @@ -0,0 +1,77 @@ +use std::time::Duration; + +use reqwest::blocking::Client; +use serde::Serialize; + +use crate::framework::{Error, Event, StageReceiver}; + +use super::ErrorPolicy; + +#[derive(Serialize)] +struct RequestBody { + #[serde(flatten)] + event: Event, + variant: String, + timestamp: Option, +} + +impl From for RequestBody { + fn from(event: Event) -> Self { + let timestamp = event.context.timestamp.map(|x| x * 1000); + let variant = event.data.to_string(); + + RequestBody { + event, + timestamp, + variant, + } + } +} + +fn execute_fallible_request( + client: &Client, + url: &str, + body: &RequestBody, + policy: &ErrorPolicy, + retry_quota: usize, + backoff_delay: Duration, +) -> Result<(), Error> { + let request = client.post(url).json(body).build()?; + + let result = client + .execute(request) + .and_then(|res| res.error_for_status()); + + match (result, policy, retry_quota) { + (Ok(_), _, _) => { + log::info!("successful http request to webhook"); + Ok(()) + } + (Err(x), ErrorPolicy::Exit, 0) => Err(x.into()), + (Err(x), ErrorPolicy::Continue, 0) => { + log::warn!("failed to send webhook request: {:?}", x); + Ok(()) + } + (Err(x), _, quota) => { + log::warn!("failed attempt to execute webhook request: {:?}", x); + std::thread::sleep(backoff_delay); + execute_fallible_request(client, url, body, policy, quota - 1, backoff_delay) + } + } +} + +pub(crate) fn request_loop( + input: StageReceiver, + client: &Client, + url: &str, + error_policy: &ErrorPolicy, + max_retries: usize, + backoff_delay: Duration, +) -> Result<(), Error> { + loop { + let event = input.recv().unwrap(); + let body = RequestBody::from(event); + + execute_fallible_request(client, url, &body, error_policy, max_retries, backoff_delay)?; + } +} diff --git a/src/sinks/webhook/setup.rs b/src/sinks/webhook/setup.rs new file mode 100644 index 00000000..e97d6a98 --- /dev/null +++ b/src/sinks/webhook/setup.rs @@ -0,0 +1,91 @@ +use std::{collections::HashMap, time::Duration}; + +use reqwest::header::{self, HeaderMap, HeaderName, HeaderValue}; +use serde_derive::Deserialize; + +use crate::framework::{BootstrapResult, Error, SinkConfig, StageReceiver}; + +use super::run::request_loop; + +static APP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION")); + +#[derive(Debug, Deserialize, Clone)] +pub enum ErrorPolicy { + Continue, + Exit, +} + +#[derive(Default, Debug, Deserialize)] +pub struct Config { + url: String, + authorization: Option, + headers: Option>, + timeout: Option, + error_policy: Option, + max_retries: Option, + backoff_delay: Option, +} + +fn build_headers_map(config: &Config) -> Result { + let mut headers = HeaderMap::new(); + + headers.insert( + header::CONTENT_TYPE, + HeaderValue::try_from("application/json")?, + ); + + if let Some(auth_value) = &config.authorization { + let auth_value = HeaderValue::try_from(auth_value)?; + headers.insert(header::AUTHORIZATION, auth_value); + } + + if let Some(custom) = &config.headers { + for (name, value) in custom.iter() { + let name = HeaderName::try_from(name)?; + let value = HeaderValue::try_from(value)?; + headers.insert(name, value); + } + } + + Ok(headers) +} + +const DEFAULT_MAX_RETRIES: usize = 20; +const DEFAULT_BACKOFF_DELAY: u64 = 5_000; + +impl SinkConfig for Config { + fn bootstrap(&self, input: StageReceiver) -> BootstrapResult { + let client = reqwest::blocking::ClientBuilder::new() + .user_agent(APP_USER_AGENT) + .default_headers(build_headers_map(self)?) + .timeout(Duration::from_millis(self.timeout.unwrap_or(30000))) + .build()?; + + let url = self.url.clone(); + + let error_policy = self + .error_policy + .as_ref() + .cloned() + .unwrap_or(ErrorPolicy::Exit); + + let max_retries = self.max_retries.unwrap_or(DEFAULT_MAX_RETRIES); + + let backoff_delay = + Duration::from_millis(self.backoff_delay.unwrap_or(DEFAULT_BACKOFF_DELAY)); + + let handle = std::thread::spawn(move || { + request_loop( + input, + &client, + &url, + &error_policy, + max_retries, + backoff_delay, + ) + .expect("request loop failed") + }); + + Ok(handle) + } +}