Skip to content

Commit

Permalink
feat: Introduce "Webhook" sink (#51)
Browse files Browse the repository at this point in the history
Introduces a new sink that outputs events to a remote endpoint using HTTP calls.

Co-authored-by: Mark Stopka <mark.stopka@perlur.cloud>
  • Loading branch information
scarmuega and mark-stopka authored Jan 12, 2022
1 parent dfe7ce6 commit e47e021
Show file tree
Hide file tree
Showing 11 changed files with 216 additions and 7 deletions.
21 changes: 21 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 7 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ serde_json = "1.0.74"
serde_derive = "1.0.130"
strum = "0.23"
strum_macros = "0.23"
minicbor = "0.12.1"

# feature: webhook
reqwest = { version = "0.11", optional = true, features = ["blocking", "json"] }

# feature: tuisink
tui = { version = "0.16", optional = true, default-features = false, features = ["crossterm"] }
Expand All @@ -46,11 +50,11 @@ murmur3 = { version = "0.5.1", optional = true }

# required for CI to complete successfully
openssl = { version = "0.10", optional = true, features = ["vendored"] }
minicbor = "0.12.1"

[features]
default = []
fingerprint = ["murmur3"]
webhook = ["reqwest"]
tuisink = ["tui"]
kafkasink = ["kafka", "openssl"]
elasticsink = ["elasticsearch", "tokio"]
tuisink = ["tui"]
fingerprint = ["murmur3"]
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ If the available out-of-the-box features don't satisfiy your particular use-case
- [ ] AWS SQS queue
- [ ] AWS Lambda call
- [ ] GCP PubSub
- [ ] webhook (http post)
- [x] webhook (http post)
- [x] terminal (append-only, tail-like)
- [ ] TUI
- Filters
Expand Down
9 changes: 9 additions & 0 deletions src/bin/oura/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ use oura::sources::n2c::Config as N2CConfig;
use oura::sources::n2n::Config as N2NConfig;
use serde_derive::Deserialize;

#[cfg(feature = "webhook")]
use oura::sinks::webhook::Config as WebhookConfig;

#[cfg(feature = "kafkasink")]
use oura::sinks::kafka::Config as KafkaConfig;

Expand Down Expand Up @@ -68,6 +71,9 @@ impl FilterConfig for Filter {
enum Sink {
Terminal(TerminalConfig),

#[cfg(feature = "webhook")]
Webhook(WebhookConfig),

#[cfg(feature = "kafkasink")]
Kafka(KafkaConfig),

Expand All @@ -80,6 +86,9 @@ impl SinkConfig for Sink {
match self {
Sink::Terminal(c) => c.bootstrap(input),

#[cfg(feature = "webhook")]
Sink::Webhook(c) => c.bootstrap(input),

#[cfg(feature = "kafkasink")]
Sink::Kafka(c) => c.bootstrap(input),

Expand Down
2 changes: 1 addition & 1 deletion src/filters/fingerpint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ fn build_fingerprint(event: &Event, seed: u32) -> Result<String, Error> {
.with_prefix("coll")
.append_slice(tx_id)?
.append_to_string(index)?,
EventData::NativeScript => b
EventData::NativeScript {} => b
.with_slot(&event.context.slot)
.with_prefix("scpt")
.append_optional(&event.context.tx_hash)?,
Expand Down
2 changes: 1 addition & 1 deletion src/framework.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ pub enum EventData {
tx_id: String,
index: u64,
},
NativeScript,
NativeScript {},
PlutusScript {
data: String,
},
Expand Down
3 changes: 3 additions & 0 deletions src/sinks/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
pub mod terminal;

#[cfg(feature = "webhook")]
pub mod webhook;

#[cfg(feature = "tuisink")]
pub mod tui;

Expand Down
2 changes: 1 addition & 1 deletion src/sinks/terminal/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl LogLine {
source,
max_width,
},
EventData::NativeScript => LogLine {
EventData::NativeScript {} => LogLine {
prefix: "NATIVE",
color: Color::White,
content: "{{ ... }}".to_string(),
Expand Down
4 changes: 4 additions & 0 deletions src/sinks/webhook/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
mod run;
mod setup;

pub use setup::*;
77 changes: 77 additions & 0 deletions src/sinks/webhook/run.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use std::time::Duration;

use reqwest::blocking::Client;
use serde::Serialize;

use crate::framework::{Error, Event, StageReceiver};

use super::ErrorPolicy;

#[derive(Serialize)]
struct RequestBody {
#[serde(flatten)]
event: Event,
variant: String,
timestamp: Option<u64>,
}

impl From<Event> for RequestBody {
fn from(event: Event) -> Self {
let timestamp = event.context.timestamp.map(|x| x * 1000);
let variant = event.data.to_string();

RequestBody {
event,
timestamp,
variant,
}
}
}

fn execute_fallible_request(
client: &Client,
url: &str,
body: &RequestBody,
policy: &ErrorPolicy,
retry_quota: usize,
backoff_delay: Duration,
) -> Result<(), Error> {
let request = client.post(url).json(body).build()?;

let result = client
.execute(request)
.and_then(|res| res.error_for_status());

match (result, policy, retry_quota) {
(Ok(_), _, _) => {
log::info!("successful http request to webhook");
Ok(())
}
(Err(x), ErrorPolicy::Exit, 0) => Err(x.into()),
(Err(x), ErrorPolicy::Continue, 0) => {
log::warn!("failed to send webhook request: {:?}", x);
Ok(())
}
(Err(x), _, quota) => {
log::warn!("failed attempt to execute webhook request: {:?}", x);
std::thread::sleep(backoff_delay);
execute_fallible_request(client, url, body, policy, quota - 1, backoff_delay)
}
}
}

pub(crate) fn request_loop(
input: StageReceiver,
client: &Client,
url: &str,
error_policy: &ErrorPolicy,
max_retries: usize,
backoff_delay: Duration,
) -> Result<(), Error> {
loop {
let event = input.recv().unwrap();
let body = RequestBody::from(event);

execute_fallible_request(client, url, &body, error_policy, max_retries, backoff_delay)?;
}
}
91 changes: 91 additions & 0 deletions src/sinks/webhook/setup.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
use std::{collections::HashMap, time::Duration};

use reqwest::header::{self, HeaderMap, HeaderName, HeaderValue};
use serde_derive::Deserialize;

use crate::framework::{BootstrapResult, Error, SinkConfig, StageReceiver};

use super::run::request_loop;

static APP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"));

#[derive(Debug, Deserialize, Clone)]
pub enum ErrorPolicy {
Continue,
Exit,
}

#[derive(Default, Debug, Deserialize)]
pub struct Config {
url: String,
authorization: Option<String>,
headers: Option<HashMap<String, String>>,
timeout: Option<u64>,
error_policy: Option<ErrorPolicy>,
max_retries: Option<usize>,
backoff_delay: Option<u64>,
}

fn build_headers_map(config: &Config) -> Result<HeaderMap, Error> {
let mut headers = HeaderMap::new();

headers.insert(
header::CONTENT_TYPE,
HeaderValue::try_from("application/json")?,
);

if let Some(auth_value) = &config.authorization {
let auth_value = HeaderValue::try_from(auth_value)?;
headers.insert(header::AUTHORIZATION, auth_value);
}

if let Some(custom) = &config.headers {
for (name, value) in custom.iter() {
let name = HeaderName::try_from(name)?;
let value = HeaderValue::try_from(value)?;
headers.insert(name, value);
}
}

Ok(headers)
}

const DEFAULT_MAX_RETRIES: usize = 20;
const DEFAULT_BACKOFF_DELAY: u64 = 5_000;

impl SinkConfig for Config {
fn bootstrap(&self, input: StageReceiver) -> BootstrapResult {
let client = reqwest::blocking::ClientBuilder::new()
.user_agent(APP_USER_AGENT)
.default_headers(build_headers_map(self)?)
.timeout(Duration::from_millis(self.timeout.unwrap_or(30000)))
.build()?;

let url = self.url.clone();

let error_policy = self
.error_policy
.as_ref()
.cloned()
.unwrap_or(ErrorPolicy::Exit);

let max_retries = self.max_retries.unwrap_or(DEFAULT_MAX_RETRIES);

let backoff_delay =
Duration::from_millis(self.backoff_delay.unwrap_or(DEFAULT_BACKOFF_DELAY));

let handle = std::thread::spawn(move || {
request_loop(
input,
&client,
&url,
&error_policy,
max_retries,
backoff_delay,
)
.expect("request loop failed")
});

Ok(handle)
}
}

0 comments on commit e47e021

Please sign in to comment.