diff --git a/Cargo.lock b/Cargo.lock index cd034380f9..0dbab42f4a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -836,6 +836,7 @@ dependencies = [ "serde_json", "sha2 0.10.8", "tendermint-rpc", + "thiserror", "tokio", "tracing", "typenum", diff --git a/lib/chain-utils/Cargo.toml b/lib/chain-utils/Cargo.toml index c5617bddc1..f744b62505 100644 --- a/lib/chain-utils/Cargo.toml +++ b/lib/chain-utils/Cargo.toml @@ -28,3 +28,4 @@ chrono = { version = "0.4.26", default-features = false, features = ["alloc"] } hubble.workspace = true reqwest = "0.11.20" serde_json = "1.0.107" +thiserror = "1.0.49" diff --git a/lib/chain-utils/src/evm.rs b/lib/chain-utils/src/evm.rs index 526c213b79..6b0ae3c273 100644 --- a/lib/chain-utils/src/evm.rs +++ b/lib/chain-utils/src/evm.rs @@ -16,7 +16,7 @@ use ethers::{ contract::{ContractError, EthCall, EthLogDecode}, core::k256::ecdsa, middleware::SignerMiddleware, - providers::{Middleware, Provider, Ws}, + providers::{Middleware, Provider, ProviderError, Ws, WsClientError}, signers::{LocalWallet, Wallet}, utils::secret_key_to_address, }; @@ -279,11 +279,19 @@ impl Chain for Evm { } } +#[derive(Debug, thiserror::Error)] +pub enum EvmInitError { + #[error("unable to connect to websocket")] + Ws(#[from] WsClientError), + #[error("provider error")] + Provider(#[from] ProviderError), +} + impl Evm { - pub async fn new(config: Config) -> Self { - let provider = Provider::new(Ws::connect(config.eth_rpc_api).await.unwrap()); + pub async fn new(config: Config) -> Result { + let provider = Provider::new(Ws::connect(config.eth_rpc_api).await?); - let chain_id = provider.get_chainid().await.unwrap(); + let chain_id = provider.get_chainid().await?; let signing_key: ecdsa::SigningKey = config.signer.value(); let address = secret_key_to_address(&signing_key); @@ -292,7 +300,7 @@ impl Evm { let signer_middleware = Arc::new(SignerMiddleware::new(provider.clone(), wallet.clone())); - Self { + Ok(Self { chain_id: U256(chain_id), ibc_handler: IBCHandler::new(config.ibc_handler_address, signer_middleware.clone()), provider, @@ -305,7 +313,7 @@ impl Evm { hasura_config.secret, ) }), - } + }) } // TODO: Change to take a beacon slot instead of a height diff --git a/lib/chain-utils/src/lib.rs b/lib/chain-utils/src/lib.rs index 10e019f892..ffa6887efb 100644 --- a/lib/chain-utils/src/lib.rs +++ b/lib/chain-utils/src/lib.rs @@ -103,7 +103,14 @@ pub struct ChainEvent { } pub trait ClientState { - type ChainId: Debug + Display + PartialEq + Hash + Clone + Serialize + for<'de> Deserialize<'de>; + type ChainId: Debug + + Display + + PartialEq + + Eq + + Hash + + Clone + + Serialize + + for<'de> Deserialize<'de>; type Height: IsHeight; fn height(&self) -> Self::Height; diff --git a/lib/chain-utils/src/union.rs b/lib/chain-utils/src/union.rs index e7cde2ff26..89b98e88ae 100644 --- a/lib/chain-utils/src/union.rs +++ b/lib/chain-utils/src/union.rs @@ -1,9 +1,8 @@ -use std::{fmt::Display, str::FromStr}; +use std::{fmt::Display, num::ParseIntError, str::FromStr}; use ethers::prelude::k256::ecdsa; use futures::{stream, Future, FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; use prost::Message; -use protos::ibc::core::channel::v1::QueryPacketAcknowledgementRequest; use serde::{Deserialize, Serialize}; use sha2::Digest; use tendermint_rpc::{ @@ -290,27 +289,46 @@ impl Chain for Union { } } +#[derive(Debug, thiserror::Error)] +pub enum UnionInitError { + #[error("tendermint rpc error")] + Tendermint(#[from] tendermint_rpc::Error), + #[error( + "unable to parse chain id: expected format `-`, found `{found}`" + )] + // TODO: Once the `Id` trait in unionlabs is cleaned up to no longer use static id types, this error should just wrap `IdParseError` + ChainIdParse { + found: String, + #[source] + source: Option, + }, +} + impl Union { - pub async fn new(config: Config) -> Self { + pub async fn new(config: Config) -> Result { let (tm_client, driver) = WebSocketClient::builder(config.ws_url) .compat_mode(tendermint_rpc::client::CompatMode::V0_37) .build() - .await - .unwrap(); + .await?; tokio::spawn(async move { driver.run().await }); - let chain_id = tm_client - .status() - .await - .unwrap() - .node_info - .network - .to_string(); - - let chain_revision = chain_id.split('-').last().unwrap().parse().unwrap(); - - Self { + let chain_id = tm_client.status().await?.node_info.network.to_string(); + + let chain_revision = chain_id + .split('-') + .last() + .ok_or_else(|| UnionInitError::ChainIdParse { + found: chain_id.clone(), + source: None, + })? + .parse() + .map_err(|err| UnionInitError::ChainIdParse { + found: chain_id.clone(), + source: Some(err), + })?; + + Ok(Self { signer: CosmosAccountId::new(config.signer.value(), "union".to_string()), tm_client, chain_id, @@ -318,7 +336,7 @@ impl Union { prover_endpoint: config.prover_endpoint, grpc_url: config.grpc_url, fee_denom: config.fee_denom, - } + }) } pub async fn broadcast_tx_commit( diff --git a/lib/serde-utils/src/lib.rs b/lib/serde-utils/src/lib.rs index 5ed1ef828e..5eb7aec8e6 100644 --- a/lib/serde-utils/src/lib.rs +++ b/lib/serde-utils/src/lib.rs @@ -10,6 +10,7 @@ const HEX_ENCODING_PREFIX: &str = "0x"; pub enum FromHexStringError { Hex(FromHexError), MissingPrefix(String), + EmptyString, // NOTE: Contains the stringified error TryFromBytes(String), } @@ -18,6 +19,7 @@ impl std::error::Error for FromHexStringError { fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { match self { FromHexStringError::Hex(hex) => Some(hex), + FromHexStringError::EmptyString => None, FromHexStringError::MissingPrefix(_) => None, FromHexStringError::TryFromBytes(_) => None, } @@ -28,6 +30,7 @@ impl core::fmt::Display for FromHexStringError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { FromHexStringError::Hex(e) => write!(f, "{e}"), + FromHexStringError::EmptyString => write!(f, "cannot parse empty string as hex"), FromHexStringError::MissingPrefix(data) => write!( f, "missing prefix `{HEX_ENCODING_PREFIX}` when deserializing hex data '{data}'", @@ -51,7 +54,13 @@ where T: TryFrom>, >>::Error: Debug + 'static, { - match string.as_ref().strip_prefix(HEX_ENCODING_PREFIX.as_bytes()) { + let s = &string.as_ref(); + + if s.is_empty() { + return Err(FromHexStringError::EmptyString); + } + + match s.strip_prefix(HEX_ENCODING_PREFIX.as_bytes()) { Some(maybe_hex) => hex::decode(maybe_hex) .map_err(FromHexStringError::Hex)? .try_into() diff --git a/lib/unionlabs/src/bounded_int.rs b/lib/unionlabs/src/bounded_int.rs index aed29f2eb1..84870a3488 100644 --- a/lib/unionlabs/src/bounded_int.rs +++ b/lib/unionlabs/src/bounded_int.rs @@ -85,6 +85,21 @@ macro_rules! bounded_int { } } + impl std::str::FromStr for $Struct { + type Err = BoundedIntParseError<$ty>; + + fn from_str(s: &str) -> Result { + s.parse::<$ty>() + .map_err(BoundedIntParseError::Parse) + .and_then(|n| n.try_into().map_err(BoundedIntParseError::Value)) + } + } + + impl std::fmt::Display for $Struct { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } + } $( const _: () = assert!( @@ -129,6 +144,12 @@ pub struct BoundedIntError { found: T, } +#[derive(Debug, Clone, PartialEq)] +pub enum BoundedIntParseError { + Parse(std::num::ParseIntError), + Value(BoundedIntError), +} + bounded_int! { pub BoundedI8(i8); pub BoundedI16(i16); diff --git a/lib/unionlabs/src/tendermint/types/header.rs b/lib/unionlabs/src/tendermint/types/header.rs index cce2b8e52a..d4e54f47a4 100644 --- a/lib/unionlabs/src/tendermint/types/header.rs +++ b/lib/unionlabs/src/tendermint/types/header.rs @@ -16,6 +16,7 @@ pub struct Header { /// basic block info pub version: Consensus, pub chain_id: String, + #[serde(with = "::serde_utils::string")] pub height: BoundedI64<0, { i64::MAX }>, pub time: Timestamp, /// prev block info diff --git a/lib/unionlabs/src/tendermint/version/consensus.rs b/lib/unionlabs/src/tendermint/version/consensus.rs index 8d98d686ab..fc3ec11967 100644 --- a/lib/unionlabs/src/tendermint/version/consensus.rs +++ b/lib/unionlabs/src/tendermint/version/consensus.rs @@ -4,7 +4,9 @@ use crate::{Proto, TypeUrl}; #[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize)] pub struct Consensus { + #[serde(with = "::serde_utils::string")] pub block: u64, + #[serde(with = "::serde_utils::string", default)] pub app: u64, } diff --git a/voyager/src/chain.rs b/voyager/src/chain.rs index b717a418d5..048b4b771e 100644 --- a/voyager/src/chain.rs +++ b/voyager/src/chain.rs @@ -3,7 +3,11 @@ use std::{ str::FromStr, }; -use chain_utils::{evm::Evm, union::Union, Chain}; +use chain_utils::{ + evm::{Evm, EvmInitError}, + union::{Union, UnionInitError}, + Chain, +}; use futures::Future; use serde::{Deserialize, Serialize}; use unionlabs::{ @@ -36,12 +40,20 @@ pub enum AnyChain { EvmMinimal(Evm), } +#[derive(Debug, thiserror::Error)] +pub enum AnyChainTryFromConfigError { + #[error("error initializing a union chain")] + Union(#[from] UnionInitError), + #[error("error initializing an ethereum chain")] + Evm(#[from] EvmInitError), +} + impl AnyChain { pub async fn try_from_config( voyager_config: &config::VoyagerConfig, config: ChainConfig, - ) -> Self { - match config { + ) -> Result { + Ok(match config { ChainConfig::Evm(EvmChainConfig::Mainnet(evm)) => Self::EvmMainnet( Evm::::new(chain_utils::evm::Config { ibc_handler_address: evm.ibc_handler_address, @@ -50,7 +62,7 @@ impl AnyChain { eth_beacon_rpc_api: evm.eth_beacon_rpc_api, hasura_config: voyager_config.hasura.clone(), }) - .await, + .await?, ), ChainConfig::Evm(EvmChainConfig::Minimal(evm)) => Self::EvmMinimal( Evm::::new(chain_utils::evm::Config { @@ -60,7 +72,7 @@ impl AnyChain { eth_beacon_rpc_api: evm.eth_beacon_rpc_api, hasura_config: voyager_config.hasura.clone(), }) - .await, + .await?, ), ChainConfig::Union(union) => Self::Union( Union::new(chain_utils::union::Config { @@ -70,9 +82,9 @@ impl AnyChain { grpc_url: union.grpc_url, fee_denom: union.fee_denom, }) - .await, + .await?, ), - } + }) } } diff --git a/voyager/src/chain/union.rs b/voyager/src/chain/union.rs index a3fb2bd351..c2c359dc01 100644 --- a/voyager/src/chain/union.rs +++ b/voyager/src/chain/union.rs @@ -59,8 +59,8 @@ use crate::{ AcknowledgementPath, ChannelEndPath, ClientConsensusStatePath, ClientStatePath, CommitmentPath, ConnectionPath, IbcPath, StateProof, }, - try_from_relayer_msg, Chain, ChainOf, ClientStateOf, ConsensusStateOf, HeaderOf, HeightOf, - IbcStateRead, LightClient, QueryHeight, + try_from_relayer_msg, Chain, ClientStateOf, ConsensusStateOf, HeightOf, IbcStateRead, + LightClient, QueryHeight, }, msg::{ aggregate::{Aggregate, LightClientSpecificAggregate}, @@ -1258,3 +1258,72 @@ where ) } } + +#[test] +fn commit_json() { + let json = r#"{ + "header": { + "version": { + "block": "11" + }, + "chain_id": "union-devnet-1", + "height": "1", + "time": "2023-10-09T18:09:07.889054294Z", + "last_block_id": { + "hash": "", + "parts": { + "total": 0, + "hash": "" + } + }, + "last_commit_hash": "E3B0C44298FC1C149AFBF4C8996FB92427AE41E4649B934CA495991B7852B855", + "data_hash": "E3B0C44298FC1C149AFBF4C8996FB92427AE41E4649B934CA495991B7852B855", + "validators_hash": "F09E25471B41514B2F8B08B5F4C9093C5D6ED134E107FF491CED2374B947DF60", + "next_validators_hash": "F09E25471B41514B2F8B08B5F4C9093C5D6ED134E107FF491CED2374B947DF60", + "consensus_hash": "048091BC7DDC283F77BFBF91D73C44DA58C3DF8A9CBC867405D8B7F3DAADA22F", + "app_hash": "E3B0C44298FC1C149AFBF4C8996FB92427AE41E4649B934CA495991B7852B855", + "last_results_hash": "E3B0C44298FC1C149AFBF4C8996FB92427AE41E4649B934CA495991B7852B855", + "evidence_hash": "E3B0C44298FC1C149AFBF4C8996FB92427AE41E4649B934CA495991B7852B855", + "proposer_address": "196D6009588DA28CF40039C957A53B08104723F9" + }, + "commit": { + "height": "1", + "round": 0, + "block_id": { + "hash": "A991A3E42488481603B9409954F3C2B82A25052D9ABE2344EF536CF6F0EA628D", + "parts": { + "total": 1, + "hash": "9F5AEC29C585A4BFFDEE07544F0CB89B87956875F205E581AE1304308670BF02" + } + }, + "signatures": [ + { + "block_id_flag": 2, + "validator_address": "196D6009588DA28CF40039C957A53B08104723F9", + "timestamp": "2023-10-11T09:43:54.957285095Z", + "signature": "0gLS0JY7Nvu1gjZoLM+YD8l1K+Uaxubqx+Jh7+Zvr+wsaMsM1FAoo3PtYPBoa9BN5r0a/j/pxBqi6qOcuJbWeQ==" + }, + { + "block_id_flag": 2, + "validator_address": "36E1644D94064ED11521041E9138A0D1CCA9C31C", + "timestamp": "2023-10-11T09:43:54.780686805Z", + "signature": "2ZhlWMRC3YnPit8dccpgrQXKN0RGFt/t+a9njRerONormIOQ5KBNq97UP9T4AugaJe3Kmz+h5AfKtY0WCcaIgg==" + }, + { + "block_id_flag": 2, + "validator_address": "4CE57693C82B50F830731DAB14FA759327762456", + "timestamp": "2023-10-11T09:43:54.782517372Z", + "signature": "4SO9mmDDzGZjTGUPDGnPA0KREBNHLUZelTZZQVUCplQtpmG/44FEL4V3o2l6sekdlWY1f0z22efWdh4mIXZ2Ng==" + }, + { + "block_id_flag": 2, + "validator_address": "EFB1D8B3A56D97F2AB24AC5F0B04F48535F74DA9", + "timestamp": "2023-10-11T09:43:54.853391475Z", + "signature": "r9B04fLXFnuLnj09MQrRsDUBCfuA+bgF2SSlqa0WN00CSNuph+oON9Qp0ua761NvvRqA82/GyTq4vjLzdaZhrQ==" + } + ] + } + }"#; + + dbg!(serde_json::from_str::(json)); +} diff --git a/voyager/src/config.rs b/voyager/src/config.rs index 0842217a90..fcfd9f9015 100644 --- a/voyager/src/config.rs +++ b/voyager/src/config.rs @@ -7,7 +7,10 @@ use serde::{Deserialize, Serialize}; use tendermint_rpc::WebSocketClientUrl; use unionlabs::ethereum::Address; -use crate::{chain::AnyChain, queue::Queue}; +use crate::{ + chain::{AnyChain, AnyChainTryFromConfigError}, + queue::Queue, +}; #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(bound(serialize = "", deserialize = ""))] @@ -24,14 +27,24 @@ pub struct VoyagerConfig { } impl Config { - pub async fn get_chain(&self, name: &str) -> Option { + pub async fn get_chain(&self, name: &str) -> Result { match self.chain.get(name) { - Some(config) => Some(AnyChain::try_from_config(&self.voyager, config.clone()).await), - None => None, + Some(config) => Ok(AnyChain::try_from_config(&self.voyager, config.clone()).await?), + None => Err(GetChainError::ChainNotFound { + name: name.to_string(), + }), } } } +#[derive(Debug, thiserror::Error)] +pub enum GetChainError { + #[error("chain `{name}` not found in config")] + ChainNotFound { name: String }, + #[error("error initializing chain")] + AnyChainTryFromConfig(#[from] AnyChainTryFromConfigError), +} + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "snake_case", tag = "chain_type")] pub enum ChainConfig { diff --git a/voyager/src/main.rs b/voyager/src/main.rs index b0136cb9cc..937cf8bc15 100644 --- a/voyager/src/main.rs +++ b/voyager/src/main.rs @@ -7,8 +7,9 @@ clippy::manual_async_fn, clippy::module_name_repetitions )] +// #![deny(clippy::unwrap_used)] -use std::fs::read_to_string; +use std::{ffi::OsString, fs::read_to_string, process::ExitCode}; use chain_utils::{evm::Evm, union::Union}; use clap::Parser; @@ -17,8 +18,8 @@ use unionlabs::ethereum_consts_traits::Mainnet; use crate::{ chain::AnyChain, cli::{AppArgs, Command, QueryCmd}, - config::Config, - queue::{AnyQueue, Voyager}, + config::{Config, GetChainError}, + queue::{AnyQueue, Voyager, VoyagerInitError}, }; pub const DELAY_PERIOD: u64 = 0; @@ -33,27 +34,67 @@ pub mod queue; pub mod chain; #[tokio::main] -async fn main() -> Result<(), anyhow::Error> { +async fn main() -> ExitCode { tracing_subscriber::fmt::init(); let args = AppArgs::parse(); - do_main(args).await + match do_main(args).await { + Ok(()) => ExitCode::SUCCESS, + Err(err) => { + eprintln!("Error: {err}"); + ExitCode::FAILURE + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum VoyagerError { + #[error("unable to read the config file at {}", path.to_string_lossy())] + ConfigFileNotFound { + path: OsString, + #[source] + source: std::io::Error, + }, + #[error("unable to parse the config file at {}", path.to_string_lossy())] + ConfigFileParse { + path: OsString, + #[source] + source: serde_json::Error, + }, + #[error("error retrieving a chain from the config")] + GetChain(#[from] GetChainError), + #[error("error initializing voyager")] + Init(#[from] VoyagerInitError), } #[allow(clippy::too_many_lines)] // NOTE: This function is a mess, will be cleaned up -async fn do_main(args: cli::AppArgs) -> Result<(), anyhow::Error> { +async fn do_main(args: cli::AppArgs) -> Result<(), VoyagerError> { let voyager_config = read_to_string(&args.config_file_path) - .map(|s| serde_json::from_str::>(&s).unwrap()) - .unwrap(); + .map_err(|err| VoyagerError::ConfigFileNotFound { + path: args.config_file_path.clone(), + source: err, + }) + .and_then(|s| { + serde_json::from_str::>(&s).map_err(|err| { + VoyagerError::ConfigFileParse { + path: args.config_file_path, + source: err, + } + }) + })?; match args.command { Command::PrintConfig => { - println!("{}", serde_json::to_string_pretty(&voyager_config).unwrap()); + println!( + "{}", + serde_json::to_string_pretty(&voyager_config) + .expect("config serialization is infallible; qed;") + ); } Command::Relay => { - let queue = Voyager::new(voyager_config.clone()).await; + let queue = Voyager::new(voyager_config.clone()).await?; queue.run().await; } @@ -64,7 +105,7 @@ async fn do_main(args: cli::AppArgs) -> Result<(), anyhow::Error> { module_address, port_id, } => { - let chain = voyager_config.get_chain(&on).await.unwrap(); + let chain = voyager_config.get_chain(&on).await?; match chain { AnyChain::EvmMinimal(evm) => { @@ -83,7 +124,7 @@ async fn do_main(args: cli::AppArgs) -> Result<(), anyhow::Error> { port_id, channel_id, } => { - let chain = voyager_config.get_chain(&on).await.unwrap(); + let chain = voyager_config.get_chain(&on).await?; match chain { AnyChain::EvmMinimal(evm) => { @@ -102,7 +143,7 @@ async fn do_main(args: cli::AppArgs) -> Result<(), anyhow::Error> { _ => panic!("not supported"), }, Command::Query { on, at, cmd } => { - let on = voyager_config.get_chain(&on).await.unwrap(); + let on = voyager_config.get_chain(&on).await?; match cmd { QueryCmd::IbcPath(path) => { diff --git a/voyager/src/queue.rs b/voyager/src/queue.rs index 2664ea73d5..9cd8cfb542 100644 --- a/voyager/src/queue.rs +++ b/voyager/src/queue.rs @@ -20,7 +20,7 @@ use futures::{ use hubble::hasura::{Datastore, HasuraDataStore, InsertDemoTx}; use pg_queue::ProcessFlow; use serde::{de::DeserializeOwned, Deserialize, Serialize}; -use sqlx::{error::BoxDynError, PgPool}; +use sqlx::PgPool; use unionlabs::{ ethereum_consts_traits::{Mainnet, Minimal}, events::{ @@ -56,7 +56,7 @@ use crate::{ CommitmentPath, ConnectionPath, IbcStateRead, }, union::{EthereumMainnet, EthereumMinimal}, - AnyChain, ChainOf, HeightOf, LightClient, QueryHeight, + AnyChain, AnyChainTryFromConfigError, ChainOf, HeightOf, LightClient, QueryHeight, }, config::Config, msg::{ @@ -117,7 +117,7 @@ pub struct Voyager { pub trait Queue: Clone + Send + Sync + Sized { /// Error type returned by this queue, representing errors that are out of control of the consumer (i.e. unable to connect to database, can't insert into row, can't deserialize row, etc) - type Error: Debug + Display + Error; + type Error: Debug + Display + Error + 'static; type Config: Debug + Clone + Serialize + DeserializeOwned; fn new(cfg: Self::Config) -> impl Future>; @@ -147,10 +147,9 @@ pub enum AnyQueue { } #[derive(Debug, thiserror::Error)] +#[error(transparent)] pub enum AnyQueueError { - #[error("{0}")] InMemory(#[from] ::Error), - #[error("{0}")] PgQueue(#[from] ::Error), } @@ -267,8 +266,18 @@ impl Queue for PgQueue { } } +#[derive(Debug, thiserror::Error)] +pub enum VoyagerInitError { + #[error("multiple configured chains have the same chain id `{chain_id}`")] + DuplicateChainId { chain_id: String }, + #[error("error initializing chain")] + ChainInit(#[from] AnyChainTryFromConfigError), + #[error("error initializing queue")] + QueueInit(#[source] Q::Error), +} + impl Voyager { - pub async fn new(config: Config) -> Self { + pub async fn new(config: Config) -> Result> { if config.voyager.hasura.is_none() { tracing::warn!("no hasura config supplied, no messages will be indexed"); } @@ -277,13 +286,26 @@ impl Voyager { let mut evm_minimal = HashMap::new(); let mut evm_mainnet = HashMap::new(); + fn insert_into_chain_map( + map: &mut HashMap<<::SelfClientState as ClientState>::ChainId, C>, + chain: C, + ) -> Result<<::SelfClientState as ClientState>::ChainId, VoyagerInitError> + { + let chain_id = chain.chain_id(); + map.insert(chain_id.clone(), chain) + .map_or(Ok(chain_id), |prev| { + Err(VoyagerInitError::DuplicateChainId { + chain_id: prev.chain_id().to_string(), + }) + }) + } + for (chain_name, chain_config) in config.chain { - let chain = AnyChain::try_from_config(&config.voyager, chain_config).await; + let chain = AnyChain::try_from_config(&config.voyager, chain_config).await?; match chain { AnyChain::Union(c) => { - let chain_id = c.chain_id.clone(); - assert!(union.insert(c.chain_id.clone(), c).is_none()); + let chain_id = insert_into_chain_map(&mut union, c)?; tracing::info!( chain_name, @@ -293,8 +315,7 @@ impl Voyager { ); } AnyChain::EvmMainnet(c) => { - let chain_id = c.chain_id; - assert!(evm_mainnet.insert(c.chain_id, c).is_none()); + let chain_id = insert_into_chain_map(&mut evm_mainnet, c)?; tracing::info!( chain_name, @@ -304,8 +325,7 @@ impl Voyager { ); } AnyChain::EvmMinimal(c) => { - let chain_id = c.chain_id; - assert!(evm_minimal.insert(c.chain_id, c).is_none()); + let chain_id = insert_into_chain_map(&mut evm_minimal, c)?; tracing::info!( chain_name, @@ -317,7 +337,11 @@ impl Voyager { } } - Self { + let queue = Q::new(config.voyager.queue) + .await + .map_err(VoyagerInitError::QueueInit)?; + + Ok(Self { evm_minimal, evm_mainnet, union, @@ -326,8 +350,8 @@ impl Voyager { .voyager .hasura .map(|hc| HasuraDataStore::new(reqwest::Client::new(), hc.url, hc.secret)), - queue: Q::new(config.voyager.queue).await.unwrap(), - } + queue, + }) } pub async fn run(self) { diff --git a/voyager/src/queue/aggregate_data.rs b/voyager/src/queue/aggregate_data.rs index 24a35f48b5..800dc6a2fd 100644 --- a/voyager/src/queue/aggregate_data.rs +++ b/voyager/src/queue/aggregate_data.rs @@ -12,7 +12,7 @@ pub fn do_aggregate>( event: T, data: VecDeque, ) -> RelayerMsg { - let data_json = serde_json::to_string(&data).unwrap(); + let data_json = serde_json::to_string(&data).expect("serialization should not fail"); tracing::info!(%data_json, "aggregating data"); @@ -55,12 +55,17 @@ impl HListTryFromIterator for HCons where T: TryFrom + Into, Tail: HListTryFromIterator, + // REVIEW: Should debug be used instead? U: Serialize, { fn try_from_iter(vec: VecDeque) -> Result> { match pluck::(vec) { ControlFlow::Continue(not_found) => { - tracing::warn!(not_found = %serde_json::to_string(¬_found).unwrap(), "type didn't match"); + tracing::error!( + not_found = %serde_json::to_string(¬_found).expect("serialization should not fail"), + "type didn't match" + ); + Err(not_found) } ControlFlow::Break((vec, u)) => Ok(HCons { diff --git a/voyager/src/queue/msg_server.rs b/voyager/src/queue/msg_server.rs index be1078ce0f..ec95f43745 100644 --- a/voyager/src/queue/msg_server.rs +++ b/voyager/src/queue/msg_server.rs @@ -37,7 +37,7 @@ impl EventSource for MsgServer { Json(msg): Json, ) -> StatusCode { tracing::info!(?msg, "received msg"); - sender.send(msg).unwrap(); + sender.send(msg).expect("receiver should not close"); StatusCode::OK } @@ -49,7 +49,7 @@ impl EventSource for MsgServer { ) -> StatusCode { tracing::info!(?msgs, "received msgs"); for msg in msgs { - sender.send(msg).unwrap(); + sender.send(msg).expect("receiver should not close"); } StatusCode::OK @@ -57,7 +57,8 @@ impl EventSource for MsgServer { tokio::spawn( // TODO: Make this configurable - axum::Server::bind(&"0.0.0.0:65534".parse().unwrap()).serve(app.into_make_service()), + axum::Server::bind(&"0.0.0.0:65534".parse().expect("valid SocketAddr; qed;")) + .serve(app.into_make_service()), ); UnboundedReceiverStream::new(rx).map(Ok)