Skip to content

Commit

Permalink
feat(sink/webhook): use utils retry_operation
Browse files Browse the repository at this point in the history
feat(sink/gcp_pubsub): use utils retry_operation

fix: remove async_recursion

feat: make retry_policy part of the configs

fix: max_backoff to 20x

feat: deserialize duration from milliseconds as a u64 in toml

feat: start gcp cloud functions

feat: add docs for cloud function and the retry policy

chore: update cargo toml feaure flags

chore: more docs
  • Loading branch information
rvcas committed May 31, 2022
1 parent 6a0f5f2 commit 7bd2274
Show file tree
Hide file tree
Showing 18 changed files with 326 additions and 206 deletions.
12 changes: 0 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ openssl = { version = "0.10", optional = true, features = ["vendored"] }

# features: gcp
cloud-pubsub = { version = "0.8.0", optional = true }
async-recursion = { version = "1.0.0", optional = true }

[features]
default = []
Expand All @@ -68,4 +67,4 @@ kafkasink = ["kafka", "openssl"]
elasticsink = ["elasticsearch", "tokio"]
fingerprint = ["murmur3"]
aws = ["aws-config", "aws-sdk-sqs", "aws-sdk-lambda", "aws-sdk-s3", "tokio"]
gcp = ["cloud-pubsub", "tokio", "async-recursion"]
gcp = ["cloud-pubsub", "tokio", "reqwest"]
26 changes: 26 additions & 0 deletions book/src/advanced/retry_policy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Retry Policy

Advanced options for instructing Oura how to deal with failed attempts in certain sinks.

## Supported Sinks

- [GCP CloudFunction](../sinks/gcp_cloudfunction.md)
- [GCP PubSub](../sinks/gcp_pubsub.md)
- [Webhook](../sinks/webhook.md)

## Configuration

To modify the default behaviour used by the sink, a section named `[sink.retry_policy]` needs to be added in the `daemon.toml` file.

```toml
[sink.retry_policy]
max_retries = 30
backoff_unit = 5000
backoff_factor = 2
max_backoff = 100000
```

- `max_retries`: the max number of retries before failing the whole pipeline. Default value is `20`
- `backoff_unit`: the delay expressed in milliseconds between each retry. Default value is `5000`.
- `backoff_factor`: the amount to increase the backoff delay after each attempt. Default value is `2`.
- `max_backoff`: the longest possible delay in milliseconds. Default value is `100000`
5 changes: 3 additions & 2 deletions book/src/sinks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ These are the existing sinks that are included as part the main _Oura_ codebase:
- [AWS SQS](aws_sqs.md): a sink that sends each event as message to an AWS SQS queue.
- [AWS Lamda](aws_lambda.md): a sink that invokes an AWS Lambda function for each event.
- [AWS S3](aws_s3.md): a sink that saves the CBOR content of the blocks as an AWS S3 object.
- [GCP PubSub](gcp_pubsub.md): a sink that sends each event as a message to a goolge cloud PubSub topic.
- [GCP PubSub](gcp_pubsub.md): a sink that sends each event as a message to a google cloud PubSub topic.
- [GCP CloudFunction](gcp_cloudfunction.md): a sink that sends each event as JSON to a Cloud Function via HTTP.

New sinks are being developed, information will be added in this documentation to reflect the updated list. Contributions and feature request are welcome in our [Github Repo](https://github.com/txpipe/oura).
New sinks are being developed, information will be added in this documentation to reflect the updated list. Contributions and feature request are welcome in our [Github Repo](https://github.com/txpipe/oura).
38 changes: 38 additions & 0 deletions book/src/sinks/gcp_cloudfunction.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Google Cloud Functions

A sink that sends each event to a cloud function. Each event is json-encoded and sent as a POST request.

## Configuration

```toml
[sink]
type = "GcpCloudFunction"
name = "oura"
project_id = "XXX"
region = "us-west-2"
timeout = 30000
error_policy = "Continue"
authorization = "user:pass"

[sink.headers]
extra_header_1 = "abc"
extra_header_2 = "123"

[sink.retry_policy]
max_retries = 30
backoff_unit = 5000
backoff_factor = 2
max_backoff = 100000
```

### Section: `sink`

- `type`: the literal value `GcpCloudFunction`
- `name`: the name of the cloud function
- `project_id`: the google cloud project id that the function exists in
- `region`: the region that the function was created in
- `timeout` (optional): the timeout value for the HTTP response in milliseconds. Default value is `30000`.
- `authorization` (optional): value to add as the 'Authorization' HTTP header
- `headers` (optional): key-value map of extra headers to pass in each HTTP call
- `error_policy` (optional): either `Continue` or `Exit`. Default value is `Exit`.
- [retry_policy](../advanced/retry_policy.md)
8 changes: 8 additions & 0 deletions book/src/sinks/gcp_pubsub.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,18 @@ A sink that sends each event as a message to a PubSub topic. Each event is json-
type = "GcpPubSub"
credentials = "oura-test-347101-ff3f7b2d69cc.json"
topic = "test"

[sink.retry_policy]
max_retries = 30
backoff_unit = 5000
backoff_factor = 2
max_backoff = 100000
```

### Section: `sink`

- `type`: the literal value `GcpPubSub`.
- `credentials`: the path to the service account json file downloaded from the cloud console.
- `topic`: the short name of the topic to send message to.
- `error_policy` (optional): either `Continue` or `Exit`. Default value is `Exit`.
- [retry_policy](../advanced/retry_policy.md)
13 changes: 8 additions & 5 deletions book/src/sinks/webhook.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,13 @@ type = "Webhook"
url = "https://endpoint:5000/events"
authorization = "user:pass"
timeout = 30000
error_policy = "Retry"
error_policy = "Continue"

[sink.retry_policy]
max_retries = 30
backoff_delay = 5000
backoff_unit = 5000
backoff_factor = 2
max_backoff = 100000

[sink.headers]
extra_header_1 = "abc"
Expand All @@ -28,6 +32,5 @@ extra_header_2 = "123"
- `authorization` (optional): value to add as the 'Authorization' HTTP header
- `headers` (optional): key-value map of extra headers to pass in each HTTP call
- `timeout` (optional): the timeout value for the HTTP response in milliseconds. Default value is `30000`.
- `error_policy` (optional): either `Continue` or `Retry`. Default value is `Retry`.
- `max_retries` (optional): the max number of retries before failing the whole pipeline. Default value is `30`
- `backoff_delay` (optional): the delay expressed in milliseconds between each retry. Default value is `5000`.
- `error_policy` (optional): either `Continue` or `Exit`. Default value is `Exit`.
- [retry_policy](../advanced/retry_policy.md)
9 changes: 9 additions & 0 deletions src/bin/oura/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ use oura::sinks::aws_s3::Config as AwsS3Config;
#[cfg(feature = "gcp")]
use oura::sinks::gcp_pubsub::Config as GcpPubSubConfig;

#[cfg(feature = "gcp")]
use oura::sinks::gcp_cloudfunction::Config as GcpCloudFunctionConfig;

#[cfg(feature = "fingerprint")]
use oura::filters::fingerprint::Config as FingerprintConfig;

Expand Down Expand Up @@ -123,6 +126,9 @@ enum Sink {

#[cfg(feature = "gcp")]
GcpPubSub(GcpPubSubConfig),

#[cfg(feature = "gcp")]
GcpCloudFunction(GcpCloudFunctionConfig),
}

fn bootstrap_sink(config: Sink, input: StageReceiver, utils: Arc<Utils>) -> BootstrapResult {
Expand Down Expand Up @@ -154,6 +160,9 @@ fn bootstrap_sink(config: Sink, input: StageReceiver, utils: Arc<Utils>) -> Boot

#[cfg(feature = "gcp")]
Sink::GcpPubSub(c) => WithUtils::new(c, utils).bootstrap(input),

#[cfg(feature = "gcp")]
Sink::GcpCloudFunction(c) => WithUtils::new(c, utils).bootstrap(input),
}
}

Expand Down
108 changes: 107 additions & 1 deletion src/sinks/common.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,113 @@
use serde::Deserialize;
use std::{collections::HashMap, sync::Arc};

use reqwest::{
blocking::Client,
header::{self, HeaderMap, HeaderName, HeaderValue},
};
use serde::{Deserialize, Serialize};

use crate::{
model::Event,
pipelining::StageReceiver,
utils::{retry, Utils},
Error,
};

pub static APP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"));

#[derive(Debug, Deserialize, Clone)]
pub enum ErrorPolicy {
Continue,
Exit,
}

pub fn build_headers_map(
authorization: Option<&String>,
extra: Option<&HashMap<String, String>>,
) -> Result<HeaderMap, Error> {
let mut headers = HeaderMap::new();

headers.insert(
header::CONTENT_TYPE,
HeaderValue::try_from("application/json")?,
);

if let Some(auth_value) = &authorization {
let auth_value = HeaderValue::try_from(*auth_value)?;
headers.insert(header::AUTHORIZATION, auth_value);
}

if let Some(custom) = &extra {
for (name, value) in custom.iter() {
let name = HeaderName::try_from(name)?;
let value = HeaderValue::try_from(value)?;
headers.insert(name, value);
}
}

Ok(headers)
}

#[derive(Serialize)]
struct RequestBody {
#[serde(flatten)]
event: Event,
variant: String,
timestamp: Option<u64>,
}

impl From<Event> for RequestBody {
fn from(event: Event) -> Self {
let timestamp = event.context.timestamp.map(|x| x * 1000);
let variant = event.data.to_string();

RequestBody {
event,
timestamp,
variant,
}
}
}

fn execute_fallible_request(client: &Client, url: &str, body: &RequestBody) -> Result<(), Error> {
let request = client.post(url).json(body).build()?;

client
.execute(request)
.and_then(|res| res.error_for_status())?;

Ok(())
}

pub(crate) fn request_loop(
input: StageReceiver,
client: &Client,
url: &str,
error_policy: &ErrorPolicy,
retry_policy: &retry::Policy,
utils: Arc<Utils>,
) -> Result<(), Error> {
for event in input.iter() {
// notify progress to the pipeline
utils.track_sink_progress(&event);

let body = RequestBody::from(event);

let result = retry::retry_operation(
|| execute_fallible_request(client, url, &body),
retry_policy,
);

match result {
Ok(()) => (),
Err(err) => match error_policy {
ErrorPolicy::Exit => return Err(err),
ErrorPolicy::Continue => {
log::warn!("failed to send webhook request: {:?}", err);
}
},
}
}

Ok(())
}
3 changes: 3 additions & 0 deletions src/sinks/gcp_cloudfunction/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
mod setup;

pub use setup::*;
53 changes: 53 additions & 0 deletions src/sinks/gcp_cloudfunction/setup.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use std::time::Duration;

use serde::Deserialize;

use crate::{
pipelining::{BootstrapResult, SinkProvider, StageReceiver},
sinks::{build_headers_map, request_loop, ErrorPolicy, APP_USER_AGENT},
utils::{retry, WithUtils},
};

#[derive(Debug, Default, Deserialize)]
pub struct Config {
pub name: String,
pub project_id: String,
pub region: String,
pub timeout: Option<u64>,
pub authorization: Option<String>,
pub error_policy: Option<ErrorPolicy>,
pub retry_policy: Option<retry::Policy>,
}

impl SinkProvider for WithUtils<Config> {
fn bootstrap(&self, input: StageReceiver) -> BootstrapResult {
let client = reqwest::blocking::ClientBuilder::new()
.user_agent(APP_USER_AGENT)
.default_headers(build_headers_map(self.inner.authorization.as_ref(), None)?)
.timeout(Duration::from_millis(self.inner.timeout.unwrap_or(30000)))
.build()?;

let url = format!(
"https://{}-{}.cloudfunctions.net/{}",
self.inner.region, self.inner.project_id, self.inner.name,
);

let error_policy = self
.inner
.error_policy
.as_ref()
.cloned()
.unwrap_or(ErrorPolicy::Exit);

let retry_policy = self.inner.retry_policy.unwrap_or_default();

let utils = self.utils.clone();

let handle = std::thread::spawn(move || {
request_loop(input, &client, &url, &error_policy, &retry_policy, utils)
.expect("request loop failed")
});

Ok(handle)
}
}
Loading

0 comments on commit 7bd2274

Please sign in to comment.