Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add poll-based event source #3323

Merged
merged 36 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
136dbef
Prepare for different types of event sources
romac May 9, 2023
babf3e7
Add pull-based event source
romac May 9, 2023
32f5c04
Add `event_source` chain config option (`push` or `pull`)
romac May 9, 2023
d683700
Use poll as default
romac May 9, 2023
0b3ae01
Change `push` to `websocket` and `pull` to `rpc`
romac May 9, 2023
396efc5
Add `poll_interval` setting to the chain config
romac May 9, 2023
068d5ab
Add changelog entry
romac May 16, 2023
e325813
Merge branch 'master' into romac/poll-vs-push
romac May 16, 2023
a5d62da
Revert d683700 and go back to using the WebSocket event source by def…
romac May 16, 2023
2a01101
Improve WebSocket::EventSource doc comment
seanchen1991 May 16, 2023
ceb210a
Cargo fmt
seanchen1991 May 16, 2023
586c217
Add more tracing to event sources
romac May 16, 2023
d9a691b
Apply suggestions from code review
romac May 16, 2023
1d40aed
Change uses of event monitor to event source
romac May 17, 2023
53b0a07
Fix `rpc::EventSource` doc comment
seanchen1991 May 17, 2023
2dabbe1
Merge branch 'master' into romac/poll-vs-push
seanchen1991 May 17, 2023
d6bfefd
Merge branch 'master' into romac/poll-vs-push
romac May 22, 2023
829348d
Group source-specific settings together and rename types of event sou…
romac May 23, 2023
e20ea53
Merge branch 'master' into romac/poll-vs-push
romac May 23, 2023
cadbcb9
Formatting
romac May 23, 2023
081678e
Fix leftover merge conflict
romac May 23, 2023
aed4da1
Merge branch 'master' into romac/poll-vs-push
ancazamfir May 29, 2023
2f68906
Delete config_fork.toml
romac May 29, 2023
59107b0
Revert "Delete config_fork.toml"
romac May 29, 2023
164c05e
Fix typo
romac May 29, 2023
682efd2
Fix typo in config and rename `poll_interval` to `interval`
romac May 30, 2023
ebee52c
Clarify logic and doc for processing commands after fetching batch
romac May 30, 2023
b9410b2
Do not collect `DistributeFee` events
romac May 30, 2023
cfb3128
Merge branch 'master' into romac/poll-vs-push
romac May 31, 2023
dcd0771
Merge branch 'master' into romac/poll-vs-push
romac Jun 2, 2023
2440551
Merge branch 'master' into romac/poll-vs-push
romac Jun 5, 2023
7bc3e2c
Add section to hermes guide on configuring for wasm relaying
seanchen1991 Jun 5, 2023
40604ff
Merge branch 'romac/poll-vs-push' of https://github.com/informalsyste…
seanchen1991 Jun 5, 2023
35055ba
Remove an extra word
seanchen1991 Jun 5, 2023
27965e4
Guide formatting
seanchen1991 Jun 5, 2023
1e069f9
Merge branch 'master' into romac/poll-vs-push
seanchen1991 Jun 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
- Add a poll-based event source which fetches events from the chain using
the `/block_results` RPC endpoint instead of getting them over WebSocket.

To use the poll-based event source, set `event_source = 'rpc'` in the per-chain configuration.
romac marked this conversation as resolved.
Show resolved Hide resolved

**Warning**
Only use this if you think Hermes is not getting all
the events it should, eg. when relaying for a CosmWasm-enabled blockchain
which emits IBC events in a smart contract where the events lack the
`message` attribute key. See #3190 and #2809 for more background information.

([\#2850](https://github.com/informalsystems/hermes/issues/2850))
12 changes: 12 additions & 0 deletions config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,16 @@ grpc_addr = 'http://127.0.0.1:9090'
# listens on. Required
websocket_addr = 'ws://127.0.0.1:26657/websocket'

# The type of event source to use for getting events from the chain.
# Either `websocket` for WebSocket or `rpc` for RPC via the `/block_results` endpoint.
# Default: 'websocket'
event_source = 'websocket'

# The interval at which to poll for blocks when the `event_source` is `rpc`.
# Has no effect when `event_source` is set to `websocket`.
# Default: 1 second
poll_interval = '1s'

# Specify the maximum amount of time (duration) that the RPC requests should
# take before timing out. Default: 10s (10 seconds)
# Note: Hermes uses this parameter _only_ in `start` mode; for all other CLIs,
Expand Down Expand Up @@ -310,6 +320,8 @@ id = 'ibc-1'
rpc_addr = 'http://127.0.0.1:26557'
grpc_addr = 'http://127.0.0.1:9091'
websocket_addr = 'ws://127.0.0.1:26557/websocket'
event_source = 'websocket'
poll_interval = '1s'
rpc_timeout = '10s'
account_prefix = 'cosmos'
key_name = 'testkey'
Expand Down
5 changes: 4 additions & 1 deletion crates/relayer-cli/src/chain_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use ibc_chain_registry::querier::SimpleHermesRpcQuerier;
use ibc_relayer::config::filter::{FilterPattern, PacketFilter};
use ibc_relayer::config::gas_multiplier::GasMultiplier;
use ibc_relayer::config::types::{MaxMsgNum, MaxTxSize, Memo};
use ibc_relayer::config::{default, AddressType, ChainConfig, GasPrice};
use ibc_relayer::config::{default, AddressType, ChainConfig, EventSource, GasPrice};
use ibc_relayer::keyring::Store;

const MAX_HEALTHY_QUERY_RETRIES: u8 = 5;
Expand Down Expand Up @@ -109,6 +109,7 @@ where
MAX_HEALTHY_QUERY_RETRIES,
)
.await?;

let websocket_address =
rpc_data.websocket.clone().try_into().map_err(|e| {
RegistryError::websocket_url_parse_error(rpc_data.websocket.to_string(), e)
Expand All @@ -120,6 +121,8 @@ where
rpc_addr: rpc_data.rpc_address,
websocket_addr: websocket_address,
grpc_addr: grpc_address,
event_source: EventSource::WebSocket,
poll_interval: default::poll_interval(),
rpc_timeout: default::rpc_timeout(),
genesis_restart: None,
account_prefix: chain_data.bech32_prefix,
Expand Down
12 changes: 7 additions & 5 deletions crates/relayer-cli/src/commands/listen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ use tendermint_rpc::{client::CompatMode, Client, HttpClient};
use tokio::runtime::Runtime as TokioRuntime;
use tracing::{error, info, instrument};

use ibc_relayer::{chain::handle::Subscription, config::ChainConfig, event::monitor::EventMonitor};
use ibc_relayer::{
chain::handle::Subscription, config::ChainConfig, event::source::websocket::EventSource,
};
use ibc_relayer_types::{core::ics24_host::identifier::ChainId, events::IbcEvent};

use crate::prelude::*;
Expand Down Expand Up @@ -140,22 +142,22 @@ fn subscribe(
compat_mode: CompatMode,
rt: Arc<TokioRuntime>,
) -> eyre::Result<Subscription> {
let (mut event_monitor, tx_cmd) = EventMonitor::new(
let (mut event_source, tx_cmd) = EventSource::new(
chain_config.id.clone(),
chain_config.websocket_addr.clone(),
compat_mode,
rt,
)
.map_err(|e| eyre!("could not initialize event monitor: {}", e))?;

event_monitor
event_source
.init_subscriptions()
.map_err(|e| eyre!("could not initialize subscriptions: {}", e))?;

let queries = event_monitor.queries();
let queries = event_source.queries();
info!("listening for queries: {}", queries.iter().format(", "),);

thread::spawn(|| event_monitor.run());
thread::spawn(|| event_source.run());

let subscription = tx_cmd.subscribe()?;
Ok(subscription)
Expand Down
9 changes: 8 additions & 1 deletion crates/relayer-types/src/core/ics02_client/height.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::prelude::*;
use crate::{core::ics24_host::identifier::ChainId, prelude::*};
use core::cmp::Ordering;

use core::num::ParseIntError;
Expand Down Expand Up @@ -33,6 +33,13 @@ impl Height {
})
}

pub fn from_tm(height: tendermint::block::Height, chain_id: &ChainId) -> Self {
Self {
revision_number: chain_id.version(),
revision_height: height.value(),
}
}

pub fn revision_number(&self) -> u64 {
self.revision_number
}
Expand Down
40 changes: 23 additions & 17 deletions crates/relayer/src/chain/cosmos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ use crate::config::{parse_gas_prices, ChainConfig, GasPrice};
use crate::consensus_state::AnyConsensusState;
use crate::denom::DenomTrace;
use crate::error::Error;
use crate::event::monitor::{EventMonitor, TxMonitorCmd};
use crate::event::source::{EventSource, TxEventSourceCmd};
use crate::event::IbcEventWithHeight;
use crate::keyring::{KeyRing, Secp256k1KeyPair, SigningKeyPair};
use crate::light_client::tendermint::LightClient as TmLightClient;
Expand Down Expand Up @@ -149,7 +149,7 @@ pub struct CosmosSdkChain {
/// A cached copy of the account information
account: Option<Account>,

tx_monitor_cmd: Option<TxMonitorCmd>,
tx_monitor_cmd: Option<TxEventSourceCmd>,
}

impl CosmosSdkChain {
Expand Down Expand Up @@ -286,27 +286,33 @@ impl CosmosSdkChain {
Ok(())
}

fn init_event_monitor(&mut self) -> Result<TxMonitorCmd, Error> {
fn init_event_source(&mut self) -> Result<TxEventSourceCmd, Error> {
romac marked this conversation as resolved.
Show resolved Hide resolved
crate::time!(
"init_event_monitor",
romac marked this conversation as resolved.
Show resolved Hide resolved
{
"src_chain": self.config().id.to_string(),
}
);

let (mut event_monitor, monitor_tx) = EventMonitor::new(
self.config.id.clone(),
self.config.websocket_addr.clone(),
self.compat_mode,
self.rt.clone(),
)
.map_err(Error::event_monitor)?;
use crate::config::EventSource as Mode;

event_monitor
.init_subscriptions()
.map_err(Error::event_monitor)?;
let (event_source, monitor_tx) = match self.config.event_source {
Mode::WebSocket => EventSource::websocket(
self.config.id.clone(),
self.config.websocket_addr.clone(),
self.compat_mode,
self.rt.clone(),
),
Mode::Rpc => EventSource::rpc(
self.config.id.clone(),
self.rpc_client.clone(),
self.config.poll_interval,
self.rt.clone(),
),
}
.map_err(Error::event_source)?;

thread::spawn(move || event_monitor.run());
thread::spawn(move || event_source.run());

Ok(monitor_tx)
}
Expand Down Expand Up @@ -886,7 +892,7 @@ impl ChainEndpoint for CosmosSdkChain {

fn shutdown(self) -> Result<(), Error> {
if let Some(monitor_tx) = self.tx_monitor_cmd {
monitor_tx.shutdown().map_err(Error::event_monitor)?;
monitor_tx.shutdown().map_err(Error::event_source)?;
}

Ok(())
Expand All @@ -904,13 +910,13 @@ impl ChainEndpoint for CosmosSdkChain {
let tx_monitor_cmd = match &self.tx_monitor_cmd {
Some(tx_monitor_cmd) => tx_monitor_cmd,
None => {
let tx_monitor_cmd = self.init_event_monitor()?;
let tx_monitor_cmd = self.init_event_source()?;
self.tx_monitor_cmd = Some(tx_monitor_cmd);
self.tx_monitor_cmd.as_ref().unwrap()
}
};

let subscription = tx_monitor_cmd.subscribe().map_err(Error::event_monitor)?;
let subscription = tx_monitor_cmd.subscribe().map_err(Error::event_source)?;
Ok(subscription)
}

Expand Down
2 changes: 1 addition & 1 deletion crates/relayer/src/chain/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::{
denom::DenomTrace,
error::Error,
event::{
monitor::{EventBatch, Result as MonitorResult},
source::{EventBatch, Result as MonitorResult},
IbcEventWithHeight,
},
keyring::AnySigningKeyPair,
Expand Down
2 changes: 1 addition & 1 deletion crates/relayer/src/chain/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use super::{

pub struct Threads {
pub chain_runtime: thread::JoinHandle<()>,
pub event_monitor: Option<thread::JoinHandle<()>>,
pub event_source: Option<thread::JoinHandle<()>>,
}

pub struct ChainRuntime<Endpoint: ChainEndpoint> {
Expand Down
19 changes: 19 additions & 0 deletions crates/relayer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ pub mod default {
Duration::from_secs(10)
}

pub fn poll_interval() -> Duration {
Duration::from_secs(1)
}

pub fn clock_drift() -> Duration {
Duration::from_secs(5)
}
Expand Down Expand Up @@ -443,6 +447,17 @@ pub struct GenesisRestart {
pub archive_addr: Url,
}

#[derive(Clone, Debug, PartialEq, Deserialize, Serialize, Default)]
#[serde(rename_all = "lowercase")]
pub enum EventSource {
/// Push-based event source, via WebSocket
#[default]
WebSocket,

/// Pull-based event source, via RPC /block_results
Rpc,
}

#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct ChainConfig {
Expand All @@ -452,6 +467,10 @@ pub struct ChainConfig {
pub rpc_addr: Url,
pub websocket_addr: WebSocketClientUrl,
pub grpc_addr: Url,
#[serde(default)]
pub event_source: EventSource,
#[serde(default = "default::poll_interval", with = "humantime_serde")]
pub poll_interval: Duration,
#[serde(default = "default::rpc_timeout", with = "humantime_serde")]
pub rpc_timeout: Duration,
pub account_prefix: String,
Expand Down
8 changes: 4 additions & 4 deletions crates/relayer/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use ibc_relayer_types::{
use crate::chain::cosmos::version;
use crate::chain::cosmos::BLOCK_MAX_BYTES_MAX_FRACTION;
use crate::config::Error as ConfigError;
use crate::event::monitor;
use crate::event::source;
use crate::keyring::{errors::Error as KeyringError, KeyType};
use crate::sdk_error::SdkError;

Expand Down Expand Up @@ -86,9 +86,9 @@ define_error! {
{ url: tendermint_rpc::Url }
|e| { format!("Websocket error to endpoint {}", e.url) },

EventMonitor
[ monitor::Error ]
|_| { "event monitor error" },
EventSource
[ source::Error ]
|_| { "event source error" },

Grpc
|_| { "gRPC error" },
Expand Down
9 changes: 5 additions & 4 deletions crates/relayer/src/event.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use core::fmt::{Display, Error as FmtError, Formatter};
use serde::Serialize;
use tendermint::abci::Event as AbciEvent;

use ibc_relayer_types::{
applications::ics29_fee::events::{DistributeFeePacket, IncentivizedPacket},
applications::ics31_icq::events::CrossChainQueryPacket,
Expand All @@ -21,14 +24,12 @@ use ibc_relayer_types::{
events::{Error as IbcEventError, IbcEvent, IbcEventType},
Height,
};
use serde::Serialize;
use tendermint::abci::Event as AbciEvent;

use crate::light_client::decode_header;

pub mod bus;
pub mod monitor;
pub mod rpc;
pub mod error;
pub mod source;

#[derive(Clone, Debug, Serialize)]
pub struct IbcEventWithHeight {
Expand Down
Loading