Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make supervisor more resilient to node going down #903

Merged
merged 16 commits into from
May 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Jongwhan Lee (@leejw51crypto) ([#878]).
- [ibc-relayer]
- Change the default for client creation to allow governance recovery in case of expiration or misbehaviour ([#785])
- Use a single supervisor to subscribe to all configured chains ([#862])
- The relayer is now more resilient to a node not being up or going down, and will attempt to reconnect ([#871])

### BUG FIXES

Expand Down Expand Up @@ -58,6 +59,7 @@ Jongwhan Lee (@leejw51crypto) ([#878]).
[#862]: https://github.com/informalsystems/ibc-rs/issues/862
[#863]: https://github.com/informalsystems/ibc-rs/issues/863
[#869]: https://github.com/informalsystems/ibc-rs/issues/869
[#871]: https://github.com/informalsystems/ibc-rs/issues/871
[#878]: https://github.com/informalsystems/ibc-rs/issues/878
[#909]: https://github.com/informalsystems/ibc-rs/issues/909
[#873]: https://github.com/informalsystems/ibc-rs/issues/873
Expand Down
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions relayer-cli/src/commands/listen.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
use std::{ops::Deref, sync::Arc, thread};

use abscissa_core::{application::fatal_error, error::BoxError, Command, Options, Runnable};
use crossbeam_channel as channel;
use itertools::Itertools;
use tokio::runtime::Runtime as TokioRuntime;

use tendermint_rpc::query::{EventType, Query};

use ibc::ics24_host::identifier::ChainId;
use ibc_relayer::{config::ChainConfig, event::monitor::*};
use ibc_relayer::{
config::ChainConfig,
event::monitor::{EventMonitor, EventReceiver},
};

use crate::prelude::*;

Expand Down Expand Up @@ -73,7 +75,7 @@ fn subscribe(
chain_config: &ChainConfig,
queries: Vec<Query>,
rt: Arc<TokioRuntime>,
) -> Result<(EventMonitor, channel::Receiver<EventBatch>), BoxError> {
) -> Result<(EventMonitor, EventReceiver), BoxError> {
let (mut event_monitor, rx) = EventMonitor::new(
chain_config.id.clone(),
chain_config.websocket_addr.clone(),
Expand Down
59 changes: 34 additions & 25 deletions relayer-cli/src/commands/misbehaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use ibc::ics02_client::events::UpdateClient;
use ibc::ics02_client::height::Height;
use ibc::ics24_host::identifier::{ChainId, ClientId};
use ibc_relayer::chain::handle::ChainHandle;
use ibc_relayer::event::monitor::UnwrapOrClone;
use ibc_relayer::foreign_client::{ForeignClient, MisbehaviourResults};

use crate::application::CliApp;
Expand Down Expand Up @@ -52,32 +53,40 @@ pub fn monitor_misbehaviour(
let subscription = chain.subscribe()?;

// check previous updates that may have been missed
misbehaviour_handling(chain.clone(), config, client_id, None)?;
misbehaviour_handling(chain.clone(), config, client_id.clone(), None)?;

// process update client events
while let Ok(event_batch) = subscription.recv() {
for event in event_batch.events.iter() {
match event {
IbcEvent::UpdateClient(update) => {
debug!("{:?}", update);
misbehaviour_handling(
chain.clone(),
config,
update.client_id(),
Some(update.clone()),
)?;
let event_batch = event_batch.unwrap_or_clone();
match event_batch {
Ok(event_batch) => {
for event in event_batch.events {
match event {
IbcEvent::UpdateClient(update) => {
debug!("{:?}", update);
misbehaviour_handling(
chain.clone(),
config,
update.client_id().clone(),
Some(update),
)?;
}

IbcEvent::CreateClient(_create) => {
// TODO - get header from full node, consensus state from chain, compare
}

IbcEvent::ClientMisbehaviour(ref _misbehaviour) => {
// TODO - submit misbehaviour to the witnesses (our full node)
return Ok(Some(event));
}

_ => {}
}
}

IbcEvent::CreateClient(_create) => {
// TODO - get header from full node, consensus state from chain, compare
}

IbcEvent::ClientMisbehaviour(_misbehaviour) => {
// TODO - submit misbehaviour to the witnesses (our full node)
return Ok(Some(event.clone()));
}

_ => {}
}
Err(e) => {
dbg!(e);
}
}
}
Expand All @@ -88,11 +97,11 @@ pub fn monitor_misbehaviour(
fn misbehaviour_handling(
chain: Box<dyn ChainHandle>,
config: &config::Reader<CliApp>,
client_id: &ClientId,
client_id: ClientId,
update: Option<UpdateClient>,
) -> Result<(), BoxError> {
let client_state = chain
.query_client_state(client_id, Height::zero())
.query_client_state(&client_id, Height::zero())
.map_err(|e| format!("could not query client state for {}: {}", client_id, e))?;

if client_state.is_frozen() {
Expand All @@ -108,7 +117,7 @@ fn misbehaviour_handling(
)
})?;

let client = ForeignClient::restore(client_id, chain.clone(), counterparty_chain.clone());
let client = ForeignClient::restore(&client_id, chain.clone(), counterparty_chain.clone());
let result = client.detect_misbehaviour_and_submit_evidence(update);
if let MisbehaviourResults::EvidenceSubmitted(events) = result {
info!("evidence submission result {:?}", events);
Expand Down
1 change: 1 addition & 0 deletions relayer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ dyn-clonable = "0.9.0"
tonic = "0.4"
dirs-next = "2.0.0"
dyn-clone = "1.0.3"
retry = { version = "1.2.1", default-features = false }

[dependencies.tendermint]
version = "=0.19.0"
Expand Down
12 changes: 2 additions & 10 deletions relayer/src/chain.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::{sync::Arc, thread};

use crossbeam_channel as channel;
use prost_types::Any;
use tendermint::block::Height;
use tokio::runtime::Runtime as TokioRuntime;
Expand Down Expand Up @@ -34,12 +33,11 @@ use ibc_proto::ibc::core::connection::v1::{
QueryClientConnectionsRequest, QueryConnectionsRequest,
};

use crate::config::ChainConfig;
use crate::connection::ConnectionMsgType;
use crate::error::{Error, Kind};
use crate::event::monitor::EventBatch;
use crate::keyring::{KeyEntry, KeyRing};
use crate::light_client::LightClient;
use crate::{config::ChainConfig, event::monitor::EventReceiver};

pub(crate) mod cosmos;
pub mod counterparty;
Expand Down Expand Up @@ -91,13 +89,7 @@ pub trait Chain: Sized {
fn init_event_monitor(
&self,
rt: Arc<TokioRuntime>,
) -> Result<
(
channel::Receiver<EventBatch>,
Option<thread::JoinHandle<()>>,
),
Error,
>;
) -> Result<(EventReceiver, Option<thread::JoinHandle<()>>), Error>;

/// Returns the chain's identifier
fn id(&self) -> &ChainId;
Expand Down
11 changes: 2 additions & 9 deletions relayer/src/chain/cosmos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use std::{
use anomaly::fail;
use bech32::{ToBase32, Variant};
use bitcoin::hashes::hex::ToHex;
use crossbeam_channel as channel;
use prost::Message;
use prost_types::Any;
use tendermint::abci::Path as TendermintABCIPath;
Expand Down Expand Up @@ -65,7 +64,7 @@ use ibc_proto::ibc::core::connection::v1::{
use crate::chain::QueryResponse;
use crate::config::ChainConfig;
use crate::error::{Error, Kind};
use crate::event::monitor::{EventBatch, EventMonitor};
use crate::event::monitor::{EventMonitor, EventReceiver};
use crate::keyring::{KeyEntry, KeyRing, Store};
use crate::light_client::tendermint::LightClient as TmLightClient;
use crate::light_client::LightClient;
Expand Down Expand Up @@ -361,13 +360,7 @@ impl Chain for CosmosSdkChain {
fn init_event_monitor(
&self,
rt: Arc<TokioRuntime>,
) -> Result<
(
channel::Receiver<EventBatch>,
Option<thread::JoinHandle<()>>,
),
Error,
> {
) -> Result<(EventReceiver, Option<thread::JoinHandle<()>>), Error> {
crate::time!("init_event_monitor");

let (mut event_monitor, event_receiver) = EventMonitor::new(
Expand Down
11 changes: 7 additions & 4 deletions relayer/src/chain/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,16 @@ use ibc_proto::ibc::core::client::v1::QueryConsensusStatesRequest;
use ibc_proto::ibc::core::commitment::v1::MerkleProof;
pub use prod::ProdChainHandle;

use crate::connection::ConnectionMsgType;
use crate::keyring::KeyEntry;
use crate::{error::Error, event::monitor::EventBatch};
use crate::{
connection::ConnectionMsgType,
error::Error,
event::monitor::{EventBatch, Result as MonitorResult},
keyring::KeyEntry,
};

mod prod;

pub type Subscription = channel::Receiver<Arc<EventBatch>>;
pub type Subscription = channel::Receiver<Arc<MonitorResult<EventBatch>>>;

pub type ReplyTo<T> = channel::Sender<Result<T, Error>>;
pub type Reply<T> = channel::Receiver<Result<T, Error>>;
Expand Down
10 changes: 2 additions & 8 deletions relayer/src/chain/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use ibc_proto::ibc::core::connection::v1::{
use crate::chain::Chain;
use crate::config::ChainConfig;
use crate::error::{Error, Kind};
use crate::event::monitor::EventBatch;
use crate::event::monitor::EventReceiver;
use crate::keyring::{KeyEntry, KeyRing};
use crate::light_client::{mock::LightClient as MockLightClient, LightClient};

Expand Down Expand Up @@ -79,13 +79,7 @@ impl Chain for MockChain {
fn init_event_monitor(
&self,
_rt: Arc<Runtime>,
) -> Result<
(
channel::Receiver<EventBatch>,
Option<thread::JoinHandle<()>>,
),
Error,
> {
) -> Result<(EventReceiver, Option<thread::JoinHandle<()>>), Error> {
let (_, rx) = channel::unbounded();
Ok((rx, None))
}
Expand Down
20 changes: 14 additions & 6 deletions relayer/src/chain/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ use crate::{
config::ChainConfig,
connection::ConnectionMsgType,
error::{Error, Kind},
event::{bus::EventBus, monitor::EventBatch},
event::{
bus::EventBus,
monitor::{EventBatch, EventReceiver, Result as MonitorResult},
},
keyring::KeyEntry,
light_client::LightClient,
};
Expand Down Expand Up @@ -69,10 +72,10 @@ pub struct ChainRuntime<C: Chain> {
request_receiver: channel::Receiver<ChainRequest>,

/// An event bus, for broadcasting events that this runtime receives (via `event_receiver`) to subscribers
event_bus: EventBus<Arc<EventBatch>>,
event_bus: EventBus<Arc<MonitorResult<EventBatch>>>,

/// Receiver channel from the event bus
event_receiver: channel::Receiver<EventBatch>,
event_receiver: EventReceiver,

/// A handle to the light client
light_client: Box<dyn LightClient<C>>,
Expand Down Expand Up @@ -111,7 +114,7 @@ impl<C: Chain + Send + 'static> ChainRuntime<C> {
fn init(
chain: C,
light_client: Box<dyn LightClient<C>>,
event_receiver: channel::Receiver<EventBatch>,
event_receiver: EventReceiver,
rt: Arc<TokioRuntime>,
) -> (Box<dyn ChainHandle>, thread::JoinHandle<()>) {
let chain_runtime = Self::new(chain, light_client, event_receiver, rt);
Expand All @@ -120,7 +123,12 @@ impl<C: Chain + Send + 'static> ChainRuntime<C> {
let handle = chain_runtime.handle();

// Spawn the runtime & return
let thread = thread::spawn(move || chain_runtime.run().unwrap());
let id = handle.id();
let thread = thread::spawn(move || {
if let Err(e) = chain_runtime.run() {
error!("failed to start runtime for chain '{}': {}", id, e);
}
});

(handle, thread)
}
Expand All @@ -129,7 +137,7 @@ impl<C: Chain + Send + 'static> ChainRuntime<C> {
fn new(
chain: C,
light_client: Box<dyn LightClient<C>>,
event_receiver: channel::Receiver<EventBatch>,
event_receiver: EventReceiver,
rt: Arc<TokioRuntime>,
) -> Self {
let (request_sender, request_receiver) = channel::unbounded::<ChainRequest>();
Expand Down
Loading