Skip to content

Commit

Permalink
Make supervisor more resilient to node going down (#903)
Browse files Browse the repository at this point in the history
  • Loading branch information
romac authored May 6, 2021
1 parent d3c6096 commit 20d8fff
Show file tree
Hide file tree
Showing 16 changed files with 439 additions and 172 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Jongwhan Lee (@leejw51crypto) ([#878]).
- [ibc-relayer]
- Change the default for client creation to allow governance recovery in case of expiration or misbehaviour ([#785])
- Use a single supervisor to subscribe to all configured chains ([#862])
- The relayer is now more resilient to a node not being up or going down, and will attempt to reconnect ([#871])

### BUG FIXES

Expand Down Expand Up @@ -58,6 +59,7 @@ Jongwhan Lee (@leejw51crypto) ([#878]).
[#862]: https://github.com/informalsystems/ibc-rs/issues/862
[#863]: https://github.com/informalsystems/ibc-rs/issues/863
[#869]: https://github.com/informalsystems/ibc-rs/issues/869
[#871]: https://github.com/informalsystems/ibc-rs/issues/871
[#878]: https://github.com/informalsystems/ibc-rs/issues/878
[#909]: https://github.com/informalsystems/ibc-rs/issues/909
[#873]: https://github.com/informalsystems/ibc-rs/issues/873
Expand Down
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions relayer-cli/src/commands/listen.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
use std::{ops::Deref, sync::Arc, thread};

use abscissa_core::{application::fatal_error, error::BoxError, Command, Options, Runnable};
use crossbeam_channel as channel;
use itertools::Itertools;
use tokio::runtime::Runtime as TokioRuntime;

use tendermint_rpc::query::{EventType, Query};

use ibc::ics24_host::identifier::ChainId;
use ibc_relayer::{config::ChainConfig, event::monitor::*};
use ibc_relayer::{
config::ChainConfig,
event::monitor::{EventMonitor, EventReceiver},
};

use crate::prelude::*;

Expand Down Expand Up @@ -73,7 +75,7 @@ fn subscribe(
chain_config: &ChainConfig,
queries: Vec<Query>,
rt: Arc<TokioRuntime>,
) -> Result<(EventMonitor, channel::Receiver<EventBatch>), BoxError> {
) -> Result<(EventMonitor, EventReceiver), BoxError> {
let (mut event_monitor, rx) = EventMonitor::new(
chain_config.id.clone(),
chain_config.websocket_addr.clone(),
Expand Down
59 changes: 34 additions & 25 deletions relayer-cli/src/commands/misbehaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use ibc::ics02_client::events::UpdateClient;
use ibc::ics02_client::height::Height;
use ibc::ics24_host::identifier::{ChainId, ClientId};
use ibc_relayer::chain::handle::ChainHandle;
use ibc_relayer::event::monitor::UnwrapOrClone;
use ibc_relayer::foreign_client::{ForeignClient, MisbehaviourResults};

use crate::application::CliApp;
Expand Down Expand Up @@ -52,32 +53,40 @@ pub fn monitor_misbehaviour(
let subscription = chain.subscribe()?;

// check previous updates that may have been missed
misbehaviour_handling(chain.clone(), config, client_id, None)?;
misbehaviour_handling(chain.clone(), config, client_id.clone(), None)?;

// process update client events
while let Ok(event_batch) = subscription.recv() {
for event in event_batch.events.iter() {
match event {
IbcEvent::UpdateClient(update) => {
debug!("{:?}", update);
misbehaviour_handling(
chain.clone(),
config,
update.client_id(),
Some(update.clone()),
)?;
let event_batch = event_batch.unwrap_or_clone();
match event_batch {
Ok(event_batch) => {
for event in event_batch.events {
match event {
IbcEvent::UpdateClient(update) => {
debug!("{:?}", update);
misbehaviour_handling(
chain.clone(),
config,
update.client_id().clone(),
Some(update),
)?;
}

IbcEvent::CreateClient(_create) => {
// TODO - get header from full node, consensus state from chain, compare
}

IbcEvent::ClientMisbehaviour(ref _misbehaviour) => {
// TODO - submit misbehaviour to the witnesses (our full node)
return Ok(Some(event));
}

_ => {}
}
}

IbcEvent::CreateClient(_create) => {
// TODO - get header from full node, consensus state from chain, compare
}

IbcEvent::ClientMisbehaviour(_misbehaviour) => {
// TODO - submit misbehaviour to the witnesses (our full node)
return Ok(Some(event.clone()));
}

_ => {}
}
Err(e) => {
dbg!(e);
}
}
}
Expand All @@ -88,11 +97,11 @@ pub fn monitor_misbehaviour(
fn misbehaviour_handling(
chain: Box<dyn ChainHandle>,
config: &config::Reader<CliApp>,
client_id: &ClientId,
client_id: ClientId,
update: Option<UpdateClient>,
) -> Result<(), BoxError> {
let client_state = chain
.query_client_state(client_id, Height::zero())
.query_client_state(&client_id, Height::zero())
.map_err(|e| format!("could not query client state for {}: {}", client_id, e))?;

if client_state.is_frozen() {
Expand All @@ -108,7 +117,7 @@ fn misbehaviour_handling(
)
})?;

let client = ForeignClient::restore(client_id, chain.clone(), counterparty_chain.clone());
let client = ForeignClient::restore(&client_id, chain.clone(), counterparty_chain.clone());
let result = client.detect_misbehaviour_and_submit_evidence(update);
if let MisbehaviourResults::EvidenceSubmitted(events) = result {
info!("evidence submission result {:?}", events);
Expand Down
1 change: 1 addition & 0 deletions relayer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ dyn-clonable = "0.9.0"
tonic = "0.4"
dirs-next = "2.0.0"
dyn-clone = "1.0.3"
retry = { version = "1.2.1", default-features = false }

[dependencies.tendermint]
version = "=0.19.0"
Expand Down
12 changes: 2 additions & 10 deletions relayer/src/chain.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::{sync::Arc, thread};

use crossbeam_channel as channel;
use prost_types::Any;
use tendermint::block::Height;
use tokio::runtime::Runtime as TokioRuntime;
Expand Down Expand Up @@ -34,12 +33,11 @@ use ibc_proto::ibc::core::connection::v1::{
QueryClientConnectionsRequest, QueryConnectionsRequest,
};

use crate::config::ChainConfig;
use crate::connection::ConnectionMsgType;
use crate::error::{Error, Kind};
use crate::event::monitor::EventBatch;
use crate::keyring::{KeyEntry, KeyRing};
use crate::light_client::LightClient;
use crate::{config::ChainConfig, event::monitor::EventReceiver};

pub(crate) mod cosmos;
pub mod counterparty;
Expand Down Expand Up @@ -91,13 +89,7 @@ pub trait Chain: Sized {
fn init_event_monitor(
&self,
rt: Arc<TokioRuntime>,
) -> Result<
(
channel::Receiver<EventBatch>,
Option<thread::JoinHandle<()>>,
),
Error,
>;
) -> Result<(EventReceiver, Option<thread::JoinHandle<()>>), Error>;

/// Returns the chain's identifier
fn id(&self) -> &ChainId;
Expand Down
11 changes: 2 additions & 9 deletions relayer/src/chain/cosmos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use std::{
use anomaly::fail;
use bech32::{ToBase32, Variant};
use bitcoin::hashes::hex::ToHex;
use crossbeam_channel as channel;
use prost::Message;
use prost_types::Any;
use tendermint::abci::Path as TendermintABCIPath;
Expand Down Expand Up @@ -65,7 +64,7 @@ use ibc_proto::ibc::core::connection::v1::{
use crate::chain::QueryResponse;
use crate::config::ChainConfig;
use crate::error::{Error, Kind};
use crate::event::monitor::{EventBatch, EventMonitor};
use crate::event::monitor::{EventMonitor, EventReceiver};
use crate::keyring::{KeyEntry, KeyRing, Store};
use crate::light_client::tendermint::LightClient as TmLightClient;
use crate::light_client::LightClient;
Expand Down Expand Up @@ -361,13 +360,7 @@ impl Chain for CosmosSdkChain {
fn init_event_monitor(
&self,
rt: Arc<TokioRuntime>,
) -> Result<
(
channel::Receiver<EventBatch>,
Option<thread::JoinHandle<()>>,
),
Error,
> {
) -> Result<(EventReceiver, Option<thread::JoinHandle<()>>), Error> {
crate::time!("init_event_monitor");

let (mut event_monitor, event_receiver) = EventMonitor::new(
Expand Down
11 changes: 7 additions & 4 deletions relayer/src/chain/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,16 @@ use ibc_proto::ibc::core::client::v1::QueryConsensusStatesRequest;
use ibc_proto::ibc::core::commitment::v1::MerkleProof;
pub use prod::ProdChainHandle;

use crate::connection::ConnectionMsgType;
use crate::keyring::KeyEntry;
use crate::{error::Error, event::monitor::EventBatch};
use crate::{
connection::ConnectionMsgType,
error::Error,
event::monitor::{EventBatch, Result as MonitorResult},
keyring::KeyEntry,
};

mod prod;

pub type Subscription = channel::Receiver<Arc<EventBatch>>;
pub type Subscription = channel::Receiver<Arc<MonitorResult<EventBatch>>>;

pub type ReplyTo<T> = channel::Sender<Result<T, Error>>;
pub type Reply<T> = channel::Receiver<Result<T, Error>>;
Expand Down
10 changes: 2 additions & 8 deletions relayer/src/chain/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use ibc_proto::ibc::core::connection::v1::{
use crate::chain::Chain;
use crate::config::ChainConfig;
use crate::error::{Error, Kind};
use crate::event::monitor::EventBatch;
use crate::event::monitor::EventReceiver;
use crate::keyring::{KeyEntry, KeyRing};
use crate::light_client::{mock::LightClient as MockLightClient, LightClient};

Expand Down Expand Up @@ -79,13 +79,7 @@ impl Chain for MockChain {
fn init_event_monitor(
&self,
_rt: Arc<Runtime>,
) -> Result<
(
channel::Receiver<EventBatch>,
Option<thread::JoinHandle<()>>,
),
Error,
> {
) -> Result<(EventReceiver, Option<thread::JoinHandle<()>>), Error> {
let (_, rx) = channel::unbounded();
Ok((rx, None))
}
Expand Down
20 changes: 14 additions & 6 deletions relayer/src/chain/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ use crate::{
config::ChainConfig,
connection::ConnectionMsgType,
error::{Error, Kind},
event::{bus::EventBus, monitor::EventBatch},
event::{
bus::EventBus,
monitor::{EventBatch, EventReceiver, Result as MonitorResult},
},
keyring::KeyEntry,
light_client::LightClient,
};
Expand Down Expand Up @@ -69,10 +72,10 @@ pub struct ChainRuntime<C: Chain> {
request_receiver: channel::Receiver<ChainRequest>,

/// An event bus, for broadcasting events that this runtime receives (via `event_receiver`) to subscribers
event_bus: EventBus<Arc<EventBatch>>,
event_bus: EventBus<Arc<MonitorResult<EventBatch>>>,

/// Receiver channel from the event bus
event_receiver: channel::Receiver<EventBatch>,
event_receiver: EventReceiver,

/// A handle to the light client
light_client: Box<dyn LightClient<C>>,
Expand Down Expand Up @@ -111,7 +114,7 @@ impl<C: Chain + Send + 'static> ChainRuntime<C> {
fn init(
chain: C,
light_client: Box<dyn LightClient<C>>,
event_receiver: channel::Receiver<EventBatch>,
event_receiver: EventReceiver,
rt: Arc<TokioRuntime>,
) -> (Box<dyn ChainHandle>, thread::JoinHandle<()>) {
let chain_runtime = Self::new(chain, light_client, event_receiver, rt);
Expand All @@ -120,7 +123,12 @@ impl<C: Chain + Send + 'static> ChainRuntime<C> {
let handle = chain_runtime.handle();

// Spawn the runtime & return
let thread = thread::spawn(move || chain_runtime.run().unwrap());
let id = handle.id();
let thread = thread::spawn(move || {
if let Err(e) = chain_runtime.run() {
error!("failed to start runtime for chain '{}': {}", id, e);
}
});

(handle, thread)
}
Expand All @@ -129,7 +137,7 @@ impl<C: Chain + Send + 'static> ChainRuntime<C> {
fn new(
chain: C,
light_client: Box<dyn LightClient<C>>,
event_receiver: channel::Receiver<EventBatch>,
event_receiver: EventReceiver,
rt: Arc<TokioRuntime>,
) -> Self {
let (request_sender, request_receiver) = channel::unbounded::<ChainRequest>();
Expand Down
Loading

0 comments on commit 20d8fff

Please sign in to comment.