Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combined sink #2

Open
wants to merge 4 commits into
base: lts/v1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/bin/oura/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ use oura::sinks::aws_lambda::Config as AwsLambdaConfig;
#[cfg(feature = "aws")]
use oura::sinks::aws_s3::Config as AwsS3Config;

#[cfg(feature = "aws")]
use oura::sinks::aws_s3_sqs::Config as AwsS3SqsConfig;

#[cfg(feature = "redissink")]
use oura::sinks::redis::Config as RedisConfig;

Expand Down Expand Up @@ -130,6 +133,9 @@ enum Sink {
#[cfg(feature = "aws")]
AwsS3(AwsS3Config),

#[cfg(feature = "aws")]
AwsS3Sqs(AwsS3SqsConfig),

#[cfg(feature = "redissink")]
Redis(RedisConfig),

Expand Down Expand Up @@ -170,6 +176,9 @@ fn bootstrap_sink(config: Sink, input: StageReceiver, utils: Arc<Utils>) -> Boot
#[cfg(feature = "aws")]
Sink::AwsS3(c) => WithUtils::new(c, utils).bootstrap(input),

#[cfg(feature = "aws")]
Sink::AwsS3Sqs(c) => WithUtils::new(c, utils).bootstrap(input),

#[cfg(feature = "redissink")]
Sink::Redis(c) => WithUtils::new(c, utils).bootstrap(input),

Expand Down
241 changes: 241 additions & 0 deletions src/sinks/aws_s3_sqs/combined_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
use super::Config;
use crate::model::BlockRecord;
use crate::sinks::aws_s3_sqs::{ContentType, Naming};
use crate::Error;
use aws_sdk_s3::types::ByteStream as S3ByteStream;
use aws_sdk_s3::Client as S3Client;
use aws_sdk_s3::Region as S3Region;
use aws_sdk_s3::RetryConfig as S3RetryConfig;
use aws_sdk_sqs::Client as SqsClient;
use aws_sdk_sqs::Region as SqsRegion;
use aws_sdk_sqs::RetryConfig as SqsRetryConfig;
use serde::{Deserialize, Serialize};
use serde_json::json;

const DEFAULT_MAX_RETRIES: u32 = 5;

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
struct SqsMessage {
s3_key: String,
block_hash: String,
previous_hash: String,
block_number: u64,
slot: u64,
tip: u64,
}

impl From<&ContentType> for String {
fn from(other: &ContentType) -> Self {
match other {
ContentType::Cbor => "application/cbor".to_string(),
ContentType::CborHex => "text/plain".to_string(),
ContentType::Json => "application/json".to_string(),
}
}
}

pub(super) struct CombinedClient {
s3: S3Client,
sqs: SqsClient,
config: Config,
naming: Naming,
content_type: ContentType,
sqs_group_id: String,
s3_prefix: String,
}

impl CombinedClient {
pub fn new(config: &Config) -> Result<CombinedClient, Error> {
let s3 = setup_s3_client(config)?;
let sqs = setup_sqs_client(config)?;
let naming = config.s3_naming.clone().unwrap_or(Naming::Hash);
let content_type = config.s3_content.clone().unwrap_or(ContentType::Cbor);
let group_id = config
.sqs_group_id
.clone()
.unwrap_or_else(|| "oura-sink".to_string());
let s3_prefix = config.s3_prefix.clone().unwrap_or_default();
Ok(CombinedClient {
s3,
sqs,
config: config.clone(),
naming,
content_type,
sqs_group_id: group_id,
s3_prefix,
})
}

pub async fn send_block(
self: &Self,
record: &BlockRecord,
tip: u64,
) -> Result<(), Error> {
let key = self.get_s3_key(record);
self.send_s3_object(&key, record).await?;
self.send_sqs_message(&key, record, tip).await?;
Ok(())
}

async fn send_s3_object(self: &Self, key: &str, record: &BlockRecord) -> Result<(), Error> {
let content_type: String = String::from(&self.content_type);
let content = encode_block(&self.content_type, record);
let req = self
.s3
.put_object()
.bucket(&self.config.s3_bucket)
.key(key)
.body(content)
.metadata("era", record.era.to_string())
.metadata("issuer_vkey", &record.issuer_vkey)
.metadata("tx_count", record.tx_count.to_string())
.metadata("slot", record.slot.to_string())
.metadata("hash", &record.hash)
.metadata("number", record.number.to_string())
.metadata("previous_hash", &record.previous_hash)
.content_type(content_type);

let res = req.send().await?;

log::trace!("S3 put response: {:?}", res);

Ok(())
}

async fn send_sqs_message(
self: &Self,
key: &str,
record: &BlockRecord,
tip: u64,
) -> Result<(), Error> {
let message = SqsMessage {
s3_key: key.to_string(),
block_hash: record.hash.to_string(),
previous_hash: record.previous_hash.to_string(),
block_number: record.number,
slot: record.slot,
tip: tip,
};

let body = json!(message).to_string();

let mut req = self
.sqs
.send_message()
.queue_url(&self.config.sqs_queue_url)
.message_body(body);

if self.config.sqs_fifo.unwrap_or_default() {
req = req
.message_group_id(&self.sqs_group_id)
.message_deduplication_id(key);
}

let res = req.send().await?;

log::trace!("SQS send response: {:?}", res);

Ok(())
}

fn get_s3_key(&self, record: &BlockRecord) -> String {
define_obj_key(&self.s3_prefix, &self.naming, record)
}
}

fn encode_block(content_type: &ContentType, record: &BlockRecord) -> S3ByteStream {
let hex = match record.cbor_hex.as_ref() {
Some(x) => x,
None => {
log::error!(
"found block record without CBOR, please enable CBOR in source mapper options"
);
panic!()
}
};

match content_type {
ContentType::Cbor => {
let cbor = hex::decode(hex).expect("valid hex value");
S3ByteStream::from(cbor)
}
ContentType::CborHex => S3ByteStream::from(hex.as_bytes().to_vec()),
ContentType::Json => {
let json = json!(record).to_string().as_bytes().to_vec();
S3ByteStream::from(json)
}
}
}

fn setup_s3_client(config: &Config) -> Result<S3Client, Error> {
let explicit_region = config.s3_region.to_owned();

let aws_config = tokio::runtime::Builder::new_current_thread()
.build()?
.block_on(
aws_config::from_env()
.region(S3Region::new(explicit_region))
.load(),
);

let retry_config = S3RetryConfig::new()
.with_max_attempts(config.s3_max_retries.unwrap_or(DEFAULT_MAX_RETRIES));

let s3_config = aws_sdk_s3::config::Builder::from(&aws_config)
.retry_config(retry_config)
.build();

Ok(S3Client::from_conf(s3_config))
}

fn setup_sqs_client(config: &Config) -> Result<SqsClient, Error> {
let explicit_region = config.sqs_region.to_owned();

let aws_config = tokio::runtime::Builder::new_current_thread()
.build()?
.block_on(
aws_config::from_env()
.region(SqsRegion::new(explicit_region))
.load(),
);

let retry_config = SqsRetryConfig::new()
.with_max_attempts(config.sqs_max_retries.unwrap_or(DEFAULT_MAX_RETRIES));

let sqs_config = aws_sdk_sqs::config::Builder::from(&aws_config)
.retry_config(retry_config)
.build();

Ok(SqsClient::from_conf(sqs_config))
}

fn define_obj_key(prefix: &str, policy: &Naming, record: &BlockRecord) -> String {
match policy {
Naming::Hash => format!("{}{}", prefix, record.hash),
Naming::SlotHash => format!("{}{}.{}", prefix, record.slot, record.hash),
Naming::BlockHash => format!("{}{}.{}", prefix, record.number, record.hash),
Naming::BlockNumber => format!("{}", record.number),
Naming::EpochHash => format!(
"{}{}.{}",
prefix,
record.epoch.unwrap_or_default(),
record.hash
),
Naming::EpochSlotHash => format!(
"{}{}.{}.{}",
prefix,
record.epoch.unwrap_or_default(),
record.slot,
record.hash
),
Naming::EpochBlockHash => {
format!(
"{}{}.{}.{}",
prefix,
record.epoch.unwrap_or_default(),
record.number,
record.hash
)
}
}
}
35 changes: 35 additions & 0 deletions src/sinks/aws_s3_sqs/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use serde::Deserialize;

#[derive(Deserialize, Debug, Clone)]
pub enum Naming {
Hash,
SlotHash,
BlockHash,
BlockNumber,
EpochHash,
EpochSlotHash,
EpochBlockHash,
}

#[derive(Deserialize, Debug, Clone)]
pub enum ContentType {
Cbor,
CborHex,
Json,
}

#[derive(Default, Debug, Deserialize, Clone)]
pub struct Config {
pub s3_region: String,
pub s3_bucket: String,
pub s3_prefix: Option<String>,
pub s3_naming: Option<Naming>,
pub s3_content: Option<ContentType>,
pub s3_max_retries: Option<u32>,

pub sqs_region: String,
pub sqs_queue_url: String,
pub sqs_fifo: Option<bool>,
pub sqs_group_id: Option<String>,
pub sqs_max_retries: Option<u32>,
}
7 changes: 7 additions & 0 deletions src/sinks/aws_s3_sqs/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
mod combined_client;
mod config;
mod run;
mod setup;

pub use self::config::*;
pub use setup::*;
39 changes: 39 additions & 0 deletions src/sinks/aws_s3_sqs/run.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use std::sync::Arc;

use crate::sinks::aws_s3_sqs::combined_client::CombinedClient;
use crate::{model::EventData, pipelining::StageReceiver, utils::Utils, Error};

pub(super) fn writer_loop(
input: StageReceiver,
client: CombinedClient,
utils: Arc<Utils>,
) -> Result<(), Error> {
let client = Arc::new(client);

let rt = tokio::runtime::Builder::new_current_thread()
.enable_time()
.enable_io()
.build()?;

for event in input.iter() {
if let EventData::Block(record) = &event.data {
let client = client.clone();
let tip = utils.tip.load(std::sync::atomic::Ordering::SeqCst);

let result = rt.block_on(async move { client.send_block(record, tip).await });

match result {
Ok(_) => {
// notify the pipeline where we are
utils.track_sink_progress(&event);
}
Err(err) => {
log::error!("unrecoverable error sending block to S3 and SQS: {:?}", err);
return Err(err);
}
}
}
}

Ok(())
}
21 changes: 21 additions & 0 deletions src/sinks/aws_s3_sqs/setup.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use super::config::Config;
use crate::sinks::aws_s3_sqs::combined_client::CombinedClient;
use crate::{
pipelining::{BootstrapResult, SinkProvider, StageReceiver},
utils::WithUtils,
};

use super::run::writer_loop;

impl SinkProvider for WithUtils<Config> {
fn bootstrap(&self, input: StageReceiver) -> BootstrapResult {
let client = CombinedClient::new(&self.inner)?;
let utils = self.utils.clone();

let handle = std::thread::spawn(move || {
writer_loop(input, client, utils).expect("writer loop failed")
});

Ok(handle)
}
}
3 changes: 3 additions & 0 deletions src/sinks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ pub mod aws_lambda;
#[cfg(feature = "aws")]
pub mod aws_s3;

#[cfg(feature = "aws")]
pub mod aws_s3_sqs;

#[cfg(feature = "redissink")]
pub mod redis;

Expand Down
2 changes: 2 additions & 0 deletions src/sources/n2c/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ impl ChainObserver {
let ready = self.chain_buffer.pop_with_depth(self.min_depth);
log::debug!("found {} points with required min depth", ready.len());

self.event_writer.utils.tip.store(tip.1, std::sync::atomic::Ordering::SeqCst);

// find confirmed block in memory and send down the pipeline
for point in ready {
let block = self
Expand Down
Loading