Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug in event monitor where subscriptions would not be terminated properly on WebSocket error #290

Merged
merged 5 commits into from
Oct 8, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 47 additions & 35 deletions relayer/src/event_monitor.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use ibc::events::IBCEvent;
use tendermint::{chain, net, Error as TMError};
use tendermint_rpc::Subscription;
use tendermint_rpc::{SubscriptionClient, WebSocketClient};
use tendermint_rpc::{
query::EventType, query::Query, Subscription, SubscriptionClient, WebSocketClient,
};
use tokio::stream::StreamExt;
use tokio::sync::mpsc::Sender;

use futures::stream::select_all;
use tracing::{debug, info};
use futures::{stream::select_all, Stream};
use tracing::{debug, error, info};

type SubscriptionResult = Result<tendermint_rpc::event::Event, tendermint_rpc::Error>;
type SubscriptionStream = dyn Stream<Item = SubscriptionResult> + Send + Sync + Unpin;

/// Connect to a TM node, receive push events over a websocket and filter them for the
/// event handler.
Expand All @@ -19,9 +23,9 @@ pub struct EventMonitor {
/// Node Address
node_addr: net::Address,
/// Queries
event_queries: Vec<String>,
/// Subscriptions
subscriptions: Vec<Subscription>,
event_queries: Vec<Query>,
/// All subscriptions combined in a single stream
subscriptions: Box<SubscriptionStream>,
}

impl EventMonitor {
Expand All @@ -34,35 +38,29 @@ impl EventMonitor {
let websocket_client = WebSocketClient::new(rpc_addr.clone()).await?;

// TODO: move them to config file(?)
let event_queries = vec![
"tm.event='NewTx'".to_string(),
"tm.event='NewBlock'".to_string(),
];
let event_queries = vec![Query::from(EventType::Tx), Query::from(EventType::NewBlock)];

Ok(EventMonitor {
chain_id,
websocket_client,
channel_to_handler,
node_addr: rpc_addr.clone(),
subscriptions: Vec::with_capacity(event_queries.len()),
event_queries,
subscriptions: Box::new(futures::stream::empty()),
})
}

/// Terminate and clear the current subscriptions, and subscribe again to all queries.
/// Clear the current subscriptions, and subscribe again to all queries.
pub async fn subscribe(&mut self) -> Result<(), Box<dyn std::error::Error>> {
let count = self.subscriptions.len();
let subscriptions = std::mem::replace(&mut self.subscriptions, Vec::with_capacity(count));

for subscription in subscriptions {
subscription.terminate().await?;
}
let mut subscriptions = vec![];

for query in &self.event_queries {
let subscription = self.websocket_client.subscribe(query.clone()).await?;
self.subscriptions.push(subscription);
subscriptions.push(subscription);
}

self.subscriptions = Box::new(select_all(subscriptions));

Ok(())
}

Expand All @@ -75,20 +73,30 @@ impl EventMonitor {
Ok(..) => continue,
Err(err) => {
debug!("Web socket error: {}", err);

// Try to reconnect
let websocket_client = WebSocketClient::new(self.node_addr.clone())
let mut websocket_client = WebSocketClient::new(self.node_addr.clone())
.await
.unwrap_or_else(|e| {
debug!("Error on reconnection {}", e);
panic!("Abort on failed reconnection")
debug!("Error on reconnection: {}", e);
panic!("Abort on failed reconnection");
});

// Swap the new client with the previous one which failed,
// so that we can shut the latter down gracefully.
std::mem::swap(&mut self.websocket_client, &mut websocket_client);

debug!("Reconnected");
self.websocket_client = websocket_client;

// Shut down previous client
debug!("Gracefully shutting down previous client");
websocket_client.close().await.unwrap_or_else(|e| {
error!("Failed to close previous WebSocket client: {}", e);
});

// Try to resubscribe
if let Err(err) = self.subscribe().await {
debug!("Error on recreating subscriptions {}", err);
debug!("Error on recreating subscriptions: {}", err);
panic!("Abort during reconnection");
};
}
Expand All @@ -98,18 +106,22 @@ impl EventMonitor {

/// Collect the IBC events from the subscriptions
pub async fn collect_events(&mut self) -> Result<(), TMError> {
if let Some(event) = select_all(&mut self.subscriptions).next().await {
match event {
Ok(event) => {
if let Ok(ibc_events) = ibc::events::get_all_events(event) {
// TODO - send_timeout()?
self.channel_to_handler
.send((self.chain_id, ibc_events))
.await?;
tokio::select! {
Some(event) = self.subscriptions.next() => {
match event {
Ok(event) => {
if let Ok(ibc_events) = ibc::events::get_all_events(event) {
// TODO - send_timeout()?
self.channel_to_handler
.send((self.chain_id, ibc_events))
.await?;
}
}
Err(err) => {
error!("Error on collecting events from subscriptions: {}", err);
}
}
Err(_e) => (), // TODO,
}
},
}

Ok(())
Expand Down