diff --git a/.changelog/unreleased/features/ibc-relayer/2850-poll-event-source.md b/.changelog/unreleased/features/ibc-relayer/2850-poll-event-source.md new file mode 100644 index 0000000000..933a7edede --- /dev/null +++ b/.changelog/unreleased/features/ibc-relayer/2850-poll-event-source.md @@ -0,0 +1,12 @@ +- Add a poll-based event source which fetches events from the chain using + the `/block_results` RPC endpoint instead of getting them over WebSocket. + + To use the poll-based event source, set `event_source = 'poll'` in the per-chain configuration. + + **Warning** + Only use this if you think Hermes is not getting all + the events it should, eg. when relaying for a CosmWasm-enabled blockchain + which emits IBC events in a smart contract where the events lack the + `message` attribute key. See #3190 and #2809 for more background information. + + ([\#2850](https://github.com/informalsystems/hermes/issues/2850)) diff --git a/ci/misbehaviour/config.toml b/ci/misbehaviour/config.toml index 16b0e4f8d0..f5531d3f81 100644 --- a/ci/misbehaviour/config.toml +++ b/ci/misbehaviour/config.toml @@ -120,7 +120,7 @@ grpc_addr = 'http://127.0.0.1:9090' # Specify the WebSocket address and port where the chain WebSocket server # listens on. Required -websocket_addr = 'ws://127.0.0.1:26657/websocket' +event_source = { mode = 'push', url = 'ws://127.0.0.1:26657/websocket', batch_delay = '500ms' } # Specify the maximum amount of time (duration) that the RPC requests should # take before timing out. Default: 10s (10 seconds) @@ -301,7 +301,7 @@ memo_prefix = '' id = 'ibc-1' rpc_addr = 'http://127.0.0.1:26557' grpc_addr = 'http://127.0.0.1:9091' -websocket_addr = 'ws://127.0.0.1:26557/websocket' +event_source = { mode = 'push', url = 'ws://127.0.0.1:26557/websocket', batch_delay = '500ms' } rpc_timeout = '10s' account_prefix = 'cosmos' key_name = 'testkey' diff --git a/ci/misbehaviour/config_fork.toml b/ci/misbehaviour/config_fork.toml index 2996f08f99..d911b14798 100644 --- a/ci/misbehaviour/config_fork.toml +++ b/ci/misbehaviour/config_fork.toml @@ -118,9 +118,8 @@ rpc_addr = 'http://127.0.0.1:26657' # Specify the GRPC address and port where the chain GRPC server listens on. Required grpc_addr = 'http://127.0.0.1:9090' -# Specify the WebSocket address and port where the chain WebSocket server -# listens on. Required -websocket_addr = 'ws://127.0.0.1:26657/websocket' +# The type of event source to use for getting events from the chain. +event_source = { mode = 'push', url = 'ws://127.0.0.1:26557/websocket', batch_delay = '500ms' } # Specify the maximum amount of time (duration) that the RPC requests should # take before timing out. Default: 10s (10 seconds) @@ -301,7 +300,7 @@ memo_prefix = '' id = 'ibc-1' rpc_addr = 'http://127.0.0.1:26457' grpc_addr = 'http://127.0.0.1:9092' -websocket_addr = 'ws://127.0.0.1:26457/websocket' +event_source = { mode = 'push', url = 'ws://127.0.0.1:26457/websocket', batch_delay = '500ms' } rpc_timeout = '10s' account_prefix = 'cosmos' key_name = 'testkey' diff --git a/config.toml b/config.toml index 079a72d25c..0331d6e677 100644 --- a/config.toml +++ b/config.toml @@ -128,9 +128,34 @@ rpc_addr = 'http://127.0.0.1:26657' # Specify the GRPC address and port where the chain GRPC server listens on. Required grpc_addr = 'http://127.0.0.1:9090' -# Specify the WebSocket address and port where the chain WebSocket server -# listens on. Required -websocket_addr = 'ws://127.0.0.1:26657/websocket' +# The type of event source to use for getting events from the chain. +# +# This setting can take two types of values, as an inline table: +# +# a) Push: for receiving IBC events over WebSocket +# +# `{ mode = 'push', url = 'ws://127.0.0.1:26657/websocket', batch_delay = '500ms' }` +# +# where +# +# - `url` is the WebSocket URL to connect to. Required +# - `batch_delay` is the delay until event batch is +# emitted in the absence of NewBlock event. Default: 500ms +# Lower values will result in faster event processing, improving the latency of Hermes, +# but may split the events into more batches than necessary, requiring more client updates +# to be submitted, yielding higher costs. Higher values will result in slower event +# processing, increasing the latency of Hermes, but are more likely to batch events together. +# The default value provides good latency while minimizing the number of client updates needed. + +# b) Pull: for polling for IBC events via the `/block_results` RPC endpoint +# +# `{ mode = 'pull', interval = '1s' }` +# +# where +# +# - `interval` is the interval at which to poll for blocks. Default: 1s +# +event_source = { mode = 'push', url = 'ws://127.0.0.1:26657/websocket', batch_delay = '500ms' } # Specify the maximum amount of time (duration) that the RPC requests should # take before timing out. Default: 10s (10 seconds) @@ -149,18 +174,6 @@ rpc_timeout = '10s' # Default: false trusted_node = false -# Delay until event batch is emitted if no NewBlock events have come yet. -# -# Lower values will result in faster event processing, improving the latency of Hermes, -# but may split the events into more batches than necessary, requiring more client updates -# to be submitted, yielding higher costs. Higher values will result in slower event -# processing, increasing the latency of Hermes, but are more likely to batch events together. -# -# The default value provides good latency while minimizing the number of client updates needed. -# -# Default: 500ms -batch_delay = '500ms' - # Specify the prefix used by the chain. Required account_prefix = 'cosmos' @@ -338,9 +351,8 @@ memo_prefix = '' id = 'ibc-1' rpc_addr = 'http://127.0.0.1:26557' grpc_addr = 'http://127.0.0.1:9091' -websocket_addr = 'ws://127.0.0.1:26557/websocket' +event_source = { mode = 'push', url = 'ws://127.0.0.1:26557/websocket', batch_delay = '500ms' } rpc_timeout = '10s' -batch_delay = '500ms' trusted_node = false account_prefix = 'cosmos' key_name = 'testkey' diff --git a/crates/relayer-cli/example b/crates/relayer-cli/example deleted file mode 100644 index 6323f50bc3..0000000000 --- a/crates/relayer-cli/example +++ /dev/null @@ -1,199 +0,0 @@ -[global] -log_level = 'info' -[mode.clients] -enabled = true -refresh = true -misbehaviour = true - -[mode.connections] -enabled = false - -[mode.channels] -enabled = false - -[mode.packets] -enabled = true -clear_interval = 100 -clear_on_start = true -tx_confirmation = false - -[rest] -enabled = false -host = '127.0.0.1' -port = 3000 - -[telemetry] -enabled = false -host = '127.0.0.1' -port = 3001 - -[[chains]] -id = 'cosmoshub-4' -type = 'CosmosSdk' -rpc_addr = 'https://rpc-cosmoshub.ecostake.com/' -websocket_addr = 'wss://rpc-cosmoshub.ecostake.com/websocket' -grpc_addr = 'https://grpc-cosmoshub-ia.notional.ventures/' -rpc_timeout = '10s' -account_prefix = 'cosmos' -key_name = 'a' -key_store_type = 'Test' -store_prefix = 'ibc' -default_gas = 100000 -max_gas = 400000 -gas_multiplier = 1.1 -max_msg_num = 30 -max_tx_size = 2097152 -clock_drift = '5s' -max_block_time = '30s' -memo_prefix = '' -proof_specs = ''' -[ - { - "leaf_spec": { - "hash": 1, - "prehash_key": 0, - "prehash_value": 1, - "length": 1, - "prefix": "AA==" - }, - "inner_spec": { - "child_order": [ - 0, - 1 - ], - "child_size": 33, - "min_prefix_length": 4, - "max_prefix_length": 12, - "empty_child": "", - "hash": 1 - }, - "max_depth": 0, - "min_depth": 0 - }, - { - "leaf_spec": { - "hash": 1, - "prehash_key": 0, - "prehash_value": 1, - "length": 1, - "prefix": "AA==" - }, - "inner_spec": { - "child_order": [ - 0, - 1 - ], - "child_size": 32, - "min_prefix_length": 1, - "max_prefix_length": 1, - "empty_child": "", - "hash": 1 - }, - "max_depth": 0, - "min_depth": 0 - } -]''' - -[chains.trust_threshold] -numerator = '1' -denominator = '3' - -[chains.gas_price] -price = 0.1 -denom = 'uatom' - -[chains.packet_filter] -policy = 'allow' -list = [[ - 'transfer', - 'channel-141', -]] - -[chains.address_type] -derivation = 'cosmos' - -[[chains]] -id = 'osmosis-1' -type = 'CosmosSdk' -rpc_addr = 'https://rpc-osmosis.ecostake.com/' -websocket_addr = 'wss://rpc-osmosis.ecostake.com/websocket' -grpc_addr = 'https://grpc-osmosis-ia.notional.ventures/' -rpc_timeout = '10s' -account_prefix = 'osmo' -key_name = 'b' -key_store_type = 'Test' -store_prefix = 'ibc' -default_gas = 100000 -max_gas = 400000 -gas_multiplier = 1.1 -max_msg_num = 30 -max_tx_size = 2097152 -clock_drift = '5s' -max_block_time = '30s' -memo_prefix = '' -proof_specs = ''' -[ - { - "leaf_spec": { - "hash": 1, - "prehash_key": 0, - "prehash_value": 1, - "length": 1, - "prefix": "AA==" - }, - "inner_spec": { - "child_order": [ - 0, - 1 - ], - "child_size": 33, - "min_prefix_length": 4, - "max_prefix_length": 12, - "empty_child": "", - "hash": 1 - }, - "max_depth": 0, - "min_depth": 0 - }, - { - "leaf_spec": { - "hash": 1, - "prehash_key": 0, - "prehash_value": 1, - "length": 1, - "prefix": "AA==" - }, - "inner_spec": { - "child_order": [ - 0, - 1 - ], - "child_size": 32, - "min_prefix_length": 1, - "max_prefix_length": 1, - "empty_child": "", - "hash": 1 - }, - "max_depth": 0, - "min_depth": 0 - } -]''' - -[chains.trust_threshold] -numerator = '1' -denominator = '3' - -[chains.gas_price] -price = 0.1 -denom = 'uosmo' - -[chains.packet_filter] -policy = 'allow' -list = [[ - 'transfer', - 'channel-0', -]] - -[chains.address_type] -derivation = 'cosmos' - diff --git a/crates/relayer-cli/src/chain_registry.rs b/crates/relayer-cli/src/chain_registry.rs index 7149cf3ff8..fc3909127a 100644 --- a/crates/relayer-cli/src/chain_registry.rs +++ b/crates/relayer-cli/src/chain_registry.rs @@ -19,7 +19,7 @@ use ibc_chain_registry::querier::*; use ibc_relayer::config::filter::{FilterPattern, PacketFilter}; use ibc_relayer::config::gas_multiplier::GasMultiplier; use ibc_relayer::config::types::{MaxMsgNum, MaxTxSize, Memo}; -use ibc_relayer::config::{default, AddressType, ChainConfig, GasPrice}; +use ibc_relayer::config::{default, AddressType, ChainConfig, EventSourceMode, GasPrice}; use ibc_relayer::keyring::Store; use tendermint_light_client_verifier::types::TrustThreshold; @@ -107,6 +107,7 @@ where MAX_HEALTHY_QUERY_RETRIES, ) .await?; + let websocket_address = rpc_data.websocket.clone().try_into().map_err(|e| { RegistryError::websocket_url_parse_error(rpc_data.websocket.to_string(), e) @@ -116,10 +117,12 @@ where id: chain_data.chain_id, r#type: default::chain_type(), rpc_addr: rpc_data.rpc_address, - websocket_addr: websocket_address, grpc_addr: grpc_address, + event_source: EventSourceMode::Push { + url: websocket_address, + batch_delay: default::batch_delay(), + }, rpc_timeout: default::rpc_timeout(), - batch_delay: default::batch_delay(), trusted_node: default::trusted_node(), genesis_restart: None, account_prefix: chain_data.bech32_prefix, diff --git a/crates/relayer-cli/src/commands/listen.rs b/crates/relayer-cli/src/commands/listen.rs index 0e47bed7c3..84a0c383e8 100644 --- a/crates/relayer-cli/src/commands/listen.rs +++ b/crates/relayer-cli/src/commands/listen.rs @@ -14,7 +14,11 @@ use tendermint_rpc::{client::CompatMode, Client, HttpClient}; use tokio::runtime::Runtime as TokioRuntime; use tracing::{error, info, instrument}; -use ibc_relayer::{chain::handle::Subscription, config::ChainConfig, event::monitor::EventMonitor}; +use ibc_relayer::{ + chain::handle::Subscription, + config::{ChainConfig, EventSourceMode}, + event::source::websocket::EventSource, +}; use ibc_relayer_types::{core::ics24_host::identifier::ChainId, events::IbcEvent}; use crate::prelude::*; @@ -140,23 +144,27 @@ fn subscribe( compat_mode: CompatMode, rt: Arc, ) -> eyre::Result { - let (mut event_monitor, tx_cmd) = EventMonitor::new( + let EventSourceMode::Push { url, batch_delay } = &chain_config.event_source else { + return Err(eyre!("unsupported event source mode, only 'push' is supported for listening to events")); + }; + + let (mut event_source, tx_cmd) = EventSource::new( chain_config.id.clone(), - chain_config.websocket_addr.clone(), + url.clone(), compat_mode, - chain_config.batch_delay, + *batch_delay, rt, ) - .map_err(|e| eyre!("could not initialize event monitor: {}", e))?; + .map_err(|e| eyre!("could not initialize event source: {}", e))?; - event_monitor + event_source .init_subscriptions() .map_err(|e| eyre!("could not initialize subscriptions: {}", e))?; - let queries = event_monitor.queries(); + let queries = event_source.queries(); info!("listening for queries: {}", queries.iter().format(", "),); - thread::spawn(|| event_monitor.run()); + thread::spawn(|| event_source.run()); let subscription = tx_cmd.subscribe()?; Ok(subscription) diff --git a/crates/relayer-cli/tests/fixtures/two_chains.toml b/crates/relayer-cli/tests/fixtures/two_chains.toml index 6ce815451c..db9358261d 100644 --- a/crates/relayer-cli/tests/fixtures/two_chains.toml +++ b/crates/relayer-cli/tests/fixtures/two_chains.toml @@ -24,7 +24,7 @@ tx_confirmation = true id = 'ibc-0' rpc_addr = 'http://127.0.0.1:26657' grpc_addr = 'http://127.0.0.1:9090' -websocket_addr = 'ws://127.0.0.1:26657/websocket' +event_source = { mode = 'push', url = 'ws://127.0.0.1:26657/websocket', batch_delay = '500ms' } rpc_timeout = '10s' account_prefix = 'cosmos' key_name = 'testkey' @@ -39,9 +39,9 @@ denominator = '3' [[chains]] id = 'ibc-1' -rpc_addr = 'http://127.0.0.1:26657' -grpc_addr = 'http://127.0.0.1:9090' -websocket_addr = 'ws://127.0.0.1:26657/websocket' +rpc_addr = 'http://127.0.0.1:26457' +grpc_addr = 'http://127.0.0.1:9091' +event_source = { mode = 'push', url = 'ws://127.0.0.1:26457/websocket', batch_delay = '500ms' } rpc_timeout = '10s' account_prefix = 'cosmos' key_name = 'testkey' diff --git a/crates/relayer-rest/tests/mock.rs b/crates/relayer-rest/tests/mock.rs index 138834b496..f4dcf0b9f6 100644 --- a/crates/relayer-rest/tests/mock.rs +++ b/crates/relayer-rest/tests/mock.rs @@ -98,7 +98,7 @@ const MOCK_CHAIN_CONFIG: &str = r#" id = 'mock-0' rpc_addr = 'http://127.0.0.1:26557' grpc_addr = 'http://127.0.0.1:9091' -websocket_addr = 'ws://127.0.0.1:26557/websocket' +event_source = { mode = 'push', url = 'ws://127.0.0.1:26557/websocket', batch_delay = '500ms' } rpc_timeout = '10s' account_prefix = 'cosmos' key_name = 'testkey' diff --git a/crates/relayer-types/src/core/ics02_client/height.rs b/crates/relayer-types/src/core/ics02_client/height.rs index 74c62208c2..fb4c9c0f85 100644 --- a/crates/relayer-types/src/core/ics02_client/height.rs +++ b/crates/relayer-types/src/core/ics02_client/height.rs @@ -1,5 +1,4 @@ use std::cmp::Ordering; - use std::num::ParseIntError; use std::str::FromStr; @@ -10,6 +9,7 @@ use serde_derive::{Deserialize, Serialize}; use ibc_proto::ibc::core::client::v1::Height as RawHeight; use crate::core::ics02_client::error::Error; +use crate::core::ics24_host::identifier::ChainId; #[derive(Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct Height { @@ -32,6 +32,13 @@ impl Height { }) } + pub fn from_tm(height: tendermint::block::Height, chain_id: &ChainId) -> Self { + Self { + revision_number: chain_id.version(), + revision_height: height.value(), + } + } + pub fn revision_number(&self) -> u64 { self.revision_number } diff --git a/crates/relayer/src/chain/cosmos.rs b/crates/relayer/src/chain/cosmos.rs index b70ca6f465..f782e886c9 100644 --- a/crates/relayer/src/chain/cosmos.rs +++ b/crates/relayer/src/chain/cosmos.rs @@ -97,7 +97,7 @@ use crate::config::{parse_gas_prices, ChainConfig, GasPrice}; use crate::consensus_state::AnyConsensusState; use crate::denom::DenomTrace; use crate::error::Error; -use crate::event::monitor::{EventMonitor, TxMonitorCmd}; +use crate::event::source::{EventSource, TxEventSourceCmd}; use crate::event::IbcEventWithHeight; use crate::keyring::{KeyRing, Secp256k1KeyPair, SigningKeyPair}; use crate::light_client::tendermint::LightClient as TmLightClient; @@ -150,7 +150,7 @@ pub struct CosmosSdkChain { /// A cached copy of the account information account: Option, - tx_monitor_cmd: Option, + tx_monitor_cmd: Option, } impl CosmosSdkChain { @@ -286,28 +286,34 @@ impl CosmosSdkChain { Ok(()) } - fn init_event_monitor(&mut self) -> Result { + fn init_event_source(&mut self) -> Result { crate::time!( - "init_event_monitor", + "init_event_source", { "src_chain": self.config().id.to_string(), } ); - let (mut event_monitor, monitor_tx) = EventMonitor::new( - self.config.id.clone(), - self.config.websocket_addr.clone(), - self.compat_mode, - self.config.batch_delay, - self.rt.clone(), - ) - .map_err(Error::event_monitor)?; + use crate::config::EventSourceMode as Mode; - event_monitor - .init_subscriptions() - .map_err(Error::event_monitor)?; + let (event_source, monitor_tx) = match &self.config.event_source { + Mode::Push { url, batch_delay } => EventSource::websocket( + self.config.id.clone(), + url.clone(), + self.compat_mode, + *batch_delay, + self.rt.clone(), + ), + Mode::Pull { interval } => EventSource::rpc( + self.config.id.clone(), + self.rpc_client.clone(), + *interval, + self.rt.clone(), + ), + } + .map_err(Error::event_source)?; - thread::spawn(move || event_monitor.run()); + thread::spawn(move || event_source.run()); Ok(monitor_tx) } @@ -884,7 +890,7 @@ impl ChainEndpoint for CosmosSdkChain { fn shutdown(self) -> Result<(), Error> { if let Some(monitor_tx) = self.tx_monitor_cmd { - monitor_tx.shutdown().map_err(Error::event_monitor)?; + monitor_tx.shutdown().map_err(Error::event_source)?; } Ok(()) @@ -902,13 +908,13 @@ impl ChainEndpoint for CosmosSdkChain { let tx_monitor_cmd = match &self.tx_monitor_cmd { Some(tx_monitor_cmd) => tx_monitor_cmd, None => { - let tx_monitor_cmd = self.init_event_monitor()?; + let tx_monitor_cmd = self.init_event_source()?; self.tx_monitor_cmd = Some(tx_monitor_cmd); self.tx_monitor_cmd.as_ref().unwrap() } }; - let subscription = tx_monitor_cmd.subscribe().map_err(Error::event_monitor)?; + let subscription = tx_monitor_cmd.subscribe().map_err(Error::event_source)?; Ok(subscription) } diff --git a/crates/relayer/src/chain/handle.rs b/crates/relayer/src/chain/handle.rs index 93aa773f65..be905173cb 100644 --- a/crates/relayer/src/chain/handle.rs +++ b/crates/relayer/src/chain/handle.rs @@ -36,7 +36,7 @@ use crate::{ denom::DenomTrace, error::Error, event::{ - monitor::{EventBatch, Result as MonitorResult}, + source::{EventBatch, Result as MonitorResult}, IbcEventWithHeight, }, keyring::AnySigningKeyPair, diff --git a/crates/relayer/src/chain/runtime.rs b/crates/relayer/src/chain/runtime.rs index 2c97d55bda..0426f6377d 100644 --- a/crates/relayer/src/chain/runtime.rs +++ b/crates/relayer/src/chain/runtime.rs @@ -53,7 +53,7 @@ use super::{ pub struct Threads { pub chain_runtime: thread::JoinHandle<()>, - pub event_monitor: Option>, + pub event_source: Option>, } pub struct ChainRuntime { diff --git a/crates/relayer/src/config.rs b/crates/relayer/src/config.rs index 78379362fb..f6754c34e5 100644 --- a/crates/relayer/src/config.rs +++ b/crates/relayer/src/config.rs @@ -173,6 +173,10 @@ pub mod default { Duration::from_secs(10) } + pub fn poll_interval() -> Duration { + Duration::from_secs(1) + } + pub fn batch_delay() -> Duration { Duration::from_millis(500) } @@ -450,21 +454,55 @@ pub struct GenesisRestart { pub archive_addr: Url, } +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] +#[serde(tag = "mode", rename_all = "lowercase")] +pub enum EventSourceMode { + /// Push-based event source, via WebSocket + Push { + /// The WebSocket URL to connect to + url: WebSocketClientUrl, + + /// Maximum amount of time to wait for a NewBlock event before emitting the event batch + #[serde(default = "default::batch_delay", with = "humantime_serde")] + batch_delay: Duration, + }, + + /// Pull-based event source, via RPC /block_results + #[serde(alias = "poll")] + Pull { + /// The polling interval + #[serde(default = "default::poll_interval", with = "humantime_serde")] + interval: Duration, + }, +} + #[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] #[serde(deny_unknown_fields)] pub struct ChainConfig { + /// The chain's network identifier pub id: ChainId, + + /// The chain type #[serde(default = "default::chain_type")] pub r#type: ChainType, + + /// The RPC URL to connect to pub rpc_addr: Url, - pub websocket_addr: WebSocketClientUrl, + + /// The gRPC URL to connect to pub grpc_addr: Url, + + /// The type of event source and associated settings + pub event_source: EventSourceMode, + + /// Timeout used when issuing RPC queries #[serde(default = "default::rpc_timeout", with = "humantime_serde")] pub rpc_timeout: Duration, - #[serde(default = "default::batch_delay", with = "humantime_serde")] - pub batch_delay: Duration, + + /// Whether or not the full node Hermes connects to is trusted #[serde(default = "default::trusted_node")] pub trusted_node: bool, + pub account_prefix: String, pub key_name: String, #[serde(default)] diff --git a/crates/relayer/src/error.rs b/crates/relayer/src/error.rs index d907b3b3eb..28bd1fc1b9 100644 --- a/crates/relayer/src/error.rs +++ b/crates/relayer/src/error.rs @@ -41,7 +41,7 @@ use ibc_relayer_types::{ use crate::chain::cosmos::version; use crate::chain::cosmos::BLOCK_MAX_BYTES_MAX_FRACTION; use crate::config::Error as ConfigError; -use crate::event::monitor; +use crate::event::source; use crate::keyring::{errors::Error as KeyringError, KeyType}; use crate::sdk_error::SdkError; @@ -87,9 +87,9 @@ define_error! { { url: tendermint_rpc::Url } |e| { format!("Websocket error to endpoint {}", e.url) }, - EventMonitor - [ monitor::Error ] - |_| { "event monitor error" }, + EventSource + [ source::Error ] + |_| { "event source error" }, Grpc |_| { "gRPC error" }, diff --git a/crates/relayer/src/event.rs b/crates/relayer/src/event.rs index 0ed811c66e..7d3e958505 100644 --- a/crates/relayer/src/event.rs +++ b/crates/relayer/src/event.rs @@ -1,4 +1,7 @@ use core::fmt::{Display, Error as FmtError, Formatter}; +use serde::Serialize; +use tendermint::abci::Event as AbciEvent; + use ibc_relayer_types::{ applications::ics29_fee::events::{DistributeFeePacket, IncentivizedPacket}, applications::ics31_icq::events::CrossChainQueryPacket, @@ -21,14 +24,12 @@ use ibc_relayer_types::{ events::{Error as IbcEventError, IbcEvent, IbcEventType}, Height, }; -use serde::Serialize; -use tendermint::abci::Event as AbciEvent; use crate::light_client::decode_header; pub mod bus; -pub mod monitor; -pub mod rpc; +pub mod error; +pub mod source; #[derive(Clone, Debug, Serialize)] pub struct IbcEventWithHeight { diff --git a/crates/relayer/src/event/monitor/error.rs b/crates/relayer/src/event/error.rs similarity index 90% rename from crates/relayer/src/event/monitor/error.rs rename to crates/relayer/src/event/error.rs index 5bd12337dd..daf1e0a01b 100644 --- a/crates/relayer/src/event/monitor/error.rs +++ b/crates/relayer/src/event/error.rs @@ -36,10 +36,10 @@ define_error! { |e| { format!("failed to extract IBC events: {0}", e.reason) }, ChannelSendFailed - |_| { "event monitor: internal message-passing failure: could not send message" }, + |_| { "event source: internal message-passing failure: could not send message" }, ChannelRecvFailed - |_| { "event monitor: internal message-passing failure: could not receive message" }, + |_| { "event source: internal message-passing failure: could not receive message" }, SubscriptionCancelled [ TraceError ] diff --git a/crates/relayer/src/event/source.rs b/crates/relayer/src/event/source.rs new file mode 100644 index 0000000000..49e76441e8 --- /dev/null +++ b/crates/relayer/src/event/source.rs @@ -0,0 +1,143 @@ +pub mod rpc; +pub mod websocket; + +use std::{sync::Arc, time::Duration}; + +use crossbeam_channel as channel; + +use futures::Stream; +use tendermint_rpc::{ + client::CompatMode, event::Event as RpcEvent, Error as RpcError, HttpClient, WebSocketClientUrl, +}; +use tokio::runtime::Runtime as TokioRuntime; + +use ibc_relayer_types::{ + core::ics02_client::height::Height, core::ics24_host::identifier::ChainId, +}; + +pub use super::error::{Error, ErrorDetail}; + +use super::IbcEventWithHeight; +use crate::chain::{handle::Subscription, tracking::TrackingId}; + +pub type Result = core::result::Result; + +pub enum EventSource { + WebSocket(websocket::EventSource), + Rpc(rpc::EventSource), +} + +impl EventSource { + pub fn websocket( + chain_id: ChainId, + ws_url: WebSocketClientUrl, + rpc_compat: CompatMode, + batch_delay: Duration, + rt: Arc, + ) -> Result<(Self, TxEventSourceCmd)> { + let (mut source, tx) = + websocket::EventSource::new(chain_id, ws_url, rpc_compat, batch_delay, rt)?; + + source.init_subscriptions()?; + + Ok((Self::WebSocket(source), tx)) + } + + pub fn rpc( + chain_id: ChainId, + rpc_client: HttpClient, + poll_interval: Duration, + rt: Arc, + ) -> Result<(Self, TxEventSourceCmd)> { + let (source, tx) = rpc::EventSource::new(chain_id, rpc_client, poll_interval, rt)?; + Ok((Self::Rpc(source), tx)) + } + + pub fn run(self) { + match self { + Self::WebSocket(source) => source.run(), + Self::Rpc(source) => source.run(), + } + } +} + +/// A batch of events from a chain at a specific height +#[derive(Clone, Debug)] +pub struct EventBatch { + pub chain_id: ChainId, + pub tracking_id: TrackingId, + pub height: Height, + pub events: Vec, +} + +type SubscriptionResult = core::result::Result; +type SubscriptionStream = dyn Stream + Send + Sync + Unpin; + +pub type EventSender = channel::Sender>; +pub type EventReceiver = channel::Receiver>; + +#[derive(Clone, Debug)] +pub struct TxEventSourceCmd(channel::Sender); + +impl TxEventSourceCmd { + pub fn shutdown(&self) -> Result<()> { + self.0 + .send(EventSourceCmd::Shutdown) + .map_err(|_| Error::channel_send_failed()) + } + + pub fn subscribe(&self) -> Result { + let (tx, rx) = crossbeam_channel::bounded(1); + + self.0 + .send(EventSourceCmd::Subscribe(tx)) + .map_err(|_| Error::channel_send_failed())?; + + let subscription = rx.recv().map_err(|_| Error::channel_recv_failed())?; + Ok(subscription) + } +} + +#[derive(Debug)] +pub enum EventSourceCmd { + Shutdown, + Subscribe(channel::Sender), +} + +// TODO: These are SDK specific, should be eventually moved. +pub mod queries { + use tendermint_rpc::query::{EventType, Query}; + + pub fn all() -> Vec { + // Note: Tendermint-go supports max 5 query specifiers! + vec![ + new_block(), + ibc_client(), + ibc_connection(), + ibc_channel(), + ibc_query(), + // This will be needed when we send misbehavior evidence to full node + // Query::eq("message.module", "evidence"), + ] + } + + pub fn new_block() -> Query { + Query::from(EventType::NewBlock) + } + + pub fn ibc_client() -> Query { + Query::eq("message.module", "ibc_client") + } + + pub fn ibc_connection() -> Query { + Query::eq("message.module", "ibc_connection") + } + + pub fn ibc_channel() -> Query { + Query::eq("message.module", "ibc_channel") + } + + pub fn ibc_query() -> Query { + Query::eq("message.module", "interchainquery") + } +} diff --git a/crates/relayer/src/event/source/rpc.rs b/crates/relayer/src/event/source/rpc.rs new file mode 100644 index 0000000000..b684cc6590 --- /dev/null +++ b/crates/relayer/src/event/source/rpc.rs @@ -0,0 +1,402 @@ +pub mod extract; + +use std::sync::Arc; + +use crossbeam_channel as channel; +use tokio::{ + runtime::Runtime as TokioRuntime, + time::{sleep, Duration, Instant}, +}; +use tracing::{debug, error, error_span, trace}; + +use tendermint::abci; +use tendermint::block::Height as BlockHeight; +use tendermint_rpc::{Client, HttpClient}; + +use ibc_relayer_types::{ + core::{ + ics02_client::{events::NewBlock, height::Height}, + ics24_host::identifier::ChainId, + }, + events::IbcEvent, +}; + +use crate::{ + chain::tracking::TrackingId, + event::{bus::EventBus, source::Error, IbcEventWithHeight}, + telemetry, + util::retry::ConstantGrowth, +}; + +use super::{EventBatch, EventSourceCmd, TxEventSourceCmd}; + +use self::extract::extract_events; + +pub type Result = core::result::Result; + +/// An RPC endpoint that serves as a source of events for a given chain. +pub struct EventSource { + /// Chain identifier + chain_id: ChainId, + + /// RPC client + rpc_client: HttpClient, + + /// Poll interval + poll_interval: Duration, + + /// Event bus for broadcasting events + event_bus: EventBus>>, + + /// Channel where to receive commands + rx_cmd: channel::Receiver, + + /// Tokio runtime + rt: Arc, + + /// Last fetched block height + last_fetched_height: BlockHeight, +} + +impl EventSource { + pub fn new( + chain_id: ChainId, + rpc_client: HttpClient, + poll_interval: Duration, + rt: Arc, + ) -> Result<(Self, TxEventSourceCmd)> { + let event_bus = EventBus::new(); + let (tx_cmd, rx_cmd) = channel::unbounded(); + + let source = Self { + rt, + chain_id, + rpc_client, + poll_interval, + event_bus, + rx_cmd, + last_fetched_height: BlockHeight::from(0_u32), + }; + + Ok((source, TxEventSourceCmd(tx_cmd))) + } + + pub fn run(mut self) { + let _span = error_span!("event_source.rpc", chain.id = %self.chain_id).entered(); + + debug!("collecting events"); + + let rt = self.rt.clone(); + + rt.block_on(async { + let mut backoff = poll_backoff(self.poll_interval); + + // Initialize the latest fetched height + if let Ok(latest_height) = latest_height(&self.rpc_client).await { + self.last_fetched_height = latest_height; + } + + // Continuously run the event loop, so that when it aborts + // because of WebSocket client restart, we pick up the work again. + loop { + let before_step = Instant::now(); + + match self.step().await { + Ok(Next::Abort) => break, + + Ok(Next::Continue) => { + // Reset the backoff + backoff = poll_backoff(self.poll_interval); + + // Check if we need to wait some more before the next iteration. + let delay = self.poll_interval.checked_sub(before_step.elapsed()); + + if let Some(delay_remaining) = delay { + sleep(delay_remaining).await; + } + + continue; + } + + Err(e) => { + error!("event source encountered an error: {e}"); + + // Let's backoff the little bit to give the chain some time to recover. + let delay = backoff.next().expect("backoff is an infinite iterator"); + + error!("retrying in {delay:?}..."); + sleep(delay).await; + } + } + } + }); + + debug!("shutting down event source"); + } + + async fn step(&mut self) -> Result { + // Process any shutdown or subscription commands before we start doing any work + if let Next::Abort = self.try_process_cmd() { + return Ok(Next::Abort); + } + + let latest_height = latest_height(&self.rpc_client).await?; + + let batches = if latest_height > self.last_fetched_height { + trace!( + "latest height ({latest_height}) > latest fetched height ({})", + self.last_fetched_height + ); + + self.fetch_batches(latest_height).await.map(Some)? + } else { + trace!( + "latest height ({latest_height}) <= latest fetched height ({})", + self.last_fetched_height + ); + + None + }; + + // Before handling the batch, check if there are any pending shutdown or subscribe commands. + // + // This avoids having the supervisor process an event batch after the event source has been shutdown. + // + // It also allows subscribers to receive the latest event batch even if they + // subscribe while the batch being fetched. + if let Next::Abort = self.try_process_cmd() { + return Ok(Next::Abort); + } + + for batch in batches.unwrap_or_default() { + self.broadcast_batch(batch); + } + + Ok(Next::Continue) + } + + /// Process any pending commands, if any. + fn try_process_cmd(&mut self) -> Next { + if let Ok(cmd) = self.rx_cmd.try_recv() { + match cmd { + EventSourceCmd::Shutdown => return Next::Abort, + + EventSourceCmd::Subscribe(tx) => { + if let Err(e) = tx.send(self.event_bus.subscribe()) { + error!("failed to send back subscription: {e}"); + } + } + } + } + + Next::Continue + } + + async fn fetch_batches(&mut self, latest_height: BlockHeight) -> Result> { + let start_height = self.last_fetched_height.increment(); + + trace!("fetching blocks from {start_height} to {latest_height}"); + + let heights = HeightRangeInclusive::new(start_height, latest_height); + let mut batches = Vec::with_capacity(heights.len()); + + for height in heights { + trace!("collecting events at height {height}"); + + let result = collect_events(&self.rpc_client, &self.chain_id, height).await; + + match result { + Ok(batch) => { + self.last_fetched_height = height; + + if let Some(batch) = batch { + batches.push(batch); + } + } + Err(e) => { + error!(%height, "failed to collect events: {e}"); + break; + } + } + } + + Ok(batches) + } + + /// Collect the IBC events from the subscriptions + fn broadcast_batch(&mut self, batch: EventBatch) { + telemetry!(ws_events, &batch.chain_id, batch.events.len() as u64); + + trace!( + chain = %batch.chain_id, + count = %batch.events.len(), + height = %batch.height, + "broadcasting batch of {} events", + batch.events.len() + ); + + self.event_bus.broadcast(Arc::new(Ok(batch))); + } +} + +fn poll_backoff(poll_interval: Duration) -> impl Iterator { + ConstantGrowth::new(poll_interval, Duration::from_millis(500)) + .clamp(poll_interval * 5, usize::MAX) +} + +fn dedupe(events: Vec) -> Vec { + use itertools::Itertools; + use std::hash::{Hash, Hasher}; + + #[derive(Clone)] + struct HashEvent(abci::Event); + + impl PartialEq for HashEvent { + fn eq(&self, other: &Self) -> bool { + // NOTE: We don't compare on the index because it is not deterministic + // NOTE: We need to check the length of the attributes in order + // to not miss any attribute + self.0.kind == other.0.kind + && self.0.attributes.len() == other.0.attributes.len() + && self + .0 + .attributes + .iter() + .zip(other.0.attributes.iter()) + .all(|(a, b)| a.key == b.key && a.value == b.value) + } + } + + impl Eq for HashEvent {} + + impl Hash for HashEvent { + fn hash(&self, state: &mut H) { + self.0.kind.hash(state); + + for attr in &self.0.attributes { + // NOTE: We don't hash the index because it is not deterministic + attr.key.hash(state); + attr.value.hash(state); + } + } + } + + events + .into_iter() + .map(HashEvent) + .unique() + .map(|e| e.0) + .collect() +} + +/// Collect the IBC events from an RPC event +async fn collect_events( + rpc_client: &HttpClient, + chain_id: &ChainId, + latest_block_height: BlockHeight, +) -> Result> { + let abci_events = fetch_all_events(rpc_client, latest_block_height).await?; + trace!("Found {} ABCI events before dedupe", abci_events.len()); + + let abci_events = dedupe(abci_events); + trace!("Found {} ABCI events after dedupe", abci_events.len()); + + let height = Height::from_tm(latest_block_height, chain_id); + let new_block_event = + IbcEventWithHeight::new(IbcEvent::NewBlock(NewBlock::new(height)), height); + + let mut block_events = extract_events(chain_id, height, &abci_events).unwrap_or_default(); + let mut events = Vec::with_capacity(block_events.len() + 1); + events.push(new_block_event); + events.append(&mut block_events); + + trace!( + "collected {events_len} events at height {height}: {events:#?}", + events_len = events.len(), + height = height, + ); + + Ok(Some(EventBatch { + chain_id: chain_id.clone(), + tracking_id: TrackingId::new_uuid(), + height, + events, + })) +} + +async fn fetch_all_events( + rpc_client: &HttpClient, + height: BlockHeight, +) -> Result> { + let mut response = rpc_client.block_results(height).await.map_err(Error::rpc)?; + let mut events = vec![]; + + if let Some(begin_block_events) = &mut response.begin_block_events { + events.append(begin_block_events); + } + + if let Some(txs_results) = &mut response.txs_results { + for tx_result in txs_results { + if tx_result.code != abci::Code::Ok { + // Transaction failed, skip it + continue; + } + + events.append(&mut tx_result.events); + } + } + + if let Some(end_block_events) = &mut response.end_block_events { + events.append(end_block_events); + } + + Ok(events) +} + +async fn latest_height(rpc_client: &HttpClient) -> Result { + rpc_client + .abci_info() + .await + .map(|status| status.last_block_height) + .map_err(Error::rpc) +} + +pub enum Next { + Abort, + Continue, +} + +pub struct HeightRangeInclusive { + current: BlockHeight, + end: BlockHeight, +} + +impl HeightRangeInclusive { + pub fn new(start: BlockHeight, end: BlockHeight) -> Self { + Self { + current: start, + end, + } + } +} + +impl Iterator for HeightRangeInclusive { + type Item = BlockHeight; + + fn next(&mut self) -> Option { + if self.current > self.end { + None + } else { + let current = self.current; + self.current = self.current.increment(); + Some(current) + } + } + + fn size_hint(&self) -> (usize, Option) { + let size = self.end.value() - self.current.value() + 1; + (size as usize, Some(size as usize)) + } +} + +impl ExactSizeIterator for HeightRangeInclusive {} diff --git a/crates/relayer/src/event/source/rpc/extract.rs b/crates/relayer/src/event/source/rpc/extract.rs new file mode 100644 index 0000000000..4d394e71d7 --- /dev/null +++ b/crates/relayer/src/event/source/rpc/extract.rs @@ -0,0 +1,101 @@ +use ibc_relayer_types::applications::ics29_fee::events::DistributionType; +use tendermint::abci; + +use ibc_relayer_types::core::ics02_client::height::Height; +use ibc_relayer_types::core::ics24_host::identifier::ChainId; +use ibc_relayer_types::events::IbcEvent; + +use crate::telemetry; + +use crate::event::{ibc_event_try_from_abci_event, IbcEventWithHeight}; + +pub fn extract_events( + chain_id: &ChainId, + height: Height, + events: &[abci::Event], +) -> Result, String> { + let mut events_with_height = vec![]; + + for abci_event in events { + match ibc_event_try_from_abci_event(abci_event) { + Ok(event) if should_collect_event(&event) => { + if let IbcEvent::DistributeFeePacket(dist) = &event { + // Only record rewarded fees + if let DistributionType::Reward = dist.distribution_type { + telemetry!(fees_amount, chain_id, &dist.receiver, dist.fee.clone()); + } + } else { + events_with_height.push(IbcEventWithHeight { height, event }); + } + } + + _ => {} + } + } + + Ok(events_with_height) +} + +fn should_collect_event(e: &IbcEvent) -> bool { + event_is_type_packet(e) + || event_is_type_channel(e) + || event_is_type_connection(e) + || event_is_type_client(e) + || event_is_type_fee(e) + || event_is_type_cross_chain_query(e) +} + +fn event_is_type_packet(ev: &IbcEvent) -> bool { + matches!( + ev, + IbcEvent::SendPacket(_) + | IbcEvent::ReceivePacket(_) + | IbcEvent::WriteAcknowledgement(_) + | IbcEvent::AcknowledgePacket(_) + | IbcEvent::TimeoutPacket(_) + | IbcEvent::TimeoutOnClosePacket(_) + ) +} + +fn event_is_type_client(ev: &IbcEvent) -> bool { + matches!( + ev, + IbcEvent::CreateClient(_) + | IbcEvent::UpdateClient(_) + | IbcEvent::UpgradeClient(_) + | IbcEvent::ClientMisbehaviour(_) + ) +} + +fn event_is_type_connection(ev: &IbcEvent) -> bool { + matches!( + ev, + IbcEvent::OpenInitConnection(_) + | IbcEvent::OpenTryConnection(_) + | IbcEvent::OpenAckConnection(_) + | IbcEvent::OpenConfirmConnection(_) + ) +} + +fn event_is_type_channel(ev: &IbcEvent) -> bool { + matches!( + ev, + IbcEvent::OpenInitChannel(_) + | IbcEvent::OpenTryChannel(_) + | IbcEvent::OpenAckChannel(_) + | IbcEvent::OpenConfirmChannel(_) + | IbcEvent::CloseInitChannel(_) + | IbcEvent::CloseConfirmChannel(_) + ) +} + +fn event_is_type_cross_chain_query(ev: &IbcEvent) -> bool { + matches!(ev, IbcEvent::CrossChainQueryPacket(_)) +} + +fn event_is_type_fee(ev: &IbcEvent) -> bool { + matches!( + ev, + IbcEvent::IncentivizedPacket(_) | IbcEvent::DistributeFeePacket(_) + ) +} diff --git a/crates/relayer/src/event/monitor.rs b/crates/relayer/src/event/source/websocket.rs similarity index 72% rename from crates/relayer/src/event/monitor.rs rename to crates/relayer/src/event/source/websocket.rs index bf9c43e57d..7993440267 100644 --- a/crates/relayer/src/event/monitor.rs +++ b/crates/relayer/src/event/source/websocket.rs @@ -1,3 +1,5 @@ +pub mod extract; + use alloc::sync::Arc; use core::cmp::Ordering; use std::time::Duration; @@ -13,16 +15,15 @@ use tokio::{runtime::Runtime as TokioRuntime, sync::mpsc}; use tracing::{debug, error, info, instrument, trace}; use tendermint_rpc::{ - client::CompatMode, event::Event as RpcEvent, query::Query, Error as RpcError, - SubscriptionClient, WebSocketClient, WebSocketClientDriver, WebSocketClientUrl, + client::CompatMode, event::Event as RpcEvent, query::Query, SubscriptionClient, + WebSocketClient, WebSocketClientDriver, WebSocketClientUrl, }; -use ibc_relayer_types::{ - core::ics02_client::height::Height, core::ics24_host::identifier::ChainId, events::IbcEvent, -}; +use ibc_relayer_types::{core::ics24_host::identifier::ChainId, events::IbcEvent}; use crate::{ - chain::{handle::Subscription, tracking::TrackingId}, + chain::tracking::TrackingId, + event::{bus::EventBus, error::*, IbcEventWithHeight}, telemetry, util::{ retry::{retry_with_index, RetryResult}, @@ -30,12 +31,9 @@ use crate::{ }, }; -mod error; -pub use error::*; - -use super::{bus::EventBus, IbcEventWithHeight}; +use super::{EventBatch, EventSourceCmd, Result, SubscriptionStream, TxEventSourceCmd}; -pub type Result = core::result::Result; +use self::extract::extract_events; mod retry_strategy { use crate::util::retry::clamp_total; @@ -52,49 +50,9 @@ mod retry_strategy { } } -/// A batch of events from a chain at a specific height -#[derive(Clone, Debug)] -pub struct EventBatch { - pub chain_id: ChainId, - pub tracking_id: TrackingId, - pub height: Height, - pub events: Vec, -} - -type SubscriptionResult = core::result::Result; -type SubscriptionStream = dyn Stream + Send + Sync + Unpin; - -pub type EventSender = channel::Sender>; -pub type EventReceiver = channel::Receiver>; - -#[derive(Clone, Debug)] -pub struct TxMonitorCmd(channel::Sender); - -impl TxMonitorCmd { - pub fn shutdown(&self) -> Result<()> { - self.0 - .send(MonitorCmd::Shutdown) - .map_err(|_| Error::channel_send_failed()) - } - - pub fn subscribe(&self) -> Result { - let (tx, rx) = crossbeam_channel::bounded(1); - - self.0 - .send(MonitorCmd::Subscribe(tx)) - .map_err(|_| Error::channel_send_failed())?; - - let subscription = rx.recv().map_err(|_| Error::channel_recv_failed())?; - Ok(subscription) - } -} - -#[derive(Debug)] -pub enum MonitorCmd { - Shutdown, - Subscribe(channel::Sender), -} - +/// A batch of events received from a WebSocket endpoint from a +/// chain at a specific height. +/// /// Connect to a Tendermint node, subscribe to a set of queries, /// receive push events over a websocket, and filter them for the /// event handler. @@ -102,7 +60,7 @@ pub enum MonitorCmd { /// The default events that are queried are: /// - [`EventType::NewBlock`](tendermint_rpc::query::EventType::NewBlock) /// - [`EventType::Tx`](tendermint_rpc::query::EventType::Tx) -pub struct EventMonitor { +pub struct EventSource { chain_id: ChainId, /// Delay until batch is emitted batch_delay: Duration, @@ -117,7 +75,7 @@ pub struct EventMonitor { /// Channel where to send client driver errors tx_err: mpsc::UnboundedSender, /// Channel where to receive commands - rx_cmd: channel::Receiver, + rx_cmd: channel::Receiver, /// Node Address ws_url: WebSocketClientUrl, /// RPC compatibility mode @@ -130,48 +88,10 @@ pub struct EventMonitor { rt: Arc, } -// TODO: These are SDK specific, should be eventually moved. -pub mod queries { - use tendermint_rpc::query::{EventType, Query}; - - pub fn all() -> Vec { - // Note: Tendermint-go supports max 5 query specifiers! - vec![ - new_block(), - ibc_client(), - ibc_connection(), - ibc_channel(), - ibc_query(), - // This will be needed when we send misbehavior evidence to full node - // Query::eq("message.module", "evidence"), - ] - } - - pub fn new_block() -> Query { - Query::from(EventType::NewBlock) - } - - pub fn ibc_client() -> Query { - Query::eq("message.module", "ibc_client") - } - - pub fn ibc_connection() -> Query { - Query::eq("message.module", "ibc_connection") - } - - pub fn ibc_channel() -> Query { - Query::eq("message.module", "ibc_channel") - } - - pub fn ibc_query() -> Query { - Query::eq("message.module", "interchainquery") - } -} - -impl EventMonitor { - /// Create an event monitor, and connect to a node +impl EventSource { + /// Create an event source, and connect to a node #[instrument( - name = "event_monitor.create", + name = "event_source.create", level = "error", skip_all, fields(chain = %chain_id, url = %ws_url) @@ -182,7 +102,7 @@ impl EventMonitor { rpc_compat: CompatMode, batch_delay: Duration, rt: Arc, - ) -> Result<(Self, TxMonitorCmd)> { + ) -> Result<(Self, TxEventSourceCmd)> { let event_bus = EventBus::new(); let (tx_cmd, rx_cmd) = channel::unbounded(); @@ -196,9 +116,9 @@ impl EventMonitor { let driver_handle = rt.spawn(run_driver(driver, tx_err.clone())); // TODO: move them to config file(?) - let event_queries = queries::all(); + let event_queries = super::queries::all(); - let monitor = Self { + let source = Self { rt, chain_id, batch_delay, @@ -214,16 +134,16 @@ impl EventMonitor { subscriptions: Box::new(futures::stream::empty()), }; - Ok((monitor, TxMonitorCmd(tx_cmd))) + Ok((source, TxEventSourceCmd(tx_cmd))) } - /// The list of [`Query`] that this event monitor is subscribing for. + /// The list of [`Query`] that this event source is subscribing for. pub fn queries(&self) -> &[Query] { &self.event_queries } /// Clear the current subscriptions, and subscribe again to all queries. - #[instrument(name = "event_monitor.init_subscriptions", skip_all, fields(chain = %self.chain_id))] + #[instrument(name = "event_source.init_subscriptions", skip_all, fields(chain = %self.chain_id))] pub fn init_subscriptions(&mut self) -> Result<()> { let mut subscriptions = vec![]; @@ -246,7 +166,7 @@ impl EventMonitor { } #[instrument( - name = "event_monitor.try_reconnect", + name = "event_source.try_reconnect", level = "error", skip_all, fields(chain = %self.chain_id) @@ -286,7 +206,7 @@ impl EventMonitor { /// Try to resubscribe to events #[instrument( - name = "event_monitor.try_resubscribe", + name = "event_source.try_resubscribe", level = "error", skip_all, fields(chain = %self.chain_id) @@ -301,7 +221,7 @@ impl EventMonitor { /// See the [`retry`](https://docs.rs/retry) crate and the /// [`crate::util::retry`] module for more information. #[instrument( - name = "event_monitor.reconnect", + name = "event_source.reconnect", level = "error", skip_all, fields(chain = %self.chain_id) @@ -335,16 +255,16 @@ impl EventMonitor { } } - /// Event monitor loop + /// Event source loop #[allow(clippy::while_let_loop)] #[instrument( - name = "event_monitor", + name = "event_source.websocket", level = "error", skip_all, fields(chain = %self.chain_id) )] pub fn run(mut self) { - debug!("starting event monitor"); + debug!("collecting events"); // work around double borrow let rt = self.rt.clone(); @@ -364,7 +284,7 @@ impl EventMonitor { } } - debug!("event monitor is shutting down"); + debug!("event source is shutting down"); // Close the WebSocket connection let _ = self.client.close(); @@ -372,7 +292,7 @@ impl EventMonitor { // Wait for the WebSocket driver to finish let _ = self.rt.block_on(self.driver_handle); - trace!("event monitor has successfully shut down"); + trace!("event source has successfully shut down"); } async fn run_loop(&mut self) -> Next { @@ -387,16 +307,9 @@ impl EventMonitor { pin_mut!(batches); loop { - // Process any shutdown or subscription commands - if let Ok(cmd) = self.rx_cmd.try_recv() { - match cmd { - MonitorCmd::Shutdown => return Next::Abort, - MonitorCmd::Subscribe(tx) => { - if let Err(e) = tx.send(self.event_bus.subscribe()) { - error!("failed to send back subscription: {e}"); - } - } - } + // Process any shutdown or subscription commands before we start doing any work. + if let Next::Abort = self.try_process_cmd() { + return Next::Abort; } let result = tokio::select! { @@ -405,19 +318,19 @@ impl EventMonitor { }; // Before handling the batch, check if there are any pending shutdown or subscribe commands. - if let Ok(cmd) = self.rx_cmd.try_recv() { - match cmd { - MonitorCmd::Shutdown => return Next::Abort, - MonitorCmd::Subscribe(tx) => { - if let Err(e) = tx.send(self.event_bus.subscribe()) { - error!("failed to send back subscription: {e}"); - } - } - } + // + // This avoids having the supervisor process an event batch after the event source has been shutdown, + // and issues during testing where the WebSocket connection might get closed before the event + // source has been shutdown. + // + // It also allows subscribers to receive the latest event batch even if they + // subscribe while the batch being fetched. + if let Next::Abort = self.try_process_cmd() { + return Next::Abort; } match result { - Ok(batch) => self.process_batch(batch), + Ok(batch) => self.broadcast_batch(batch), Err(e) => { if let ErrorDetail::SubscriptionCancelled(reason) = e.detail() { error!("subscription cancelled, reason: {}", reason); @@ -448,14 +361,37 @@ impl EventMonitor { self.event_bus.broadcast(Arc::new(Err(error))); } - /// Collect the IBC events from the subscriptions - fn process_batch(&mut self, batch: EventBatch) { + /// Broadcast a batch of events to all subscribers. + fn broadcast_batch(&mut self, batch: EventBatch) { telemetry!(ws_events, &batch.chain_id, batch.events.len() as u64); - trace!(chain = %batch.chain_id, len = %batch.events.len(), "emitting batch"); + trace!( + chain = %batch.chain_id, + count = %batch.events.len(), + height = %batch.height, + "broadcasting batch of {} events", + batch.events.len() + ); self.event_bus.broadcast(Arc::new(Ok(batch))); } + + /// Process a pending command, if any. + fn try_process_cmd(&mut self) -> Next { + if let Ok(cmd) = self.rx_cmd.try_recv() { + match cmd { + EventSourceCmd::Shutdown => return Next::Abort, + + EventSourceCmd::Subscribe(tx) => { + if let Err(e) = tx.send(self.event_bus.subscribe()) { + error!("failed to send back subscription: {e}"); + } + } + } + } + + Next::Continue + } } /// Collect the IBC events from an RPC event @@ -463,7 +399,7 @@ fn collect_events( chain_id: &ChainId, event: RpcEvent, ) -> impl Stream> { - let events = crate::event::rpc::get_all_events(chain_id, event).unwrap_or_default(); + let events = extract_events(chain_id, event).unwrap_or_default(); stream::iter(events).map(Ok) } @@ -522,7 +458,7 @@ async fn run_driver( ) { if let Err(e) = driver.run().await { if tx.send(e).is_err() { - error!("failed to relay driver error to event monitor"); + error!("failed to relay driver error to event source"); } } } diff --git a/crates/relayer/src/event/rpc.rs b/crates/relayer/src/event/source/websocket/extract.rs similarity index 98% rename from crates/relayer/src/event/rpc.rs rename to crates/relayer/src/event/source/websocket/extract.rs index b1bad42fad..97ff858a98 100644 --- a/crates/relayer/src/event/rpc.rs +++ b/crates/relayer/src/event/source/websocket/extract.rs @@ -11,10 +11,10 @@ use ibc_relayer_types::core::ics24_host::identifier::ChainId; use ibc_relayer_types::events::IbcEvent; use crate::chain::cosmos::types::events::channel::RawObject; -use crate::event::monitor::queries; +use crate::event::source::queries; use crate::telemetry; -use super::{ibc_event_try_from_abci_event, IbcEventWithHeight}; +use crate::event::{ibc_event_try_from_abci_event, IbcEventWithHeight}; /// Extract IBC events from Tendermint RPC events /// @@ -116,7 +116,7 @@ use super::{ibc_event_try_from_abci_event, IbcEventWithHeight}; /// {Begin,End}Block events however do not have any such `message.action` associated with them, so /// this doesn't work. For this reason, we extract block events in the following order -> /// OpenInit -> OpenTry -> OpenAck -> OpenConfirm -> SendPacket -> CloseInit -> CloseConfirm. -pub fn get_all_events( +pub fn extract_events( chain_id: &ChainId, result: RpcEvent, ) -> Result, String> { diff --git a/crates/relayer/src/link/relay_path.rs b/crates/relayer/src/link/relay_path.rs index 87c346e444..41d9876caa 100644 --- a/crates/relayer/src/link/relay_path.rs +++ b/crates/relayer/src/link/relay_path.rs @@ -42,7 +42,7 @@ use crate::chain::tracking::TrackedMsgs; use crate::chain::tracking::TrackingId; use crate::channel::error::ChannelError; use crate::channel::Channel; -use crate::event::monitor::EventBatch; +use crate::event::source::EventBatch; use crate::event::IbcEventWithHeight; use crate::foreign_client::{ForeignClient, ForeignClientError}; use crate::link::error::{self, LinkError}; diff --git a/crates/relayer/src/supervisor.rs b/crates/relayer/src/supervisor.rs index 3a6029c4dd..87f8f0a055 100644 --- a/crates/relayer/src/supervisor.rs +++ b/crates/relayer/src/supervisor.rs @@ -19,7 +19,7 @@ use crate::{ chain::{endpoint::HealthCheck, handle::ChainHandle, tracking::TrackingId}, config::Config, event::{ - monitor::{self, Error as EventError, ErrorDetail as EventErrorDetail, EventBatch}, + source::{self, Error as EventError, ErrorDetail as EventErrorDetail, EventBatch}, IbcEventWithHeight, }, object::Object, @@ -51,7 +51,7 @@ use cmd::SupervisorCmd; use self::{scan::ChainScanner, spawn::SpawnContext}; -type ArcBatch = Arc>; +type ArcBatch = Arc>; type Subscription = Receiver; /** diff --git a/crates/relayer/src/worker/cmd.rs b/crates/relayer/src/worker/cmd.rs index b7d16c647c..5b2ea5709f 100644 --- a/crates/relayer/src/worker/cmd.rs +++ b/crates/relayer/src/worker/cmd.rs @@ -2,7 +2,7 @@ use core::fmt::{Display, Error as FmtError, Formatter}; use ibc_relayer_types::{core::ics02_client::events::NewBlock, Height}; -use crate::event::monitor::EventBatch; +use crate::event::source::EventBatch; /// A command for a [`WorkerHandle`](crate::worker::WorkerHandle). #[derive(Debug, Clone)] diff --git a/crates/relayer/src/worker/handle.rs b/crates/relayer/src/worker/handle.rs index 0f01787795..3b21391558 100644 --- a/crates/relayer/src/worker/handle.rs +++ b/crates/relayer/src/worker/handle.rs @@ -15,7 +15,7 @@ use crate::chain::tracking::TrackingId; use crate::event::IbcEventWithHeight; use crate::util::lock::{LockExt, RwArc}; use crate::util::task::TaskHandle; -use crate::{event::monitor::EventBatch, object::Object}; +use crate::{event::source::EventBatch, object::Object}; use super::{WorkerCmd, WorkerId}; diff --git a/crates/relayer/src/worker/packet.rs b/crates/relayer/src/worker/packet.rs index 48e547bb60..bf781bf2d0 100644 --- a/crates/relayer/src/worker/packet.rs +++ b/crates/relayer/src/worker/packet.rs @@ -25,7 +25,7 @@ use ibc_relayer_types::Height; use crate::chain::handle::ChainHandle; use crate::config::filter::FeePolicy; -use crate::event::monitor::EventBatch; +use crate::event::source::EventBatch; use crate::foreign_client::HasExpiredOrFrozenError; use crate::link::Resubmit; use crate::link::{error::LinkError, Link}; diff --git a/crates/relayer/tests/config/fixtures/relayer_conf_example.toml b/crates/relayer/tests/config/fixtures/relayer_conf_example.toml index 8d27b1357d..ea41a2888f 100644 --- a/crates/relayer/tests/config/fixtures/relayer_conf_example.toml +++ b/crates/relayer/tests/config/fixtures/relayer_conf_example.toml @@ -24,7 +24,7 @@ tx_confirmation = true id = 'chain_A' rpc_addr = 'http://127.0.0.1:26657' grpc_addr = 'http://127.0.0.1:9090' -websocket_addr = 'ws://localhost:26657/websocket' +event_source = { mode = 'push', url = 'ws://localhost:26657/websocket', batch_delay = '500ms' } rpc_timeout = '10s' account_prefix = 'cosmos' key_name = 'testkey' @@ -49,7 +49,7 @@ list = [ id = 'chain_B' rpc_addr = 'http://127.0.0.1:26557' grpc_addr = 'http://127.0.0.1:9090' -websocket_addr = 'ws://localhost:26557/websocket' +event_source = { mode = 'push', url = 'ws://localhost:26557/websocket', batch_delay = '500ms' } rpc_timeout = '10s' account_prefix = 'cosmos' key_name = 'testkey' diff --git a/crates/relayer/tests/config/fixtures/relayer_conf_example_decoding_size.toml b/crates/relayer/tests/config/fixtures/relayer_conf_example_decoding_size.toml index e6c51b578a..6df007f2f7 100644 --- a/crates/relayer/tests/config/fixtures/relayer_conf_example_decoding_size.toml +++ b/crates/relayer/tests/config/fixtures/relayer_conf_example_decoding_size.toml @@ -24,7 +24,7 @@ tx_confirmation = true id = 'chain_A' rpc_addr = 'http://127.0.0.1:26657' grpc_addr = 'http://127.0.0.1:9090' -websocket_addr = 'ws://localhost:26657/websocket' +event_source = { mode = 'push', url = 'ws://localhost:26657/websocket', batch_delay = '500ms' } rpc_timeout = '10s' account_prefix = 'cosmos' key_name = 'testkey' @@ -43,7 +43,7 @@ address_type = { derivation = 'cosmos' } id = 'chain_B' rpc_addr = 'http://127.0.0.1:26557' grpc_addr = 'http://127.0.0.1:9090' -websocket_addr = 'ws://localhost:26557/websocket' +event_source = { mode = 'push', url = 'ws://localhost:26557/websocket', batch_delay = '500ms' } rpc_timeout = '10s' account_prefix = 'cosmos' key_name = 'testkey' diff --git a/crates/relayer/tests/config/fixtures/relayer_conf_example_fee_filter.toml b/crates/relayer/tests/config/fixtures/relayer_conf_example_fee_filter.toml index ab4359c27f..60a0cc7e0c 100644 --- a/crates/relayer/tests/config/fixtures/relayer_conf_example_fee_filter.toml +++ b/crates/relayer/tests/config/fixtures/relayer_conf_example_fee_filter.toml @@ -24,7 +24,7 @@ tx_confirmation = true id = 'chain_A' rpc_addr = 'http://127.0.0.1:26657' grpc_addr = 'http://127.0.0.1:9090' -websocket_addr = 'ws://localhost:26657/websocket' +event_source = { mode = 'push', url = 'ws://127.0.0.1:26657/websocket', batch_delay = '500ms' } rpc_timeout = '10s' account_prefix = 'cosmos' key_name = 'testkey' @@ -55,7 +55,7 @@ recv = [ { amount = 0 }] id = 'chain_B' rpc_addr = 'http://127.0.0.1:26557' grpc_addr = 'http://127.0.0.1:9090' -websocket_addr = 'ws://localhost:26557/websocket' +event_source = { mode = 'push', url = 'ws://127.0.0.1:26557/websocket', batch_delay = '500ms' } rpc_timeout = '10s' account_prefix = 'cosmos' key_name = 'testkey' @@ -64,4 +64,4 @@ gas_price = { price = 0.001, denom = 'stake' } clock_drift = '5s' trusting_period = '14days' trust_threshold = { numerator = '1', denominator = '3' } -address_type = { derivation = 'ethermint', proto_type = { pk_type = '/injective.crypto.v1beta1.ethsecp256k1.PubKey' } } \ No newline at end of file +address_type = { derivation = 'ethermint', proto_type = { pk_type = '/injective.crypto.v1beta1.ethsecp256k1.PubKey' } } diff --git a/guide/src/documentation/configuration/configure-hermes.md b/guide/src/documentation/configuration/configure-hermes.md index 28bca9bb20..832c63fc75 100644 --- a/guide/src/documentation/configuration/configure-hermes.md +++ b/guide/src/documentation/configuration/configure-hermes.md @@ -65,13 +65,13 @@ please refer to the [Keys](../commands/keys/index.md) sections in order to learn Hermes supports connection via TLS for use-cases such as connecting from behind a proxy or a load balancer. In order to enable this, you'll want to set the -`rpc_addr`, `grpc_addr`, or `websocket_addr` parameters to specify a TLS +`rpc_addr`, `grpc_addr`, or `event_source` parameters to specify a TLS connection via HTTPS using the following scheme (note that the port number 443 is just used for example): ``` rpc_addr = 'https://domain.com:443' grpc_addr = 'https://domain.com:443' -websocket_addr = 'wss://domain.com:443/websocket' +event_source = { mode = 'push', url = 'wss://domain.com:443/websocket', batch_delay = '500ms' } ``` ## Configuring Support for Interchain Accounts @@ -118,7 +118,7 @@ list = [ ## Connecting to a full node protected by HTTP Basic Authentication To connect to a full node protected by [HTTP Basic Authentication][http-basic-auth], -specify the username and password in the `rpc_addr`, `grpc_addr` and `websocket_addr` settings +specify the username and password in the `rpc_addr`, `grpc_addr` and `event_source` settings under the chain configuration in `config.toml`. Here is an example with username `hello` and password `world`, assuming the RPC, WebSocket and gRPC servers @@ -132,14 +132,39 @@ id = 'my-chain-0' rpc_addr = 'https://hello:world@mydomain.com:26657' grpc_addr = 'https://hello:world@mydomain.com:9090' -websocket_addr = 'wss://hello:world@mydomain.com:26657/websocket' +event_source = { mode = 'push', url = 'wss://hello:world@mydomain.com:26657/websocket', batch_delay = '500ms' } # ... ``` -> **Caution:** Warning: The "Basic" authentication scheme sends the credentials encoded but not encrypted. +> **Caution:** The "Basic" authentication scheme sends the credentials encoded but not encrypted. > This would be completely insecure unless the exchange was over a secure connection (HTTPS/TLS). +## Configuring Support for Wasm Relaying + +As of version 1.6.0, Hermes supports the relaying of wasm messages natively. This is facilitated by configuring +Hermes to use pull-based relaying by polling for IBC events via the `/block_results` RPC endpoint. Set +the `event_source` parameter to pull mode in `config.toml`: + +```toml +event_source = 'poll' +``` + +The default interval at which Hermes polls the RPC endpoint is 1 second. If you need to change the interval, +you can do so like this: + +```toml +event_source = { mode = 'pull', interval = '2s' } +``` + +The pull model of relaying is in contrast with Hermes' default push model, where IBC events are received +over WebSocket. + +> **Note:** This mode should only be used in situations where Hermes misses events that it should +be receiving, such as when relaying for CosmWasm-enabled blockchains which emit IBC events without the +`message` attribute. Without this attribute, the WebSocket is not able to catch these events to stream +to Hermes, so the `/block_results` RPC endpoint must be used instead. + [http-basic-auth]: https://developer.mozilla.org/en-US/docs/Web/HTTP/Authentication [ica]: https://github.com/cosmos/ibc/blob/master/spec/app/ics-027-interchain-accounts/README.md [chain-registry]: https://github.com/cosmos/chain-registry diff --git a/guide/src/templates/files/hermes/local-chains/config.toml b/guide/src/templates/files/hermes/local-chains/config.toml index 57f8b2f5df..383cdf4467 100644 --- a/guide/src/templates/files/hermes/local-chains/config.toml +++ b/guide/src/templates/files/hermes/local-chains/config.toml @@ -29,7 +29,7 @@ port = 3001 id = 'ibc-0' rpc_addr = 'http://localhost:27030' grpc_addr = 'http://localhost:27032' -websocket_addr = 'ws://localhost:27030/websocket' +event_source = { mode = 'push', url = 'ws://localhost:27030/websocket', batch_delay = '500ms' } rpc_timeout = '15s' account_prefix = 'cosmos' key_name = 'wallet' @@ -44,7 +44,7 @@ trust_threshold = { numerator = '1', denominator = '3' } id = 'ibc-1' rpc_addr = 'http://localhost:27040' grpc_addr = 'http://localhost:27042' -websocket_addr = 'ws://localhost:27040/websocket' +event_source = { mode = 'push', url = 'ws://localhost:27040/websocket', batch_delay = '500ms' } rpc_timeout = '15s' account_prefix = 'cosmos' key_name = 'wallet' diff --git a/guide/src/templates/files/hermes/more-chains/config_with_filters.toml b/guide/src/templates/files/hermes/more-chains/config_with_filters.toml index a1adc2f134..59899f8b91 100644 --- a/guide/src/templates/files/hermes/more-chains/config_with_filters.toml +++ b/guide/src/templates/files/hermes/more-chains/config_with_filters.toml @@ -29,7 +29,7 @@ port = 3001 id = 'ibc-0' rpc_addr = 'http://localhost:27050' grpc_addr = 'http://localhost:27052' -websocket_addr = 'ws://localhost:27050/websocket' +event_source = { mode = 'push', url = 'ws://localhost:27050/websocket', batch_delay = '500ms' } rpc_timeout = '15s' account_prefix = 'cosmos' key_name = 'wallet' @@ -51,7 +51,7 @@ list = [ id = 'ibc-1' rpc_addr = 'http://localhost:27060' grpc_addr = 'http://localhost:27062' -websocket_addr = 'ws://localhost:27060/websocket' +event_source = { mode = 'push', url = 'ws://localhost:27060/websocket', batch_delay = '500ms' } rpc_timeout = '15s' account_prefix = 'cosmos' key_name = 'wallet' @@ -74,7 +74,7 @@ list = [ id = 'ibc-2' rpc_addr = 'http://localhost:27070' grpc_addr = 'http://localhost:27072' -websocket_addr = 'ws://localhost:27070/websocket' +event_source = { mode = 'push', url = 'ws://localhost:27070/websocket', batch_delay = '500ms' } rpc_timeout = '15s' account_prefix = 'cosmos' key_name = 'wallet' @@ -96,7 +96,7 @@ list = [ id = 'ibc-3' rpc_addr = 'http://localhost:27080' grpc_addr = 'http://localhost:27082' -websocket_addr = 'ws://localhost:27080/websocket' +event_source = { mode = 'push', url = 'ws://localhost:27080/websocket', batch_delay = '500ms' } rpc_timeout = '15s' account_prefix = 'cosmos' key_name = 'wallet' diff --git a/guide/src/templates/files/hermes/more-chains/config_without_filters.toml b/guide/src/templates/files/hermes/more-chains/config_without_filters.toml index 11a74570de..fff4a5100d 100644 --- a/guide/src/templates/files/hermes/more-chains/config_without_filters.toml +++ b/guide/src/templates/files/hermes/more-chains/config_without_filters.toml @@ -29,7 +29,7 @@ port = 3001 id = 'ibc-0' rpc_addr = 'http://localhost:27050' grpc_addr = 'http://localhost:27052' -websocket_addr = 'ws://localhost:27050/websocket' +event_source = { mode = 'push', url = 'ws://localhost:27050/websocket', batch_delay = '500ms' } rpc_timeout = '15s' account_prefix = 'cosmos' key_name = 'wallet' @@ -44,7 +44,7 @@ trust_threshold = { numerator = '1', denominator = '3' } id = 'ibc-1' rpc_addr = 'http://localhost:27060' grpc_addr = 'http://localhost:27062' -websocket_addr = 'ws://localhost:27060/websocket' +event_source = { mode = 'push', url = 'ws://localhost:27060/websocket', batch_delay = '500ms' } rpc_timeout = '15s' account_prefix = 'cosmos' key_name = 'wallet' @@ -59,7 +59,7 @@ trust_threshold = { numerator = '1', denominator = '3' } id = 'ibc-2' rpc_addr = 'http://localhost:27070' grpc_addr = 'http://localhost:27072' -websocket_addr = 'ws://localhost:27070/websocket' +event_source = { mode = 'push', url = 'ws://localhost:27070/websocket', batch_delay = '500ms' } rpc_timeout = '15s' account_prefix = 'cosmos' key_name = 'wallet' @@ -74,7 +74,7 @@ trust_threshold = { numerator = '1', denominator = '3' } id = 'ibc-3' rpc_addr = 'http://localhost:27080' grpc_addr = 'http://localhost:27082' -websocket_addr = 'ws://localhost:27080/websocket' +event_source = { mode = 'push', url = 'ws://localhost:27080/websocket', batch_delay = '500ms' } rpc_timeout = '15s' account_prefix = 'cosmos' key_name = 'wallet' diff --git a/guide/src/templates/files/hermes/more-chains/hermes_second_instance.toml b/guide/src/templates/files/hermes/more-chains/hermes_second_instance.toml index be1f559908..3d50e4836a 100644 --- a/guide/src/templates/files/hermes/more-chains/hermes_second_instance.toml +++ b/guide/src/templates/files/hermes/more-chains/hermes_second_instance.toml @@ -29,7 +29,7 @@ port = 3002 id = 'ibc-0' rpc_addr = 'http://localhost:27050' grpc_addr = 'http://localhost:27052' -websocket_addr = 'ws://localhost:27050/websocket' +event_source = { mode = 'push', url = 'ws://localhost:27050/websocket', batch_delay = '500ms' } rpc_timeout = '15s' account_prefix = 'cosmos' key_name = 'wallet1' @@ -50,7 +50,7 @@ list = [ id = 'ibc-1' rpc_addr = 'http://localhost:27060' grpc_addr = 'http://localhost:27062' -websocket_addr = 'ws://localhost:27060/websocket' +event_source = { mode = 'push', url = 'ws://localhost:27060/websocket', batch_delay = '500ms' } rpc_timeout = '15s' account_prefix = 'cosmos' key_name = 'wallet1' @@ -72,7 +72,7 @@ list = [ id = 'ibc-2' rpc_addr = 'http://localhost:27070' grpc_addr = 'http://localhost:27072' -websocket_addr = 'ws://localhost:27070/websocket' +event_source = { mode = 'push', url = 'ws://localhost:27070/websocket', batch_delay = '500ms' } rpc_timeout = '15s' account_prefix = 'cosmos' key_name = 'wallet1' @@ -93,7 +93,7 @@ list = [ id = 'ibc-3' rpc_addr = 'http://localhost:27080' grpc_addr = 'http://localhost:27082' -websocket_addr = 'ws://localhost:27080/websocket' +event_source = { mode = 'push', url = 'ws://localhost:27080/websocket', batch_delay = '500ms' } rpc_timeout = '15s' account_prefix = 'cosmos' key_name = 'wallet1' diff --git a/guide/src/templates/files/hermes/production/config.toml b/guide/src/templates/files/hermes/production/config.toml index bc6ad8265b..90f4a1cca8 100644 --- a/guide/src/templates/files/hermes/production/config.toml +++ b/guide/src/templates/files/hermes/production/config.toml @@ -31,7 +31,7 @@ port = 3001 id = 'cosmoshub-4' type = 'CosmosSdk' rpc_addr = 'https://rpc.cosmoshub.strange.love/' -websocket_addr = 'wss://rpc.cosmoshub.strange.love/websocket' +event_source = { mode = 'push', url = 'wss://rpc.cosmoshub.strange.love/websocket', batch_delay = '500ms' } grpc_addr = 'https://grpc-cosmoshub-ia.notional.ventures/' rpc_timeout = '10s' account_prefix = 'cosmos' @@ -70,7 +70,7 @@ derivation = 'cosmos' id = 'osmosis-1' type = 'CosmosSdk' rpc_addr = 'https://rpc.osmosis.interbloc.org/' -websocket_addr = 'wss://rpc.osmosis.interbloc.org/websocket' +event_source = { mode = 'push', url = 'wss://rpc.osmosis.interbloc.org/websocket', batch_delay = '500ms' } grpc_addr = 'https://grpc-osmosis-ia.notional.ventures/' rpc_timeout = '10s' account_prefix = 'osmo' diff --git a/tools/test-framework/src/types/single/node.rs b/tools/test-framework/src/types/single/node.rs index a02ea7937c..5aeced1f4b 100644 --- a/tools/test-framework/src/types/single/node.rs +++ b/tools/test-framework/src/types/single/node.rs @@ -138,10 +138,12 @@ impl FullNode { id: self.chain_driver.chain_id.clone(), r#type: ChainType::CosmosSdk, rpc_addr: Url::from_str(&self.chain_driver.rpc_address())?, - websocket_addr: WebSocketClientUrl::from_str(&self.chain_driver.websocket_address())?, grpc_addr: Url::from_str(&self.chain_driver.grpc_address())?, - rpc_timeout: ibc_relayer::config::default::rpc_timeout(), - batch_delay: ibc_relayer::config::default::batch_delay(), + event_source: config::EventSourceMode::Push { + url: WebSocketClientUrl::from_str(&self.chain_driver.websocket_address())?, + batch_delay: config::default::batch_delay(), + }, + rpc_timeout: config::default::rpc_timeout(), trusted_node: false, genesis_restart: None, account_prefix: self.chain_driver.account_prefix.clone(),