Skip to content

Commit

Permalink
Use pull event source when generating configuration with `hermes conf…
Browse files Browse the repository at this point in the history
…ig auto` (#3920)

* Use pull event source when generating configuration with `hermes config auto`

* Add changelog entry
  • Loading branch information
romac authored Mar 27, 2024
1 parent a1c4360 commit ff9e2df
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 52 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- Use RPC (pull) event source instead of WebSocket (push) when generating configuration with `hermes config auto`
([\#3913](https://github.com/informalsystems/hermes/issues/3913))
2 changes: 1 addition & 1 deletion crates/chain-registry/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ define_error! {
.iter()
.join(", ");

format!("Error finding a healthy endpoint after {} retries. Endpoints: {endpoints}", e.retries)
format!("Did not find a healthy endpoint after {} retries. Endpoints: {endpoints}", e.retries)
},

UriParseError
Expand Down
49 changes: 16 additions & 33 deletions crates/chain-registry/src/querier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,14 @@ use std::str::FromStr;
use async_trait::async_trait;
use futures::{stream::FuturesUnordered, StreamExt};
use http::Uri;
use tokio::time::timeout;
use tokio::time::Duration;
use tendermint_rpc::HttpClient;
use tendermint_rpc::HttpClientUrl;
use tracing::{debug, info};

use ibc_proto::cosmos::bank::v1beta1::query_client::QueryClient;
use tendermint_rpc::{Client, SubscriptionClient, Url, WebSocketClient};
use tendermint_rpc::{Client, Url};

use crate::error::RegistryError;
use crate::formatter::{SimpleWebSocketFormatter, UriFormatter};

/// `QueryTypes` represents the basic types required to query a node
pub trait QueryTypes {
Expand Down Expand Up @@ -73,16 +72,15 @@ pub trait QueryContext: QueryTypes {

// ----------------- RPC ------------------

/// `SimpleHermesRpcQuerier` retrieves `HermesConfigData` by querying a list of RPC endpoints through their WebSocket API
/// and returns the result of the first endpoint to answer.
/// `SimpleHermesRpcQuerier` retrieves `HermesConfigData` by querying a list of RPC endpoints
/// through their RPC API and returns the result of the first endpoint to answer.
pub struct SimpleHermesRpcQuerier;

/// Data which must be retrieved from RPC endpoints for Hermes
#[derive(Clone, Debug)]
pub struct HermesConfigData {
pub rpc_address: Url,
pub max_block_size: u64,
pub websocket: Url,
// max_block_time should also be retrieved from the RPC
// however it looks like it is not in the genesis file anymore
}
Expand All @@ -101,46 +99,31 @@ impl QueryContext for SimpleHermesRpcQuerier {
RegistryError::no_healthy_rpc(chain_name)
}

/// Convert the RPC url to a WebSocket url, query the endpoint, return the data from the RPC.
async fn query(rpc: Self::QueryInput) -> Result<Self::QueryOutput, Self::QueryError> {
let websocket_addr = SimpleWebSocketFormatter::parse_or_build_address(rpc.as_str())?;
/// Query the endpoint, return the data from the RPC.
async fn query(rpc_url: Self::QueryInput) -> Result<Self::QueryOutput, Self::QueryError> {
info!("Querying RPC server at {rpc_url}");

info!("Querying WebSocket server at {websocket_addr}");
let url = HttpClientUrl::from_str(&rpc_url)
.map_err(|e| RegistryError::tendermint_url_parse_error(rpc_url.clone(), e))?;

let (client, driver) = timeout(
Duration::from_secs(5),
WebSocketClient::new(websocket_addr.clone()),
)
.await
.map_err(|e| RegistryError::websocket_time_out_error(websocket_addr.to_string(), e))?
.map_err(|e| RegistryError::websocket_connect_error(websocket_addr.to_string(), e))?;

let driver_handle = tokio::spawn(driver.run());
let client = HttpClient::builder(url)
.build()
.map_err(|e| RegistryError::rpc_connect_error(rpc_url.clone(), e))?;

let latest_consensus_params = match client.latest_consensus_params().await {
Ok(response) => response.consensus_params.block.max_bytes,
Err(e) => {
return Err(RegistryError::rpc_consensus_params_error(
websocket_addr.to_string(),
rpc_url.to_string(),
e,
))
}
};

client.close().map_err(|e| {
RegistryError::websocket_conn_close_error(websocket_addr.to_string(), e)
})?;

driver_handle
.await
.map_err(|e| RegistryError::join_error("chain_data_join".to_string(), e))?
.map_err(|e| RegistryError::websocket_driver_error(websocket_addr.to_string(), e))?;

Ok(HermesConfigData {
rpc_address: Url::from_str(&rpc)
.map_err(|e| RegistryError::tendermint_url_parse_error(rpc, e))?,
rpc_address: Url::from_str(&rpc_url)
.map_err(|e| RegistryError::tendermint_url_parse_error(rpc_url, e))?,
max_block_size: latest_consensus_params,
websocket: websocket_addr,
})
}
}
Expand Down
26 changes: 11 additions & 15 deletions crates/relayer-cli/src/chain_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,6 @@ where
)
.await?;

let websocket_address =
rpc_data.websocket.clone().try_into().map_err(|e| {
RegistryError::websocket_url_parse_error(rpc_data.websocket.to_string(), e)
})?;

let avg_gas_price = if let Some(fee_token) = chain_data.fees.fee_tokens.first() {
fee_token.average_gas_price
} else {
Expand All @@ -131,9 +126,9 @@ where
id: chain_data.chain_id,
rpc_addr: rpc_data.rpc_address,
grpc_addr: grpc_address,
event_source: EventSourceMode::Push {
url: websocket_address,
batch_delay: default::batch_delay(),
event_source: EventSourceMode::Pull {
interval: default::poll_interval(),
max_retries: default::max_retries(),
},
rpc_timeout: default::rpc_timeout(),
trusted_node: default::trusted_node(),
Expand Down Expand Up @@ -191,6 +186,7 @@ where
for i in 0..retries {
let query_response =
QuerierType::query_healthy(chain_name.to_string(), endpoints.clone()).await;

match query_response {
Ok(r) => {
return Ok(r);
Expand All @@ -200,13 +196,13 @@ where
}
}
}
Err(RegistryError::unhealthy_endpoints(
endpoints
.iter()
.map(|endpoint| endpoint.to_string())
.collect(),
retries,
))

let endpoints = endpoints
.iter()
.map(|endpoint| endpoint.to_string())
.collect();

Err(RegistryError::unhealthy_endpoints(endpoints, retries))
}

/// Fetches the specified resources from the Cosmos chain registry, using the specified commit hash
Expand Down
10 changes: 8 additions & 2 deletions crates/relayer-cli/src/commands/config/auto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use ibc_relayer::config::{store, ChainConfig, Config};

use std::collections::HashSet;
use std::path::PathBuf;
use tracing::{info, warn};
use tracing::{error, info, warn};

fn find_key(chain_config: &ChainConfig) -> Option<String> {
let keys = chain_config.list_keys().ok()?;
Expand Down Expand Up @@ -115,7 +115,13 @@ impl Runnable for AutoCmd {
let mut chain_configs: Vec<(String, ChainConfig)> = config_results
.unwrap()
.into_iter()
.filter_map(|(name, config)| config.ok().map(|c| (name, c)))
.filter_map(|(name, config)| match config {
Ok(config) => Some((name, config)),
Err(e) => {
error!("Failed to generate chain config for chain '{name}': {e}");
None
}
})
.collect();

// Determine which chains were not fetched
Expand Down
2 changes: 1 addition & 1 deletion crates/relayer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ pub mod default {
}

pub fn poll_interval() -> Duration {
Duration::from_secs(1)
Duration::from_millis(500)
}

pub fn max_retries() -> u32 {
Expand Down

0 comments on commit ff9e2df

Please sign in to comment.