Skip to content

Commit

Permalink
Improve spawning of supervisor worker tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
soareschen committed Dec 6, 2021
1 parent d18cc90 commit 0ed7526
Show file tree
Hide file tree
Showing 24 changed files with 1,421 additions and 1,400 deletions.
23 changes: 12 additions & 11 deletions relayer-cli/src/commands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ use crossbeam_channel::Sender;
use ibc_relayer::chain::handle::{ChainHandle, ProdChainHandle};
use ibc_relayer::config::reload::ConfigReload;
use ibc_relayer::config::Config;
use ibc_relayer::registry::SharedRegistry;
use ibc_relayer::rest;
use ibc_relayer::supervisor::{cmd::SupervisorCmd, Supervisor};
use ibc_relayer::supervisor::{cmd::SupervisorCmd, spawn_supervisor, SupervisorHandle};

use crate::conclude::json;
use crate::conclude::Output;
Expand All @@ -24,16 +25,17 @@ impl Runnable for StartCmd {
let config = (*app_config()).clone();
let config = Arc::new(RwLock::new(config));

let (mut supervisor, tx_cmd) = make_supervisor::<ProdChainHandle>(config.clone())
.unwrap_or_else(|e| {
let supervisor_handle =
make_supervisor::<ProdChainHandle>(config.clone()).unwrap_or_else(|e| {
Output::error(format!("Hermes failed to start, last error: {}", e)).exit();
unreachable!()
});

match crate::config::config_path() {
Some(config_path) => {
let reload = ConfigReload::new(config_path, config, tx_cmd.clone());
register_signals(reload, tx_cmd).unwrap_or_else(|e| {
let reload =
ConfigReload::new(config_path, config, supervisor_handle.sender.clone());
register_signals(reload, supervisor_handle.sender.clone()).unwrap_or_else(|e| {
warn!("failed to install signal handler: {}", e);
});
}
Expand All @@ -43,10 +45,8 @@ impl Runnable for StartCmd {
};

info!("Hermes has started");
match supervisor.run() {
Ok(()) => Output::success_msg("done").exit(),
Err(e) => Output::error(format!("Hermes failed to start, last error: {}", e)).exit(),
}

supervisor_handle.wait();
}
}

Expand Down Expand Up @@ -175,10 +175,11 @@ fn spawn_telemetry_server(

fn make_supervisor<Chain: ChainHandle + 'static>(
config: Arc<RwLock<Config>>,
) -> Result<(Supervisor<Chain>, Sender<SupervisorCmd>), Box<dyn Error + Send + Sync>> {
) -> Result<SupervisorHandle, Box<dyn Error + Send + Sync>> {
let registry = SharedRegistry::<Chain>::new(config.clone());
spawn_telemetry_server(&config)?;

let rest = spawn_rest_server(&config);

Ok(Supervisor::new(config, rest))
Ok(spawn_supervisor(config, registry, rest, true)?)
}
11 changes: 6 additions & 5 deletions relayer/src/link/relay_path.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use alloc::collections::BTreeMap as HashMap;
use alloc::collections::VecDeque;
use core::cell::RefCell;
use core::fmt;
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::Instant;

Expand Down Expand Up @@ -49,6 +49,7 @@ use crate::link::pending::PendingTxs;
use crate::link::relay_sender::{AsyncReply, SubmitReply};
use crate::link::relay_summary::RelaySummary;
use crate::link::{pending, relay_sender};
use crate::util::lock::LockExt;
use crate::util::queue::Queue;

const MAX_RETRIES: usize = 5;
Expand All @@ -65,7 +66,7 @@ pub struct RelayPath<ChainA: ChainHandle, ChainB: ChainHandle> {
// Marks whether this path has already cleared pending packets.
// Packets should be cleared once (at startup), then this
// flag turns to `false`.
clear_packets: RefCell<bool>,
clear_packets: Arc<RwLock<bool>>,

// Operational data, targeting both the source and destination chain.
// These vectors of operational data are ordered decreasingly by
Expand Down Expand Up @@ -119,7 +120,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> RelayPath<ChainA, ChainB> {
dst_channel_id: dst_channel_id.clone(),
dst_port_id: dst_port_id.clone(),

clear_packets: RefCell::new(true),
clear_packets: Arc::new(RwLock::new(true)),
src_operational_data: Queue::new(),
dst_operational_data: Queue::new(),

Expand Down Expand Up @@ -305,7 +306,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> RelayPath<ChainA, ChainB> {
}

fn should_clear_packets(&self) -> bool {
*self.clear_packets.borrow()
*self.clear_packets.acquire_read()
}

/// Clears any packets that were sent before `height`, either if the `clear_packets` flag
Expand All @@ -318,7 +319,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> RelayPath<ChainA, ChainB> {
if self.should_clear_packets() || force {
// Disable further clearing of old packets by default.
// Clearing may still happen: upon new blocks, when `force = true`.
self.clear_packets.replace(false);
*self.clear_packets.acquire_write() = false;

let clear_height = height
.map(|h| h.decrement().map_err(|e| LinkError::decrement_height(h, e)))
Expand Down
2 changes: 1 addition & 1 deletion relayer/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ use tracing::{trace, warn};

use ibc::core::ics24_host::identifier::ChainId;

use crate::util::lock::RwArc;
use crate::{
chain::{handle::ChainHandle, runtime::ChainRuntime, CosmosSdkChain},
config::Config,
error::Error as RelayerError,
supervisor::RwArc,
};

define_error! {
Expand Down
Loading

0 comments on commit 0ed7526

Please sign in to comment.