From d1e540122c8211cc8bd143c1b06951ae242f1bcb Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Mon, 3 May 2021 15:45:32 +0200 Subject: [PATCH 1/8] Introduce error type for event monitor --- relayer/src/chain/cosmos.rs | 4 ++- relayer/src/chain/runtime.rs | 4 +-- relayer/src/error.rs | 4 +++ relayer/src/event/monitor.rs | 68 ++++++++++++++++++++++-------------- 4 files changed, 51 insertions(+), 29 deletions(-) diff --git a/relayer/src/chain/cosmos.rs b/relayer/src/chain/cosmos.rs index 5634a4d893..163f940625 100644 --- a/relayer/src/chain/cosmos.rs +++ b/relayer/src/chain/cosmos.rs @@ -376,9 +376,11 @@ impl Chain for CosmosSdkChain { self.config.id.clone(), self.config.websocket_addr.clone(), rt, - )?; + ) + .map_err(Kind::EventMonitor)?; event_monitor.subscribe().unwrap(); + let monitor_thread = thread::spawn(move || event_monitor.run()); Ok((event_receiver, Some(monitor_thread))) diff --git a/relayer/src/chain/runtime.rs b/relayer/src/chain/runtime.rs index 4a25740f37..cd60090bfa 100644 --- a/relayer/src/chain/runtime.rs +++ b/relayer/src/chain/runtime.rs @@ -89,10 +89,10 @@ impl ChainRuntime { let light_client = chain.init_light_client()?; // Start the event monitor - let (event_receiver, event_monitor_thread) = chain.init_event_monitor(rt.clone())?; + let (event_batch_rx, event_monitor_thread) = chain.init_event_monitor(rt.clone())?; // Instantiate & spawn the runtime - let (handle, runtime_thread) = Self::init(chain, light_client, event_receiver, rt); + let (handle, runtime_thread) = Self::init(chain, light_client, event_batch_rx, rt); let threads = Threads { chain_runtime: runtime_thread, diff --git a/relayer/src/error.rs b/relayer/src/error.rs index d19fd46e95..b319be0150 100644 --- a/relayer/src/error.rs +++ b/relayer/src/error.rs @@ -34,6 +34,10 @@ pub enum Kind { #[error("Websocket error to endpoint {0}")] Websocket(tendermint_rpc::Url), + /// Event monitor error + #[error("Event monitor")] + EventMonitor(crate::event::monitor::Error), + /// GRPC error (typically raised by the GRPC client or the GRPC requester) #[error("GRPC error")] Grpc, diff --git a/relayer/src/event/monitor.rs b/relayer/src/event/monitor.rs index 1d2286e138..0109a87a45 100644 --- a/relayer/src/event/monitor.rs +++ b/relayer/src/event/monitor.rs @@ -1,19 +1,34 @@ use std::sync::Arc; -use anomaly::BoxError; use crossbeam_channel as channel; use futures::stream::StreamExt; use futures::{stream::select_all, Stream}; use itertools::Itertools; use tendermint_rpc::{query::EventType, query::Query, SubscriptionClient, WebSocketClient}; +use thiserror::Error; use tokio::runtime::Runtime as TokioRuntime; use tokio::task::JoinHandle; use tracing::{debug, error, info}; -use ibc::{events::IbcEvent, ics24_host::identifier::ChainId}; +use ibc::{events::IbcEvent, ics02_client::height::Height, ics24_host::identifier::ChainId}; -use crate::error::{Error, Kind}; -use ibc::ics02_client::height::Height; +#[derive(Debug, Clone, Error)] +pub enum Error { + #[error("failed to create WebSocket client driver: {0}")] + ClientCreationFailed(tendermint_rpc::Error), + + #[error("failed to terminate previous WebSocket client driver: {0}")] + ClientTerminationFailed(Arc), + + #[error("failed to run previous WebSocket client driver to completion: {0}")] + ClientCompletionFailed(tendermint_rpc::Error), + + #[error("failed to subscribe to events via WebSocket client: {0}")] + ClientSubscriptionFailed(tendermint_rpc::Error), + + #[error("failed to send event batch through channel")] + ChannelSendFailed, +} /// A batch of events from a chain at a specific height #[derive(Clone, Debug)] @@ -70,11 +85,9 @@ impl EventMonitor { let (tx, rx) = channel::unbounded(); let ws_addr = node_addr.clone(); - let (websocket_client, websocket_driver) = rt.block_on(async move { - WebSocketClient::new(ws_addr.clone()) - .await - .map_err(|e| Kind::Websocket(ws_addr).context(e)) - })?; + let (websocket_client, websocket_driver) = rt + .block_on(async move { WebSocketClient::new(ws_addr.clone()).await }) + .map_err(Error::ClientCreationFailed)?; let websocket_driver_handle = rt.spawn(websocket_driver.run()); @@ -112,13 +125,14 @@ impl EventMonitor { } /// Clear the current subscriptions, and subscribe again to all queries. - pub fn subscribe(&mut self) -> Result<(), BoxError> { + pub fn subscribe(&mut self) -> Result<(), Error> { let mut subscriptions = vec![]; for query in &self.event_queries { let subscription = self .rt - .block_on(self.websocket_client.subscribe(query.clone()))?; + .block_on(self.websocket_client.subscribe(query.clone())) + .map_err(Error::ClientSubscriptionFailed)?; subscriptions.push(subscription); } @@ -128,11 +142,12 @@ impl EventMonitor { Ok(()) } - fn try_reconnect(&mut self) -> Result<(), BoxError> { + fn try_reconnect(&mut self) -> Result<(), Error> { // Try to reconnect let (mut websocket_client, websocket_driver) = self .rt - .block_on(WebSocketClient::new(self.node_addr.clone()))?; + .block_on(WebSocketClient::new(self.node_addr.clone())) + .map_err(Error::ClientCreationFailed)?; let mut websocket_driver_handle = self.rt.spawn(websocket_driver.run()); @@ -150,21 +165,19 @@ impl EventMonitor { debug!("Gracefully shutting down previous client"); if let Err(e) = websocket_client.close() { - error!("Previous websocket client closing failure {}", e); + error!("previous websocket client closing failure {}", e); } - self.rt.block_on(websocket_driver_handle).map_err(|e| { - tendermint_rpc::Error::client_internal_error(format!( - "failed to terminate previous WebSocket client driver: {}", - e - )) - })??; + let result = self + .rt + .block_on(websocket_driver_handle) + .map_err(|e| Error::ClientTerminationFailed(Arc::new(e)))?; - Ok(()) + result.map_err(Error::ClientCompletionFailed) } /// Try to resubscribe to events - fn try_resubscribe(&mut self) -> Result<(), BoxError> { + fn try_resubscribe(&mut self) -> Result<(), Error> { self.subscribe() } @@ -195,7 +208,7 @@ impl EventMonitor { } /// Collect the IBC events from the subscriptions - fn collect_events(&mut self) -> Result<(), BoxError> { + fn collect_events(&mut self) -> Result<(), Error> { let event = self.rt.block_on(self.subscriptions.next()); match event { @@ -210,19 +223,22 @@ impl EventMonitor { height, events, }; - self.tx_batch.send(batch)?; + + self.tx_batch + .send(batch) + .map_err(|_| Error::ChannelSendFailed)?; } } Err(err) => { error!( - "Error {} when extracting IBC events from {:?}: ", + "error {} when extracting IBC events from {:?}: ", err, event ); } } } Some(Err(err)) => { - error!("Error on collecting events from subscriptions: {}", err); + error!("error on collecting events from subscriptions: {}", err); } None => (), // no events available } From e09a26b58f4f8894f3ea7932a727c318943b3211 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Mon, 3 May 2021 15:46:11 +0200 Subject: [PATCH 2/8] Do not crash if chain runtime fails to subscribe to events --- relayer/src/chain/cosmos.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relayer/src/chain/cosmos.rs b/relayer/src/chain/cosmos.rs index 163f940625..994073cf35 100644 --- a/relayer/src/chain/cosmos.rs +++ b/relayer/src/chain/cosmos.rs @@ -379,7 +379,7 @@ impl Chain for CosmosSdkChain { ) .map_err(Kind::EventMonitor)?; - event_monitor.subscribe().unwrap(); + event_monitor.subscribe().map_err(Kind::EventMonitor)?; let monitor_thread = thread::spawn(move || event_monitor.run()); From b01a0f55d12f6cf4ffdf9257be1d14dbc78f4107 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Mon, 3 May 2021 15:50:25 +0200 Subject: [PATCH 3/8] Gracefully handle runtime init error in `start-multi` command --- relayer-cli/src/commands/start_multi.rs | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/relayer-cli/src/commands/start_multi.rs b/relayer-cli/src/commands/start_multi.rs index 6bbd5b6957..45c8cb7c41 100644 --- a/relayer-cli/src/commands/start_multi.rs +++ b/relayer-cli/src/commands/start_multi.rs @@ -85,8 +85,26 @@ fn start_all_connections(config: &Config) -> Result { conn.a_chain, conn.b_chain ); - let chain_a = registry.get_or_spawn(&conn.a_chain)?; - let chain_b = registry.get_or_spawn(&conn.b_chain)?; + let chain_a = registry.get_or_spawn(&conn.a_chain); + let chain_b = registry.get_or_spawn(&conn.b_chain); + + let (chain_a, chain_b) = match (chain_a, chain_b) { + (Err(err), _) => { + error!( + "failed to initialize runtime for chain '{}': {}", + conn.a_chain, err + ); + continue; + } + (_, Err(err)) => { + error!( + "failed to initialize runtime for chain '{}': {}", + conn.a_chain, err + ); + continue; + } + (Ok(a), Ok(b)) => (a, b), + }; s.spawn(|_| { let supervisor = Supervisor::spawn(chain_a, chain_b).unwrap(); From 48439aea219b1c994f51d6283c8b96a6f3182d05 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Tue, 4 May 2021 11:36:57 +0200 Subject: [PATCH 4/8] Improve worker error context --- relayer/src/link.rs | 15 +++++++++------ relayer/src/supervisor.rs | 7 +++++-- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/relayer/src/link.rs b/relayer/src/link.rs index 51c2799789..707eabd90e 100644 --- a/relayer/src/link.rs +++ b/relayer/src/link.rs @@ -31,13 +31,13 @@ use ibc_proto::ibc::core::channel::v1::{ QueryPacketCommitmentsRequest, QueryUnreceivedAcksRequest, QueryUnreceivedPacketsRequest, }; -use crate::chain::handle::ChainHandle; use crate::channel::{Channel, ChannelError, ChannelSide}; use crate::connection::ConnectionError; use crate::error::Error; use crate::event::monitor::EventBatch; use crate::foreign_client::{ForeignClient, ForeignClientError}; use crate::relay::MAX_ITER; +use crate::{chain::handle::ChainHandle, transfer::PacketError}; use ibc::events::VecIbcEvents; #[derive(Debug, Error)] @@ -45,23 +45,26 @@ pub enum LinkError { #[error("failed with underlying error: {0}")] Failed(String), + #[error("failed with underlying error: {0}")] + Generic(#[from] Error), + #[error("failed to construct packet proofs for chain {0} with error: {1}")] PacketProofsConstructor(ChainId, Error), #[error("failed during query to chain id {0} with underlying error: {1}")] QueryError(ChainId, Error), - #[error("ConnectionError: {0}:")] + #[error("connection error: {0}:")] ConnectionError(#[from] ConnectionError), - #[error("ChannelError: {0}:")] + #[error("channel error: {0}:")] ChannelError(#[from] ChannelError), - #[error("Failed during a client operation: {0}:")] + #[error("failed during a client operation: {0}:")] ClientError(ForeignClientError), - #[error("PacketError: {0}:")] - PacketError(#[from] Error), + #[error("packet error: {0}:")] + PacketError(#[from] PacketError), #[error("clearing of old packets failed")] OldPacketClearingFailed, diff --git a/relayer/src/supervisor.rs b/relayer/src/supervisor.rs index c86df8e03b..e9ffa206a5 100644 --- a/relayer/src/supervisor.rs +++ b/relayer/src/supervisor.rs @@ -6,6 +6,7 @@ use std::{ use anomaly::BoxError; use crossbeam_channel::{Receiver, Sender}; +use tracing::{error, error_span, info, warn}; use ibc::{ events::IbcEvent, @@ -18,7 +19,6 @@ use ibc::{ ics24_host::identifier::{ChainId, ChannelId, PortId}, Height, }; -use tracing::{info, warn}; use crate::{ chain::handle::ChainHandle, @@ -245,12 +245,15 @@ impl Worker { /// Run the worker event loop. fn run(self, object: Object) { + let span = error_span!("worker", path = %object.short_name()); + let _guard = span.enter(); + let result = match object { Object::UnidirectionalChannelPath(path) => self.run_uni_chan_path(path), }; if let Err(e) = result { - eprintln!("worker error: {}", e); + error!("worker error: {}", e); } } From 726efc6bb84372788f5054b7a5046feb9604c64b Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Tue, 4 May 2021 15:26:28 +0200 Subject: [PATCH 5/8] Fix chain id in error message if spawning chain runtime fails --- relayer-cli/src/commands/start_multi.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/relayer-cli/src/commands/start_multi.rs b/relayer-cli/src/commands/start_multi.rs index 45c8cb7c41..ebe9159e60 100644 --- a/relayer-cli/src/commands/start_multi.rs +++ b/relayer-cli/src/commands/start_multi.rs @@ -89,6 +89,7 @@ fn start_all_connections(config: &Config) -> Result { let chain_b = registry.get_or_spawn(&conn.b_chain); let (chain_a, chain_b) = match (chain_a, chain_b) { + (Ok(a), Ok(b)) => (a, b), (Err(err), _) => { error!( "failed to initialize runtime for chain '{}': {}", @@ -99,11 +100,10 @@ fn start_all_connections(config: &Config) -> Result { (_, Err(err)) => { error!( "failed to initialize runtime for chain '{}': {}", - conn.a_chain, err + conn.b_chain, err ); continue; } - (Ok(a), Ok(b)) => (a, b), }; s.spawn(|_| { From e839aa80c0c001eba0e22dc0bddb0a1d270efdf6 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Tue, 4 May 2021 15:33:35 +0200 Subject: [PATCH 6/8] Print channel error when one arises --- relayer/src/chain/runtime.rs | 33 ++++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/relayer/src/chain/runtime.rs b/relayer/src/chain/runtime.rs index cd60090bfa..c8eea41669 100644 --- a/relayer/src/chain/runtime.rs +++ b/relayer/src/chain/runtime.rs @@ -2,21 +2,22 @@ use std::{sync::Arc, thread}; use crossbeam_channel as channel; use tokio::runtime::Runtime as TokioRuntime; +use tracing::error; -use ibc::ics02_client::client_consensus::AnyConsensusStateWithHeight; -use ibc::ics02_client::events::UpdateClient; -use ibc::ics02_client::misbehaviour::AnyMisbehaviour; use ibc::{ events::IbcEvent, ics02_client::{ - client_consensus::{AnyConsensusState, ConsensusState}, + client_consensus::{AnyConsensusState, AnyConsensusStateWithHeight, ConsensusState}, client_state::{AnyClientState, ClientState}, + events::UpdateClient, header::{AnyHeader, Header}, + misbehaviour::AnyMisbehaviour, + }, + ics03_connection::{connection::ConnectionEnd, version::Version}, + ics04_channel::{ + channel::ChannelEnd, + packet::{PacketMsgType, Sequence}, }, - ics03_connection::connection::ConnectionEnd, - ics03_connection::version::Version, - ics04_channel::channel::ChannelEnd, - ics04_channel::packet::{PacketMsgType, Sequence}, ics23_commitment::commitment::CommitmentPrefix, ics24_host::identifier::{ChannelId, ClientId, ConnectionId, PortId}, proofs::Proofs, @@ -24,6 +25,7 @@ use ibc::{ signer::Signer, Height, }; + use ibc_proto::ibc::core::client::v1::{QueryClientStatesRequest, QueryConsensusStatesRequest}; use ibc_proto::ibc::core::{ channel::v1::{ @@ -151,12 +153,13 @@ impl ChainRuntime { loop { channel::select! { recv(self.event_receiver) -> event_batch => { - if let Ok(event_batch) = event_batch { - self.event_bus - .broadcast(Arc::new(event_batch)) - .map_err(|e| Kind::Channel.context(e))?; - } else { - // TODO: Handle error + match event_batch { + Ok(event_batch) => { + self.event_bus + .broadcast(Arc::new(event_batch)) + .map_err(|e| Kind::Channel.context(e))?; + }, + Err(e) => error!("received error via event bus: {}", e), } }, recv(self.request_receiver) -> event => { @@ -290,7 +293,7 @@ impl ChainRuntime { self.query_txs(request, reply_to)? }, - Err(_e) => todo!(), // TODO: Handle error? + Err(e) => error!("received error via chain request channel: {}", e), } }, } From 8a92f4be1f0329bc45ce933a4e194d775d2a6774 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Tue, 4 May 2021 16:16:27 +0200 Subject: [PATCH 7/8] Refactor event monitor loop a little bit --- relayer/src/event/monitor.rs | 102 ++++++++++++++++++++--------------- 1 file changed, 58 insertions(+), 44 deletions(-) diff --git a/relayer/src/event/monitor.rs b/relayer/src/event/monitor.rs index 0109a87a45..59ee7be917 100644 --- a/relayer/src/event/monitor.rs +++ b/relayer/src/event/monitor.rs @@ -8,7 +8,7 @@ use tendermint_rpc::{query::EventType, query::Query, SubscriptionClient, WebSock use thiserror::Error; use tokio::runtime::Runtime as TokioRuntime; use tokio::task::JoinHandle; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, warn}; use ibc::{events::IbcEvent, ics02_client::height::Height, ics24_host::identifier::ChainId}; @@ -26,6 +26,12 @@ pub enum Error { #[error("failed to subscribe to events via WebSocket client: {0}")] ClientSubscriptionFailed(tendermint_rpc::Error), + #[error("failed to collect events over WebSocket subscription: {0}")] + NextEventBatchFailed(tendermint_rpc::Error), + + #[error("failed to extract IBC events: {0}")] + CollectEventsFailed(String), + #[error("failed to send event batch through channel")] ChannelSendFailed, } @@ -129,6 +135,8 @@ impl EventMonitor { let mut subscriptions = vec![]; for query in &self.event_queries { + debug!("subscribing to query: {}", query); + let subscription = self .rt .block_on(self.websocket_client.subscribe(query.clone())) @@ -139,10 +147,17 @@ impl EventMonitor { self.subscriptions = Box::new(select_all(subscriptions)); + debug!("subscribed to all queries"); + Ok(()) } fn try_reconnect(&mut self) -> Result<(), Error> { + warn!( + "trying to reconnect to WebSocket endpoint: {}", + self.node_addr + ); + // Try to reconnect let (mut websocket_client, websocket_driver) = self .rt @@ -159,10 +174,10 @@ impl EventMonitor { &mut websocket_driver_handle, ); - debug!("Reconnected"); + warn!("reconnected to WebSocket endpoint: {}", self.node_addr); // Shut down previous client - debug!("Gracefully shutting down previous client"); + debug!("gracefully shutting down previous client"); if let Err(e) = websocket_client.close() { error!("previous websocket client closing failure {}", e); @@ -173,34 +188,38 @@ impl EventMonitor { .block_on(websocket_driver_handle) .map_err(|e| Error::ClientTerminationFailed(Arc::new(e)))?; + debug!("previous client successfully shutdown"); + result.map_err(Error::ClientCompletionFailed) } /// Try to resubscribe to events fn try_resubscribe(&mut self) -> Result<(), Error> { + warn!("trying to resubscribe to events"); + self.subscribe() } /// Event monitor loop pub fn run(mut self) { - info!(chain.id = %self.chain_id, "running listener"); + info!(chain.id = %self.chain_id, "starting event monitor"); loop { match self.collect_events() { - Ok(_) => continue, - Err(err) => { - debug!("Web socket error: {}", err); + Ok(batches) => self.process_batches(batches).unwrap_or_else(|e| { + error!("failed to process event batch: {}", e); + }), + Err(e) => { + error!("failed to collect events: {}", e); // Try to reconnect self.try_reconnect().unwrap_or_else(|e| { - debug!("Error on reconnecting: {}", e); - panic!("Abort during reconnection"); + error!("error on reconnecting: {}", e); }); // Try to resubscribe self.try_resubscribe().unwrap_or_else(|e| { - debug!("Error on reconnecting: {}", e); - panic!("Abort during reconnection"); + error!("error on reconnecting: {}", e); }); } } @@ -208,41 +227,36 @@ impl EventMonitor { } /// Collect the IBC events from the subscriptions - fn collect_events(&mut self) -> Result<(), Error> { - let event = self.rt.block_on(self.subscriptions.next()); - - match event { - Some(Ok(event)) => { - match crate::event::rpc::get_all_events(&self.chain_id, event.clone()) { - Ok(ibc_events) => { - let events_by_height = ibc_events.into_iter().into_group_map(); - - for (height, events) in events_by_height { - let batch = EventBatch { - chain_id: self.chain_id.clone(), - height, - events, - }; - - self.tx_batch - .send(batch) - .map_err(|_| Error::ChannelSendFailed)?; - } - } - Err(err) => { - error!( - "error {} when extracting IBC events from {:?}: ", - err, event - ); - } - } - } - Some(Err(err)) => { - error!("error on collecting events from subscriptions: {}", err); - } - None => (), // no events available + fn process_batches(&self, batches: Vec) -> Result<(), Error> { + for batch in batches { + self.tx_batch + .send(batch) + .map_err(|_| Error::ChannelSendFailed)?; } Ok(()) } + + /// Collect the IBC events from the subscriptions + fn collect_events(&mut self) -> Result, Error> { + if let Some(event) = self.rt.block_on(self.subscriptions.next()) { + let event = event.map_err(Error::NextEventBatchFailed)?; + let ibc_events = crate::event::rpc::get_all_events(&self.chain_id, event) + .map_err(Error::CollectEventsFailed)?; + + let events_by_height = ibc_events.into_iter().into_group_map(); + let batches = events_by_height + .into_iter() + .map(|(height, events)| EventBatch { + chain_id: self.chain_id.clone(), + height, + events, + }) + .collect(); + + Ok(batches) + } else { + Ok(vec![]) + } + } } From 80d9adbd66fe82023fd9a2ca51cc933f43c8613c Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Tue, 4 May 2021 23:32:15 +0200 Subject: [PATCH 8/8] Update changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a015ad7e33..8e56254072 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,7 @@ Jongwhan Lee (@leejw51crypto) ([#878]). - Fix pagination in gRPC query for clients ([#811]) - Fix relayer crash when hermes starts in the same time as packets are being sent ([#851]) - Fix missing port information in `hermes query channels` ([#840]) + - Fix crash during initialization of event monitor when node is down ([#863]) - [ibc-relayer-cli] - Fix for `ft-transfer` mismatching arguments ([#869]) @@ -46,6 +47,7 @@ Jongwhan Lee (@leejw51crypto) ([#878]). [#851]: https://github.com/informalsystems/ibc-rs/issues/851 [#854]: https://github.com/informalsystems/ibc-rs/issues/854 [#861]: https://github.com/informalsystems/ibc-rs/issues/861 +[#863]: https://github.com/informalsystems/ibc-rs/issues/863 [#869]: https://github.com/informalsystems/ibc-rs/issues/869 [#878]: https://github.com/informalsystems/ibc-rs/issues/878