Skip to content

Commit

Permalink
feat: Introduce GCP PubSub sink (#237)
Browse files Browse the repository at this point in the history
  • Loading branch information
rvcas authored Apr 27, 2022
1 parent 6b0745a commit e456a1e
Show file tree
Hide file tree
Showing 12 changed files with 431 additions and 12 deletions.
252 changes: 249 additions & 3 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ tokio = { version = "1.17.0", optional = true, features = ["rt"] }
# required for CI to complete successfully
openssl = { version = "0.10", optional = true, features = ["vendored"] }

# features: gcp
cloud-pubsub = { version = "0.8.0", optional = true }
async-recursion = { version = "1.0.0", optional = true }

[features]
default = []
logs = ["file-rotate"]
Expand All @@ -70,3 +74,4 @@ kafkasink = ["kafka", "openssl"]
elasticsink = ["elasticsearch", "tokio"]
fingerprint = ["murmur3"]
aws = ["aws-config", "aws-sdk-sqs", "aws-sdk-lambda", "aws-sdk-s3", "tokio"]
gcp = ["cloud-pubsub", "tokio", "async-recursion"]
1 change: 1 addition & 0 deletions book/src/sinks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ These are the existing sinks that are included as part the main _Oura_ codebase:
- [AWS SQS](aws_sqs.md): a sink that sends each event as message to an AWS SQS queue.
- [AWS Lamda](aws_lambda.md): a sink that invokes an AWS Lambda function for each event.
- [AWS S3](aws_s3.md): a sink that saves the CBOR content of the blocks as an AWS S3 object.
- [GCP PubSub](gcp_pubsub.md): a sink that sends each event as a message to a goolge cloud PubSub topic.

New sinks are being developed, information will be added in this documentation to reflect the updated list. Contributions and feature request are welcome in our [Github Repo](https://github.com/txpipe/oura).
18 changes: 18 additions & 0 deletions book/src/sinks/gcp_pubsub.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Google Cloud PubSub

A sink that sends each event as a message to a PubSub topic. Each event is json-encoded and sent to a configurable PubSub topic.

## Configuration

```toml
[sink]
type = "GcpPubSub"
credentials = "oura-test-347101-ff3f7b2d69cc.json"
topic = "test"
```

### Section: `sink`

- `type`: the literal value `GcpPubSub`.
- `credentials`: the path to the service account json file downloaded from the cloud console.
- `topic`: the short name of the topic to send message to.
9 changes: 9 additions & 0 deletions src/bin/oura/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ use oura::sinks::aws_lambda::Config as AwsLambdaConfig;
#[cfg(feature = "aws")]
use oura::sinks::aws_s3::Config as AwsS3Config;

#[cfg(feature = "gcp")]
use oura::sinks::gcp_pubsub::Config as GcpPubSubConfig;

#[cfg(feature = "fingerprint")]
use oura::filters::fingerprint::Config as FingerprintConfig;

Expand Down Expand Up @@ -117,6 +120,9 @@ enum Sink {

#[cfg(feature = "aws")]
AwsS3(AwsS3Config),

#[cfg(feature = "gcp")]
GcpPubSub(GcpPubSubConfig),
}

fn bootstrap_sink(config: Sink, input: StageReceiver, utils: Arc<Utils>) -> BootstrapResult {
Expand Down Expand Up @@ -145,6 +151,9 @@ fn bootstrap_sink(config: Sink, input: StageReceiver, utils: Arc<Utils>) -> Boot

#[cfg(feature = "aws")]
Sink::AwsS3(c) => WithUtils::new(c, utils).bootstrap(input),

#[cfg(feature = "gcp")]
Sink::GcpPubSub(c) => WithUtils::new(c, utils).bootstrap(input),
}
}

Expand Down
7 changes: 7 additions & 0 deletions src/sinks/common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
use serde::Deserialize;

#[derive(Debug, Deserialize, Clone)]
pub enum ErrorPolicy {
Continue,
Exit,
}
4 changes: 4 additions & 0 deletions src/sinks/gcp_pubsub/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
mod run;
mod setup;

pub use setup::*;
70 changes: 70 additions & 0 deletions src/sinks/gcp_pubsub/run.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use std::{sync::Arc, time::Duration};

use async_recursion::async_recursion;
use cloud_pubsub::{error::Error, Client, Topic};
use serde_json::json;

use crate::{model::Event, pipelining::StageReceiver, sinks::ErrorPolicy, utils::Utils};

#[async_recursion]
async fn send_pubsub_msg(
client: &Topic,
event: &Event,
policy: &ErrorPolicy,
retry_quota: usize,
backoff_delay: Duration,
) -> Result<(), Error> {
let body = json!(event).to_string();

let result = client.publish(body).await;

match (result, policy, retry_quota) {
(Ok(_), _, _) => {
log::info!("successful pubsub publish");
Ok(())
}
(Err(x), ErrorPolicy::Exit, 0) => Err(x),
(Err(x), ErrorPolicy::Continue, 0) => {
log::warn!("failed to publish to pubsub: {:?}", x);
Ok(())
}
(Err(x), _, quota) => {
log::warn!("failed attempt to execute pubsub publish: {:?}", x);
std::thread::sleep(backoff_delay);
send_pubsub_msg(client, event, policy, quota - 1, backoff_delay).await
}
}
}

pub fn writer_loop(
input: StageReceiver,
credentials: String,
topic_name: String,
error_policy: &ErrorPolicy,
max_retries: usize,
backoff_delay: Duration,
utils: Arc<Utils>,
) -> Result<(), crate::Error> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_time()
.enable_io()
.build()?;

let publisher = rt.block_on(Client::new(credentials))?;
let topic = publisher.topic(topic_name);

for event in input.iter() {
// notify the pipeline where we are
utils.track_sink_progress(&event);

rt.block_on(send_pubsub_msg(
&topic,
&event,
error_policy,
max_retries,
backoff_delay,
))?;
}

Ok(())
}
59 changes: 59 additions & 0 deletions src/sinks/gcp_pubsub/setup.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use std::time::Duration;

use serde::Deserialize;

use crate::{
pipelining::{BootstrapResult, SinkProvider, StageReceiver},
sinks::ErrorPolicy,
utils::WithUtils,
};

use super::run::writer_loop;

#[derive(Debug, Default, Deserialize)]
pub struct Config {
pub topic: String,
pub credentials: String,
pub error_policy: Option<ErrorPolicy>,
pub max_retries: Option<usize>,
pub backoff_delay: Option<u64>,
}

const DEFAULT_MAX_RETRIES: usize = 20;
const DEFAULT_BACKOFF_DELAY: u64 = 5_000;

impl SinkProvider for WithUtils<Config> {
fn bootstrap(&self, input: StageReceiver) -> BootstrapResult {
let credentials = self.inner.credentials.to_owned();
let topic_name = self.inner.topic.to_owned();

let error_policy = self
.inner
.error_policy
.as_ref()
.cloned()
.unwrap_or(ErrorPolicy::Exit);

let max_retries = self.inner.max_retries.unwrap_or(DEFAULT_MAX_RETRIES);

let backoff_delay =
Duration::from_millis(self.inner.backoff_delay.unwrap_or(DEFAULT_BACKOFF_DELAY));

let utils = self.utils.clone();

let handle = std::thread::spawn(move || {
writer_loop(
input,
credentials,
topic_name,
&error_policy,
max_retries,
backoff_delay,
utils,
)
.expect("writer loop failed");
});

Ok(handle)
}
}
7 changes: 7 additions & 0 deletions src/sinks/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
mod common;

pub mod assert;
pub mod stdout;
pub mod terminal;

pub use common::*;

#[cfg(feature = "logs")]
pub mod logs;

Expand All @@ -22,3 +26,6 @@ pub mod aws_lambda;

#[cfg(feature = "aws")]
pub mod aws_s3;

#[cfg(feature = "gcp")]
pub mod gcp_pubsub;
4 changes: 1 addition & 3 deletions src/sinks/webhook/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ use std::{sync::Arc, time::Duration};
use reqwest::blocking::Client;
use serde::Serialize;

use crate::{model::Event, pipelining::StageReceiver, utils::Utils, Error};

use super::ErrorPolicy;
use crate::{model::Event, pipelining::StageReceiver, sinks::ErrorPolicy, utils::Utils, Error};

#[derive(Serialize)]
struct RequestBody {
Expand Down
7 changes: 1 addition & 6 deletions src/sinks/webhook/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use serde::Deserialize;

use crate::{
pipelining::{BootstrapResult, SinkProvider, StageReceiver},
sinks::ErrorPolicy,
utils::WithUtils,
Error,
};
Expand All @@ -13,12 +14,6 @@ use super::run::request_loop;

static APP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"));

#[derive(Debug, Deserialize, Clone)]
pub enum ErrorPolicy {
Continue,
Exit,
}

#[derive(Default, Debug, Deserialize)]
pub struct Config {
pub url: String,
Expand Down

0 comments on commit e456a1e

Please sign in to comment.