Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit the maximum number of requests which can issued concurrently to the chain #3350

Merged
merged 3 commits into from
May 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ websocket_addr = 'ws://127.0.0.1:26657/websocket'
# Hermes uses a large preconfigured timeout (on the order of minutes).
rpc_timeout = '10s'

# The maximum number of requests which can be issued to the chain concurrently.
# Higher number may increase throughput when relaying on multiple channels
# or between multiple chains but may also overload the node.
# Default: 50
max_concurrency = 50

# Delay until event batch is emitted if no NewBlock events have come yet
batch_delay = '500ms'

Expand Down
1 change: 1 addition & 0 deletions crates/relayer-cli/src/chain_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ where
websocket_addr: websocket_address,
grpc_addr: grpc_address,
rpc_timeout: default::rpc_timeout(),
max_concurrency: default::max_concurrency(),
batch_delay: default::batch_delay(),
genesis_restart: None,
account_prefix: chain_data.bech32_prefix,
Expand Down
1 change: 1 addition & 0 deletions crates/relayer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ strum = { version = "0.24.1", features = ["derive"] }
tokio-stream = "0.1.14"
once_cell = "1.17.1"
parking_lot = "0.12.1"
std-semaphore = "0.1"

[dependencies.byte-unit]
version = "4.0.19"
Expand Down
82 changes: 81 additions & 1 deletion crates/relayer/src/chain/handle/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use ibc_relayer_types::{
signer::Signer,
Height,
};
use std_semaphore::Semaphore;

use crate::{
account::Balance,
Expand Down Expand Up @@ -50,6 +51,9 @@ use super::{ChainHandle, ChainImpl, HealthCheck, Subscription};
pub struct BaseChainHandle {
/// The chain implementation
chain: Arc<ChainImpl>,

/// A semaphore to limit the number of concurrent requests to the chain
semaphore: Arc<Semaphore>,
}

impl Debug for BaseChainHandle {
Expand All @@ -68,7 +72,17 @@ impl Display for BaseChainHandle {

impl BaseChainHandle {
pub fn new(chain: Arc<ChainImpl>) -> Self {
Self { chain }
// The semaphore is initialized with the maximum number of concurrent requests.
// If that number was specified as 0, then we use the maximum amount of concurrency,
// and effectively disable the limit.
Comment on lines +76 to +77
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this fact be mentioned in the config.toml for the max_concurrency parameter?

Copy link
Member Author

@romac romac May 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good catch! Maybe with a warning that doing this may completely overload the node.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let max_concurrency = Some(chain.config().max_concurrency)
.filter(|&n| n > 0)
.unwrap_or(u16::MAX);

Self {
chain,
semaphore: Arc::new(Semaphore::new(max_concurrency as isize)),
}
}
}

Expand Down Expand Up @@ -160,6 +174,8 @@ impl ChainHandle for BaseChainHandle {
key_name: Option<String>,
denom: Option<String>,
) -> Result<Balance, Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => {
chain.query_balance(key_name.as_deref(), denom.as_deref())
Expand All @@ -168,18 +184,24 @@ impl ChainHandle for BaseChainHandle {
}

fn query_all_balances(&self, key_name: Option<String>) -> Result<Vec<Balance>, Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_all_balances(key_name.as_deref()), // FIXME
}
}

fn query_denom_trace(&self, hash: String) -> Result<DenomTrace, Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_denom_trace(hash),
}
}

fn query_application_status(&self) -> Result<ChainStatus, Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_application_status(),
}
Expand All @@ -189,6 +211,8 @@ impl ChainHandle for BaseChainHandle {
&self,
request: QueryClientStatesRequest,
) -> Result<Vec<IdentifiedAnyClientState>, Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_clients(request),
}
Expand All @@ -199,6 +223,8 @@ impl ChainHandle for BaseChainHandle {
request: QueryClientStateRequest,
include_proof: IncludeProof,
) -> Result<(AnyClientState, Option<MerkleProof>), Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_client_state(request, include_proof),
}
Expand All @@ -208,6 +234,8 @@ impl ChainHandle for BaseChainHandle {
&self,
request: QueryClientConnectionsRequest,
) -> Result<Vec<ConnectionId>, Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_client_connections(request),
}
Expand All @@ -217,6 +245,8 @@ impl ChainHandle for BaseChainHandle {
&self,
request: QueryConsensusStateHeightsRequest,
) -> Result<Vec<Height>, Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_consensus_state_heights(request),
}
Expand All @@ -227,6 +257,8 @@ impl ChainHandle for BaseChainHandle {
request: QueryConsensusStateRequest,
include_proof: IncludeProof,
) -> Result<(AnyConsensusState, Option<MerkleProof>), Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_consensus_state(request, include_proof),
}
Expand All @@ -236,6 +268,8 @@ impl ChainHandle for BaseChainHandle {
&self,
request: QueryUpgradedClientStateRequest,
) -> Result<(AnyClientState, MerkleProof), Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_upgraded_client_state(request),
}
Expand All @@ -245,18 +279,24 @@ impl ChainHandle for BaseChainHandle {
&self,
request: QueryUpgradedConsensusStateRequest,
) -> Result<(AnyConsensusState, MerkleProof), Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_upgraded_consensus_state(request),
}
}

fn query_commitment_prefix(&self) -> Result<CommitmentPrefix, Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_commitment_prefix(),
}
}

fn query_compatible_versions(&self) -> Result<Vec<Version>, Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_compatible_versions(),
}
Expand All @@ -267,6 +307,8 @@ impl ChainHandle for BaseChainHandle {
request: QueryConnectionRequest,
include_proof: IncludeProof,
) -> Result<(ConnectionEnd, Option<MerkleProof>), Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_connection(request, include_proof),
}
Expand All @@ -276,6 +318,8 @@ impl ChainHandle for BaseChainHandle {
&self,
request: QueryConnectionsRequest,
) -> Result<Vec<IdentifiedConnectionEnd>, Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_connections(request),
}
Expand All @@ -285,6 +329,8 @@ impl ChainHandle for BaseChainHandle {
&self,
request: QueryConnectionChannelsRequest,
) -> Result<Vec<IdentifiedChannelEnd>, Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_connection_channels(request),
}
Expand All @@ -295,6 +341,8 @@ impl ChainHandle for BaseChainHandle {
request: QueryNextSequenceReceiveRequest,
include_proof: IncludeProof,
) -> Result<(Sequence, Option<MerkleProof>), Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => {
chain.query_next_sequence_receive(request, include_proof)
Expand All @@ -306,6 +354,8 @@ impl ChainHandle for BaseChainHandle {
&self,
request: QueryChannelsRequest,
) -> Result<Vec<IdentifiedChannelEnd>, Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_channels(request),
}
Expand All @@ -316,6 +366,8 @@ impl ChainHandle for BaseChainHandle {
request: QueryChannelRequest,
include_proof: IncludeProof,
) -> Result<(ChannelEnd, Option<MerkleProof>), Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_channel(request, include_proof),
}
Expand All @@ -325,6 +377,8 @@ impl ChainHandle for BaseChainHandle {
&self,
request: QueryChannelClientStateRequest,
) -> Result<Option<IdentifiedAnyClientState>, Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_channel_client_state(request),
}
Expand Down Expand Up @@ -438,6 +492,8 @@ impl ChainHandle for BaseChainHandle {
request: QueryPacketCommitmentRequest,
include_proof: IncludeProof,
) -> Result<(Vec<u8>, Option<MerkleProof>), Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_packet_commitment(request, include_proof),
}
Expand All @@ -447,6 +503,8 @@ impl ChainHandle for BaseChainHandle {
&self,
request: QueryPacketCommitmentsRequest,
) -> Result<(Vec<Sequence>, Height), Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_packet_commitments(request),
}
Expand All @@ -457,6 +515,8 @@ impl ChainHandle for BaseChainHandle {
request: QueryPacketReceiptRequest,
include_proof: IncludeProof,
) -> Result<(Vec<u8>, Option<MerkleProof>), Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_packet_receipt(request, include_proof),
}
Expand All @@ -466,6 +526,8 @@ impl ChainHandle for BaseChainHandle {
&self,
request: QueryUnreceivedPacketsRequest,
) -> Result<Vec<Sequence>, Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_unreceived_packets(request),
}
Expand All @@ -476,6 +538,8 @@ impl ChainHandle for BaseChainHandle {
request: QueryPacketAcknowledgementRequest,
include_proof: IncludeProof,
) -> Result<(Vec<u8>, Option<MerkleProof>), Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => {
chain.query_packet_acknowledgement(request, include_proof)
Expand All @@ -487,6 +551,8 @@ impl ChainHandle for BaseChainHandle {
&self,
request: QueryPacketAcknowledgementsRequest,
) -> Result<(Vec<Sequence>, Height), Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_packet_acknowledgements(request),
}
Expand All @@ -496,12 +562,16 @@ impl ChainHandle for BaseChainHandle {
&self,
request: QueryUnreceivedAcksRequest,
) -> Result<Vec<Sequence>, Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_unreceived_acknowledgements(request),
}
}

fn query_txs(&self, request: QueryTxRequest) -> Result<Vec<IbcEventWithHeight>, Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_txs(request),
}
Expand All @@ -511,6 +581,8 @@ impl ChainHandle for BaseChainHandle {
&self,
request: QueryPacketEventDataRequest,
) -> Result<Vec<IbcEventWithHeight>, Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_packet_events(request),
}
Expand All @@ -520,6 +592,8 @@ impl ChainHandle for BaseChainHandle {
&self,
request: QueryHostConsensusStateRequest,
) -> Result<AnyConsensusState, Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => {
chain.query_host_consensus_state(request).map(Into::into)
Expand All @@ -533,6 +607,8 @@ impl ChainHandle for BaseChainHandle {
port_id: PortId,
counterparty_payee: Signer,
) -> Result<(), Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => {
chain.maybe_register_counterparty_payee(&channel_id, &port_id, &counterparty_payee)
Expand All @@ -544,6 +620,8 @@ impl ChainHandle for BaseChainHandle {
&self,
request: Vec<CrossChainQueryRequest>,
) -> Result<Vec<CrossChainQueryResponse>, Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.cross_chain_query(request),
}
Expand All @@ -553,6 +631,8 @@ impl ChainHandle for BaseChainHandle {
&self,
request: QueryIncentivizedPacketRequest,
) -> Result<QueryIncentivizedPacketResponse, Error> {
let _permit = self.semaphore.access();

match self.chain.as_ref() {
ChainImpl::CosmosSdk(chain) => chain.query_incentivized_packet(request),
}
Expand Down
Loading