Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve spawning of supervisor worker tasks #1656

Merged
merged 6 commits into from
Dec 16, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions relayer-cli/src/commands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ use crossbeam_channel::Sender;
use ibc_relayer::chain::handle::{ChainHandle, ProdChainHandle};
use ibc_relayer::config::reload::ConfigReload;
use ibc_relayer::config::Config;
use ibc_relayer::registry::SharedRegistry;
use ibc_relayer::rest;
use ibc_relayer::supervisor::{cmd::SupervisorCmd, Supervisor};
use ibc_relayer::supervisor::{cmd::SupervisorCmd, spawn_supervisor, SupervisorHandle};

use crate::conclude::json;
use crate::conclude::Output;
Expand All @@ -24,16 +25,17 @@ impl Runnable for StartCmd {
let config = (*app_config()).clone();
let config = Arc::new(RwLock::new(config));

let (mut supervisor, tx_cmd) = make_supervisor::<ProdChainHandle>(config.clone())
.unwrap_or_else(|e| {
let supervisor_handle =
make_supervisor::<ProdChainHandle>(config.clone()).unwrap_or_else(|e| {
Output::error(format!("Hermes failed to start, last error: {}", e)).exit();
unreachable!()
});

match crate::config::config_path() {
Some(config_path) => {
let reload = ConfigReload::new(config_path, config, tx_cmd.clone());
register_signals(reload, tx_cmd).unwrap_or_else(|e| {
let reload =
ConfigReload::new(config_path, config, supervisor_handle.sender.clone());
register_signals(reload, supervisor_handle.sender.clone()).unwrap_or_else(|e| {
warn!("failed to install signal handler: {}", e);
});
}
Expand All @@ -43,10 +45,8 @@ impl Runnable for StartCmd {
};

info!("Hermes has started");
match supervisor.run() {
Ok(()) => Output::success_msg("done").exit(),
Err(e) => Output::error(format!("Hermes failed to start, last error: {}", e)).exit(),
}

supervisor_handle.wait();
}
}

Expand Down Expand Up @@ -175,10 +175,11 @@ fn spawn_telemetry_server(

fn make_supervisor<Chain: ChainHandle + 'static>(
config: Arc<RwLock<Config>>,
) -> Result<(Supervisor<Chain>, Sender<SupervisorCmd>), Box<dyn Error + Send + Sync>> {
) -> Result<SupervisorHandle, Box<dyn Error + Send + Sync>> {
let registry = SharedRegistry::<Chain>::new(config.clone());
spawn_telemetry_server(&config)?;

let rest = spawn_rest_server(&config);

Ok(Supervisor::new(config, rest))
Ok(spawn_supervisor(config, registry, rest, true)?)
}
11 changes: 6 additions & 5 deletions relayer/src/link/relay_path.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use alloc::collections::BTreeMap as HashMap;
use alloc::collections::VecDeque;
use core::cell::RefCell;
use core::fmt;
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::Instant;

Expand Down Expand Up @@ -49,6 +49,7 @@ use crate::link::pending::PendingTxs;
use crate::link::relay_sender::{AsyncReply, SubmitReply};
use crate::link::relay_summary::RelaySummary;
use crate::link::{pending, relay_sender};
use crate::util::lock::LockExt;
use crate::util::queue::Queue;

const MAX_RETRIES: usize = 5;
Expand All @@ -65,7 +66,7 @@ pub struct RelayPath<ChainA: ChainHandle, ChainB: ChainHandle> {
// Marks whether this path has already cleared pending packets.
// Packets should be cleared once (at startup), then this
// flag turns to `false`.
clear_packets: RefCell<bool>,
clear_packets: Arc<RwLock<bool>>,

// Operational data, targeting both the source and destination chain.
// These vectors of operational data are ordered decreasingly by
Expand Down Expand Up @@ -119,7 +120,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> RelayPath<ChainA, ChainB> {
dst_channel_id: dst_channel_id.clone(),
dst_port_id: dst_port_id.clone(),

clear_packets: RefCell::new(true),
clear_packets: Arc::new(RwLock::new(true)),
src_operational_data: Queue::new(),
dst_operational_data: Queue::new(),

Expand Down Expand Up @@ -305,7 +306,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> RelayPath<ChainA, ChainB> {
}

fn should_clear_packets(&self) -> bool {
*self.clear_packets.borrow()
*self.clear_packets.acquire_read()
}

/// Clears any packets that were sent before `height`, either if the `clear_packets` flag
Expand All @@ -318,7 +319,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> RelayPath<ChainA, ChainB> {
if self.should_clear_packets() || force {
// Disable further clearing of old packets by default.
// Clearing may still happen: upon new blocks, when `force = true`.
self.clear_packets.replace(false);
*self.clear_packets.acquire_write() = false;

let clear_height = height
.map(|h| h.decrement().map_err(|e| LinkError::decrement_height(h, e)))
Expand Down
2 changes: 1 addition & 1 deletion relayer/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ use tracing::{trace, warn};

use ibc::core::ics24_host::identifier::ChainId;

use crate::util::lock::RwArc;
use crate::{
chain::{handle::ChainHandle, runtime::ChainRuntime, CosmosSdkChain},
config::Config,
error::Error as RelayerError,
supervisor::RwArc,
};

define_error! {
Expand Down
Loading