From 1aa4f8168c4daaf1d046cfdafe5bc63bddbb8416 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Tue, 23 May 2023 15:56:56 +0200 Subject: [PATCH] Limit the maximum number of requests which can issued concurrently to the chain (#3350) * Limit the maximum number of requests which can issued concurrently to the chain Controlled via the `max_concurrency` setting in the chain configuration. * Swap out `rsevents-extra` for `std-semaphore` --- Cargo.lock | 7 ++ config.toml | 6 ++ crates/relayer-cli/src/chain_registry.rs | 1 + crates/relayer/Cargo.toml | 1 + crates/relayer/src/chain/handle/base.rs | 82 ++++++++++++++++++- crates/relayer/src/config.rs | 13 +++ crates/relayer/src/spawn.rs | 10 ++- tools/test-framework/src/types/single/node.rs | 1 + 8 files changed, 119 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9aa4eaad58..14d47ae63b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1799,6 +1799,7 @@ dependencies = [ "serial_test", "sha2 0.10.6", "signature 1.6.4", + "std-semaphore", "strum", "subtle-encoding", "tendermint", @@ -3804,6 +3805,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +[[package]] +name = "std-semaphore" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ae9eec00137a8eed469fb4148acd9fc6ac8c3f9b110f52cd34698c8b5bfa0e" + [[package]] name = "string_cache" version = "0.8.7" diff --git a/config.toml b/config.toml index e41c439737..970d2e016b 100644 --- a/config.toml +++ b/config.toml @@ -132,6 +132,12 @@ websocket_addr = 'ws://127.0.0.1:26657/websocket' # Hermes uses a large preconfigured timeout (on the order of minutes). rpc_timeout = '10s' +# The maximum number of requests which can be issued to the chain concurrently. +# Higher number may increase throughput when relaying on multiple channels +# or between multiple chains but may also overload the node. +# Default: 50 +max_concurrency = 50 + # Delay until event batch is emitted if no NewBlock events have come yet batch_delay = '500ms' diff --git a/crates/relayer-cli/src/chain_registry.rs b/crates/relayer-cli/src/chain_registry.rs index 36bb2964d8..6971f860c4 100644 --- a/crates/relayer-cli/src/chain_registry.rs +++ b/crates/relayer-cli/src/chain_registry.rs @@ -121,6 +121,7 @@ where websocket_addr: websocket_address, grpc_addr: grpc_address, rpc_timeout: default::rpc_timeout(), + max_concurrency: default::max_concurrency(), batch_delay: default::batch_delay(), genesis_restart: None, account_prefix: chain_data.bech32_prefix, diff --git a/crates/relayer/Cargo.toml b/crates/relayer/Cargo.toml index 180f9c1808..d72262d996 100644 --- a/crates/relayer/Cargo.toml +++ b/crates/relayer/Cargo.toml @@ -71,6 +71,7 @@ strum = { version = "0.24.1", features = ["derive"] } tokio-stream = "0.1.14" once_cell = "1.17.1" parking_lot = "0.12.1" +std-semaphore = "0.1" [dependencies.byte-unit] version = "4.0.19" diff --git a/crates/relayer/src/chain/handle/base.rs b/crates/relayer/src/chain/handle/base.rs index 0a480aed16..61d86c42b8 100644 --- a/crates/relayer/src/chain/handle/base.rs +++ b/crates/relayer/src/chain/handle/base.rs @@ -21,6 +21,7 @@ use ibc_relayer_types::{ signer::Signer, Height, }; +use std_semaphore::Semaphore; use crate::{ account::Balance, @@ -50,6 +51,9 @@ use super::{ChainHandle, ChainImpl, HealthCheck, Subscription}; pub struct BaseChainHandle { /// The chain implementation chain: Arc, + + /// A semaphore to limit the number of concurrent requests to the chain + semaphore: Arc, } impl Debug for BaseChainHandle { @@ -68,7 +72,17 @@ impl Display for BaseChainHandle { impl BaseChainHandle { pub fn new(chain: Arc) -> Self { - Self { chain } + // The semaphore is initialized with the maximum number of concurrent requests. + // If that number was specified as 0, then we use the maximum amount of concurrency, + // and effectively disable the limit. + let max_concurrency = Some(chain.config().max_concurrency) + .filter(|&n| n > 0) + .unwrap_or(u16::MAX); + + Self { + chain, + semaphore: Arc::new(Semaphore::new(max_concurrency as isize)), + } } } @@ -160,6 +174,8 @@ impl ChainHandle for BaseChainHandle { key_name: Option, denom: Option, ) -> Result { + let _permit = self.semaphore.access(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => { chain.query_balance(key_name.as_deref(), denom.as_deref()) @@ -168,18 +184,24 @@ impl ChainHandle for BaseChainHandle { } fn query_all_balances(&self, key_name: Option) -> Result, Error> { + let _permit = self.semaphore.access(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_all_balances(key_name.as_deref()), // FIXME } } fn query_denom_trace(&self, hash: String) -> Result { + let _permit = self.semaphore.access(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_denom_trace(hash), } } fn query_application_status(&self) -> Result { + let _permit = self.semaphore.access(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_application_status(), } @@ -189,6 +211,8 @@ impl ChainHandle for BaseChainHandle { &self, request: QueryClientStatesRequest, ) -> Result, Error> { + let _permit = self.semaphore.access(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_clients(request), } @@ -199,6 +223,8 @@ impl ChainHandle for BaseChainHandle { request: QueryClientStateRequest, include_proof: IncludeProof, ) -> Result<(AnyClientState, Option), Error> { + let _permit = self.semaphore.access(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_client_state(request, include_proof), } @@ -208,6 +234,8 @@ impl ChainHandle for BaseChainHandle { &self, request: QueryClientConnectionsRequest, ) -> Result, Error> { + let _permit = self.semaphore.access(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_client_connections(request), } @@ -217,6 +245,8 @@ impl ChainHandle for BaseChainHandle { &self, request: QueryConsensusStateHeightsRequest, ) -> Result, Error> { + let _permit = self.semaphore.access(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_consensus_state_heights(request), } @@ -227,6 +257,8 @@ impl ChainHandle for BaseChainHandle { request: QueryConsensusStateRequest, include_proof: IncludeProof, ) -> Result<(AnyConsensusState, Option), Error> { + let _permit = self.semaphore.access(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_consensus_state(request, include_proof), } @@ -236,6 +268,8 @@ impl ChainHandle for BaseChainHandle { &self, request: QueryUpgradedClientStateRequest, ) -> Result<(AnyClientState, MerkleProof), Error> { + let _permit = self.semaphore.access(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_upgraded_client_state(request), } @@ -245,18 +279,24 @@ impl ChainHandle for BaseChainHandle { &self, request: QueryUpgradedConsensusStateRequest, ) -> Result<(AnyConsensusState, MerkleProof), Error> { + let _permit = self.semaphore.access(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_upgraded_consensus_state(request), } } fn query_commitment_prefix(&self) -> Result { + let _permit = self.semaphore.access(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_commitment_prefix(), } } fn query_compatible_versions(&self) -> Result, Error> { + let _permit = self.semaphore.access(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_compatible_versions(), } @@ -267,6 +307,8 @@ impl ChainHandle for BaseChainHandle { request: QueryConnectionRequest, include_proof: IncludeProof, ) -> Result<(ConnectionEnd, Option), Error> { + let _permit = self.semaphore.access(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_connection(request, include_proof), } @@ -276,6 +318,8 @@ impl ChainHandle for BaseChainHandle { &self, request: QueryConnectionsRequest, ) -> Result, Error> { + let _permit = self.semaphore.access(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_connections(request), } @@ -285,6 +329,8 @@ impl ChainHandle for BaseChainHandle { &self, request: QueryConnectionChannelsRequest, ) -> Result, Error> { + let _permit = self.semaphore.access(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_connection_channels(request), } @@ -295,6 +341,8 @@ impl ChainHandle for BaseChainHandle { request: QueryNextSequenceReceiveRequest, include_proof: IncludeProof, ) -> Result<(Sequence, Option), Error> { + let _permit = self.semaphore.access(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => { chain.query_next_sequence_receive(request, include_proof) @@ -306,6 +354,8 @@ impl ChainHandle for BaseChainHandle { &self, request: QueryChannelsRequest, ) -> Result, Error> { + let _permit = self.semaphore.access(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_channels(request), } @@ -316,6 +366,8 @@ impl ChainHandle for BaseChainHandle { request: QueryChannelRequest, include_proof: IncludeProof, ) -> Result<(ChannelEnd, Option), Error> { + let _permit = self.semaphore.access(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_channel(request, include_proof), } @@ -325,6 +377,8 @@ impl ChainHandle for BaseChainHandle { &self, request: QueryChannelClientStateRequest, ) -> Result, Error> { + let _permit = self.semaphore.access(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_channel_client_state(request), } @@ -438,6 +492,8 @@ impl ChainHandle for BaseChainHandle { request: QueryPacketCommitmentRequest, include_proof: IncludeProof, ) -> Result<(Vec, Option), Error> { + let _permit = self.semaphore.access(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_packet_commitment(request, include_proof), } @@ -447,6 +503,8 @@ impl ChainHandle for BaseChainHandle { &self, request: QueryPacketCommitmentsRequest, ) -> Result<(Vec, Height), Error> { + let _permit = self.semaphore.access(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_packet_commitments(request), } @@ -457,6 +515,8 @@ impl ChainHandle for BaseChainHandle { request: QueryPacketReceiptRequest, include_proof: IncludeProof, ) -> Result<(Vec, Option), Error> { + let _permit = self.semaphore.access(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_packet_receipt(request, include_proof), } @@ -466,6 +526,8 @@ impl ChainHandle for BaseChainHandle { &self, request: QueryUnreceivedPacketsRequest, ) -> Result, Error> { + let _permit = self.semaphore.access(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_unreceived_packets(request), } @@ -476,6 +538,8 @@ impl ChainHandle for BaseChainHandle { request: QueryPacketAcknowledgementRequest, include_proof: IncludeProof, ) -> Result<(Vec, Option), Error> { + let _permit = self.semaphore.access(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => { chain.query_packet_acknowledgement(request, include_proof) @@ -487,6 +551,8 @@ impl ChainHandle for BaseChainHandle { &self, request: QueryPacketAcknowledgementsRequest, ) -> Result<(Vec, Height), Error> { + let _permit = self.semaphore.access(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_packet_acknowledgements(request), } @@ -496,12 +562,16 @@ impl ChainHandle for BaseChainHandle { &self, request: QueryUnreceivedAcksRequest, ) -> Result, Error> { + let _permit = self.semaphore.access(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_unreceived_acknowledgements(request), } } fn query_txs(&self, request: QueryTxRequest) -> Result, Error> { + let _permit = self.semaphore.access(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_txs(request), } @@ -511,6 +581,8 @@ impl ChainHandle for BaseChainHandle { &self, request: QueryPacketEventDataRequest, ) -> Result, Error> { + let _permit = self.semaphore.access(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_packet_events(request), } @@ -520,6 +592,8 @@ impl ChainHandle for BaseChainHandle { &self, request: QueryHostConsensusStateRequest, ) -> Result { + let _permit = self.semaphore.access(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => { chain.query_host_consensus_state(request).map(Into::into) @@ -533,6 +607,8 @@ impl ChainHandle for BaseChainHandle { port_id: PortId, counterparty_payee: Signer, ) -> Result<(), Error> { + let _permit = self.semaphore.access(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => { chain.maybe_register_counterparty_payee(&channel_id, &port_id, &counterparty_payee) @@ -544,6 +620,8 @@ impl ChainHandle for BaseChainHandle { &self, request: Vec, ) -> Result, Error> { + let _permit = self.semaphore.access(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.cross_chain_query(request), } @@ -553,6 +631,8 @@ impl ChainHandle for BaseChainHandle { &self, request: QueryIncentivizedPacketRequest, ) -> Result { + let _permit = self.semaphore.access(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_incentivized_packet(request), } diff --git a/crates/relayer/src/config.rs b/crates/relayer/src/config.rs index f8a32014b0..9810e1245d 100644 --- a/crates/relayer/src/config.rs +++ b/crates/relayer/src/config.rs @@ -189,6 +189,10 @@ pub mod default { ZERO_DURATION } + pub fn max_concurrency() -> u16 { + 50 + } + pub fn auto_register_counterparty_payee() -> bool { false } @@ -450,15 +454,24 @@ pub struct GenesisRestart { #[serde(deny_unknown_fields)] pub struct ChainConfig { pub id: ChainId, + #[serde(default = "default::chain_type")] pub r#type: ChainType, + pub rpc_addr: Url, pub websocket_addr: WebSocketClientUrl, pub grpc_addr: Url, + #[serde(default = "default::rpc_timeout", with = "humantime_serde")] pub rpc_timeout: Duration, + #[serde(default = "default::batch_delay", with = "humantime_serde")] pub batch_delay: Duration, + + /// The maximum number of requests which can be issued to the chain concurrently. + #[serde(default = "default::max_concurrency")] + pub max_concurrency: u16, + pub account_prefix: String, pub key_name: String, #[serde(default)] diff --git a/crates/relayer/src/spawn.rs b/crates/relayer/src/spawn.rs index 1b074feb5f..8eb2f6fbe9 100644 --- a/crates/relayer/src/spawn.rs +++ b/crates/relayer/src/spawn.rs @@ -7,7 +7,7 @@ use ibc_relayer_types::core::ics24_host::identifier::ChainId; use crate::{ chain::{cosmos::CosmosSdkChain, endpoint::ChainEndpoint, handle::ChainHandle, ChainType}, - config::Config, + config::{ChainConfig, Config}, error::Error as RelayerError, }; @@ -44,6 +44,14 @@ pub enum ChainImpl { CosmosSdk(CosmosSdkChain), } +impl ChainImpl { + pub fn config(&self) -> &ChainConfig { + match self { + Self::CosmosSdk(chain) => chain.config(), + } + } +} + /// Spawns a chain runtime from the configuration and given a chain identifier. /// Returns the corresponding handle if successful. pub fn spawn_chain_runtime( diff --git a/tools/test-framework/src/types/single/node.rs b/tools/test-framework/src/types/single/node.rs index aa730d25ce..667f2b3193 100644 --- a/tools/test-framework/src/types/single/node.rs +++ b/tools/test-framework/src/types/single/node.rs @@ -142,6 +142,7 @@ impl FullNode { grpc_addr: Url::from_str(&self.chain_driver.grpc_address())?, rpc_timeout: ibc_relayer::config::default::rpc_timeout(), batch_delay: ibc_relayer::config::default::batch_delay(), + max_concurrency: config::default::max_concurrency(), genesis_restart: None, account_prefix: self.chain_driver.account_prefix.clone(), key_name: self.wallets.relayer.id.0.clone(),