Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: retry refactor #302

Merged
merged 3 commits into from
Jun 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ redis = { version ="0.21.5", optional = true, features = ["tokio-comp"]}

# features: gcp
cloud-pubsub = { version = "0.8.0", optional = true }
async-recursion = { version = "1.0.0", optional = true }

[features]
default = []
Expand All @@ -72,4 +71,4 @@ elasticsink = ["elasticsearch", "tokio"]
fingerprint = ["murmur3"]
aws = ["aws-config", "aws-sdk-sqs", "aws-sdk-lambda", "aws-sdk-s3", "tokio"]
redissink = ["redis", "tokio"]
gcp = ["cloud-pubsub", "tokio", "async-recursion"]
gcp = ["cloud-pubsub", "tokio", "reqwest"]
26 changes: 26 additions & 0 deletions book/src/advanced/retry_policy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Retry Policy

Advanced options for instructing Oura how to deal with failed attempts in certain sinks.

## Supported Sinks

- [GCP CloudFunction](../sinks/gcp_cloudfunction.md)
- [GCP PubSub](../sinks/gcp_pubsub.md)
- [Webhook](../sinks/webhook.md)

## Configuration

To modify the default behaviour used by the sink, a section named `[sink.retry_policy]` needs to be added in the `daemon.toml` file.

```toml
[sink.retry_policy]
max_retries = 30
backoff_unit = 5000
backoff_factor = 2
max_backoff = 100000
```

- `max_retries`: the max number of retries before failing the whole pipeline. Default value is `20`
- `backoff_unit`: the delay expressed in milliseconds between each retry. Default value is `5000`.
- `backoff_factor`: the amount to increase the backoff delay after each attempt. Default value is `2`.
- `max_backoff`: the longest possible delay in milliseconds. Default value is `100000`
3 changes: 2 additions & 1 deletion book/src/sinks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ These are the existing sinks that are included as part the main _Oura_ codebase:
- [AWS SQS](aws_sqs.md): a sink that sends each event as message to an AWS SQS queue.
- [AWS Lamda](aws_lambda.md): a sink that invokes an AWS Lambda function for each event.
- [AWS S3](aws_s3.md): a sink that saves the CBOR content of the blocks as an AWS S3 object.
- [GCP PubSub](gcp_pubsub.md): a sink that sends each event as a message to a goolge cloud PubSub topic.
- [GCP PubSub](gcp_pubsub.md): a sink that sends each event as a message to a google cloud PubSub topic.
- [GCP CloudFunction](gcp_cloudfunction.md): a sink that sends each event as JSON to a Cloud Function via HTTP.

New sinks are being developed, information will be added in this documentation to reflect the updated list. Contributions and feature request are welcome in our [Github Repo](https://github.com/txpipe/oura).
38 changes: 38 additions & 0 deletions book/src/sinks/gcp_cloudfunction.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Google Cloud Functions

A sink that sends each event to a cloud function. Each event is json-encoded and sent as a POST request.

## Configuration

```toml
[sink]
type = "GcpCloudFunction"
name = "oura"
project_id = "XXX"
region = "us-west-2"
timeout = 30000
error_policy = "Continue"
authorization = "user:pass"

[sink.headers]
extra_header_1 = "abc"
extra_header_2 = "123"

[sink.retry_policy]
max_retries = 30
backoff_unit = 5000
backoff_factor = 2
max_backoff = 100000
```

### Section: `sink`

- `type`: the literal value `GcpCloudFunction`
- `name`: the name of the cloud function
- `project_id`: the google cloud project id that the function exists in
- `region`: the region that the function was created in
- `timeout` (optional): the timeout value for the HTTP response in milliseconds. Default value is `30000`.
- `authorization` (optional): value to add as the 'Authorization' HTTP header
- `headers` (optional): key-value map of extra headers to pass in each HTTP call
- `error_policy` (optional): either `Continue` or `Exit`. Default value is `Exit`.
- [retry_policy](../advanced/retry_policy.md)
8 changes: 8 additions & 0 deletions book/src/sinks/gcp_pubsub.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,18 @@ A sink that sends each event as a message to a PubSub topic. Each event is json-
type = "GcpPubSub"
credentials = "oura-test-347101-ff3f7b2d69cc.json"
topic = "test"

[sink.retry_policy]
max_retries = 30
backoff_unit = 5000
backoff_factor = 2
max_backoff = 100000
```

### Section: `sink`

- `type`: the literal value `GcpPubSub`.
- `credentials`: the path to the service account json file downloaded from the cloud console.
- `topic`: the short name of the topic to send message to.
- `error_policy` (optional): either `Continue` or `Exit`. Default value is `Exit`.
- [retry_policy](../advanced/retry_policy.md)
13 changes: 8 additions & 5 deletions book/src/sinks/webhook.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,13 @@ type = "Webhook"
url = "https://endpoint:5000/events"
authorization = "user:pass"
timeout = 30000
error_policy = "Retry"
error_policy = "Continue"

[sink.retry_policy]
max_retries = 30
backoff_delay = 5000
backoff_unit = 5000
backoff_factor = 2
max_backoff = 100000

[sink.headers]
extra_header_1 = "abc"
Expand All @@ -28,6 +32,5 @@ extra_header_2 = "123"
- `authorization` (optional): value to add as the 'Authorization' HTTP header
- `headers` (optional): key-value map of extra headers to pass in each HTTP call
- `timeout` (optional): the timeout value for the HTTP response in milliseconds. Default value is `30000`.
- `error_policy` (optional): either `Continue` or `Retry`. Default value is `Retry`.
- `max_retries` (optional): the max number of retries before failing the whole pipeline. Default value is `30`
- `backoff_delay` (optional): the delay expressed in milliseconds between each retry. Default value is `5000`.
- `error_policy` (optional): either `Continue` or `Exit`. Default value is `Exit`.
- [retry_policy](../advanced/retry_policy.md)
9 changes: 9 additions & 0 deletions src/bin/oura/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ use oura::sinks::redis::Config as RedisConfig;
#[cfg(feature = "gcp")]
use oura::sinks::gcp_pubsub::Config as GcpPubSubConfig;

#[cfg(feature = "gcp")]
use oura::sinks::gcp_cloudfunction::Config as GcpCloudFunctionConfig;

#[cfg(feature = "fingerprint")]
use oura::filters::fingerprint::Config as FingerprintConfig;

Expand Down Expand Up @@ -129,6 +132,9 @@ enum Sink {

#[cfg(feature = "gcp")]
GcpPubSub(GcpPubSubConfig),

#[cfg(feature = "gcp")]
GcpCloudFunction(GcpCloudFunctionConfig),
}

fn bootstrap_sink(config: Sink, input: StageReceiver, utils: Arc<Utils>) -> BootstrapResult {
Expand Down Expand Up @@ -163,6 +169,9 @@ fn bootstrap_sink(config: Sink, input: StageReceiver, utils: Arc<Utils>) -> Boot

#[cfg(feature = "gcp")]
Sink::GcpPubSub(c) => WithUtils::new(c, utils).bootstrap(input),

#[cfg(feature = "gcp")]
Sink::GcpCloudFunction(c) => WithUtils::new(c, utils).bootstrap(input),
}
}

Expand Down
108 changes: 107 additions & 1 deletion src/sinks/common.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,113 @@
use serde::Deserialize;
use std::{collections::HashMap, sync::Arc};

use reqwest::{
blocking::Client,
header::{self, HeaderMap, HeaderName, HeaderValue},
};
use serde::{Deserialize, Serialize};

use crate::{
model::Event,
pipelining::StageReceiver,
utils::{retry, Utils},
Error,
};

pub static APP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"));

#[derive(Debug, Deserialize, Clone)]
pub enum ErrorPolicy {
Continue,
Exit,
}

pub fn build_headers_map(
authorization: Option<&String>,
extra: Option<&HashMap<String, String>>,
) -> Result<HeaderMap, Error> {
let mut headers = HeaderMap::new();

headers.insert(
header::CONTENT_TYPE,
HeaderValue::try_from("application/json")?,
);

if let Some(auth_value) = &authorization {
let auth_value = HeaderValue::try_from(*auth_value)?;
headers.insert(header::AUTHORIZATION, auth_value);
}

if let Some(custom) = &extra {
for (name, value) in custom.iter() {
let name = HeaderName::try_from(name)?;
let value = HeaderValue::try_from(value)?;
headers.insert(name, value);
}
}

Ok(headers)
}

#[derive(Serialize)]
struct RequestBody {
#[serde(flatten)]
event: Event,
variant: String,
timestamp: Option<u64>,
}

impl From<Event> for RequestBody {
fn from(event: Event) -> Self {
let timestamp = event.context.timestamp.map(|x| x * 1000);
let variant = event.data.to_string();

RequestBody {
event,
timestamp,
variant,
}
}
}

fn execute_fallible_request(client: &Client, url: &str, body: &RequestBody) -> Result<(), Error> {
let request = client.post(url).json(body).build()?;

client
.execute(request)
.and_then(|res| res.error_for_status())?;

Ok(())
}

pub(crate) fn request_loop(
input: StageReceiver,
client: &Client,
url: &str,
error_policy: &ErrorPolicy,
retry_policy: &retry::Policy,
utils: Arc<Utils>,
) -> Result<(), Error> {
for event in input.iter() {
// notify progress to the pipeline
utils.track_sink_progress(&event);

let body = RequestBody::from(event);

let result = retry::retry_operation(
|| execute_fallible_request(client, url, &body),
retry_policy,
);

match result {
Ok(()) => (),
Err(err) => match error_policy {
ErrorPolicy::Exit => return Err(err),
ErrorPolicy::Continue => {
log::warn!("failed to send webhook request: {:?}", err);
}
},
}
}

Ok(())
}
3 changes: 3 additions & 0 deletions src/sinks/gcp_cloudfunction/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
mod setup;

pub use setup::*;
53 changes: 53 additions & 0 deletions src/sinks/gcp_cloudfunction/setup.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use std::time::Duration;

use serde::Deserialize;

use crate::{
pipelining::{BootstrapResult, SinkProvider, StageReceiver},
sinks::{build_headers_map, request_loop, ErrorPolicy, APP_USER_AGENT},
utils::{retry, WithUtils},
};

#[derive(Debug, Default, Deserialize)]
pub struct Config {
pub name: String,
pub project_id: String,
pub region: String,
pub timeout: Option<u64>,
pub authorization: Option<String>,
pub error_policy: Option<ErrorPolicy>,
pub retry_policy: Option<retry::Policy>,
}

impl SinkProvider for WithUtils<Config> {
fn bootstrap(&self, input: StageReceiver) -> BootstrapResult {
let client = reqwest::blocking::ClientBuilder::new()
.user_agent(APP_USER_AGENT)
.default_headers(build_headers_map(self.inner.authorization.as_ref(), None)?)
.timeout(Duration::from_millis(self.inner.timeout.unwrap_or(30000)))
.build()?;

let url = format!(
"https://{}-{}.cloudfunctions.net/{}",
self.inner.region, self.inner.project_id, self.inner.name,
);

let error_policy = self
.inner
.error_policy
.as_ref()
.cloned()
.unwrap_or(ErrorPolicy::Exit);

let retry_policy = self.inner.retry_policy.unwrap_or_default();

let utils = self.utils.clone();

let handle = std::thread::spawn(move || {
request_loop(input, &client, &url, &error_policy, &retry_policy, utils)
.expect("request loop failed")
});

Ok(handle)
}
}
Loading