Skip to content

Commit

Permalink
feat(pubsub): add retry logic with backoff delay
Browse files Browse the repository at this point in the history
  • Loading branch information
rvcas committed Apr 21, 2022
1 parent f5b8e7f commit c6af164
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 22 deletions.
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ openssl = { version = "0.10", optional = true, features = ["vendored"] }

# features: gcp
cloud-pubsub = { version = "0.8.0", optional = true }
async-recursion = { version = "1.0.0", optional = true }

[features]
default = []
Expand All @@ -73,4 +74,4 @@ kafkasink = ["kafka", "openssl"]
elasticsink = ["elasticsearch", "tokio"]
fingerprint = ["murmur3"]
aws = ["aws-config", "aws-sdk-sqs", "aws-sdk-lambda", "aws-sdk-s3", "tokio"]
gcp = ["cloud-pubsub", "tokio"]
gcp = ["cloud-pubsub", "tokio", "async-recursion"]
7 changes: 7 additions & 0 deletions src/sinks/common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
use serde::Deserialize;

#[derive(Debug, Deserialize, Clone)]
pub enum ErrorPolicy {
Continue,
Exit,
}
51 changes: 40 additions & 11 deletions src/sinks/gcp_pubsub/run.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,48 @@
use std::sync::Arc;
use std::{sync::Arc, time::Duration};

use cloud_pubsub::{error::Error, topic::PublishMessageResponse, Client, Topic};
use async_recursion::async_recursion;
use cloud_pubsub::{error::Error, Client, Topic};
use serde_json::json;

use crate::{model::Event, pipelining::StageReceiver, utils::Utils};
use crate::{model::Event, pipelining::StageReceiver, sinks::ErrorPolicy, utils::Utils};

async fn send_pubsub_msg(client: &Topic, event: &Event) -> Result<PublishMessageResponse, Error> {
#[async_recursion]
async fn send_pubsub_msg(
client: &Topic,
event: &Event,
policy: &ErrorPolicy,
retry_quota: usize,
backoff_delay: Duration,
) -> Result<(), Error> {
let body = json!(event).to_string();

client.publish(body).await
let result = client.publish(body).await;

match (result, policy, retry_quota) {
(Ok(_), _, _) => {
log::info!("successful pubsub publish");
Ok(())
}
(Err(x), ErrorPolicy::Exit, 0) => Err(x),
(Err(x), ErrorPolicy::Continue, 0) => {
log::warn!("failed to publish to pubsub: {:?}", x);
Ok(())
}
(Err(x), _, quota) => {
log::warn!("failed attempt to execute pubsub publish: {:?}", x);
std::thread::sleep(backoff_delay);
send_pubsub_msg(client, event, policy, quota - 1, backoff_delay).await
}
}
}

pub fn writer_loop(
input: StageReceiver,
credentials: String,
topic_name: String,
error_policy: &ErrorPolicy,
max_retries: usize,
backoff_delay: Duration,
utils: Arc<Utils>,
) -> Result<(), crate::Error> {
let rt = tokio::runtime::Builder::new_current_thread()
Expand All @@ -29,12 +57,13 @@ pub fn writer_loop(
// notify the pipeline where we are
utils.track_sink_progress(&event);

let result = rt.block_on(send_pubsub_msg(&topic, &event));

if let Err(err) = result {
log::error!("unrecoverable error sending message to PubSub: {:?}", err);
return Err(Box::new(err));
}
rt.block_on(send_pubsub_msg(
&topic,
&event,
error_policy,
max_retries,
backoff_delay,
))?;
}

Ok(())
Expand Down
33 changes: 32 additions & 1 deletion src/sinks/gcp_pubsub/setup.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::time::Duration;

use serde::Deserialize;

use crate::{
pipelining::{BootstrapResult, SinkProvider, StageReceiver},
sinks::ErrorPolicy,
utils::WithUtils,
};

Expand All @@ -11,16 +14,44 @@ use super::run::writer_loop;
pub struct Config {
pub topic: String,
pub credentials: String,
pub error_policy: Option<ErrorPolicy>,
pub max_retries: Option<usize>,
pub backoff_delay: Option<u64>,
}

const DEFAULT_MAX_RETRIES: usize = 20;
const DEFAULT_BACKOFF_DELAY: u64 = 5_000;

impl SinkProvider for WithUtils<Config> {
fn bootstrap(&self, input: StageReceiver) -> BootstrapResult {
let credentials = self.inner.credentials.to_owned();
let topic_name = self.inner.topic.to_owned();

let error_policy = self
.inner
.error_policy
.as_ref()
.cloned()
.unwrap_or(ErrorPolicy::Exit);

let max_retries = self.inner.max_retries.unwrap_or(DEFAULT_MAX_RETRIES);

let backoff_delay =
Duration::from_millis(self.inner.backoff_delay.unwrap_or(DEFAULT_BACKOFF_DELAY));

let utils = self.utils.clone();

let handle = std::thread::spawn(move || {
writer_loop(input, credentials, topic_name, utils).expect("writer loop failed");
writer_loop(
input,
credentials,
topic_name,
&error_policy,
max_retries,
backoff_delay,
utils,
)
.expect("writer loop failed");
});

Ok(handle)
Expand Down
4 changes: 4 additions & 0 deletions src/sinks/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
mod common;

pub mod assert;
pub mod stdout;
pub mod terminal;

pub use common::*;

#[cfg(feature = "logs")]
pub mod logs;

Expand Down
4 changes: 1 addition & 3 deletions src/sinks/webhook/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ use std::{sync::Arc, time::Duration};
use reqwest::blocking::Client;
use serde::Serialize;

use crate::{model::Event, pipelining::StageReceiver, utils::Utils, Error};

use super::ErrorPolicy;
use crate::{model::Event, pipelining::StageReceiver, sinks::ErrorPolicy, utils::Utils, Error};

#[derive(Serialize)]
struct RequestBody {
Expand Down
7 changes: 1 addition & 6 deletions src/sinks/webhook/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use serde::Deserialize;

use crate::{
pipelining::{BootstrapResult, SinkProvider, StageReceiver},
sinks::ErrorPolicy,
utils::WithUtils,
Error,
};
Expand All @@ -13,12 +14,6 @@ use super::run::request_loop;

static APP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"));

#[derive(Debug, Deserialize, Clone)]
pub enum ErrorPolicy {
Continue,
Exit,
}

#[derive(Default, Debug, Deserialize)]
pub struct Config {
pub url: String,
Expand Down

0 comments on commit c6af164

Please sign in to comment.