Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove chain runtime #3341

Closed
wants to merge 12 commits into from
3 changes: 3 additions & 0 deletions .changelog/unreleased/improvements/3341-chain-runtime.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
- Emit (g)RPC queries concurrently for increased throughput when
relaying on multiple chains/channels with a single Hermes instance
([\#3341](https://github.com/informalsystems/hermes/issues/3341))
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/relayer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ generic-array = "0.14.7"
secp256k1 = { version = "0.27.0", features = ["rand-std"] }
strum = { version = "0.24.1", features = ["derive"] }
once_cell = "1.17.1"
parking_lot = "0.12.1"

[dependencies.byte-unit]
version = "4.0.19"
Expand Down
1 change: 0 additions & 1 deletion crates/relayer/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ pub mod counterparty;
pub mod endpoint;
pub mod handle;
pub mod requests;
pub mod runtime;
pub mod tracking;

use serde::{de::Error, Deserialize, Serialize};
Expand Down
138 changes: 94 additions & 44 deletions crates/relayer/src/chain/cosmos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ use futures::future::join_all;
use num_bigint::BigInt;
use std::{cmp::Ordering, thread};

use parking_lot::{Mutex, RwLock};
use tokio::runtime::Runtime as TokioRuntime;
use tonic::codegen::http::Uri;
use tonic::metadata::AsciiMetadataValue;
use tokio::sync::RwLock as AsyncRwLock;
use tonic::{codegen::http::Uri, metadata::AsciiMetadataValue};
use tracing::{error, instrument, trace, warn};

use ibc_proto::cosmos::{
Expand Down Expand Up @@ -144,12 +145,12 @@ pub struct CosmosSdkChain {
grpc_addr: Uri,
light_client: TmLightClient,
rt: Arc<TokioRuntime>,
keybase: KeyRing<Secp256k1KeyPair>,
keybase: RwLock<KeyRing<Secp256k1KeyPair>>,
romac marked this conversation as resolved.
Show resolved Hide resolved

/// A cached copy of the account information
account: Option<Account>,
account: AsyncRwLock<Option<Account>>,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's the lock for the account


tx_monitor_cmd: Option<TxMonitorCmd>,
tx_monitor_cmd: Mutex<Option<TxMonitorCmd>>,
}

impl CosmosSdkChain {
Expand All @@ -164,7 +165,8 @@ impl CosmosSdkChain {
}

fn key(&self) -> Result<Secp256k1KeyPair, Error> {
self.keybase()
self.keybase
.read()
.get_key(&self.config.key_name)
.map_err(Error::key_base)
}
Expand Down Expand Up @@ -286,7 +288,7 @@ impl CosmosSdkChain {
Ok(())
}

fn init_event_monitor(&mut self) -> Result<TxMonitorCmd, Error> {
fn init_event_monitor(&self) -> Result<TxMonitorCmd, Error> {
crate::time!(
"init_event_monitor",
{
Expand Down Expand Up @@ -641,7 +643,7 @@ impl CosmosSdkChain {
),
)]
async fn do_send_messages_and_wait_commit(
&mut self,
&self,
tracked_msgs: TrackedMsgs,
) -> Result<Vec<IbcEventWithHeight>, Error> {
crate::time!(
Expand All @@ -656,8 +658,10 @@ impl CosmosSdkChain {
let key_pair = self.key()?;
let key_account = key_pair.account();

let account =
get_or_fetch_account(&self.grpc_addr, &key_account, &mut self.account).await?;
// Take a write lock on the current account,
// excluding anybody from sending txs until we are done.
let mut opt_account = self.account.write().await;
let account = get_or_fetch_account(&self.grpc_addr, &key_account, &mut opt_account).await?;

if self.config.sequential_batch_tx {
sequential_send_batched_messages_and_wait_commit(
Expand Down Expand Up @@ -692,7 +696,7 @@ impl CosmosSdkChain {
),
)]
async fn do_send_messages_and_wait_check_tx(
&mut self,
&self,
tracked_msgs: TrackedMsgs,
) -> Result<Vec<Response>, Error> {
crate::time!(
Expand All @@ -707,8 +711,10 @@ impl CosmosSdkChain {
let key_pair = self.key()?;
let key_account = key_pair.account();

let account =
get_or_fetch_account(&self.grpc_addr, &key_account, &mut self.account).await?;
// Take a write lock on the current account,
// excluding anybody from sending txs until we are done.
let mut opt_account = self.account.write().await;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We take the lock here for the remaining duration of the function, ie. until the tx has been submitted.

let account = get_or_fetch_account(&self.grpc_addr, &key_account, &mut opt_account).await?;

send_batched_messages_and_wait_check_tx(
&self.rpc_client,
Expand All @@ -721,6 +727,33 @@ impl CosmosSdkChain {
.await
}

async fn do_maybe_register_counterparty_payee(
&self,
channel_id: &ChannelId,
port_id: &PortId,
counterparty_payee: &Signer,
) -> Result<(), Error> {
let address = self.get_signer()?;
let key_pair = self.key()?;

// Take a write lock on the current account,
// excluding anybody from sending txs until we are done.
let mut opt_account = self.account.write().await;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We take the lock here for the remaining of the function, ie. until the counterparty payee has been registered.


maybe_register_counterparty_payee(
&self.rpc_client,
&self.tx_config,
&key_pair,
&mut opt_account,
&self.config.memo_prefix,
channel_id,
port_id,
&address,
counterparty_payee,
)
.await
}

fn query_packet_from_block(
&self,
request: &QueryPacketEventDataRequest,
Expand Down Expand Up @@ -875,38 +908,54 @@ impl ChainEndpoint for CosmosSdkChain {
grpc_addr,
light_client,
rt,
keybase,
tx_config,
account: None,
tx_monitor_cmd: None,
keybase: RwLock::new(keybase),
account: AsyncRwLock::new(None),
tx_monitor_cmd: Mutex::new(None),
};

Ok(chain)
}

fn shutdown(self) -> Result<(), Error> {
if let Some(monitor_tx) = self.tx_monitor_cmd {
fn shutdown(&self) -> Result<(), Error> {
let cmd = self.tx_monitor_cmd.lock();

if let Some(monitor_tx) = cmd.as_ref() {
monitor_tx.shutdown().map_err(Error::event_monitor)?;
}

Ok(())
}

fn keybase(&self) -> &KeyRing<Self::SigningKeyPair> {
&self.keybase
fn get_key(&self) -> Result<Self::SigningKeyPair, Error> {
// Get the key from key seed file
let key_pair = self
.keybase
.read()
.get_key(&self.config().key_name)
.map_err(|e| Error::key_not_found(self.config().key_name.clone(), e))?;

Ok(key_pair)
}

fn keybase_mut(&mut self) -> &mut KeyRing<Self::SigningKeyPair> {
&mut self.keybase
fn add_key(&self, key_name: &str, key_pair: Self::SigningKeyPair) -> Result<(), Error> {
self.keybase
.write()
.add_key(key_name, key_pair)
.map_err(Error::key_base)?;

Ok(())
}

fn subscribe(&mut self) -> Result<Subscription, Error> {
let tx_monitor_cmd = match &self.tx_monitor_cmd {
fn subscribe(&self) -> Result<Subscription, Error> {
let mut cmd = self.tx_monitor_cmd.lock();

let tx_monitor_cmd = match cmd.as_ref() {
Some(tx_monitor_cmd) => tx_monitor_cmd,
None => {
let tx_monitor_cmd = self.init_event_monitor()?;
self.tx_monitor_cmd = Some(tx_monitor_cmd);
self.tx_monitor_cmd.as_ref().unwrap()
*cmd = Some(tx_monitor_cmd);
cmd.as_ref().unwrap()
}
};

Expand Down Expand Up @@ -947,7 +996,7 @@ impl ChainEndpoint for CosmosSdkChain {

/// Fetch a header from the chain at the given height and verify it.
fn verify_header(
&mut self,
&self,
trusted: ICSHeight,
target: ICSHeight,
client_state: &AnyClientState,
Expand All @@ -967,7 +1016,7 @@ impl ChainEndpoint for CosmosSdkChain {
/// Given a client update event that includes the header used in a client update,
/// look for misbehaviour by fetching a header at same or latest height.
fn check_misbehaviour(
&mut self,
&self,
update: &UpdateClient,
client_state: &AnyClientState,
) -> Result<Option<MisbehaviourEvidence>, Error> {
Expand All @@ -992,7 +1041,7 @@ impl ChainEndpoint for CosmosSdkChain {
/// TODO - more work is required here for a smarter split maybe iteratively accumulating/ evaluating
/// msgs in a Tx until any of the max size, max num msgs, max fee are exceeded.
fn send_messages_and_wait_commit(
&mut self,
&self,
tracked_msgs: TrackedMsgs,
) -> Result<Vec<IbcEventWithHeight>, Error> {
let runtime = self.rt.clone();
Expand All @@ -1001,7 +1050,7 @@ impl ChainEndpoint for CosmosSdkChain {
}

fn send_messages_and_wait_check_tx(
&mut self,
&self,
tracked_msgs: TrackedMsgs,
) -> Result<Vec<Response>, Error> {
let runtime = self.rt.clone();
Expand Down Expand Up @@ -1033,9 +1082,14 @@ impl ChainEndpoint for CosmosSdkChain {
// If a key_name is given, extract the account hash.
// Else retrieve the account from the configuration file.
let key = match key_name {
Some(key_name) => self.keybase().get_key(key_name).map_err(Error::key_base)?,
None => self.key()?,
Some(key_name) => self
.keybase
.read()
.get_key(key_name)
.map_err(Error::key_base)?,
};

let account = key.account();

let denom = denom.unwrap_or(&self.config.gas_price.denom);
Expand All @@ -1048,9 +1102,14 @@ impl ChainEndpoint for CosmosSdkChain {
// If a key_name is given, extract the account hash.
// Else retrieve the account from the configuration file.
let key = match key_name {
Some(key_name) => self.keybase().get_key(key_name).map_err(Error::key_base)?,
None => self.key()?,
Some(key_name) => self
.keybase
.read()
.get_key(key_name)
.map_err(Error::key_base)?,
};

let account = key.account();

let balance = self.block_on(query_all_balances(&self.grpc_addr, &account))?;
Expand Down Expand Up @@ -2122,7 +2181,7 @@ impl ChainEndpoint for CosmosSdkChain {
}

fn build_header(
&mut self,
&self,
trusted_height: ICSHeight,
target_height: ICSHeight,
client_state: &AnyClientState,
Expand All @@ -2145,23 +2204,14 @@ impl ChainEndpoint for CosmosSdkChain {
}

fn maybe_register_counterparty_payee(
&mut self,
&self,
channel_id: &ChannelId,
port_id: &PortId,
counterparty_payee: &Signer,
) -> Result<(), Error> {
let address = self.get_signer()?;
let key_pair = self.key()?;

self.rt.block_on(maybe_register_counterparty_payee(
&self.rpc_client,
&self.tx_config,
&key_pair,
&mut self.account,
&self.config.memo_prefix,
self.rt.block_on(self.do_maybe_register_counterparty_payee(
channel_id,
port_id,
&address,
counterparty_payee,
))
}
Expand Down
7 changes: 4 additions & 3 deletions crates/relayer/src/chain/cosmos/query/consensus_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,17 +88,18 @@ pub async fn query_consensus_state_heights(
}

pub async fn query_consensus_states(
chain_id: &ChainId,
_chain_id: &ChainId,
grpc_addr: &Uri,
request: QueryConsensusStatesRequest,
) -> Result<Vec<AnyConsensusStateWithHeight>, Error> {
crate::telemetry!(query, _chain_id, "query_consensus_states");

crate::time!(
"query_consensus_states",
{
"src_chain": chain_id,
"src_chain": _chain_id,
}
);
crate::telemetry!(query, chain_id, "query_consensus_states");

let mut client =
ibc_proto::ibc::core::client::v1::query_client::QueryClient::connect(grpc_addr.clone())
Expand Down
Loading