Skip to content

Commit

Permalink
Limit the maximum number of requests which can issued concurrently to…
Browse files Browse the repository at this point in the history
… the chain (#3350)

* Limit the maximum number of requests which can issued concurrently to the chain

Controlled via the `max_concurrency` setting in the chain configuration.

* Swap out `rsevents-extra` for `std-semaphore`
  • Loading branch information
romac authored May 23, 2023
1 parent 3aad0e8 commit 1aa4f81
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 2 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ websocket_addr = 'ws://127.0.0.1:26657/websocket'
# Hermes uses a large preconfigured timeout (on the order of minutes).
rpc_timeout = '10s'

# The maximum number of requests which can be issued to the chain concurrently.
# Higher number may increase throughput when relaying on multiple channels
# or between multiple chains but may also overload the node.
# Default: 50
max_concurrency = 50

# Delay until event batch is emitted if no NewBlock events have come yet
batch_delay = '500ms'

Expand Down
1 change: 1 addition & 0 deletions crates/relayer-cli/src/chain_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ where
websocket_addr: websocket_address,
grpc_addr: grpc_address,
rpc_timeout: default::rpc_timeout(),
max_concurrency: default::max_concurrency(),
batch_delay: default::batch_delay(),
genesis_restart: None,
account_prefix: chain_data.bech32_prefix,
Expand Down
1 change: 1 addition & 0 deletions crates/relayer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ strum = { version = "0.24.1", features = ["derive"] }
tokio-stream = "0.1.14"
once_cell = "1.17.1"
parking_lot = "0.12.1"
std-semaphore = "0.1"

[dependencies.byte-unit]
version = "4.0.19"
Expand Down
82 changes: 81 additions & 1 deletion crates/relayer/src/chain/handle/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use ibc_relayer_types::{
signer::Signer,
Height,
};
use std_semaphore::Semaphore;

use crate::{
account::Balance,
Expand Down Expand Up @@ -50,6 +51,9 @@ use super::{ChainHandle, ChainImpl, HealthCheck, Subscription};
pub struct BaseChainHandle {
/// The chain implementation
chain: Arc<ChainImpl>,

/// A semaphore to limit the number of concurrent requests to the chain
semaphore: Arc<Semaphore>,
}

impl Debug for BaseChainHandle {
Expand All @@ -68,7 +72,17 @@ impl Display for BaseChainHandle {

impl BaseChainHandle {
pub fn new(chain: Arc<ChainImpl>) -> Self {
Self { chain }
// The semaphore is initialized with the maximum number of concurrent requests.
// If that number was specified as 0, then we use the maximum amount of concurrency,
// and effectively disable the limit.
let max_concurrency = Some(chain.config().max_concurrency)
.filter(|&n| n > 0)
.unwrap_or(u16::MAX);

Self {
chain,
semaphore: Arc::new(Semaphore::new(max_concurrency as isize)),
}
}
}

Expand Down Expand Up @@ -160,6 +174,8 @@ impl ChainHandle for BaseChainHandle {
key_name: Option<String>,
denom: Option<String>,
) -> Result<Balance, Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => {
chain.query_balance(key_name.as_deref(), denom.as_deref())
Expand All @@ -168,18 +184,24 @@ impl ChainHandle for BaseChainHandle {
}

fn query_all_balances(&self, key_name: Option<String>) -> Result<Vec<Balance>, Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_all_balances(key_name.as_deref()), // FIXME
}
}

fn query_denom_trace(&self, hash: String) -> Result<DenomTrace, Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_denom_trace(hash),
}
}

fn query_application_status(&self) -> Result<ChainStatus, Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_application_status(),
}
Expand All @@ -189,6 +211,8 @@ impl ChainHandle for BaseChainHandle {
&self,
request: QueryClientStatesRequest,
) -> Result<Vec<IdentifiedAnyClientState>, Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_clients(request),
}
Expand All @@ -199,6 +223,8 @@ impl ChainHandle for BaseChainHandle {
request: QueryClientStateRequest,
include_proof: IncludeProof,
) -> Result<(AnyClientState, Option<MerkleProof>), Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_client_state(request, include_proof),
}
Expand All @@ -208,6 +234,8 @@ impl ChainHandle for BaseChainHandle {
&self,
request: QueryClientConnectionsRequest,
) -> Result<Vec<ConnectionId>, Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_client_connections(request),
}
Expand All @@ -217,6 +245,8 @@ impl ChainHandle for BaseChainHandle {
&self,
request: QueryConsensusStateHeightsRequest,
) -> Result<Vec<Height>, Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_consensus_state_heights(request),
}
Expand All @@ -227,6 +257,8 @@ impl ChainHandle for BaseChainHandle {
request: QueryConsensusStateRequest,
include_proof: IncludeProof,
) -> Result<(AnyConsensusState, Option<MerkleProof>), Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_consensus_state(request, include_proof),
}
Expand All @@ -236,6 +268,8 @@ impl ChainHandle for BaseChainHandle {
&self,
request: QueryUpgradedClientStateRequest,
) -> Result<(AnyClientState, MerkleProof), Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_upgraded_client_state(request),
}
Expand All @@ -245,18 +279,24 @@ impl ChainHandle for BaseChainHandle {
&self,
request: QueryUpgradedConsensusStateRequest,
) -> Result<(AnyConsensusState, MerkleProof), Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_upgraded_consensus_state(request),
}
}

fn query_commitment_prefix(&self) -> Result<CommitmentPrefix, Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_commitment_prefix(),
}
}

fn query_compatible_versions(&self) -> Result<Vec<Version>, Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_compatible_versions(),
}
Expand All @@ -267,6 +307,8 @@ impl ChainHandle for BaseChainHandle {
request: QueryConnectionRequest,
include_proof: IncludeProof,
) -> Result<(ConnectionEnd, Option<MerkleProof>), Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_connection(request, include_proof),
}
Expand All @@ -276,6 +318,8 @@ impl ChainHandle for BaseChainHandle {
&self,
request: QueryConnectionsRequest,
) -> Result<Vec<IdentifiedConnectionEnd>, Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_connections(request),
}
Expand All @@ -285,6 +329,8 @@ impl ChainHandle for BaseChainHandle {
&self,
request: QueryConnectionChannelsRequest,
) -> Result<Vec<IdentifiedChannelEnd>, Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_connection_channels(request),
}
Expand All @@ -295,6 +341,8 @@ impl ChainHandle for BaseChainHandle {
request: QueryNextSequenceReceiveRequest,
include_proof: IncludeProof,
) -> Result<(Sequence, Option<MerkleProof>), Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => {
chain.query_next_sequence_receive(request, include_proof)
Expand All @@ -306,6 +354,8 @@ impl ChainHandle for BaseChainHandle {
&self,
request: QueryChannelsRequest,
) -> Result<Vec<IdentifiedChannelEnd>, Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_channels(request),
}
Expand All @@ -316,6 +366,8 @@ impl ChainHandle for BaseChainHandle {
request: QueryChannelRequest,
include_proof: IncludeProof,
) -> Result<(ChannelEnd, Option<MerkleProof>), Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_channel(request, include_proof),
}
Expand All @@ -325,6 +377,8 @@ impl ChainHandle for BaseChainHandle {
&self,
request: QueryChannelClientStateRequest,
) -> Result<Option<IdentifiedAnyClientState>, Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_channel_client_state(request),
}
Expand Down Expand Up @@ -438,6 +492,8 @@ impl ChainHandle for BaseChainHandle {
request: QueryPacketCommitmentRequest,
include_proof: IncludeProof,
) -> Result<(Vec<u8>, Option<MerkleProof>), Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_packet_commitment(request, include_proof),
}
Expand All @@ -447,6 +503,8 @@ impl ChainHandle for BaseChainHandle {
&self,
request: QueryPacketCommitmentsRequest,
) -> Result<(Vec<Sequence>, Height), Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_packet_commitments(request),
}
Expand All @@ -457,6 +515,8 @@ impl ChainHandle for BaseChainHandle {
request: QueryPacketReceiptRequest,
include_proof: IncludeProof,
) -> Result<(Vec<u8>, Option<MerkleProof>), Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_packet_receipt(request, include_proof),
}
Expand All @@ -466,6 +526,8 @@ impl ChainHandle for BaseChainHandle {
&self,
request: QueryUnreceivedPacketsRequest,
) -> Result<Vec<Sequence>, Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_unreceived_packets(request),
}
Expand All @@ -476,6 +538,8 @@ impl ChainHandle for BaseChainHandle {
request: QueryPacketAcknowledgementRequest,
include_proof: IncludeProof,
) -> Result<(Vec<u8>, Option<MerkleProof>), Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => {
chain.query_packet_acknowledgement(request, include_proof)
Expand All @@ -487,6 +551,8 @@ impl ChainHandle for BaseChainHandle {
&self,
request: QueryPacketAcknowledgementsRequest,
) -> Result<(Vec<Sequence>, Height), Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_packet_acknowledgements(request),
}
Expand All @@ -496,12 +562,16 @@ impl ChainHandle for BaseChainHandle {
&self,
request: QueryUnreceivedAcksRequest,
) -> Result<Vec<Sequence>, Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_unreceived_acknowledgements(request),
}
}

fn query_txs(&self, request: QueryTxRequest) -> Result<Vec<IbcEventWithHeight>, Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_txs(request),
}
Expand All @@ -511,6 +581,8 @@ impl ChainHandle for BaseChainHandle {
&self,
request: QueryPacketEventDataRequest,
) -> Result<Vec<IbcEventWithHeight>, Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_packet_events(request),
}
Expand All @@ -520,6 +592,8 @@ impl ChainHandle for BaseChainHandle {
&self,
request: QueryHostConsensusStateRequest,
) -> Result<AnyConsensusState, Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => {
chain.query_host_consensus_state(request).map(Into::into)
Expand All @@ -533,6 +607,8 @@ impl ChainHandle for BaseChainHandle {
port_id: PortId,
counterparty_payee: Signer,
) -> Result<(), Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => {
chain.maybe_register_counterparty_payee(&channel_id, &port_id, &counterparty_payee)
Expand All @@ -544,6 +620,8 @@ impl ChainHandle for BaseChainHandle {
&self,
request: Vec<CrossChainQueryRequest>,
) -> Result<Vec<CrossChainQueryResponse>, Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.cross_chain_query(request),
}
Expand All @@ -553,6 +631,8 @@ impl ChainHandle for BaseChainHandle {
&self,
request: QueryIncentivizedPacketRequest,
) -> Result<QueryIncentivizedPacketResponse, Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_incentivized_packet(request),
}
Expand Down
Loading

0 comments on commit 1aa4f81

Please sign in to comment.