Skip to content

Commit

Permalink
fix: implement in-memory queue properly (#787)
Browse files Browse the repository at this point in the history
closes #785
  • Loading branch information
benluelo authored Oct 10, 2023
2 parents 817fe88 + 56ecd93 commit d8b4dc3
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 31 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions voyager-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
"fee_denom": "stake",
"ws_url": "ws://localhost:26657/websocket",
"prover_endpoint": "https://galois-devnet.cryptware.io:443",
"dump_path": "dump",
"grpc_url": "http://localhost:9090"
}
},
"voyager": {
"hasura": null
},
"queue": {
"database_url": "postgres://postgres:postgrespassword@localhost:5432/default"
"hasura": null,
"queue": {
"type": "pg-queue",
"database_url": "postgres://postgres:postgrespassword@localhost:5432/default"
}
}
}
1 change: 1 addition & 0 deletions voyager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ displaydoc = { version = "0.2.4", default-features = false }
frame-support-procedural = "18.0.0"
hubble.workspace = true
sqlx = { version = "0.7.2", features = ["postgres"] }
thiserror = "1.0.49"

[features]
eth-mainnet = [ "unionlabs/eth-mainnet" ]
5 changes: 3 additions & 2 deletions voyager/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::{
msg::Msg,
DoAggregate, RelayerMsg,
},
queue::Queue,
};

pub mod evm;
Expand All @@ -36,8 +37,8 @@ pub enum AnyChain {
}

impl AnyChain {
pub async fn try_from_config(
voyager_config: &config::VoyagerConfig,
pub async fn try_from_config<Q: Queue>(
voyager_config: &config::VoyagerConfig<Q>,
config: ChainConfig,
) -> Self {
match config {
Expand Down
5 changes: 1 addition & 4 deletions voyager/src/chain/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,10 +443,7 @@ impl LightClient for EthereumMainnet {
}

fn from_chain(chain: Self::HostChain) -> Self {
Self {
// dumper: Dumper::new(chain.dump_path.clone()),
chain,
}
Self { chain }
}

fn query_client_state(
Expand Down
10 changes: 4 additions & 6 deletions voyager/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,25 @@ use std::collections::BTreeMap;

use chain_utils::private_key::PrivateKey;
use ethers::prelude::k256::ecdsa;
use frame_support_procedural::{CloneNoBound, DebugNoBound, DefaultNoBound};
use hubble::hasura::HasuraConfig;
use serde::{Deserialize, Serialize};
use tendermint_rpc::WebSocketClientUrl;
use unionlabs::ethereum::Address;

use crate::{chain::AnyChain, queue::Queue};

#[derive(DebugNoBound, CloneNoBound, DefaultNoBound, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(bound(serialize = "", deserialize = ""))]
pub struct Config<Q: Queue> {
/// Map of chain name to it's respective config.
pub chain: BTreeMap<String, ChainConfig>,
pub voyager: VoyagerConfig,
pub queue: Q::Config,
pub voyager: VoyagerConfig<Q>,
}

#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct VoyagerConfig {
pub struct VoyagerConfig<Q: Queue> {
pub hasura: Option<HasuraConfig>,
pub queue: Q::Config,
}

impl<Q: Queue> Config<Q> {
Expand Down Expand Up @@ -68,7 +67,6 @@ pub struct UnionChainConfig {
pub fee_denom: String,
pub ws_url: WebSocketClientUrl,
pub prover_endpoint: String,
pub dump_path: String,
pub grpc_url: String,
}

Expand Down
8 changes: 4 additions & 4 deletions voyager/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::{
chain::AnyChain,
cli::{AppArgs, Command, IbcCmd, IbcQueryCmd},
config::Config,
queue::{InMemoryQueue, PgQueue, Voyager},
queue::{AnyQueue, InMemoryQueue, PgQueue, Voyager},
};

pub const DELAY_PERIOD: u64 = 0;
Expand All @@ -44,9 +44,9 @@ async fn main() -> Result<(), anyhow::Error> {
#[allow(clippy::too_many_lines)]
// NOTE: This function is a mess, will be cleaned up
async fn do_main(args: cli::AppArgs) -> Result<(), anyhow::Error> {
let voyager_config = read_to_string(&args.config_file_path).map_or(Config::default(), |s| {
serde_json::from_str::<Config<PgQueue>>(&s).unwrap()
});
let voyager_config = read_to_string(&args.config_file_path)
.map(|s| serde_json::from_str::<Config<AnyQueue>>(&s).unwrap())
.unwrap();

match args.command {
Command::PrintConfig => {
Expand Down
82 changes: 72 additions & 10 deletions voyager/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{
fmt::{Debug, Display},
marker::PhantomData,
ops::Add,
sync::{Arc, Mutex},
time::{Duration, SystemTime, UNIX_EPOCH},
};

Expand All @@ -13,11 +14,13 @@ use chain_utils::{
Chain, ClientState, EventSource,
};
use frunk::{hlist_pat, HList};
use futures::{future::BoxFuture, stream, Future, FutureExt, StreamExt, TryStreamExt};
use futures::{
future::BoxFuture, stream, Future, FutureExt, StreamExt, TryFutureExt, TryStreamExt,
};
use hubble::hasura::{Datastore, HasuraDataStore, InsertDemoTx};
use pg_queue::ProcessFlow;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use sqlx::PgPool;
use sqlx::{error::BoxDynError, PgPool};
use unionlabs::{
ethereum_consts_traits::{Mainnet, Minimal},
events::{
Expand Down Expand Up @@ -115,7 +118,7 @@ pub struct Voyager<Q> {
pub trait Queue: Clone + Send + Sync + Sized {
/// Error type returned by this queue, representing errors that are out of control of the consumer (i.e. unable to connect to database, can't insert into row, can't deserialize row, etc)
type Error: Debug + Display + Error;
type Config: Debug + Clone + PartialEq + Default + Serialize + DeserializeOwned;
type Config: Debug + Clone + Serialize + DeserializeOwned;

fn new(cfg: Self::Config) -> impl Future<Output = Result<Self, Self::Error>>;

Expand All @@ -130,19 +133,76 @@ pub trait Queue: Clone + Send + Sync + Sized {
Fut: Future<Output = ProcessFlow<RelayerMsg>> + 'a;
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case", tag = "type")]
pub enum AnyQueueConfig {
InMemory,
PgQueue(<PgQueue as Queue>::Config),
}

#[derive(Debug, Clone)]
pub enum AnyQueue {
InMemory(InMemoryQueue),
PgQueue(PgQueue),
}

#[derive(Debug, thiserror::Error)]
pub enum AnyQueueError {
#[error("{0}")]
InMemory(#[from] <InMemoryQueue as Queue>::Error),
#[error("{0}")]
PgQueue(#[from] <PgQueue as Queue>::Error),
}

impl Queue for AnyQueue {
type Error = AnyQueueError;
type Config = AnyQueueConfig;

fn new(cfg: Self::Config) -> impl Future<Output = Result<Self, Self::Error>> {
async move {
Ok(match cfg {
AnyQueueConfig::InMemory => Self::InMemory(InMemoryQueue::new(()).await?),
AnyQueueConfig::PgQueue(cfg) => Self::PgQueue(PgQueue::new(cfg).await?),
})
}
}

fn enqueue(&mut self, item: RelayerMsg) -> impl Future<Output = Result<(), Self::Error>> + '_ {
async move {
Ok(match self {
AnyQueue::InMemory(queue) => queue.enqueue(item).await?,
AnyQueue::PgQueue(queue) => queue.enqueue(item).await?,
})
}
}

fn process<'a, F, Fut>(&'a mut self, f: F) -> impl Future<Output = Result<(), Self::Error>> + '_
where
F: (FnOnce(RelayerMsg) -> Fut) + 'a,
Fut: Future<Output = ProcessFlow<RelayerMsg>> + 'a,
{
async move {
Ok(match self {
AnyQueue::InMemory(queue) => queue.process(f).await?,
AnyQueue::PgQueue(queue) => queue.process(f).await?,
})
}
}
}

#[derive(Debug, Clone)]
pub struct InMemoryQueue(VecDeque<RelayerMsg>);
pub struct InMemoryQueue(Arc<Mutex<VecDeque<RelayerMsg>>>);

impl Queue for InMemoryQueue {
type Error = std::convert::Infallible;
type Config = ();

fn new(_cfg: Self::Config) -> impl Future<Output = Result<Self, Self::Error>> {
futures::future::ok(Self(VecDeque::default()))
futures::future::ok(Self(Arc::new(Mutex::new(VecDeque::default()))))
}

fn enqueue(&mut self, item: RelayerMsg) -> impl Future<Output = Result<(), Self::Error>> + '_ {
self.0.push_back(item);
self.0.lock().expect("mutex is poisoned").push_back(item);
futures::future::ok(())
}

Expand All @@ -152,14 +212,16 @@ impl Queue for InMemoryQueue {
Fut: Future<Output = ProcessFlow<RelayerMsg>> + 'a,
{
async move {
match self.0.pop_front() {
let queue = &mut self.0.lock().expect("mutex is poisoned");

match queue.pop_front() {
Some(msg) => match f(msg.clone()).await {
ProcessFlow::Success(new_msgs) => {
self.0.extend(new_msgs);
queue.extend(new_msgs);
Ok(())
}
ProcessFlow::Requeue => {
self.0.push_front(msg);
queue.push_front(msg);
Ok(())
}
ProcessFlow::Fail(why) => panic!("{why}"),
Expand Down Expand Up @@ -264,7 +326,7 @@ impl<Q: Queue> Voyager<Q> {
.voyager
.hasura
.map(|hc| HasuraDataStore::new(reqwest::Client::new(), hc.url, hc.secret)),
queue: Q::new(config.queue).await.unwrap(),
queue: Q::new(config.voyager.queue).await.unwrap(),
}
}

Expand Down

0 comments on commit d8b4dc3

Please sign in to comment.