Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve spawning of supervisor worker tasks #1656

Merged
merged 6 commits into from
Dec 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions modules/src/core/ics24_host/identifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,9 @@ impl PartialEq<str> for ConnectionId {
pub struct PortId(String);

impl PortId {
pub fn unsafe_new(id: &str) -> Self {
Self(id.to_string())
/// Infallible creation of the well-known transfer port
pub fn transfer() -> Self {
Self("transfer".to_string())
}

/// Get this identifier as a borrowed `&str`
Expand Down
23 changes: 12 additions & 11 deletions relayer-cli/src/commands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ use crossbeam_channel::Sender;
use ibc_relayer::chain::handle::{ChainHandle, ProdChainHandle};
use ibc_relayer::config::reload::ConfigReload;
use ibc_relayer::config::Config;
use ibc_relayer::registry::SharedRegistry;
use ibc_relayer::rest;
use ibc_relayer::supervisor::{cmd::SupervisorCmd, Supervisor};
use ibc_relayer::supervisor::{cmd::SupervisorCmd, spawn_supervisor, SupervisorHandle};

use crate::conclude::json;
use crate::conclude::Output;
Expand All @@ -24,16 +25,17 @@ impl Runnable for StartCmd {
let config = (*app_config()).clone();
let config = Arc::new(RwLock::new(config));

let (mut supervisor, tx_cmd) = make_supervisor::<ProdChainHandle>(config.clone())
.unwrap_or_else(|e| {
let supervisor_handle =
make_supervisor::<ProdChainHandle>(config.clone()).unwrap_or_else(|e| {
Output::error(format!("Hermes failed to start, last error: {}", e)).exit();
unreachable!()
});

match crate::config::config_path() {
Some(config_path) => {
let reload = ConfigReload::new(config_path, config, tx_cmd.clone());
register_signals(reload, tx_cmd).unwrap_or_else(|e| {
let reload =
ConfigReload::new(config_path, config, supervisor_handle.sender.clone());
register_signals(reload, supervisor_handle.sender.clone()).unwrap_or_else(|e| {
warn!("failed to install signal handler: {}", e);
});
}
Expand All @@ -43,10 +45,8 @@ impl Runnable for StartCmd {
};

info!("Hermes has started");
match supervisor.run() {
Ok(()) => Output::success_msg("done").exit(),
Err(e) => Output::error(format!("Hermes failed to start, last error: {}", e)).exit(),
}

supervisor_handle.wait();
}
}

Expand Down Expand Up @@ -175,10 +175,11 @@ fn spawn_telemetry_server(

fn make_supervisor<Chain: ChainHandle + 'static>(
config: Arc<RwLock<Config>>,
) -> Result<(Supervisor<Chain>, Sender<SupervisorCmd>), Box<dyn Error + Send + Sync>> {
) -> Result<SupervisorHandle, Box<dyn Error + Send + Sync>> {
let registry = SharedRegistry::<Chain>::new(config.clone());
spawn_telemetry_server(&config)?;

let rest = spawn_rest_server(&config);

Ok(Supervisor::new(config, rest))
Ok(spawn_supervisor(config, registry, rest, true)?)
}
2 changes: 1 addition & 1 deletion relayer/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1112,7 +1112,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> Connection<ChainA, ChainB> {
}
}

fn extract_connection_id(event: &IbcEvent) -> Result<&ConnectionId, ConnectionError> {
pub fn extract_connection_id(event: &IbcEvent) -> Result<&ConnectionId, ConnectionError> {
match event {
IbcEvent::OpenInitConnection(ev) => ev.connection_id().as_ref(),
IbcEvent::OpenTryConnection(ev) => ev.connection_id().as_ref(),
Expand Down
11 changes: 6 additions & 5 deletions relayer/src/link/relay_path.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use alloc::collections::BTreeMap as HashMap;
use alloc::collections::VecDeque;
use core::cell::RefCell;
use core::fmt;
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::Instant;

Expand Down Expand Up @@ -49,6 +49,7 @@ use crate::link::pending::PendingTxs;
use crate::link::relay_sender::{AsyncReply, SubmitReply};
use crate::link::relay_summary::RelaySummary;
use crate::link::{pending, relay_sender};
use crate::util::lock::LockExt;
use crate::util::queue::Queue;

const MAX_RETRIES: usize = 5;
Expand All @@ -65,7 +66,7 @@ pub struct RelayPath<ChainA: ChainHandle, ChainB: ChainHandle> {
// Marks whether this path has already cleared pending packets.
// Packets should be cleared once (at startup), then this
// flag turns to `false`.
clear_packets: RefCell<bool>,
clear_packets: Arc<RwLock<bool>>,

// Operational data, targeting both the source and destination chain.
// These vectors of operational data are ordered decreasingly by
Expand Down Expand Up @@ -119,7 +120,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> RelayPath<ChainA, ChainB> {
dst_channel_id: dst_channel_id.clone(),
dst_port_id: dst_port_id.clone(),

clear_packets: RefCell::new(true),
clear_packets: Arc::new(RwLock::new(true)),
src_operational_data: Queue::new(),
dst_operational_data: Queue::new(),

Expand Down Expand Up @@ -305,7 +306,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> RelayPath<ChainA, ChainB> {
}

fn should_clear_packets(&self) -> bool {
*self.clear_packets.borrow()
*self.clear_packets.acquire_read()
}

/// Clears any packets that were sent before `height`, either if the `clear_packets` flag
Expand All @@ -318,7 +319,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> RelayPath<ChainA, ChainB> {
if self.should_clear_packets() || force {
// Disable further clearing of old packets by default.
// Clearing may still happen: upon new blocks, when `force = true`.
self.clear_packets.replace(false);
*self.clear_packets.acquire_write() = false;

let clear_height = height
.map(|h| h.decrement().map_err(|e| LinkError::decrement_height(h, e)))
Expand Down
2 changes: 1 addition & 1 deletion relayer/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ use tracing::{trace, warn};

use ibc::core::ics24_host::identifier::ChainId;

use crate::util::lock::RwArc;
use crate::{
chain::{handle::ChainHandle, runtime::ChainRuntime, CosmosSdkChain},
config::Config,
error::Error as RelayerError,
supervisor::RwArc,
};

define_error! {
Expand Down
Loading