Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Recover from missed RPC events after WebSocket subscription is closed by Tendermint #1205

Merged
merged 17 commits into from
Jul 21, 2021
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@

- [ibc-relayer]
- Fixed: Hermes does not clear packets on start ([#1200])
- Recover from missed RPC events after WebSocket subscription is closed by Tendermint ([#1196])


[#1094]: https://github.com/informalsystems/ibc-rs/issues/1094
[#1114]: https://github.com/informalsystems/ibc-rs/issues/1114
[#1192]: https://github.com/informalsystems/ibc-rs/issues/1192
[#1196]: https://github.com/informalsystems/ibc-rs/issues/1196
[#1200]: https://github.com/informalsystems/ibc-rs/issues/1200

## v0.6.0
Expand Down
23 changes: 9 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ exclude = [
"proto-compiler"
]

# [patch.crates-io]
# tendermint = { git = "https://github.com/informalsystems/tendermint-rs", branch = "master" }
# tendermint-rpc = { git = "https://github.com/informalsystems/tendermint-rs", branch = "master" }
# tendermint-proto = { git = "https://github.com/informalsystems/tendermint-rs", branch = "master" }
# tendermint-light-client = { git = "https://github.com/informalsystems/tendermint-rs", branch = "master" }
# tendermint-testgen = { git = "https://github.com/informalsystems/tendermint-rs", branch = "master" }
[patch.crates-io]
romac marked this conversation as resolved.
Show resolved Hide resolved
tendermint = { git = "https://github.com/informalsystems/tendermint-rs", branch = "master" }
tendermint-rpc = { git = "https://github.com/informalsystems/tendermint-rs", branch = "master" }
tendermint-proto = { git = "https://github.com/informalsystems/tendermint-rs", branch = "master" }
tendermint-light-client = { git = "https://github.com/informalsystems/tendermint-rs", branch = "master" }
tendermint-testgen = { git = "https://github.com/informalsystems/tendermint-rs", branch = "master" }
2 changes: 1 addition & 1 deletion modules/src/ics07_tendermint/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ pub mod test_util {
281_815_u64.try_into().unwrap(),
);

let vs = ValidatorSet::new(vec![v1], Some(v1));
let vs = ValidatorSet::new(vec![v1.clone()], Some(v1));

Header {
signed_header: shdr,
Expand Down
3 changes: 2 additions & 1 deletion modules/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ pub fn default_consensus_params() -> consensus::Params {
consensus::Params {
block: block::Size {
max_bytes: 22020096,
max_gas: -1, // Tendetmint-go also has TimeIotaMs: 1000, // 1s
max_gas: -1,
time_iota_ms: 1000,
},
evidence: evidence::Params {
max_age_num_blocks: 100000,
Expand Down
13 changes: 7 additions & 6 deletions relayer-cli/src/commands/listen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{fmt, ops::Deref, str::FromStr, sync::Arc, thread};
use abscissa_core::{application::fatal_error, error::BoxError, Command, Options, Runnable};
use itertools::Itertools;
use tokio::runtime::Runtime as TokioRuntime;
use tracing::{error, info};

use ibc::{events::IbcEvent, ics24_host::identifier::ChainId};

Expand Down Expand Up @@ -92,8 +93,8 @@ impl Runnable for ListenCmd {

/// Listen to events
pub fn listen(config: &ChainConfig, filters: &[EventFilter]) -> Result<(), BoxError> {
println!(
"[info] Listening for events `{}` on '{}'...",
info!(
"listening for events `{}` on '{}'...",
filters.iter().format(", "),
config.id
);
Expand All @@ -116,15 +117,15 @@ pub fn listen(config: &ChainConfig, filters: &[EventFilter]) -> Result<(), BoxEr
continue;
}

println!("- Event batch at height {}", batch.height);
info!("- event batch at height {}", batch.height);
romac marked this conversation as resolved.
Show resolved Hide resolved

for event in matching_events {
println!("+ {:#?}", event);
info!("+ {:#?}", event);
}

println!();
info!("");
}
Err(e) => println!("- Error: {}", e),
Err(e) => error!("- error: {}", e),
}
}

Expand Down
85 changes: 65 additions & 20 deletions relayer/src/event/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ use crossbeam_channel as channel;
use futures::{
pin_mut,
stream::{self, select_all, StreamExt},
Stream,
Stream, TryStreamExt,
};
use thiserror::Error;
use tokio::task::JoinHandle;
use tokio::{runtime::Runtime as TokioRuntime, sync::mpsc};
use tracing::{debug, error, info, trace, warn};
use tracing::{debug, error, info, trace};

use tendermint_rpc::{
error::Code,
event::Event as RpcEvent,
query::{EventType, Query},
Error as RpcError, Result as RpcResult, SubscriptionClient, WebSocketClient,
Expand All @@ -22,7 +23,7 @@ use ibc::{events::IbcEvent, ics02_client::height::Height, ics24_host::identifier

use crate::util::{
retry::{retry_count, retry_with_index, RetryResult},
stream::group_while,
stream::try_group_while,
};

mod retry_strategy {
Expand Down Expand Up @@ -63,10 +64,27 @@ pub enum Error {
#[error("failed to extract IBC events: {0}")]
CollectEventsFailed(String),

#[error("{0}")]
SubscriptionCancelled(RpcError),

#[error("RPC error: {0}")]
GenericRpcError(RpcError),

#[error("event monitor failed to dispatch event batch to subscribers")]
ChannelSendFailed,
}

impl Error {
fn canceled_or_generic(e: RpcError) -> Self {
match (e.code(), e.data()) {
(Code::ServerError, Some(msg)) if msg.contains("subscription was cancelled") => {
Self::SubscriptionCancelled(e)
}
_ => Self::GenericRpcError(e),
}
}
}

/// A batch of events from a chain at a specific height
#[derive(Clone, Debug)]
pub struct EventBatch {
Expand Down Expand Up @@ -253,11 +271,11 @@ impl EventMonitor {
self.subscribe()
}

/// Attempt to restart the WebSocket client using the given retry strategy.
/// Attempt to reconnect the WebSocket client using the given retry strategy.
///
/// See the [`retry`](https://docs.rs/retry) crate and the
/// [`crate::util::retry`] module for more information.
fn restart(&mut self) {
fn reconnect(&mut self) {
let result = retry_with_index(retry_strategy::default(), |_| {
// Try to reconnect
if let Err(e) = self.try_reconnect() {
Expand All @@ -267,7 +285,7 @@ impl EventMonitor {

// Try to resubscribe
if let Err(e) = self.try_resubscribe() {
trace!(chain.id = %self.chain_id, "error when reconnecting: {}", e);
trace!(chain.id = %self.chain_id, "error when resubscribing: {}", e);
return RetryResult::Retry(());
}

Expand Down Expand Up @@ -336,32 +354,55 @@ impl EventMonitor {

let result = rt.block_on(async {
tokio::select! {
Some(batch) = batches.next() => Ok(batch),
Some(batch) = batches.next() => batch,
Some(err) = self.rx_err.recv() => Err(Error::WebSocketDriver(err)),
}
});

match result {
Ok(batch) => self.process_batch(batch).unwrap_or_else(|e| {
warn!(chain.id = %self.chain_id, "{}", e);
error!(chain.id = %self.chain_id, "{}", e);
}),
Err(Error::SubscriptionCancelled(reason)) => {
error!(chain.id = %self.chain_id, "subscription cancelled, reason: {}", reason);
romac marked this conversation as resolved.
Show resolved Hide resolved

self.propagate_error(Error::SubscriptionCancelled(reason))
romac marked this conversation as resolved.
Show resolved Hide resolved
.unwrap_or_else(|e| {
error!(chain.id = %self.chain_id, "{}", e);
});

// Reconnect to the WebSocket endpoint, and subscribe again to the queries.
self.reconnect();

// Abort this event loop, the `run` method will start a new one.
// We can't just write `return self.run()` here because Rust
// does not perform tail call optimization, and we would
// thus potentially blow up the stack after many restarts.
return Next::Continue;
}
Err(e) => {
error!(chain.id = %self.chain_id, "failed to collect events: {}", e);

// Restart the event monitor, reconnect to the WebSocket endpoint,
// and subscribe again to the queries.
self.restart();
// Reconnect to the WebSocket endpoint, and subscribe again to the queries.
self.reconnect();

// Abort this event loop, the `run` method will start a new one.
// We can't just write `return self.run()` here because Rust
// does not perform tail call optimization, and we would
// thus potentially blow up the stack after many restarts.
break;
return Next::Continue;
}
}
}
}

Next::Continue
/// Propagate error to subscribers
romac marked this conversation as resolved.
Show resolved Hide resolved
fn propagate_error(&self, error: Error) -> Result<()> {
self.tx_batch
.send(Err(error))
.map_err(|_| Error::ChannelSendFailed)?;

Ok(())
}

/// Collect the IBC events from the subscriptions
Expand All @@ -375,28 +416,32 @@ impl EventMonitor {
}

/// Collect the IBC events from an RPC event
fn collect_events(chain_id: &ChainId, event: RpcEvent) -> impl Stream<Item = (Height, IbcEvent)> {
fn collect_events(
chain_id: &ChainId,
event: RpcEvent,
) -> impl Stream<Item = Result<(Height, IbcEvent)>> {
let events = crate::event::rpc::get_all_events(chain_id, event).unwrap_or_default();
stream::iter(events)
stream::iter(events).map(Ok)
}

/// Convert a stream of RPC event into a stream of event batches
fn stream_batches(
subscriptions: Box<SubscriptionStream>,
chain_id: ChainId,
) -> impl Stream<Item = EventBatch> {
) -> impl Stream<Item = Result<EventBatch>> {
let id = chain_id.clone();

// Collect IBC events from each RPC event
let events = subscriptions
.filter_map(|rpc_event| async { rpc_event.ok() })
.flat_map(move |rpc_event| collect_events(&id, rpc_event));
.map_ok(move |rpc_event| collect_events(&id, rpc_event))
.map_err(Error::canceled_or_generic)
.try_flatten();

// Group events by height
let grouped = group_while(events, |(h0, _), (h1, _)| h0 == h1);
let grouped = try_group_while(events, |(h0, _), (h1, _)| h0 == h1);

// Convert each group to a batch
grouped.map(move |events| {
grouped.map_ok(move |events| {
let height = events
.first()
.map(|(h, _)| h)
Expand Down
Loading