Skip to content

Commit

Permalink
Allow gRPC calls with larger query responses (informalsystems#3335)
Browse files Browse the repository at this point in the history
* Adds a new undocumented configuration `max_grpc_decoding_size` which can be used to specify a new decoding message size upper limit
* Updates the gRPC clients decoding message size limit with `max_decoding_message_size()`. If the the `max_grpc_decoding_size` is not configured, sets the limit to `32 MiB`.
* Adds the name of the failed query to the `GrpcStatus` error output message

---

* Increase gRPC max decoding message size using default value or configured value

* Remove const value for max_grpc_decoding_size and use default instead
  • Loading branch information
ljoss17 authored and git committed May 24, 2023
1 parent 68abcb3 commit 95fa693
Show file tree
Hide file tree
Showing 15 changed files with 219 additions and 37 deletions.
17 changes: 17 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/relayer-cli/src/chain_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ where
fee_granter: None,
max_msg_num: MaxMsgNum::default(),
max_tx_size: MaxTxSize::default(),
max_grpc_decoding_size: default::max_grpc_decoding_size(),
clock_drift: default::clock_drift(),
max_block_time: default::max_block_time(),
trusting_period: None,
Expand Down
5 changes: 5 additions & 0 deletions crates/relayer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ secp256k1 = { version = "0.27.0", features = ["rand-std"] }
strum = { version = "0.24.1", features = ["derive"] }
once_cell = "1.17.1"

[dependencies.byte-unit]
version = "4.0.19"
default-features = false
features = ["serde"]

[dependencies.num-bigint]
version = "0.4"
features = ["serde"]
Expand Down
80 changes: 64 additions & 16 deletions crates/relayer/src/chain/cosmos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ use num_bigint::BigInt;
use std::{cmp::Ordering, thread};

use tokio::runtime::Runtime as TokioRuntime;
use tonic::{codegen::http::Uri, metadata::AsciiMetadataValue};
use tonic::codegen::http::Uri;
use tonic::metadata::AsciiMetadataValue;
use tracing::{error, instrument, trace, warn};

use ibc_proto::cosmos::{
Expand Down Expand Up @@ -328,13 +329,16 @@ impl CosmosSdkChain {
)
.map_err(Error::grpc_transport)?;

client = client
.max_decoding_message_size(self.config().max_grpc_decoding_size.get_bytes() as usize);

let request = tonic::Request::new(
ibc_proto::interchain_security::ccv::consumer::v1::QueryParamsRequest {},
);

let response = self
.block_on(client.query_params(request))
.map_err(Error::grpc_status)?;
.map_err(|e| Error::grpc_status(e, "query_ccv_consumer_chain_params".to_owned()))?;

let params = response
.into_inner()
Expand Down Expand Up @@ -362,12 +366,15 @@ impl CosmosSdkChain {
)
.map_err(Error::grpc_transport)?;

client = client
.max_decoding_message_size(self.config().max_grpc_decoding_size.get_bytes() as usize);

let request =
tonic::Request::new(ibc_proto::cosmos::staking::v1beta1::QueryParamsRequest {});

let response = self
.block_on(client.params(request))
.map_err(Error::grpc_status)?;
.map_err(|e| Error::grpc_status(e, "query_staking_params".to_owned()))?;

let params = response
.into_inner()
Expand Down Expand Up @@ -414,6 +421,9 @@ impl CosmosSdkChain {
)
.map_err(Error::grpc_transport)?;

client = client
.max_decoding_message_size(self.config().max_grpc_decoding_size.get_bytes() as usize);

let request = tonic::Request::new(ibc_proto::cosmos::base::node::v1beta1::ConfigRequest {});

match self.block_on(client.config(request)) {
Expand All @@ -426,7 +436,7 @@ impl CosmosSdkChain {
if is_unimplemented_node_query(&e) {
Ok(None)
} else {
Err(Error::grpc_status(e))
Err(Error::grpc_status(e, "query_config_params".to_owned()))
}
}
}
Expand Down Expand Up @@ -1120,10 +1130,13 @@ impl ChainEndpoint for CosmosSdkChain {
)
.map_err(Error::grpc_transport)?;

client = client
.max_decoding_message_size(self.config().max_grpc_decoding_size.get_bytes() as usize);

let request = tonic::Request::new(request.into());
let response = self
.block_on(client.client_states(request))
.map_err(Error::grpc_status)?
.map_err(|e| Error::grpc_status(e, "query_clients".to_owned()))?
.into_inner();

// Deserialize into domain type
Expand Down Expand Up @@ -1308,12 +1321,15 @@ impl ChainEndpoint for CosmosSdkChain {
)
.map_err(Error::grpc_transport)?;

client = client
.max_decoding_message_size(self.config().max_grpc_decoding_size.get_bytes() as usize);

let request = tonic::Request::new(request.into());

let response = match self.block_on(client.client_connections(request)) {
Ok(res) => res.into_inner(),
Err(e) if e.code() == tonic::Code::NotFound => return Ok(vec![]),
Err(e) => return Err(Error::grpc_status(e)),
Err(e) => return Err(Error::grpc_status(e, "query_client_connections".to_owned())),
};

let ids = response
Expand Down Expand Up @@ -1349,11 +1365,14 @@ impl ChainEndpoint for CosmosSdkChain {
)
.map_err(Error::grpc_transport)?;

client = client
.max_decoding_message_size(self.config().max_grpc_decoding_size.get_bytes() as usize);

let request = tonic::Request::new(request.into());

let response = self
.block_on(client.connections(request))
.map_err(Error::grpc_status)?
.map_err(|e| Error::grpc_status(e, "query_connections".to_owned()))?
.into_inner();

let connections = response
Expand Down Expand Up @@ -1401,6 +1420,10 @@ impl ChainEndpoint for CosmosSdkChain {
.await
.map_err(Error::grpc_transport)?;

client = client.max_decoding_message_size(
chain.config().max_grpc_decoding_size.get_bytes() as usize,
);

let mut request = connection::QueryConnectionRequest {
connection_id: connection_id.to_string(),
}
Expand All @@ -1416,7 +1439,7 @@ impl ChainEndpoint for CosmosSdkChain {
if e.code() == tonic::Code::NotFound {
Error::connection_not_found(connection_id.clone())
} else {
Error::grpc_status(e)
Error::grpc_status(e, "query_connection".to_owned())
}
})?;

Expand Down Expand Up @@ -1479,11 +1502,14 @@ impl ChainEndpoint for CosmosSdkChain {
)
.map_err(Error::grpc_transport)?;

client = client
.max_decoding_message_size(self.config().max_grpc_decoding_size.get_bytes() as usize);

let request = tonic::Request::new(request.into());

let response = self
.block_on(client.connection_channels(request))
.map_err(Error::grpc_status)?
.map_err(|e| Error::grpc_status(e, "query_connection_channels".to_owned()))?
.into_inner();

let channels = response
Expand Down Expand Up @@ -1524,11 +1550,14 @@ impl ChainEndpoint for CosmosSdkChain {
)
.map_err(Error::grpc_transport)?;

client = client
.max_decoding_message_size(self.config().max_grpc_decoding_size.get_bytes() as usize);

let request = tonic::Request::new(request.into());

let response = self
.block_on(client.channels(request))
.map_err(Error::grpc_status)?
.map_err(|e| Error::grpc_status(e, "query_channels".to_owned()))?
.into_inner();

let channels = response
Expand Down Expand Up @@ -1599,11 +1628,14 @@ impl ChainEndpoint for CosmosSdkChain {
)
.map_err(Error::grpc_transport)?;

client = client
.max_decoding_message_size(self.config().max_grpc_decoding_size.get_bytes() as usize);

let request = tonic::Request::new(request.into());

let response = self
.block_on(client.channel_client_state(request))
.map_err(Error::grpc_status)?
.map_err(|e| Error::grpc_status(e, "query_channel_client_state".to_owned()))?
.into_inner();

let client_state: Option<IdentifiedAnyClientState> = response
Expand Down Expand Up @@ -1659,11 +1691,14 @@ impl ChainEndpoint for CosmosSdkChain {
)
.map_err(Error::grpc_transport)?;

client = client
.max_decoding_message_size(self.config().max_grpc_decoding_size.get_bytes() as usize);

let request = tonic::Request::new(request.into());

let response = self
.block_on(client.packet_commitments(request))
.map_err(Error::grpc_status)?
.map_err(|e| Error::grpc_status(e, "query_packet_commitments".to_owned()))?
.into_inner();

let mut commitment_sequences: Vec<Sequence> = response
Expand Down Expand Up @@ -1727,11 +1762,14 @@ impl ChainEndpoint for CosmosSdkChain {
)
.map_err(Error::grpc_transport)?;

client = client
.max_decoding_message_size(self.config().max_grpc_decoding_size.get_bytes() as usize);

let request = tonic::Request::new(request.into());

let mut response = self
.block_on(client.unreceived_packets(request))
.map_err(Error::grpc_status)?
.map_err(|e| Error::grpc_status(e, "query_unreceived_packets".to_owned()))?
.into_inner();

response.sequences.sort_unstable();
Expand Down Expand Up @@ -1788,11 +1826,14 @@ impl ChainEndpoint for CosmosSdkChain {
)
.map_err(Error::grpc_transport)?;

client = client
.max_decoding_message_size(self.config().max_grpc_decoding_size.get_bytes() as usize);

let request = tonic::Request::new(request.into());

let response = self
.block_on(client.packet_acknowledgements(request))
.map_err(Error::grpc_status)?
.map_err(|e| Error::grpc_status(e, "query_packet_acknowledgements".to_owned()))?
.into_inner();

let acks_sequences = response
Expand Down Expand Up @@ -1830,11 +1871,14 @@ impl ChainEndpoint for CosmosSdkChain {
)
.map_err(Error::grpc_transport)?;

client = client
.max_decoding_message_size(self.config().max_grpc_decoding_size.get_bytes() as usize);

let request = tonic::Request::new(request.into());

let mut response = self
.block_on(client.unreceived_acks(request))
.map_err(Error::grpc_status)?
.map_err(|e| Error::grpc_status(e, "query_unreceived_acknowledgements".to_owned()))?
.into_inner();

response.sequences.sort_unstable();
Expand Down Expand Up @@ -1886,11 +1930,15 @@ impl ChainEndpoint for CosmosSdkChain {
)
.map_err(Error::grpc_transport)?;

client = client.max_decoding_message_size(
self.config().max_grpc_decoding_size.get_bytes() as usize,
);

let request = tonic::Request::new(request.into());

let response = self
.block_on(client.next_sequence_receive(request))
.map_err(Error::grpc_status)?
.map_err(|e| Error::grpc_status(e, "query_next_sequence_receive".to_owned()))?
.into_inner();

Ok((Sequence::from(response.next_sequence_receive), None))
Expand Down
9 changes: 8 additions & 1 deletion crates/relayer/src/chain/cosmos/query/account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use prost::Message;
use tracing::info;

use crate::chain::cosmos::types::account::Account;
use crate::config::default::max_grpc_decoding_size;
use crate::error::Error;

/// Get a `&mut Account` from an `&mut Option<Account>` if it is `Some(Account)`.
Expand Down Expand Up @@ -57,14 +58,20 @@ pub async fn query_account(
.await
.map_err(Error::grpc_transport)?;

client = client.max_decoding_message_size(max_grpc_decoding_size().get_bytes() as usize);

let request = tonic::Request::new(QueryAccountRequest {
address: account_address.to_string(),
});

let response = client.account(request).await;

// Querying for an account might fail, i.e. if the account doesn't actually exist
let resp_account = match response.map_err(Error::grpc_status)?.into_inner().account {
let resp_account = match response
.map_err(|e| Error::grpc_status(e, "query_account".to_owned()))?
.into_inner()
.account
{
Some(account) => account,
None => return Err(Error::empty_query_account(account_address.to_string())),
};
Expand Down
12 changes: 9 additions & 3 deletions crates/relayer/src/chain/cosmos/query/balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use ibc_proto::cosmos::bank::v1beta1::{
query_client::QueryClient, QueryAllBalancesRequest, QueryBalanceRequest,
};

use crate::{account::Balance, error::Error};
use crate::account::Balance;
use crate::config::default::max_grpc_decoding_size;
use crate::error::Error;

/// Uses the GRPC client to retrieve the account balance for a specific denom
pub async fn query_balance(
Expand All @@ -16,6 +18,8 @@ pub async fn query_balance(
.await
.map_err(Error::grpc_transport)?;

client = client.max_decoding_message_size(max_grpc_decoding_size().get_bytes() as usize);

let request = tonic::Request::new(QueryBalanceRequest {
address: account_address.to_string(),
denom: denom.to_string(),
Expand All @@ -25,7 +29,7 @@ pub async fn query_balance(
.balance(request)
.await
.map(|r| r.into_inner())
.map_err(Error::grpc_status)?;
.map_err(|e| Error::grpc_status(e, "query_balance".to_owned()))?;

// Querying for a balance might fail, i.e. if the account doesn't actually exist
let balance = response
Expand All @@ -47,6 +51,8 @@ pub async fn query_all_balances(
.await
.map_err(Error::grpc_transport)?;

client = client.max_decoding_message_size(max_grpc_decoding_size().get_bytes() as usize);

let request = tonic::Request::new(QueryAllBalancesRequest {
address: account_address.to_string(),
pagination: None,
Expand All @@ -56,7 +62,7 @@ pub async fn query_all_balances(
.all_balances(request)
.await
.map(|r| r.into_inner())
.map_err(Error::grpc_status)?;
.map_err(|e| Error::grpc_status(e, "query_all_balances".to_owned()))?;

let balances = response
.balances
Expand Down
Loading

0 comments on commit 95fa693

Please sign in to comment.