From 476da18d4eb81843d2a99e60530d082904eaff2b Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Thu, 18 May 2023 15:23:50 +0200 Subject: [PATCH 1/2] Limit the maximum number of requests which can issued concurrently to the chain Controlled via the `max_concurrency` setting in the chain configuration. --- Cargo.lock | 19 +++++ config.toml | 6 ++ crates/relayer-cli/src/chain_registry.rs | 1 + crates/relayer/Cargo.toml | 1 + crates/relayer/src/chain/handle/base.rs | 82 ++++++++++++++++++- crates/relayer/src/config.rs | 12 +++ crates/relayer/src/spawn.rs | 10 ++- tools/test-framework/src/types/single/node.rs | 1 + 8 files changed, 130 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 86af802429..d04c321a3f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1811,6 +1811,7 @@ dependencies = [ "regex", "retry", "ripemd", + "rsevents-extra", "secp256k1", "semver", "serde", @@ -3180,6 +3181,24 @@ dependencies = [ "digest 0.10.6", ] +[[package]] +name = "rsevents" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8e29769fda0194d58e06d876d82337aac382415df547f073f44ffea0bb0c651" +dependencies = [ + "parking_lot_core", +] + +[[package]] +name = "rsevents-extra" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ff64e850751883803e4b4837a37fd5d349d9cea4fe665df3c16887a08e936e4" +dependencies = [ + "rsevents", +] + [[package]] name = "rustc-demangle" version = "0.1.22" diff --git a/config.toml b/config.toml index 2ee3a28947..4af2f0e3fc 100644 --- a/config.toml +++ b/config.toml @@ -132,6 +132,12 @@ websocket_addr = 'ws://127.0.0.1:26657/websocket' # Hermes uses a large preconfigured timeout (on the order of minutes). rpc_timeout = '10s' +# The maximum number of requests which can be issued to the chain concurrently. +# Higher number may increase throughput when relaying on multiple channels +# or between multiple chains but may also overload the node. +# Default: 50 +max_concurrency = 50 + # Specify the prefix used by the chain. Required account_prefix = 'cosmos' diff --git a/crates/relayer-cli/src/chain_registry.rs b/crates/relayer-cli/src/chain_registry.rs index d7f95546bd..e2e17ecb30 100644 --- a/crates/relayer-cli/src/chain_registry.rs +++ b/crates/relayer-cli/src/chain_registry.rs @@ -121,6 +121,7 @@ where websocket_addr: websocket_address, grpc_addr: grpc_address, rpc_timeout: default::rpc_timeout(), + max_concurrency: default::max_concurrency(), genesis_restart: None, account_prefix: chain_data.bech32_prefix, key_name: String::new(), diff --git a/crates/relayer/Cargo.toml b/crates/relayer/Cargo.toml index ae8cadbbef..7f64b39665 100644 --- a/crates/relayer/Cargo.toml +++ b/crates/relayer/Cargo.toml @@ -70,6 +70,7 @@ secp256k1 = { version = "0.27.0", features = ["rand-std"] } strum = { version = "0.24.1", features = ["derive"] } once_cell = "1.17.1" parking_lot = "0.12.1" +rsevents-extra = "0.2.2" [dependencies.byte-unit] version = "4.0.19" diff --git a/crates/relayer/src/chain/handle/base.rs b/crates/relayer/src/chain/handle/base.rs index 0a480aed16..1b92dfe5f4 100644 --- a/crates/relayer/src/chain/handle/base.rs +++ b/crates/relayer/src/chain/handle/base.rs @@ -21,6 +21,7 @@ use ibc_relayer_types::{ signer::Signer, Height, }; +use rsevents_extra::Semaphore; use crate::{ account::Balance, @@ -50,6 +51,9 @@ use super::{ChainHandle, ChainImpl, HealthCheck, Subscription}; pub struct BaseChainHandle { /// The chain implementation chain: Arc, + + /// A semaphore to limit the number of concurrent requests to the chain + semaphore: Arc, } impl Debug for BaseChainHandle { @@ -68,7 +72,17 @@ impl Display for BaseChainHandle { impl BaseChainHandle { pub fn new(chain: Arc) -> Self { - Self { chain } + // The semaphore is initialized with the maximum number of concurrent requests. + // If that number was specified as 0, then we use the maximum amount of concurrency, + // and effectively disable the limit. + let max_concurrency = Some(chain.config().max_concurrency) + .filter(|&n| n > 0) + .unwrap_or(u16::MAX); + + Self { + chain, + semaphore: Arc::new(Semaphore::new(max_concurrency, max_concurrency)), + } } } @@ -160,6 +174,8 @@ impl ChainHandle for BaseChainHandle { key_name: Option, denom: Option, ) -> Result { + let _permit = self.semaphore.wait(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => { chain.query_balance(key_name.as_deref(), denom.as_deref()) @@ -168,18 +184,24 @@ impl ChainHandle for BaseChainHandle { } fn query_all_balances(&self, key_name: Option) -> Result, Error> { + let _permit = self.semaphore.wait(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_all_balances(key_name.as_deref()), // FIXME } } fn query_denom_trace(&self, hash: String) -> Result { + let _permit = self.semaphore.wait(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_denom_trace(hash), } } fn query_application_status(&self) -> Result { + let _permit = self.semaphore.wait(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_application_status(), } @@ -189,6 +211,8 @@ impl ChainHandle for BaseChainHandle { &self, request: QueryClientStatesRequest, ) -> Result, Error> { + let _permit = self.semaphore.wait(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_clients(request), } @@ -199,6 +223,8 @@ impl ChainHandle for BaseChainHandle { request: QueryClientStateRequest, include_proof: IncludeProof, ) -> Result<(AnyClientState, Option), Error> { + let _permit = self.semaphore.wait(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_client_state(request, include_proof), } @@ -208,6 +234,8 @@ impl ChainHandle for BaseChainHandle { &self, request: QueryClientConnectionsRequest, ) -> Result, Error> { + let _permit = self.semaphore.wait(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_client_connections(request), } @@ -217,6 +245,8 @@ impl ChainHandle for BaseChainHandle { &self, request: QueryConsensusStateHeightsRequest, ) -> Result, Error> { + let _permit = self.semaphore.wait(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_consensus_state_heights(request), } @@ -227,6 +257,8 @@ impl ChainHandle for BaseChainHandle { request: QueryConsensusStateRequest, include_proof: IncludeProof, ) -> Result<(AnyConsensusState, Option), Error> { + let _permit = self.semaphore.wait(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_consensus_state(request, include_proof), } @@ -236,6 +268,8 @@ impl ChainHandle for BaseChainHandle { &self, request: QueryUpgradedClientStateRequest, ) -> Result<(AnyClientState, MerkleProof), Error> { + let _permit = self.semaphore.wait(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_upgraded_client_state(request), } @@ -245,18 +279,24 @@ impl ChainHandle for BaseChainHandle { &self, request: QueryUpgradedConsensusStateRequest, ) -> Result<(AnyConsensusState, MerkleProof), Error> { + let _permit = self.semaphore.wait(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_upgraded_consensus_state(request), } } fn query_commitment_prefix(&self) -> Result { + let _permit = self.semaphore.wait(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_commitment_prefix(), } } fn query_compatible_versions(&self) -> Result, Error> { + let _permit = self.semaphore.wait(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_compatible_versions(), } @@ -267,6 +307,8 @@ impl ChainHandle for BaseChainHandle { request: QueryConnectionRequest, include_proof: IncludeProof, ) -> Result<(ConnectionEnd, Option), Error> { + let _permit = self.semaphore.wait(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_connection(request, include_proof), } @@ -276,6 +318,8 @@ impl ChainHandle for BaseChainHandle { &self, request: QueryConnectionsRequest, ) -> Result, Error> { + let _permit = self.semaphore.wait(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_connections(request), } @@ -285,6 +329,8 @@ impl ChainHandle for BaseChainHandle { &self, request: QueryConnectionChannelsRequest, ) -> Result, Error> { + let _permit = self.semaphore.wait(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_connection_channels(request), } @@ -295,6 +341,8 @@ impl ChainHandle for BaseChainHandle { request: QueryNextSequenceReceiveRequest, include_proof: IncludeProof, ) -> Result<(Sequence, Option), Error> { + let _permit = self.semaphore.wait(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => { chain.query_next_sequence_receive(request, include_proof) @@ -306,6 +354,8 @@ impl ChainHandle for BaseChainHandle { &self, request: QueryChannelsRequest, ) -> Result, Error> { + let _permit = self.semaphore.wait(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_channels(request), } @@ -316,6 +366,8 @@ impl ChainHandle for BaseChainHandle { request: QueryChannelRequest, include_proof: IncludeProof, ) -> Result<(ChannelEnd, Option), Error> { + let _permit = self.semaphore.wait(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_channel(request, include_proof), } @@ -325,6 +377,8 @@ impl ChainHandle for BaseChainHandle { &self, request: QueryChannelClientStateRequest, ) -> Result, Error> { + let _permit = self.semaphore.wait(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_channel_client_state(request), } @@ -438,6 +492,8 @@ impl ChainHandle for BaseChainHandle { request: QueryPacketCommitmentRequest, include_proof: IncludeProof, ) -> Result<(Vec, Option), Error> { + let _permit = self.semaphore.wait(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_packet_commitment(request, include_proof), } @@ -447,6 +503,8 @@ impl ChainHandle for BaseChainHandle { &self, request: QueryPacketCommitmentsRequest, ) -> Result<(Vec, Height), Error> { + let _permit = self.semaphore.wait(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_packet_commitments(request), } @@ -457,6 +515,8 @@ impl ChainHandle for BaseChainHandle { request: QueryPacketReceiptRequest, include_proof: IncludeProof, ) -> Result<(Vec, Option), Error> { + let _permit = self.semaphore.wait(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_packet_receipt(request, include_proof), } @@ -466,6 +526,8 @@ impl ChainHandle for BaseChainHandle { &self, request: QueryUnreceivedPacketsRequest, ) -> Result, Error> { + let _permit = self.semaphore.wait(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_unreceived_packets(request), } @@ -476,6 +538,8 @@ impl ChainHandle for BaseChainHandle { request: QueryPacketAcknowledgementRequest, include_proof: IncludeProof, ) -> Result<(Vec, Option), Error> { + let _permit = self.semaphore.wait(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => { chain.query_packet_acknowledgement(request, include_proof) @@ -487,6 +551,8 @@ impl ChainHandle for BaseChainHandle { &self, request: QueryPacketAcknowledgementsRequest, ) -> Result<(Vec, Height), Error> { + let _permit = self.semaphore.wait(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_packet_acknowledgements(request), } @@ -496,12 +562,16 @@ impl ChainHandle for BaseChainHandle { &self, request: QueryUnreceivedAcksRequest, ) -> Result, Error> { + let _permit = self.semaphore.wait(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_unreceived_acknowledgements(request), } } fn query_txs(&self, request: QueryTxRequest) -> Result, Error> { + let _permit = self.semaphore.wait(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_txs(request), } @@ -511,6 +581,8 @@ impl ChainHandle for BaseChainHandle { &self, request: QueryPacketEventDataRequest, ) -> Result, Error> { + let _permit = self.semaphore.wait(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_packet_events(request), } @@ -520,6 +592,8 @@ impl ChainHandle for BaseChainHandle { &self, request: QueryHostConsensusStateRequest, ) -> Result { + let _permit = self.semaphore.wait(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => { chain.query_host_consensus_state(request).map(Into::into) @@ -533,6 +607,8 @@ impl ChainHandle for BaseChainHandle { port_id: PortId, counterparty_payee: Signer, ) -> Result<(), Error> { + let _permit = self.semaphore.wait(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => { chain.maybe_register_counterparty_payee(&channel_id, &port_id, &counterparty_payee) @@ -544,6 +620,8 @@ impl ChainHandle for BaseChainHandle { &self, request: Vec, ) -> Result, Error> { + let _permit = self.semaphore.wait(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.cross_chain_query(request), } @@ -553,6 +631,8 @@ impl ChainHandle for BaseChainHandle { &self, request: QueryIncentivizedPacketRequest, ) -> Result { + let _permit = self.semaphore.wait(); + match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_incentivized_packet(request), } diff --git a/crates/relayer/src/config.rs b/crates/relayer/src/config.rs index e67ea6cb58..e1921b1ea5 100644 --- a/crates/relayer/src/config.rs +++ b/crates/relayer/src/config.rs @@ -186,6 +186,10 @@ pub mod default { ZERO_DURATION } + pub fn max_concurrency() -> u16 { + 50 + } + pub fn auto_register_counterparty_payee() -> bool { false } @@ -447,13 +451,21 @@ pub struct GenesisRestart { #[serde(deny_unknown_fields)] pub struct ChainConfig { pub id: ChainId, + #[serde(default = "default::chain_type")] pub r#type: ChainType, + pub rpc_addr: Url, pub websocket_addr: WebSocketClientUrl, pub grpc_addr: Url, + #[serde(default = "default::rpc_timeout", with = "humantime_serde")] pub rpc_timeout: Duration, + + /// The maximum number of requests which can be issued to the chain concurrently. + #[serde(default = "default::max_concurrency")] + pub max_concurrency: u16, + pub account_prefix: String, pub key_name: String, #[serde(default)] diff --git a/crates/relayer/src/spawn.rs b/crates/relayer/src/spawn.rs index 1b074feb5f..8eb2f6fbe9 100644 --- a/crates/relayer/src/spawn.rs +++ b/crates/relayer/src/spawn.rs @@ -7,7 +7,7 @@ use ibc_relayer_types::core::ics24_host::identifier::ChainId; use crate::{ chain::{cosmos::CosmosSdkChain, endpoint::ChainEndpoint, handle::ChainHandle, ChainType}, - config::Config, + config::{ChainConfig, Config}, error::Error as RelayerError, }; @@ -44,6 +44,14 @@ pub enum ChainImpl { CosmosSdk(CosmosSdkChain), } +impl ChainImpl { + pub fn config(&self) -> &ChainConfig { + match self { + Self::CosmosSdk(chain) => chain.config(), + } + } +} + /// Spawns a chain runtime from the configuration and given a chain identifier. /// Returns the corresponding handle if successful. pub fn spawn_chain_runtime( diff --git a/tools/test-framework/src/types/single/node.rs b/tools/test-framework/src/types/single/node.rs index 882a31d619..efdde57a40 100644 --- a/tools/test-framework/src/types/single/node.rs +++ b/tools/test-framework/src/types/single/node.rs @@ -140,6 +140,7 @@ impl FullNode { websocket_addr: WebSocketClientUrl::from_str(&self.chain_driver.websocket_address())?, grpc_addr: Url::from_str(&self.chain_driver.grpc_address())?, rpc_timeout: Duration::from_secs(10), + max_concurrency: config::default::max_concurrency(), genesis_restart: None, account_prefix: self.chain_driver.account_prefix.clone(), key_name: self.wallets.relayer.id.0.clone(), From eaa44470c079c1d4d89e52ac5292c499174b31a5 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Fri, 19 May 2023 18:10:55 +0200 Subject: [PATCH 2/2] Swap out `rsevents-extra` for `std-semaphore` --- Cargo.lock | 26 +++------ crates/relayer/Cargo.toml | 2 +- crates/relayer/src/chain/handle/base.rs | 70 ++++++++++++------------- 3 files changed, 43 insertions(+), 55 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d04c321a3f..3eecdbd9de 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1811,7 +1811,6 @@ dependencies = [ "regex", "retry", "ripemd", - "rsevents-extra", "secp256k1", "semver", "serde", @@ -1820,6 +1819,7 @@ dependencies = [ "serial_test", "sha2 0.10.6", "signature", + "std-semaphore", "strum", "subtle-encoding", "tendermint", @@ -3181,24 +3181,6 @@ dependencies = [ "digest 0.10.6", ] -[[package]] -name = "rsevents" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8e29769fda0194d58e06d876d82337aac382415df547f073f44ffea0bb0c651" -dependencies = [ - "parking_lot_core", -] - -[[package]] -name = "rsevents-extra" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ff64e850751883803e4b4837a37fd5d349d9cea4fe665df3c16887a08e936e4" -dependencies = [ - "rsevents", -] - [[package]] name = "rustc-demangle" version = "0.1.22" @@ -3802,6 +3784,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +[[package]] +name = "std-semaphore" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ae9eec00137a8eed469fb4148acd9fc6ac8c3f9b110f52cd34698c8b5bfa0e" + [[package]] name = "string_cache" version = "0.8.7" diff --git a/crates/relayer/Cargo.toml b/crates/relayer/Cargo.toml index 7f64b39665..5b444b9d46 100644 --- a/crates/relayer/Cargo.toml +++ b/crates/relayer/Cargo.toml @@ -70,7 +70,7 @@ secp256k1 = { version = "0.27.0", features = ["rand-std"] } strum = { version = "0.24.1", features = ["derive"] } once_cell = "1.17.1" parking_lot = "0.12.1" -rsevents-extra = "0.2.2" +std-semaphore = "0.1" [dependencies.byte-unit] version = "4.0.19" diff --git a/crates/relayer/src/chain/handle/base.rs b/crates/relayer/src/chain/handle/base.rs index 1b92dfe5f4..61d86c42b8 100644 --- a/crates/relayer/src/chain/handle/base.rs +++ b/crates/relayer/src/chain/handle/base.rs @@ -21,7 +21,7 @@ use ibc_relayer_types::{ signer::Signer, Height, }; -use rsevents_extra::Semaphore; +use std_semaphore::Semaphore; use crate::{ account::Balance, @@ -81,7 +81,7 @@ impl BaseChainHandle { Self { chain, - semaphore: Arc::new(Semaphore::new(max_concurrency, max_concurrency)), + semaphore: Arc::new(Semaphore::new(max_concurrency as isize)), } } } @@ -174,7 +174,7 @@ impl ChainHandle for BaseChainHandle { key_name: Option, denom: Option, ) -> Result { - let _permit = self.semaphore.wait(); + let _permit = self.semaphore.access(); match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => { @@ -184,7 +184,7 @@ impl ChainHandle for BaseChainHandle { } fn query_all_balances(&self, key_name: Option) -> Result, Error> { - let _permit = self.semaphore.wait(); + let _permit = self.semaphore.access(); match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_all_balances(key_name.as_deref()), // FIXME @@ -192,7 +192,7 @@ impl ChainHandle for BaseChainHandle { } fn query_denom_trace(&self, hash: String) -> Result { - let _permit = self.semaphore.wait(); + let _permit = self.semaphore.access(); match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_denom_trace(hash), @@ -200,7 +200,7 @@ impl ChainHandle for BaseChainHandle { } fn query_application_status(&self) -> Result { - let _permit = self.semaphore.wait(); + let _permit = self.semaphore.access(); match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_application_status(), @@ -211,7 +211,7 @@ impl ChainHandle for BaseChainHandle { &self, request: QueryClientStatesRequest, ) -> Result, Error> { - let _permit = self.semaphore.wait(); + let _permit = self.semaphore.access(); match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_clients(request), @@ -223,7 +223,7 @@ impl ChainHandle for BaseChainHandle { request: QueryClientStateRequest, include_proof: IncludeProof, ) -> Result<(AnyClientState, Option), Error> { - let _permit = self.semaphore.wait(); + let _permit = self.semaphore.access(); match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_client_state(request, include_proof), @@ -234,7 +234,7 @@ impl ChainHandle for BaseChainHandle { &self, request: QueryClientConnectionsRequest, ) -> Result, Error> { - let _permit = self.semaphore.wait(); + let _permit = self.semaphore.access(); match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_client_connections(request), @@ -245,7 +245,7 @@ impl ChainHandle for BaseChainHandle { &self, request: QueryConsensusStateHeightsRequest, ) -> Result, Error> { - let _permit = self.semaphore.wait(); + let _permit = self.semaphore.access(); match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_consensus_state_heights(request), @@ -257,7 +257,7 @@ impl ChainHandle for BaseChainHandle { request: QueryConsensusStateRequest, include_proof: IncludeProof, ) -> Result<(AnyConsensusState, Option), Error> { - let _permit = self.semaphore.wait(); + let _permit = self.semaphore.access(); match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_consensus_state(request, include_proof), @@ -268,7 +268,7 @@ impl ChainHandle for BaseChainHandle { &self, request: QueryUpgradedClientStateRequest, ) -> Result<(AnyClientState, MerkleProof), Error> { - let _permit = self.semaphore.wait(); + let _permit = self.semaphore.access(); match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_upgraded_client_state(request), @@ -279,7 +279,7 @@ impl ChainHandle for BaseChainHandle { &self, request: QueryUpgradedConsensusStateRequest, ) -> Result<(AnyConsensusState, MerkleProof), Error> { - let _permit = self.semaphore.wait(); + let _permit = self.semaphore.access(); match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_upgraded_consensus_state(request), @@ -287,7 +287,7 @@ impl ChainHandle for BaseChainHandle { } fn query_commitment_prefix(&self) -> Result { - let _permit = self.semaphore.wait(); + let _permit = self.semaphore.access(); match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_commitment_prefix(), @@ -295,7 +295,7 @@ impl ChainHandle for BaseChainHandle { } fn query_compatible_versions(&self) -> Result, Error> { - let _permit = self.semaphore.wait(); + let _permit = self.semaphore.access(); match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_compatible_versions(), @@ -307,7 +307,7 @@ impl ChainHandle for BaseChainHandle { request: QueryConnectionRequest, include_proof: IncludeProof, ) -> Result<(ConnectionEnd, Option), Error> { - let _permit = self.semaphore.wait(); + let _permit = self.semaphore.access(); match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_connection(request, include_proof), @@ -318,7 +318,7 @@ impl ChainHandle for BaseChainHandle { &self, request: QueryConnectionsRequest, ) -> Result, Error> { - let _permit = self.semaphore.wait(); + let _permit = self.semaphore.access(); match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_connections(request), @@ -329,7 +329,7 @@ impl ChainHandle for BaseChainHandle { &self, request: QueryConnectionChannelsRequest, ) -> Result, Error> { - let _permit = self.semaphore.wait(); + let _permit = self.semaphore.access(); match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_connection_channels(request), @@ -341,7 +341,7 @@ impl ChainHandle for BaseChainHandle { request: QueryNextSequenceReceiveRequest, include_proof: IncludeProof, ) -> Result<(Sequence, Option), Error> { - let _permit = self.semaphore.wait(); + let _permit = self.semaphore.access(); match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => { @@ -354,7 +354,7 @@ impl ChainHandle for BaseChainHandle { &self, request: QueryChannelsRequest, ) -> Result, Error> { - let _permit = self.semaphore.wait(); + let _permit = self.semaphore.access(); match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_channels(request), @@ -366,7 +366,7 @@ impl ChainHandle for BaseChainHandle { request: QueryChannelRequest, include_proof: IncludeProof, ) -> Result<(ChannelEnd, Option), Error> { - let _permit = self.semaphore.wait(); + let _permit = self.semaphore.access(); match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_channel(request, include_proof), @@ -377,7 +377,7 @@ impl ChainHandle for BaseChainHandle { &self, request: QueryChannelClientStateRequest, ) -> Result, Error> { - let _permit = self.semaphore.wait(); + let _permit = self.semaphore.access(); match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_channel_client_state(request), @@ -492,7 +492,7 @@ impl ChainHandle for BaseChainHandle { request: QueryPacketCommitmentRequest, include_proof: IncludeProof, ) -> Result<(Vec, Option), Error> { - let _permit = self.semaphore.wait(); + let _permit = self.semaphore.access(); match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_packet_commitment(request, include_proof), @@ -503,7 +503,7 @@ impl ChainHandle for BaseChainHandle { &self, request: QueryPacketCommitmentsRequest, ) -> Result<(Vec, Height), Error> { - let _permit = self.semaphore.wait(); + let _permit = self.semaphore.access(); match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_packet_commitments(request), @@ -515,7 +515,7 @@ impl ChainHandle for BaseChainHandle { request: QueryPacketReceiptRequest, include_proof: IncludeProof, ) -> Result<(Vec, Option), Error> { - let _permit = self.semaphore.wait(); + let _permit = self.semaphore.access(); match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_packet_receipt(request, include_proof), @@ -526,7 +526,7 @@ impl ChainHandle for BaseChainHandle { &self, request: QueryUnreceivedPacketsRequest, ) -> Result, Error> { - let _permit = self.semaphore.wait(); + let _permit = self.semaphore.access(); match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_unreceived_packets(request), @@ -538,7 +538,7 @@ impl ChainHandle for BaseChainHandle { request: QueryPacketAcknowledgementRequest, include_proof: IncludeProof, ) -> Result<(Vec, Option), Error> { - let _permit = self.semaphore.wait(); + let _permit = self.semaphore.access(); match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => { @@ -551,7 +551,7 @@ impl ChainHandle for BaseChainHandle { &self, request: QueryPacketAcknowledgementsRequest, ) -> Result<(Vec, Height), Error> { - let _permit = self.semaphore.wait(); + let _permit = self.semaphore.access(); match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_packet_acknowledgements(request), @@ -562,7 +562,7 @@ impl ChainHandle for BaseChainHandle { &self, request: QueryUnreceivedAcksRequest, ) -> Result, Error> { - let _permit = self.semaphore.wait(); + let _permit = self.semaphore.access(); match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_unreceived_acknowledgements(request), @@ -570,7 +570,7 @@ impl ChainHandle for BaseChainHandle { } fn query_txs(&self, request: QueryTxRequest) -> Result, Error> { - let _permit = self.semaphore.wait(); + let _permit = self.semaphore.access(); match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_txs(request), @@ -581,7 +581,7 @@ impl ChainHandle for BaseChainHandle { &self, request: QueryPacketEventDataRequest, ) -> Result, Error> { - let _permit = self.semaphore.wait(); + let _permit = self.semaphore.access(); match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_packet_events(request), @@ -592,7 +592,7 @@ impl ChainHandle for BaseChainHandle { &self, request: QueryHostConsensusStateRequest, ) -> Result { - let _permit = self.semaphore.wait(); + let _permit = self.semaphore.access(); match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => { @@ -607,7 +607,7 @@ impl ChainHandle for BaseChainHandle { port_id: PortId, counterparty_payee: Signer, ) -> Result<(), Error> { - let _permit = self.semaphore.wait(); + let _permit = self.semaphore.access(); match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => { @@ -620,7 +620,7 @@ impl ChainHandle for BaseChainHandle { &self, request: Vec, ) -> Result, Error> { - let _permit = self.semaphore.wait(); + let _permit = self.semaphore.access(); match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.cross_chain_query(request), @@ -631,7 +631,7 @@ impl ChainHandle for BaseChainHandle { &self, request: QueryIncentivizedPacketRequest, ) -> Result { - let _permit = self.semaphore.wait(); + let _permit = self.semaphore.access(); match self.chain.as_ref() { ChainImpl::CosmosSdk(chain) => chain.query_incentivized_packet(request),