Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GCP PubSub #237

Merged
merged 10 commits into from
Apr 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
252 changes: 249 additions & 3 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ tokio = { version = "1.17.0", optional = true, features = ["rt"] }
# required for CI to complete successfully
openssl = { version = "0.10", optional = true, features = ["vendored"] }

# features: gcp
cloud-pubsub = { version = "0.8.0", optional = true }
async-recursion = { version = "1.0.0", optional = true }

[features]
default = []
logs = ["file-rotate"]
Expand All @@ -70,3 +74,4 @@ kafkasink = ["kafka", "openssl"]
elasticsink = ["elasticsearch", "tokio"]
fingerprint = ["murmur3"]
aws = ["aws-config", "aws-sdk-sqs", "aws-sdk-lambda", "aws-sdk-s3", "tokio"]
gcp = ["cloud-pubsub", "tokio", "async-recursion"]
1 change: 1 addition & 0 deletions book/src/sinks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ These are the existing sinks that are included as part the main _Oura_ codebase:
- [AWS SQS](aws_sqs.md): a sink that sends each event as message to an AWS SQS queue.
- [AWS Lamda](aws_lambda.md): a sink that invokes an AWS Lambda function for each event.
- [AWS S3](aws_s3.md): a sink that saves the CBOR content of the blocks as an AWS S3 object.
- [GCP PubSub](gcp_pubsub.md): a sink that sends each event as a message to a goolge cloud PubSub topic.

New sinks are being developed, information will be added in this documentation to reflect the updated list. Contributions and feature request are welcome in our [Github Repo](https://github.com/txpipe/oura).
18 changes: 18 additions & 0 deletions book/src/sinks/gcp_pubsub.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Google Cloud PubSub

A sink that sends each event as a message to a PubSub topic. Each event is json-encoded and sent to a configurable PubSub topic.

## Configuration

```toml
[sink]
type = "GcpPubSub"
credentials = "oura-test-347101-ff3f7b2d69cc.json"
topic = "test"
```

### Section: `sink`

- `type`: the literal value `GcpPubSub`.
- `credentials`: the path to the service account json file downloaded from the cloud console.
- `topic`: the short name of the topic to send message to.
9 changes: 9 additions & 0 deletions src/bin/oura/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ use oura::sinks::aws_lambda::Config as AwsLambdaConfig;
#[cfg(feature = "aws")]
use oura::sinks::aws_s3::Config as AwsS3Config;

#[cfg(feature = "gcp")]
use oura::sinks::gcp_pubsub::Config as GcpPubSubConfig;

#[cfg(feature = "fingerprint")]
use oura::filters::fingerprint::Config as FingerprintConfig;

Expand Down Expand Up @@ -117,6 +120,9 @@ enum Sink {

#[cfg(feature = "aws")]
AwsS3(AwsS3Config),

#[cfg(feature = "gcp")]
GcpPubSub(GcpPubSubConfig),
}

fn bootstrap_sink(config: Sink, input: StageReceiver, utils: Arc<Utils>) -> BootstrapResult {
Expand Down Expand Up @@ -145,6 +151,9 @@ fn bootstrap_sink(config: Sink, input: StageReceiver, utils: Arc<Utils>) -> Boot

#[cfg(feature = "aws")]
Sink::AwsS3(c) => WithUtils::new(c, utils).bootstrap(input),

#[cfg(feature = "gcp")]
Sink::GcpPubSub(c) => WithUtils::new(c, utils).bootstrap(input),
}
}

Expand Down
7 changes: 7 additions & 0 deletions src/sinks/common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
use serde::Deserialize;

#[derive(Debug, Deserialize, Clone)]
pub enum ErrorPolicy {
Continue,
Exit,
}
4 changes: 4 additions & 0 deletions src/sinks/gcp_pubsub/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
mod run;
mod setup;

pub use setup::*;
70 changes: 70 additions & 0 deletions src/sinks/gcp_pubsub/run.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use std::{sync::Arc, time::Duration};

use async_recursion::async_recursion;
use cloud_pubsub::{error::Error, Client, Topic};
use serde_json::json;

use crate::{model::Event, pipelining::StageReceiver, sinks::ErrorPolicy, utils::Utils};

#[async_recursion]
async fn send_pubsub_msg(
client: &Topic,
event: &Event,
policy: &ErrorPolicy,
retry_quota: usize,
backoff_delay: Duration,
) -> Result<(), Error> {
let body = json!(event).to_string();

let result = client.publish(body).await;

match (result, policy, retry_quota) {
(Ok(_), _, _) => {
log::info!("successful pubsub publish");
Ok(())
}
(Err(x), ErrorPolicy::Exit, 0) => Err(x),
(Err(x), ErrorPolicy::Continue, 0) => {
log::warn!("failed to publish to pubsub: {:?}", x);
Ok(())
}
(Err(x), _, quota) => {
log::warn!("failed attempt to execute pubsub publish: {:?}", x);
std::thread::sleep(backoff_delay);
send_pubsub_msg(client, event, policy, quota - 1, backoff_delay).await
}
}
}

pub fn writer_loop(
input: StageReceiver,
credentials: String,
topic_name: String,
error_policy: &ErrorPolicy,
max_retries: usize,
backoff_delay: Duration,
utils: Arc<Utils>,
) -> Result<(), crate::Error> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_time()
.enable_io()
.build()?;

let publisher = rt.block_on(Client::new(credentials))?;
let topic = publisher.topic(topic_name);

for event in input.iter() {
// notify the pipeline where we are
utils.track_sink_progress(&event);

rt.block_on(send_pubsub_msg(
&topic,
&event,
error_policy,
max_retries,
backoff_delay,
))?;
}

Ok(())
}
59 changes: 59 additions & 0 deletions src/sinks/gcp_pubsub/setup.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use std::time::Duration;

use serde::Deserialize;

use crate::{
pipelining::{BootstrapResult, SinkProvider, StageReceiver},
sinks::ErrorPolicy,
utils::WithUtils,
};

use super::run::writer_loop;

#[derive(Debug, Default, Deserialize)]
pub struct Config {
pub topic: String,
pub credentials: String,
pub error_policy: Option<ErrorPolicy>,
pub max_retries: Option<usize>,
pub backoff_delay: Option<u64>,
}

const DEFAULT_MAX_RETRIES: usize = 20;
const DEFAULT_BACKOFF_DELAY: u64 = 5_000;

impl SinkProvider for WithUtils<Config> {
fn bootstrap(&self, input: StageReceiver) -> BootstrapResult {
let credentials = self.inner.credentials.to_owned();
let topic_name = self.inner.topic.to_owned();

let error_policy = self
.inner
.error_policy
.as_ref()
.cloned()
.unwrap_or(ErrorPolicy::Exit);

let max_retries = self.inner.max_retries.unwrap_or(DEFAULT_MAX_RETRIES);

let backoff_delay =
Duration::from_millis(self.inner.backoff_delay.unwrap_or(DEFAULT_BACKOFF_DELAY));

let utils = self.utils.clone();

let handle = std::thread::spawn(move || {
writer_loop(
input,
credentials,
topic_name,
&error_policy,
max_retries,
backoff_delay,
utils,
)
.expect("writer loop failed");
});

Ok(handle)
}
}
7 changes: 7 additions & 0 deletions src/sinks/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
mod common;

pub mod assert;
pub mod stdout;
pub mod terminal;

pub use common::*;

#[cfg(feature = "logs")]
pub mod logs;

Expand All @@ -22,3 +26,6 @@ pub mod aws_lambda;

#[cfg(feature = "aws")]
pub mod aws_s3;

#[cfg(feature = "gcp")]
pub mod gcp_pubsub;
4 changes: 1 addition & 3 deletions src/sinks/webhook/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ use std::{sync::Arc, time::Duration};
use reqwest::blocking::Client;
use serde::Serialize;

use crate::{model::Event, pipelining::StageReceiver, utils::Utils, Error};

use super::ErrorPolicy;
use crate::{model::Event, pipelining::StageReceiver, sinks::ErrorPolicy, utils::Utils, Error};

#[derive(Serialize)]
struct RequestBody {
Expand Down
7 changes: 1 addition & 6 deletions src/sinks/webhook/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use serde::Deserialize;

use crate::{
pipelining::{BootstrapResult, SinkProvider, StageReceiver},
sinks::ErrorPolicy,
utils::WithUtils,
Error,
};
Expand All @@ -13,12 +14,6 @@ use super::run::request_loop;

static APP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"));

#[derive(Debug, Deserialize, Clone)]
pub enum ErrorPolicy {
Continue,
Exit,
}

#[derive(Default, Debug, Deserialize)]
pub struct Config {
pub url: String,
Expand Down