Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Cleaner binary shutdown system #8284

Merged
merged 4 commits into from
Apr 4, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ pub struct Client {
registrar_address: Option<Address>,

/// A closure to call when we want to restart the client
exit_handler: Mutex<Option<Box<Fn(bool, Option<String>) + 'static + Send>>>,
exit_handler: Mutex<Option<Box<Fn(String) + 'static + Send>>>,

importer: Importer,
}
Expand Down Expand Up @@ -826,8 +826,11 @@ impl Client {
self.notify.write().push(Arc::downgrade(&target));
}

/// Set a closure to call when we want to restart the client
pub fn set_exit_handler<F>(&self, f: F) where F: Fn(bool, Option<String>) + 'static + Send {
/// Set a closure to call when the client wants to be restarted.
///
/// The parameter passed to the callback is the name of the new chain spec to use after
/// the restart.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

finally a proper description of a function! 👍

pub fn set_exit_handler<F>(&self, f: F) where F: Fn(String) + 'static + Send {
*self.exit_handler.lock() = Some(Box::new(f));
}

Expand Down Expand Up @@ -1611,7 +1614,7 @@ impl BlockChainClient for Client {
return;
}
if let Some(ref h) = *self.exit_handler.lock() {
(*h)(true, Some(new_spec_name));
(*h)(new_spec_name);
} else {
warn!("Not hypervised; cannot change chain.");
}
Expand Down
166 changes: 93 additions & 73 deletions parity/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.

use std::any::Any;
use std::fmt;
use std::sync::{Arc, Weak};
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -182,7 +183,7 @@ impl ::local_store::NodeInfo for FullNodeInfo {
type LightClient = ::light::client::Client<::light_helpers::EpochFetch>;

// helper for light execution.
fn execute_light_impl(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) -> Result<((bool, Option<String>), Weak<LightClient>), String> {
fn execute_light_impl(cmd: RunCmd, logger: Arc<RotatingLogger>) -> Result<RunningClient, String> {
use light::client as light_client;
use ethsync::{LightSyncParams, LightSync, ManageNetwork};
use parking_lot::{Mutex, RwLock};
Expand Down Expand Up @@ -260,7 +261,7 @@ fn execute_light_impl(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger

let service = light_client::Service::start(config, &spec, fetch, db, cache.clone())
.map_err(|e| format!("Error starting light client: {}", e))?;
let client = service.client();
let client = service.client().clone();
let txq = Arc::new(RwLock::new(::light::transaction_queue::TransactionQueue::default()));
let provider = ::light::provider::LightProvider::new(client.clone(), txq.clone());

Expand Down Expand Up @@ -402,10 +403,10 @@ fn execute_light_impl(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger
};

// start rpc servers
let _ws_server = rpc::new_ws(cmd.ws_conf, &dependencies)?;
let _http_server = rpc::new_http("HTTP JSON-RPC", "jsonrpc", cmd.http_conf.clone(), &dependencies, dapps_middleware)?;
let _ipc_server = rpc::new_ipc(cmd.ipc_conf, &dependencies)?;
let _ui_server = rpc::new_http("Parity Wallet (UI)", "ui", cmd.ui_conf.clone().into(), &dependencies, ui_middleware)?;
let ws_server = rpc::new_ws(cmd.ws_conf, &dependencies)?;
let http_server = rpc::new_http("HTTP JSON-RPC", "jsonrpc", cmd.http_conf.clone(), &dependencies, dapps_middleware)?;
let ipc_server = rpc::new_ipc(cmd.ipc_conf, &dependencies)?;
let ui_server = rpc::new_http("Parity Wallet (UI)", "ui", cmd.ui_conf.clone().into(), &dependencies, ui_middleware)?;

// the informant
let informant = Arc::new(Informant::new(
Expand All @@ -421,17 +422,18 @@ fn execute_light_impl(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger

service.register_handler(informant.clone()).map_err(|_| "Unable to register informant handler".to_owned())?;

// wait for ctrl-c and then shut down the informant.
let res = wait_for_exit(None, None, can_restart);
informant.shutdown();

// Create a weak reference to the client so that we can wait on shutdown until it is dropped
let weak_client = Arc::downgrade(&client);

Ok((res, weak_client))
Ok(RunningClient::Light {
informant,
client,
keep_alive: Box::new((service, ws_server, http_server, ipc_server, ui_server)),
})
}

pub fn execute_impl(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) -> Result<((bool, Option<String>), Weak<Client>), String> {
fn execute_impl<Cr, Rr>(cmd: RunCmd, logger: Arc<RotatingLogger>, on_client_rq: Cr,
on_updater_rq: Rr) -> Result<RunningClient, String>
where Cr: Fn(String) + 'static + Send,
Rr: Fn() + 'static + Send
Copy link
Collaborator

@debris debris Apr 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you know why doesn't updater care about the spec? isn't it a bug?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know how the updater works unfortunately

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're right, it's not related to this pr. I'll ask andre

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andresilva told me that everything's good :)

I think it's fine because the updater gets all its info from the operations contract, so it doesn't need the chainspec
so only the client needs to be updated with the new spec, and the updater will use the updated client to interact with the operations contract

{
// load spec
let spec = cmd.spec.spec(&cmd.dirs.cache)?;

Expand Down Expand Up @@ -854,7 +856,7 @@ pub fn execute_impl(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>)
});

// the watcher must be kept alive.
let _watcher = match cmd.no_periodic_snapshot {
let watcher = match cmd.no_periodic_snapshot {
true => None,
false => {
let sync = sync_provider.clone();
Expand All @@ -881,23 +883,58 @@ pub fn execute_impl(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>)
open_dapp(&cmd.dapps_conf, &cmd.http_conf, &dapp)?;
}

// Create a weak reference to the client so that we can wait on shutdown until it is dropped
let weak_client = Arc::downgrade(&client);

// Handle exit
let restart = wait_for_exit(Some(updater), Some(client), can_restart);
client.set_exit_handler(on_client_rq);
updater.set_exit_handler(on_updater_rq);

info!("Finishing work, please wait...");

// drop this stuff as soon as exit detected.
drop((ws_server, http_server, ipc_server, ui_server, secretstore_key_server, ipfs_server, event_loop));
Ok(RunningClient::Full {
informant,
client,
keep_alive: Box::new((watcher, service, updater, ws_server, http_server, ipc_server, ui_server, secretstore_key_server, ipfs_server, event_loop)),
})
}

// to make sure timer does not spawn requests while shutdown is in progress
informant.shutdown();
// just Arc is dropping here, to allow other reference release in its default time
drop(informant);
enum RunningClient {
Light {
informant: Arc<Informant<LightNodeInformantData>>,
client: Arc<LightClient>,
keep_alive: Box<Any>,
},
Full {
informant: Arc<Informant<FullNodeInformantData>>,
client: Arc<Client>,
keep_alive: Box<Any>,
},
}

Ok((restart, weak_client))
impl RunningClient {
fn shutdown(self) {
match self {
RunningClient::Light { informant, client, keep_alive } => {
// Create a weak reference to the client so that we can wait on shutdown
// until it is dropped
let weak_client = Arc::downgrade(&client);
drop(keep_alive);
informant.shutdown();
drop(informant);
drop(client);
wait_for_drop(weak_client);
},
RunningClient::Full { informant, client, keep_alive } => {
info!("Finishing work, please wait...");
// Create a weak reference to the client so that we can wait on shutdown
// until it is dropped
let weak_client = Arc::downgrade(&client);
// drop this stuff as soon as exit detected.
drop(keep_alive);
// to make sure timer does not spawn requests while shutdown is in progress
informant.shutdown();
// just Arc is dropping here, to allow other reference release in its default time
drop(informant);
drop(client);
wait_for_drop(weak_client);
}
}
}
}

pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) -> Result<(bool, Option<String>), String> {
Expand All @@ -917,18 +954,34 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) -> R
// increase max number of open files
raise_fd_limit();

fn wait<T>(res: Result<((bool, Option<String>), Weak<T>), String>) -> Result<(bool, Option<String>), String> {
res.map(|(restart, weak_client)| {
wait_for_drop(weak_client);
restart
})
}
let exit = Arc::new((Mutex::new((false, None)), Condvar::new()));

if cmd.light {
wait(execute_light_impl(cmd, can_restart, logger))
let running_client = if cmd.light {
execute_light_impl(cmd, logger)?
} else if can_restart {
let e1 = exit.clone();
let e2 = exit.clone();
execute_impl(cmd, logger,
move |new_chain: String| { *e1.0.lock() = (true, Some(new_chain)); e1.1.notify_all(); },
move || { *e2.0.lock() = (true, None); e2.1.notify_all(); })?
} else {
wait(execute_impl(cmd, can_restart, logger))
}
trace!(target: "mode", "Not hypervised: not setting exit handlers.");
execute_impl(cmd, logger, move |_| {}, move || {})?
};

// Handle possible exits
CtrlC::set_handler({
let e = exit.clone();
move || { e.1.notify_all(); }
});

// Wait for signal
let mut l = exit.0.lock();
let _ = exit.1.wait(&mut l);

running_client.shutdown();

Ok(l.clone())
}

#[cfg(not(windows))]
Expand Down Expand Up @@ -1029,39 +1082,6 @@ fn build_create_account_hint(spec: &SpecType, keys: &str) -> String {
format!("You can create an account via RPC, UI or `parity account new --chain {} --keys-path {}`.", spec, keys)
}

fn wait_for_exit(
updater: Option<Arc<Updater>>,
client: Option<Arc<Client>>,
can_restart: bool
) -> (bool, Option<String>) {
let exit = Arc::new((Mutex::new((false, None)), Condvar::new()));

// Handle possible exits
let e = exit.clone();
CtrlC::set_handler(move || { e.1.notify_all(); });

if can_restart {
if let Some(updater) = updater {
// Handle updater wanting to restart us
let e = exit.clone();
updater.set_exit_handler(move || { *e.0.lock() = (true, None); e.1.notify_all(); });
}

if let Some(client) = client {
// Handle updater wanting to restart us
let e = exit.clone();
client.set_exit_handler(move |restart, new_chain: Option<String>| { *e.0.lock() = (restart, new_chain); e.1.notify_all(); });
}
} else {
trace!(target: "mode", "Not hypervised: not setting exit handlers.");
}

// Wait for signal
let mut l = exit.0.lock();
let _ = exit.1.wait(&mut l);
l.clone()
}

fn wait_for_drop<T>(w: Weak<T>) {
let sleep_duration = Duration::from_secs(1);
let warn_timeout = Duration::from_secs(60);
Expand Down