From c00841e82812b2acd4c1feac9191d7adcbe51c32 Mon Sep 17 00:00:00 2001 From: Eval EXEC Date: Tue, 20 Jun 2023 17:02:17 +0800 Subject: [PATCH 01/11] feat: add `register_thread` and `new_tokio_exit_rx` --- util/stop-handler/src/lib.rs | 151 ++----------------------- util/stop-handler/src/stop_register.rs | 74 ++++++++++++ 2 files changed, 82 insertions(+), 143 deletions(-) create mode 100644 util/stop-handler/src/stop_register.rs diff --git a/util/stop-handler/src/lib.rs b/util/stop-handler/src/lib.rs index d9ceddf268..fe80839dd3 100644 --- a/util/stop-handler/src/lib.rs +++ b/util/stop-handler/src/lib.rs @@ -1,147 +1,12 @@ //! TODO(doc): @keroro520 -use ckb_logger::error; -use parking_lot::Mutex; -use std::fmt::Debug; -use std::sync::mpsc; -use std::sync::{Arc, Weak}; -use std::thread::JoinHandle; -use tokio::sync::oneshot as tokio_oneshot; -use tokio::sync::watch as tokio_watch; -/// init flags -pub const WATCH_INIT: u8 = 0; -/// stop flags -pub const WATCH_STOP: u8 = 1; +pub use stop_register::{ + broadcast_exit_signals, new_crossbeam_exit_rx, new_tokio_exit_rx, register_thread, + wait_all_ckb_services_exit, +}; -/// TODO(doc): @keroro520 -#[derive(Debug)] -pub enum SignalSender { - /// TODO(doc): @keroro520 - Crossbeam(ckb_channel::Sender), - /// TODO(doc): @keroro520 - Std(mpsc::Sender), - /// TODO(doc): @keroro520 - Tokio(tokio_oneshot::Sender), - /// A single-producer, multi-consumer channel that only retains the last sent value. - Watch(tokio_watch::Sender), - /// Do nothing, for tests - Dummy, -} +pub use tokio_util::sync::CancellationToken; -impl SignalSender { - /// TODO(doc): @keroro520 - pub fn send(self, cmd: T) { - match self { - SignalSender::Crossbeam(tx) => { - if let Err(e) = tx.try_send(cmd) { - error!("handler signal send error {:?}", e); - }; - } - SignalSender::Std(tx) => { - if let Err(e) = tx.send(cmd) { - error!("handler signal send error {:?}", e); - }; - } - SignalSender::Tokio(tx) => { - if let Err(e) = tx.send(cmd) { - error!("handler signal send error {:?}", e); - }; - } - SignalSender::Watch(tx) => { - if let Err(e) = tx.send(WATCH_STOP) { - error!("handler signal send error {:?}", e); - }; - } - SignalSender::Dummy => {} - } - } -} - -#[derive(Debug)] -struct Handler { - signal: SignalSender, - thread: Option>, -} - -/// Weak is a version of Arc that holds a non-owning reference to the managed allocation. -/// Since a Weak reference does not count towards ownership, -/// it will not prevent the value stored in the allocation from being dropped, -/// and Weak itself makes no guarantees about the value still being present. -#[derive(Debug)] -enum Ref { - Arc(Arc), - Weak(Weak), -} - -impl Clone for Ref { - #[inline] - fn clone(&self) -> Ref { - match self { - Self::Arc(arc) => Self::Arc(Arc::clone(arc)), - Self::Weak(weak) => Self::Weak(Weak::clone(weak)), - } - } -} - -impl Ref { - fn downgrade(&self) -> Ref { - match self { - Self::Arc(arc) => Self::Weak(Arc::downgrade(arc)), - Self::Weak(weak) => Self::Weak(Weak::clone(weak)), - } - } -} - -/// TODO(doc): @keroro520 -//the outer Option take ownership for `Arc::try_unwrap` -//the inner Option take ownership for `JoinHandle` or `oneshot::Sender` -#[derive(Clone, Debug)] -pub struct StopHandler { - inner: Option>>>>, - name: String, -} - -impl StopHandler { - /// TODO(doc): @keroro520 - pub fn new( - signal: SignalSender, - thread: Option>, - name: String, - ) -> StopHandler { - let handler = Handler { signal, thread }; - StopHandler { - inner: Some(Ref::Arc(Arc::new(Mutex::new(Some(handler))))), - name, - } - } - - /// Creates a new Weak pointer. - pub fn downgrade_clone(&self) -> StopHandler { - StopHandler { - inner: self.inner.as_ref().map(|inner| inner.downgrade()), - name: self.name.clone(), - } - } - - /// TODO(doc): @keroro520 - pub fn try_send(&mut self, cmd: T) { - let inner = self - .inner - .take() - .expect("Stop signal can only be sent once"); - - if let Ref::Arc(inner) = inner { - if let Ok(lock) = Arc::try_unwrap(inner) { - ckb_logger::info!("StopHandler({}) send signal", self.name); - let handler = lock.lock().take().expect("Handler can only be taken once"); - let Handler { signal, thread } = handler; - signal.send(cmd); - if let Some(thread) = thread { - if let Err(e) = thread.join() { - error!("handler thread join error {:?}", e); - }; - } - }; - } - } -} +mod stop_register; +#[cfg(test)] +mod tests; diff --git a/util/stop-handler/src/stop_register.rs b/util/stop-handler/src/stop_register.rs new file mode 100644 index 0000000000..e329565ae8 --- /dev/null +++ b/util/stop-handler/src/stop_register.rs @@ -0,0 +1,74 @@ +use ckb_logger::{info, trace, warn}; +use ckb_util::Mutex; +use tokio_util::sync::CancellationToken; + +struct CkbServiceHandles { + thread_handles: Vec<(String, std::thread::JoinHandle<()>)>, +} + +/// Wait all ckb services exit +pub fn wait_all_ckb_services_exit() { + info!("waiting exit signal..."); + let exit_signal = new_crossbeam_exit_rx(); + let _ = exit_signal.recv(); + info!("received exit signal, broadcasting exit signal to all threads"); + let mut handles = CKB_HANDLES.lock(); + for (name, join_handle) in handles.thread_handles.drain(..) { + match join_handle.join() { + Ok(_) => { + info!("wait thread {} done", name); + } + Err(e) => { + warn!("wait thread {}: ERROR: {:?}", name, e) + } + } + } + info!("all ckb threads have been stopped"); +} + +static CKB_HANDLES: once_cell::sync::Lazy> = + once_cell::sync::Lazy::new(|| { + Mutex::new(CkbServiceHandles { + thread_handles: vec![], + }) + }); + +static TOKIO_EXIT: once_cell::sync::Lazy = + once_cell::sync::Lazy::new(CancellationToken::new); + +static CROSSBEAM_EXIT_SENDERS: once_cell::sync::Lazy>>> = + once_cell::sync::Lazy::new(|| Mutex::new(vec![])); + +/// Create a new CancellationToken for exit signal +pub fn new_tokio_exit_rx() -> CancellationToken { + TOKIO_EXIT.clone() +} + +/// Create a new crossbeam Receiver for exit signal +pub fn new_crossbeam_exit_rx() -> ckb_channel::Receiver<()> { + let (tx, rx) = ckb_channel::bounded(1); + CROSSBEAM_EXIT_SENDERS.lock().push(tx); + rx +} + +/// Broadcast exit signals to all threads and all tokio tasks +pub fn broadcast_exit_signals() { + TOKIO_EXIT.cancel(); + CROSSBEAM_EXIT_SENDERS.lock().iter().for_each(|tx| { + if let Err(e) = tx.try_send(()) { + println!("broadcast thread: ERROR: {:?}", e) + } else { + println!("send a crossbeam exit signal"); + } + }); +} + +/// Register a thread `JoinHandle` to `CKB_HANDLES` +pub fn register_thread(name: &str, thread_handle: std::thread::JoinHandle<()>) { + trace!("register thread {}", name); + CKB_HANDLES + .lock() + .thread_handles + .push((name.into(), thread_handle)); + trace!("register thread done {}", name); +} From 7ee2a807f5758db481e91ff90e6e971b2fb6828c Mon Sep 17 00:00:00 2001 From: Eval EXEC Date: Tue, 20 Jun 2023 17:02:38 +0800 Subject: [PATCH 02/11] test: add basic unit test for ckb shutdown --- util/stop-handler/src/tests.rs | 142 +++++++++++++++++++++++++++++++++ 1 file changed, 142 insertions(+) create mode 100644 util/stop-handler/src/tests.rs diff --git a/util/stop-handler/src/tests.rs b/util/stop-handler/src/tests.rs new file mode 100644 index 0000000000..3141512bb7 --- /dev/null +++ b/util/stop-handler/src/tests.rs @@ -0,0 +1,142 @@ +use crate::{ + broadcast_exit_signals, new_crossbeam_exit_rx, new_tokio_exit_rx, register_thread, + wait_all_ckb_services_exit, +}; +use ckb_async_runtime::{new_global_runtime, Handle}; +use ckb_channel::select; +use rand::Rng; +use std::sync::atomic::{AtomicI64, Ordering}; +use std::sync::Arc; +use std::time::Duration; +use tokio_util::sync::CancellationToken; + +fn send_ctrlc_later(duration: Duration) { + std::thread::spawn(move || { + std::thread::sleep(duration); + // send SIGINT to myself + unsafe { + libc::raise(libc::SIGINT); + println!("[ $$ sent SIGINT to myself $$ ]"); + } + }); +} + +#[derive(Default)] +struct TestStopMemo { + spawned_threads_count: Arc, + stopped_threads_count: Arc, + + spawned_tokio_task_count: Arc, + stopped_tokio_task_count: Arc, +} + +impl TestStopMemo { + fn start_many_threads(&self) { + for i in 0..rand::thread_rng().gen_range(3..7) { + let join = std::thread::spawn({ + let stopped_threads_count = Arc::clone(&self.stopped_threads_count); + move || { + let ticker = ckb_channel::tick(Duration::from_millis(500)); + let deadline = ckb_channel::after(Duration::from_millis( + (rand::thread_rng().gen_range(1.0..5.0) * 1000.0) as u64, + )); + + let stop = new_crossbeam_exit_rx(); + + loop { + select! { + recv(ticker) -> _ => { + println!("thread {} received tick signal", i); + }, + recv(stop) -> _ => { + println!("thread {} received crossbeam exit signal", i); + stopped_threads_count.fetch_add(1, Ordering::SeqCst); + return; + }, + recv(deadline) -> _ =>{ + println!("thread {} finish its job", i); + stopped_threads_count.fetch_add(1, Ordering::SeqCst); + return + } + } + } + } + }); + + self.spawned_threads_count.fetch_add(1, Ordering::SeqCst); + register_thread(&format!("test thread {}", i), join); + } + } + + fn start_many_tokio_tasks(&self, handle: &Handle) { + for i in 0..rand::thread_rng().gen_range(3..7) { + let stop: CancellationToken = new_tokio_exit_rx(); + + handle.spawn({ + let spawned_tokio_task_count = Arc::clone(&self.spawned_tokio_task_count); + let stopped_tokio_task_count = Arc::clone(&self.stopped_tokio_task_count); + async move { + spawned_tokio_task_count.fetch_add(1, Ordering::SeqCst); + + let mut interval = tokio::time::interval(Duration::from_millis(500)); + + let duration = Duration::from_millis( + (rand::thread_rng().gen_range(1.0..5.0) * 1000.0) as u64, + ); + let deadline = tokio::time::sleep(duration); + tokio::pin!(deadline); + + loop { + tokio::select! { + _ = &mut deadline =>{ + println!("tokio task {} finish its job", i); + stopped_tokio_task_count.fetch_add(1, Ordering::SeqCst); + break; + } + _ = interval.tick()=> { + println!("tokio task {} received tick signal", i); + }, + _ = stop.cancelled() => { + println!("tokio task {} receive exit signal", i); + stopped_tokio_task_count.fetch_add(1, Ordering::SeqCst); + break + }, + else => break, + } + } + } + }); + } + } +} +#[test] +fn basic() { + let (mut handle, mut stop_recv, _runtime) = new_global_runtime(); + + ctrlc::set_handler(move || { + broadcast_exit_signals(); + }) + .expect("Error setting Ctrl-C handler"); + + send_ctrlc_later(Duration::from_secs(3)); + + let test_memo = TestStopMemo::default(); + + test_memo.start_many_threads(); + test_memo.start_many_tokio_tasks(&handle); + + handle.drop_guard(); + wait_all_ckb_services_exit(); + handle.block_on(async move { + stop_recv.recv().await; + }); + + assert_eq!( + test_memo.spawned_threads_count.load(Ordering::SeqCst), + test_memo.stopped_threads_count.load(Ordering::SeqCst), + ); + assert_eq!( + test_memo.spawned_tokio_task_count.load(Ordering::SeqCst), + test_memo.stopped_tokio_task_count.load(Ordering::SeqCst), + ); +} From d120f573f42b4c5c84312bf83983e162926f2a4b Mon Sep 17 00:00:00 2001 From: Eval EXEC Date: Tue, 20 Jun 2023 17:04:09 +0800 Subject: [PATCH 03/11] wait all thread and tokio tasks exit before process exit --- Cargo.lock | 26 +++++++--- benches/benches/benchmarks/overall.rs | 3 +- block-filter/src/filter.rs | 16 +++--- chain/src/chain.rs | 34 +++---------- chain/src/tests/util.rs | 3 +- ckb-bin/Cargo.toml | 1 + ckb-bin/src/helper.rs | 40 ++++++++++----- ckb-bin/src/lib.rs | 46 +++++++++++++---- ckb-bin/src/subcommand/miner.rs | 25 ++++------ ckb-bin/src/subcommand/run.rs | 28 +++-------- miner/src/client.rs | 47 +++++++++--------- miner/src/miner.rs | 18 +++---- network/src/network.rs | 43 +++++----------- network/src/protocols/tests/mod.rs | 4 +- notify/src/lib.rs | 21 ++------ rpc/src/module/subscription.rs | 1 + rpc/src/tests/examples.rs | 3 +- rpc/src/tests/mod.rs | 5 +- shared/Cargo.toml | 1 - shared/src/shared.rs | 19 +++---- sync/src/relayer/tests/helper.rs | 7 ++- sync/src/synchronizer/mod.rs | 7 ++- sync/src/tests/net_time_checker.rs | 9 ++-- sync/src/types/header_map/mod.rs | 24 ++++----- test/src/net.rs | 7 ++- tx-pool/Cargo.toml | 1 + tx-pool/src/chunk_process.rs | 13 +++-- tx-pool/src/process.rs | 2 +- tx-pool/src/service.rs | 49 +++++++++---------- util/channel/src/lib.rs | 4 +- util/indexer/src/service.rs | 46 +++++------------ util/launcher/Cargo.toml | 1 - util/launcher/src/lib.rs | 14 ++---- util/launcher/src/shared_builder.rs | 10 ++-- .../src/tests/utils/chain.rs | 3 +- util/metrics-service/Cargo.toml | 1 + util/metrics-service/src/lib.rs | 10 +++- util/network-alert/Cargo.toml | 1 - util/network-alert/src/tests/test_notifier.rs | 10 ++-- util/runtime/Cargo.toml | 1 - util/runtime/src/lib.rs | 48 ++++++++++++------ util/stop-handler/Cargo.toml | 11 ++++- 42 files changed, 314 insertions(+), 349 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bcda9c4aea..eb5351f95d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -437,7 +437,6 @@ version = "0.112.0-pre" dependencies = [ "ckb-logger", "ckb-spawn", - "ckb-stop-handler", "tokio", ] @@ -492,6 +491,7 @@ dependencies = [ "ckb-network", "ckb-resource", "ckb-shared", + "ckb-stop-handler", "ckb-store", "ckb-types", "ckb-util", @@ -934,6 +934,7 @@ dependencies = [ "ckb-logger", "ckb-metrics", "ckb-metrics-config", + "ckb-stop-handler", "ckb-util", "hyper", "prometheus", @@ -1020,7 +1021,7 @@ dependencies = [ "tempfile", "tentacle", "tokio", - "tokio-util 0.7.7", + "tokio-util 0.7.8", "trust-dns-resolver", ] @@ -1178,6 +1179,7 @@ dependencies = [ "ckb-pow", "ckb-reward-calculator", "ckb-shared", + "ckb-stop-handler", "ckb-store", "ckb-sync", "ckb-systemtime", @@ -1284,10 +1286,17 @@ version = "0.112.0-pre" name = "ckb-stop-handler" version = "0.112.0-pre" dependencies = [ + "ckb-async-runtime", "ckb-channel", "ckb-logger", + "ckb-util", + "ctrlc", + "libc", + "once_cell", "parking_lot 0.12.1", + "rand 0.8.5", "tokio", + "tokio-util 0.7.8", ] [[package]] @@ -1429,6 +1438,7 @@ dependencies = [ "slab", "tempfile", "tokio", + "tokio-util 0.7.8", ] [[package]] @@ -2403,7 +2413,7 @@ dependencies = [ "indexmap", "slab", "tokio", - "tokio-util 0.7.7", + "tokio-util 0.7.8", "tracing", ] @@ -4479,7 +4489,7 @@ dependencies = [ "tentacle-secio", "thiserror", "tokio", - "tokio-util 0.7.7", + "tokio-util 0.7.8", "tokio-yamux", "wasm-bindgen", "wasm-bindgen-futures", @@ -4521,7 +4531,7 @@ dependencies = [ "secp256k1", "sha2", "tokio", - "tokio-util 0.7.7", + "tokio-util 0.7.8", "unsigned-varint", "x25519-dalek", ] @@ -4733,9 +4743,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.7" +version = "0.7.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5427d89453009325de0d8f342c9490009f76e999cb7672d77e46267448f7e6b2" +checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d" dependencies = [ "bytes 1.4.0", "futures-core", @@ -4756,7 +4766,7 @@ dependencies = [ "log", "nohash-hasher", "tokio", - "tokio-util 0.7.7", + "tokio-util 0.7.8", ] [[package]] diff --git a/benches/benches/benchmarks/overall.rs b/benches/benches/benchmarks/overall.rs index af98fe6bae..15a76de599 100644 --- a/benches/benches/benchmarks/overall.rs +++ b/benches/benches/benchmarks/overall.rs @@ -6,7 +6,7 @@ use ckb_chain_spec::consensus::{ConsensusBuilder, ProposalWindow}; use ckb_dao_utils::genesis_dao_data; use ckb_jsonrpc_types::JsonBytes; use ckb_launcher::SharedBuilder; -use ckb_network::{DefaultExitHandler, Flags, NetworkController, NetworkService, NetworkState}; +use ckb_network::{Flags, NetworkController, NetworkService, NetworkState}; use ckb_shared::Shared; use ckb_store::ChainStore; use ckb_types::{ @@ -77,7 +77,6 @@ fn dummy_network(shared: &Shared) -> NetworkController { "test".to_string(), Flags::COMPATIBILITY, ), - DefaultExitHandler::default(), ) .start(shared.async_handle()) .expect("Start network service failed") diff --git a/block-filter/src/filter.rs b/block-filter/src/filter.rs index 71ffca1c80..56fcc92f6b 100644 --- a/block-filter/src/filter.rs +++ b/block-filter/src/filter.rs @@ -1,7 +1,7 @@ -use ckb_async_runtime::tokio::{self, sync::oneshot, task::block_in_place}; -use ckb_logger::{debug, warn}; +use ckb_async_runtime::tokio::{self, task::block_in_place}; +use ckb_logger::{debug, info, warn}; use ckb_shared::Shared; -use ckb_stop_handler::{SignalSender, StopHandler}; +use ckb_stop_handler::{new_tokio_exit_rx, CancellationToken}; use ckb_store::{ChainDB, ChainStore}; use ckb_types::{ core::HeaderView, @@ -43,10 +43,10 @@ impl BlockFilter { } /// start background single-threaded service to create block filter data - pub fn start(self) -> StopHandler<()> { + pub fn start(self) { let notify_controller = self.shared.notify_controller().clone(); let async_handle = self.shared.async_handle().clone(); - let (stop, mut stop_rx) = oneshot::channel::<()>(); + let stop_rx: CancellationToken = new_tokio_exit_rx(); let filter_data_builder = self.clone(); let build_filter_data = @@ -62,12 +62,14 @@ impl BlockFilter { block_in_place(|| self.build_filter_data()); new_block_watcher.borrow_and_update(); } - _ = &mut stop_rx => break, + _ = stop_rx.cancelled() => { + info!("BlockFilter received exit signal, exit now"); + break + }, else => break, } } }); - StopHandler::new(SignalSender::Tokio(stop), None, NAME.to_string()) } /// build block filter data to the latest block diff --git a/chain/src/chain.rs b/chain/src/chain.rs index b6291211d5..8f7e5d8e3c 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -12,7 +12,7 @@ use ckb_proposal_table::ProposalTable; #[cfg(debug_assertions)] use ckb_rust_unstable_port::IsSorted; use ckb_shared::shared::Shared; -use ckb_stop_handler::{SignalSender, StopHandler}; +use ckb_stop_handler::{new_crossbeam_exit_rx, register_thread}; use ckb_store::{attach_block_cell, detach_block_cell, ChainStore, StoreTransaction}; use ckb_systemtime::unix_time_as_millis; use ckb_types::{ @@ -22,7 +22,7 @@ use ckb_types::{ ResolvedTransaction, }, hardfork::HardForks, - service::{Request, DEFAULT_CHANNEL_SIZE, SIGNAL_CHANNEL_SIZE}, + service::{Request, DEFAULT_CHANNEL_SIZE}, BlockExt, BlockNumber, BlockView, Cycle, HeaderView, }, packed::{Byte32, ProposalShortId}, @@ -50,13 +50,6 @@ type TruncateRequest = Request>; pub struct ChainController { process_block_sender: Sender, truncate_sender: Sender, // Used for testing only - stop: Option>, -} - -impl Drop for ChainController { - fn drop(&mut self) { - self.try_stop(); - } } #[cfg_attr(feature = "mock", faux::methods)] @@ -64,12 +57,10 @@ impl ChainController { pub fn new( process_block_sender: Sender, truncate_sender: Sender, - stop: StopHandler<()>, ) -> Self { ChainController { process_block_sender, truncate_sender, - stop: Some(stop), } } /// Inserts the block into database. @@ -109,17 +100,10 @@ impl ChainController { }) } - pub fn try_stop(&mut self) { - if let Some(ref mut stop) = self.stop { - stop.try_send(()); - } - } - /// Since a non-owning reference does not count towards ownership, /// it will not prevent the value stored in the allocation from being dropped pub fn non_owning_clone(&self) -> Self { ChainController { - stop: None, truncate_sender: self.truncate_sender.clone(), process_block_sender: self.process_block_sender.clone(), } @@ -245,7 +229,7 @@ impl ChainService { /// start background single-threaded service with specified thread_name. pub fn start(mut self, thread_name: Option) -> ChainController { - let (signal_sender, signal_receiver) = channel::bounded::<()>(SIGNAL_CHANNEL_SIZE); + let signal_receiver = new_crossbeam_exit_rx(); let (process_block_sender, process_block_receiver) = channel::bounded(DEFAULT_CHANNEL_SIZE); let (truncate_sender, truncate_receiver) = channel::bounded(1); @@ -256,10 +240,11 @@ impl ChainService { } let tx_control = self.shared.tx_pool_controller().clone(); - let thread = thread_builder + let chain_jh = thread_builder .spawn(move || loop { select! { recv(signal_receiver) -> _ => { + info!("ChainService received exit signal, stopped"); break; }, recv(process_block_receiver) -> msg => match msg { @@ -287,13 +272,10 @@ impl ChainService { } }) .expect("Start ChainService failed"); - let stop = StopHandler::new( - SignalSender::Crossbeam(signal_sender), - Some(thread), - "chain".to_string(), - ); - ChainController::new(process_block_sender, truncate_sender, stop) + register_thread("ChainService", chain_jh); + + ChainController::new(process_block_sender, truncate_sender) } fn make_fork_for_truncate(&self, target: &HeaderView, current_tip: &HeaderView) -> ForkChanges { diff --git a/chain/src/tests/util.rs b/chain/src/tests/util.rs index 3051dd775f..7ade19bc53 100644 --- a/chain/src/tests/util.rs +++ b/chain/src/tests/util.rs @@ -6,7 +6,7 @@ use ckb_dao::DaoCalculator; use ckb_dao_utils::genesis_dao_data; use ckb_jsonrpc_types::ScriptHashType; use ckb_launcher::SharedBuilder; -use ckb_network::{DefaultExitHandler, Flags, NetworkController, NetworkService, NetworkState}; +use ckb_network::{Flags, NetworkController, NetworkService, NetworkState}; use ckb_shared::shared::Shared; use ckb_store::ChainStore; pub use ckb_test_chain_utils::MockStore; @@ -314,7 +314,6 @@ pub(crate) fn dummy_network(shared: &Shared) -> NetworkController { "test".to_string(), Flags::COMPATIBILITY, ), - DefaultExitHandler::default(), ) .start(shared.async_handle()) .expect("Start network service failed") diff --git a/ckb-bin/Cargo.toml b/ckb-bin/Cargo.toml index bdd04dd5be..b0edf465ba 100644 --- a/ckb-bin/Cargo.toml +++ b/ckb-bin/Cargo.toml @@ -43,6 +43,7 @@ rayon = "1.0" sentry = { version = "0.26.0", optional = true } is-terminal = "0.4.7" fdlimit = "0.2.1" +ckb-stop-handler = { path = "../util/stop-handler", version = "= 0.112.0-pre" } [features] deadlock_detection = ["ckb-util/deadlock_detection"] diff --git a/ckb-bin/src/helper.rs b/ckb-bin/src/helper.rs index 142d06226c..21c93732b8 100644 --- a/ckb-bin/src/helper.rs +++ b/ckb-bin/src/helper.rs @@ -1,4 +1,5 @@ use ckb_logger::info; + use std::io::{stdin, stdout, Write}; #[cfg(not(feature = "deadlock_detection"))] @@ -6,27 +7,42 @@ pub fn deadlock_detection() {} #[cfg(feature = "deadlock_detection")] pub fn deadlock_detection() { + use ckb_channel::select; use ckb_logger::warn; + use ckb_stop_handler::{new_crossbeam_exit_rx, register_thread}; use ckb_util::parking_lot::deadlock; use std::{thread, time::Duration}; info!("deadlock_detection enable"); - thread::spawn(move || loop { - thread::sleep(Duration::from_secs(10)); - let deadlocks = deadlock::check_deadlock(); - if deadlocks.is_empty() { - continue; - } + let dead_lock_jh = thread::spawn({ + let ticker = ckb_channel::tick(Duration::from_secs(10)); + let stop_rx = new_crossbeam_exit_rx(); + move || loop { + select! { + recv(ticker) -> _ => { + let deadlocks = deadlock::check_deadlock(); + if deadlocks.is_empty() { + continue; + } + + warn!("{} deadlocks detected", deadlocks.len()); + for (i, threads) in deadlocks.iter().enumerate() { + warn!("Deadlock #{}", i); + for t in threads { + warn!("Thread Id {:#?}", t.thread_id()); + warn!("{:#?}", t.backtrace()); + } + } - warn!("{} deadlocks detected", deadlocks.len()); - for (i, threads) in deadlocks.iter().enumerate() { - warn!("Deadlock #{}", i); - for t in threads { - warn!("Thread Id {:#?}", t.thread_id()); - warn!("{:#?}", t.backtrace()); + }, + recv(stop_rx) -> _ =>{ + info!("deadlock_detection received exit signal, stopped"); + return; + } } } }); + register_thread("dead_lock_detect", dead_lock_jh); } pub fn prompt(msg: &str) -> String { diff --git a/ckb-bin/src/lib.rs b/ckb-bin/src/lib.rs index b047a14086..91c61b813d 100644 --- a/ckb-bin/src/lib.rs +++ b/ckb-bin/src/lib.rs @@ -8,13 +8,15 @@ mod subcommand; use ckb_app_config::{cli, ExitCode, Setup}; use ckb_async_runtime::new_global_runtime; use ckb_build_info::Version; +use ckb_logger::info; +use ckb_network::tokio; +use ckb_stop_handler::broadcast_exit_signals; use helper::raise_fd_limit; use setup_guard::SetupGuard; -use std::time::Duration; +use std::sync::Arc; #[cfg(feature = "with_sentry")] pub(crate) const LOG_TARGET_SENTRY: &str = "sentry"; -const RUNTIME_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(1); /// The executable main entry. /// @@ -58,25 +60,49 @@ pub fn run_app(version: Version) -> Result<(), ExitCode> { .expect("SubcommandRequiredElseHelp"); let is_silent_logging = is_silent_logging(cmd); - let (handle, runtime) = new_global_runtime(); + let (mut handle, mut handle_stop_rx, _runtime) = new_global_runtime(); let setup = Setup::from_matches(bin_name, cmd, matches)?; let _guard = SetupGuard::from_setup(&setup, &version, handle.clone(), is_silent_logging)?; raise_fd_limit(); + // indicate whether the process is terminated by an exit signal + let caught_exit_signal = Arc::new(std::sync::atomic::AtomicBool::new(false)); + + ctrlc::set_handler({ + let caught_exit_signal = Arc::clone(&caught_exit_signal); + move || { + broadcast_exit_signals(); + caught_exit_signal.store(true, std::sync::atomic::Ordering::SeqCst); + } + }) + .expect("Error setting Ctrl-C handler"); + let ret = match cmd { - cli::CMD_RUN => subcommand::run(setup.run(matches)?, version, handle), - cli::CMD_MINER => subcommand::miner(setup.miner(matches)?, handle), - cli::CMD_REPLAY => subcommand::replay(setup.replay(matches)?, handle), - cli::CMD_EXPORT => subcommand::export(setup.export(matches)?, handle), - cli::CMD_IMPORT => subcommand::import(setup.import(matches)?, handle), - cli::CMD_STATS => subcommand::stats(setup.stats(matches)?, handle), + cli::CMD_RUN => subcommand::run(setup.run(matches)?, version, handle.clone()), + cli::CMD_MINER => subcommand::miner(setup.miner(matches)?, handle.clone()), + cli::CMD_REPLAY => subcommand::replay(setup.replay(matches)?, handle.clone()), + cli::CMD_EXPORT => subcommand::export(setup.export(matches)?, handle.clone()), + cli::CMD_IMPORT => subcommand::import(setup.import(matches)?, handle.clone()), + cli::CMD_STATS => subcommand::stats(setup.stats(matches)?, handle.clone()), cli::CMD_RESET_DATA => subcommand::reset_data(setup.reset_data(matches)?), cli::CMD_MIGRATE => subcommand::migrate(setup.migrate(matches)?), _ => unreachable!(), }; - runtime.shutdown_timeout(RUNTIME_SHUTDOWN_TIMEOUT); + if !caught_exit_signal.load(std::sync::atomic::Ordering::SeqCst) { + // if `subcommand` finish normally, and we didn't catch exit signal, broadcast exit signals + broadcast_exit_signals(); + } + + handle.drop_guard(); + + tokio::task::block_in_place(|| { + info!("waiting all tokio tasks done"); + handle_stop_rx.blocking_recv(); + info!("all tokio tasks have been stopped"); + }); + ret } diff --git a/ckb-bin/src/subcommand/miner.rs b/ckb-bin/src/subcommand/miner.rs index dace1cc9a7..fd9a892abc 100644 --- a/ckb-bin/src/subcommand/miner.rs +++ b/ckb-bin/src/subcommand/miner.rs @@ -2,16 +2,15 @@ use ckb_app_config::{ExitCode, MinerArgs, MinerConfig}; use ckb_async_runtime::Handle; use ckb_channel::unbounded; use ckb_miner::{Client, Miner}; -use ckb_network::{DefaultExitHandler, ExitHandler}; +use ckb_stop_handler::{new_crossbeam_exit_rx, register_thread, wait_all_ckb_services_exit}; use std::thread; pub fn miner(args: MinerArgs, async_handle: Handle) -> Result<(), ExitCode> { let (new_work_tx, new_work_rx) = unbounded(); let MinerConfig { client, workers } = args.config; - let exit_handler = DefaultExitHandler::default(); let client = Client::new(new_work_tx, client, async_handle); - let (mut miner, miner_stop) = Miner::new( + let mut miner = Miner::new( args.pow_engine, client.clone(), new_work_rx, @@ -21,21 +20,17 @@ pub fn miner(args: MinerArgs, async_handle: Handle) -> Result<(), ExitCode> { ckb_memory_tracker::track_current_process_simple(args.memory_tracker.interval); - let client_stop = client.spawn_background(); + client.spawn_background(); - thread::Builder::new() - .name("client".to_string()) - .spawn(move || miner.run()) + let stop_rx = new_crossbeam_exit_rx(); + const THREAD_NAME: &str = "client"; + let miner_jh = thread::Builder::new() + .name(THREAD_NAME.into()) + .spawn(move || miner.run(stop_rx)) .expect("Start client failed!"); + register_thread(THREAD_NAME, miner_jh); - let exit_handler_clone = exit_handler.clone(); - ctrlc::set_handler(move || { - exit_handler_clone.notify_exit(); - }) - .expect("Error setting Ctrl-C handler"); - exit_handler.wait_for_exit(); + wait_all_ckb_services_exit(); - drop(client_stop); - drop(miner_stop); Ok(()) } diff --git a/ckb-bin/src/subcommand/run.rs b/ckb-bin/src/subcommand/run.rs index 7878b7b6de..cd7d8c6282 100644 --- a/ckb-bin/src/subcommand/run.rs +++ b/ckb-bin/src/subcommand/run.rs @@ -4,7 +4,8 @@ use ckb_async_runtime::Handle; use ckb_build_info::Version; use ckb_launcher::Launcher; use ckb_logger::info; -use ckb_network::{DefaultExitHandler, ExitHandler}; +use ckb_stop_handler::wait_all_ckb_services_exit; + use ckb_types::core::cell::setup_system_cell_cache; pub fn run(args: RunArgs, version: Version, async_handle: Handle) -> Result<(), ExitCode> { @@ -16,7 +17,6 @@ pub fn run(args: RunArgs, version: Version, async_handle: Handle) -> Result<(), let block_assembler_config = launcher.sanitize_block_assembler_config()?; let miner_enable = block_assembler_config.is_some(); - let exit_handler = DefaultExitHandler::default(); let (shared, mut pack) = launcher.build_shared(block_assembler_config)?; @@ -43,12 +43,11 @@ pub fn run(args: RunArgs, version: Version, async_handle: Handle) -> Result<(), let chain_controller = launcher.start_chain_service(&shared, pack.take_proposal_table()); - let block_filter = launcher.start_block_filter(&shared); + launcher.start_block_filter(&shared); - let (network_controller, rpc_server) = launcher.start_network_and_rpc( + let (network_controller, _rpc_server) = launcher.start_network_and_rpc( &shared, chain_controller.non_owning_clone(), - &exit_handler, miner_enable, pack.take_relay_tx_receiver(), ); @@ -56,22 +55,7 @@ pub fn run(args: RunArgs, version: Version, async_handle: Handle) -> Result<(), let tx_pool_builder = pack.take_tx_pool_builder(); tx_pool_builder.start(network_controller.non_owning_clone()); - let exit_handler_clone = exit_handler.clone(); - ctrlc::set_handler(move || { - exit_handler_clone.notify_exit(); - }) - .expect("Error setting Ctrl-C handler"); - exit_handler.wait_for_exit(); - - info!("Finishing work, please wait..."); - shared.tx_pool_controller().save_pool().map_err(|err| { - eprintln!("TxPool Error: {err}"); - ExitCode::Failure - })?; - - drop(rpc_server); - drop(block_filter); - drop(network_controller); - drop(chain_controller); + wait_all_ckb_services_exit(); + Ok(()) } diff --git a/miner/src/client.rs b/miner/src/client.rs index 57d8e87cb0..dda47570a7 100644 --- a/miner/src/client.rs +++ b/miner/src/client.rs @@ -4,8 +4,8 @@ use ckb_app_config::MinerClientConfig; use ckb_async_runtime::Handle; use ckb_channel::Sender; use ckb_jsonrpc_types::{Block as JsonBlock, BlockTemplate}; -use ckb_logger::{debug, error}; -use ckb_stop_handler::{SignalSender, StopHandler}; +use ckb_logger::{debug, error, info}; +use ckb_stop_handler::{new_tokio_exit_rx, CancellationToken}; use ckb_types::{ packed::{Block, Byte32}, H256, @@ -45,13 +45,12 @@ pub enum RpcError { #[derive(Debug, Clone)] pub struct Rpc { sender: mpsc::Sender, - stop: StopHandler<()>, } impl Rpc { pub fn new(url: Uri, handle: Handle) -> Rpc { let (sender, mut receiver) = mpsc::channel(65_535); - let (stop, mut stop_rx) = oneshot::channel::<()>(); + let stop_rx: CancellationToken = new_tokio_exit_rx(); let https = hyper_tls::HttpsConnector::new(); let client = HttpClient::builder().build(https); @@ -87,16 +86,16 @@ impl Rpc { } }); }, - _ = &mut stop_rx => break, + _ = stop_rx.cancelled() => { + info!("Rpc server received exit signal, exit now"); + break + }, else => break } } }); - Rpc { - sender, - stop: StopHandler::new(SignalSender::Tokio(stop), None, "miner-rpc".to_string()), - } + Rpc { sender } } pub fn request( @@ -128,12 +127,6 @@ impl Rpc { } } -impl Drop for Rpc { - fn drop(&mut self) { - self.stop.try_send(()); - } -} - pub enum Works { New(Work), FailSubmit(Byte32), @@ -200,8 +193,7 @@ impl Client { } /// spawn background update process - pub fn spawn_background(self) -> StopHandler<()> { - let (stop, stop_rx) = oneshot::channel::<()>(); + pub fn spawn_background(self) { let client = self.clone(); if let Some(addr) = self.config.listen { ckb_logger::info!("listen notify mode : {}", addr); @@ -220,19 +212,18 @@ Otherwise ckb-miner does not work properly and will behave as it stopped committ addr ); self.handle.spawn(async move { - client.listen_block_template_notify(addr, stop_rx).await; + client.listen_block_template_notify(addr).await; }); - self.blocking_fetch_block_template() + self.blocking_fetch_block_template(); } else { ckb_logger::info!("loop poll mode: interval {}ms", self.config.poll_interval); self.handle.spawn(async move { - client.poll_block_template(stop_rx).await; + client.poll_block_template().await; }); } - StopHandler::new(SignalSender::Tokio(stop), None, "miner-updater".to_string()) } - async fn listen_block_template_notify(&self, addr: SocketAddr, stop_rx: oneshot::Receiver<()>) { + async fn listen_block_template_notify(&self, addr: SocketAddr) { let client = self.clone(); let make_service = make_service_fn(move |_conn| { let client = client.clone(); @@ -241,8 +232,10 @@ Otherwise ckb-miner does not work properly and will behave as it stopped committ }); let server = Server::bind(&addr).serve(make_service); + let stop_rx: CancellationToken = new_tokio_exit_rx(); let graceful = server.with_graceful_shutdown(async move { - stop_rx.await.ok(); + stop_rx.cancelled().await; + info!("Miner client received exit signal, exit now"); }); if let Err(e) = graceful.await { @@ -250,17 +243,21 @@ Otherwise ckb-miner does not work properly and will behave as it stopped committ } } - async fn poll_block_template(&self, mut stop_rx: oneshot::Receiver<()>) { + async fn poll_block_template(&self) { let poll_interval = time::Duration::from_millis(self.config.poll_interval); let mut interval = tokio::time::interval(poll_interval); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + let stop_rx: CancellationToken = new_tokio_exit_rx(); loop { tokio::select! { _ = interval.tick() => { debug!("poll block template..."); self.fetch_block_template().await; } - _ = &mut stop_rx => break, + _ = stop_rx.cancelled() => { + info!("Miner client pool_block_template received exit signal, exit now"); + break + }, else => break, } } diff --git a/miner/src/miner.rs b/miner/src/miner.rs index 8119a7bea9..e14ffef72f 100644 --- a/miner/src/miner.rs +++ b/miner/src/miner.rs @@ -5,7 +5,6 @@ use ckb_app_config::MinerWorkerConfig; use ckb_channel::{select, unbounded, Receiver}; use ckb_logger::{debug, error, info}; use ckb_pow::PowEngine; -use ckb_stop_handler::{SignalSender, StopHandler}; use ckb_types::{ packed::{Byte32, Header}, prelude::*, @@ -27,7 +26,6 @@ pub struct Miner { pub(crate) worker_controllers: Vec, pub(crate) work_rx: Receiver, pub(crate) nonce_rx: Receiver<(Byte32, Work, u128)>, - pub(crate) stop_rx: Receiver<()>, pub(crate) pb: ProgressBar, pub(crate) nonces_found: u128, pub(crate) stderr_is_tty: bool, @@ -42,9 +40,8 @@ impl Miner { work_rx: Receiver, workers: &[MinerWorkerConfig], limit: u128, - ) -> (Miner, StopHandler<()>) { + ) -> Miner { let (nonce_tx, nonce_rx) = unbounded(); - let (stop, stop_rx) = unbounded(); let mp = MultiProgress::new(); let worker_controllers = workers @@ -61,9 +58,7 @@ impl Miner { mp.join().expect("MultiProgress join failed"); }); - let stop = StopHandler::new(SignalSender::Crossbeam(stop), None, "miner".to_string()); - - let miner = Miner { + Miner { legacy_work: LruCache::new(WORK_CACHE_SIZE), nonces_found: 0, _pow: pow, @@ -71,16 +66,14 @@ impl Miner { worker_controllers, work_rx, nonce_rx, - stop_rx, pb, stderr_is_tty, limit, - }; - (miner, stop) + } } /// TODO(doc): @quake - pub fn run(&mut self) { + pub fn run(&mut self, stop_rx: Receiver<()>) { loop { select! { recv(self.work_rx) -> msg => match msg { @@ -109,7 +102,8 @@ impl Miner { break; }, }, - recv(self.stop_rx) -> _msg => { + recv(stop_rx) -> _msg => { + info!("miner received exit signal, stopped"); break; } }; diff --git a/network/src/network.rs b/network/src/network.rs index cbe84e2cf2..c9436aa28d 100644 --- a/network/src/network.rs +++ b/network/src/network.rs @@ -21,7 +21,7 @@ use crate::{Behaviour, CKBProtocol, Peer, PeerIndex, ProtocolId, ServiceControl} use ckb_app_config::{default_support_all_protocols, NetworkConfig, SupportProtocol}; use ckb_logger::{debug, error, info, trace, warn}; use ckb_spawn::Spawn; -use ckb_stop_handler::{SignalSender, StopHandler}; +use ckb_stop_handler::{broadcast_exit_signals, new_tokio_exit_rx, CancellationToken}; use ckb_util::{Condvar, Mutex, RwLock}; use futures::{channel::mpsc::Sender, Future}; use ipnetwork::IpNetwork; @@ -490,18 +490,14 @@ impl NetworkState { } /// Used to handle global events of tentacle, such as session open/close -pub struct EventHandler { +pub struct EventHandler { pub(crate) network_state: Arc, - pub(crate) exit_handler: T, } -impl EventHandler { +impl EventHandler { /// init an event handler - pub fn new(network_state: Arc, exit_handler: T) -> Self { - Self { - network_state, - exit_handler, - } + pub fn new(network_state: Arc) -> Self { + Self { network_state } } } @@ -531,7 +527,7 @@ impl ExitHandler for DefaultExitHandler { } } -impl EventHandler { +impl EventHandler { fn inbound_eviction(&self) -> Vec { if self.network_state.config.bootnode_mode { let status = self.network_state.connection_status(); @@ -560,7 +556,7 @@ impl EventHandler { } #[async_trait] -impl ServiceHandle for EventHandler { +impl ServiceHandle for EventHandler { async fn handle_error(&mut self, context: &mut ServiceContext, error: ServiceError) { match error { ServiceError::DialerError { address, error } => { @@ -763,8 +759,8 @@ impl ServiceHandle for EventHandler { } /// Ckb network service, use to start p2p network -pub struct NetworkService { - p2p_service: Service>, +pub struct NetworkService { + p2p_service: Service, network_state: Arc, ping_controller: Option>, // Background services @@ -772,7 +768,7 @@ pub struct NetworkService { version: String, } -impl NetworkService { +impl NetworkService { /// init with all config pub fn new( network_state: Arc, @@ -780,7 +776,6 @@ impl NetworkService { required_protocol_ids: Vec, // name, version, flags identify_announce: (String, String, Flags), - exit_handler: T, ) -> Self { let config = &network_state.config; @@ -891,7 +886,6 @@ impl NetworkService { } let event_handler = EventHandler { network_state: Arc::clone(&network_state), - exit_handler, }; service_builder = service_builder .key_pair(network_state.local_private_key.clone()) @@ -1098,7 +1092,7 @@ impl NetworkService { }) .unzip(); - let (sender, mut receiver) = oneshot::channel(); + let receiver: CancellationToken = new_tokio_exit_rx(); let (start_sender, start_receiver) = mpsc::channel(); { let network_state = Arc::clone(&network_state); @@ -1130,7 +1124,8 @@ impl NetworkService { tokio::spawn(async move { p2p_service.run().await }); loop { tokio::select! { - _ = &mut receiver => { + _ = receiver.cancelled() => { + info!("NetworkService receive exit signal, start shutdown..."); let _ = p2p_control.shutdown().await; // Drop senders to stop all corresponding background task drop(bg_signals); @@ -1163,13 +1158,11 @@ impl NetworkService { return Err(e); } - let stop = StopHandler::new(SignalSender::Tokio(sender), None, "network".to_string()); Ok(NetworkController { version, network_state, p2p_control, ping_controller, - stop: Some(stop), }) } } @@ -1181,7 +1174,6 @@ pub struct NetworkController { network_state: Arc, p2p_control: ServiceControl, ping_controller: Option>, - stop: Option>, } impl NetworkController { @@ -1397,7 +1389,6 @@ impl NetworkController { /// it will not prevent the value stored in the allocation from being dropped pub fn non_owning_clone(&self) -> Self { NetworkController { - stop: None, version: self.version.clone(), network_state: Arc::clone(&self.network_state), p2p_control: self.p2p_control.clone(), @@ -1406,14 +1397,6 @@ impl NetworkController { } } -impl Drop for NetworkController { - fn drop(&mut self) { - if let Some(ref mut stop) = self.stop { - stop.try_send(()); - } - } -} - // Send an optional message before disconnect a peer pub(crate) fn disconnect_with_message( control: &ServiceControl, diff --git a/network/src/protocols/tests/mod.rs b/network/src/protocols/tests/mod.rs index db72949dda..140e9625d8 100644 --- a/network/src/protocols/tests/mod.rs +++ b/network/src/protocols/tests/mod.rs @@ -7,8 +7,7 @@ use super::{ }; use crate::{ - network::{DefaultExitHandler, EventHandler}, - services::protocol_type_checker::ProtocolTypeCheckerService, + network::EventHandler, services::protocol_type_checker::ProtocolTypeCheckerService, NetworkState, PeerIdentifyInfo, SupportProtocols, }; @@ -224,7 +223,6 @@ fn net_service_start( .forever(true) .build(EventHandler { network_state: Arc::clone(&network_state), - exit_handler: DefaultExitHandler::default(), }); let peer_id = network_state.local_peer_id().clone(); diff --git a/notify/src/lib.rs b/notify/src/lib.rs index 9a95336a3a..2b9969ba96 100644 --- a/notify/src/lib.rs +++ b/notify/src/lib.rs @@ -1,8 +1,8 @@ //! TODO(doc): @quake use ckb_app_config::NotifyConfig; use ckb_async_runtime::Handle; -use ckb_logger::{debug, error, trace}; -use ckb_stop_handler::{SignalSender, StopHandler}; +use ckb_logger::{debug, error, info, trace}; +use ckb_stop_handler::{new_tokio_exit_rx, CancellationToken}; use ckb_types::packed::Byte32; use ckb_types::{ core::{tx_pool::Reject, BlockView}, @@ -88,7 +88,6 @@ impl NotifyTimeout { /// TODO(doc): @quake #[derive(Clone)] pub struct NotifyController { - stop: StopHandler<()>, new_block_register: NotifyRegister, new_block_watcher: NotifyWatcher, new_block_notifier: Sender, @@ -103,12 +102,6 @@ pub struct NotifyController { handle: Handle, } -impl Drop for NotifyController { - fn drop(&mut self) { - self.stop.try_send(()); - } -} - /// TODO(doc): @quake pub struct NotifyService { config: NotifyConfig, @@ -142,7 +135,7 @@ impl NotifyService { /// start background tokio spawned task. pub fn start(mut self) -> NotifyController { - let (signal_sender, mut signal_receiver) = oneshot::channel(); + let signal_receiver: CancellationToken = new_tokio_exit_rx(); let handle = self.handle.clone(); let (new_block_register, mut new_block_register_receiver) = @@ -173,7 +166,8 @@ impl NotifyService { handle.spawn(async move { loop { tokio::select! { - _ = &mut signal_receiver => { + _ = signal_receiver.cancelled() => { + info!("NotifyService received exit signal, exit now"); break; } Some(msg) = new_block_register_receiver.recv() => { self.handle_register_new_block(msg) }, @@ -204,11 +198,6 @@ impl NotifyService { reject_transaction_notifier: reject_transaction_sender, network_alert_register, network_alert_notifier: network_alert_sender, - stop: StopHandler::new( - SignalSender::Tokio(signal_sender), - None, - "notify".to_string(), - ), handle, } } diff --git a/rpc/src/module/subscription.rs b/rpc/src/module/subscription.rs index 69d7cc0c88..c5ca5e5160 100644 --- a/rpc/src/module/subscription.rs +++ b/rpc/src/module/subscription.rs @@ -1,5 +1,6 @@ use ckb_jsonrpc_types::Topic; use ckb_notify::NotifyController; + use jsonrpc_core::{Metadata, Result}; use jsonrpc_derive::rpc; use jsonrpc_pubsub::{ diff --git a/rpc/src/tests/examples.rs b/rpc/src/tests/examples.rs index 1164278acd..f134f22e16 100644 --- a/rpc/src/tests/examples.rs +++ b/rpc/src/tests/examples.rs @@ -10,7 +10,7 @@ use ckb_chain_spec::consensus::{Consensus, ConsensusBuilder}; use ckb_chain_spec::versionbits::{ActiveMode, Deployment, DeploymentPos}; use ckb_dao_utils::genesis_dao_data; use ckb_launcher::SharedBuilder; -use ckb_network::{DefaultExitHandler, Flags, NetworkService, NetworkState}; +use ckb_network::{Flags, NetworkService, NetworkState}; use ckb_network_alert::alert_relayer::AlertRelayer; use ckb_notify::NotifyService; use ckb_sync::SyncShared; @@ -151,7 +151,6 @@ fn setup_rpc_test_suite(height: u64) -> RpcTestSuite { "0.1.0".to_string(), Flags::COMPATIBILITY, ), - DefaultExitHandler::default(), ) .start(shared.async_handle()) .expect("Start network service failed") diff --git a/rpc/src/tests/mod.rs b/rpc/src/tests/mod.rs index b35c80844b..70ba66762c 100644 --- a/rpc/src/tests/mod.rs +++ b/rpc/src/tests/mod.rs @@ -4,7 +4,7 @@ use ckb_chain::chain::{ChainController, ChainService}; use ckb_dao::DaoCalculator; use ckb_jsonrpc_types::ScriptHashType; use ckb_launcher::SharedBuilder; -use ckb_network::{DefaultExitHandler, Flags, NetworkService, NetworkState}; +use ckb_network::{Flags, NetworkService, NetworkState}; use ckb_reward_calculator::RewardCalculator; use ckb_shared::{Shared, Snapshot}; use ckb_store::ChainStore; @@ -80,7 +80,7 @@ impl RpcTestResponse { #[allow(dead_code)] struct RpcTestSuite { - rpc_client: reqwest::blocking::Client, + rpc_client: Client, rpc_uri: String, shared: Shared, chain_controller: ChainController, @@ -249,7 +249,6 @@ fn setup() -> RpcTestSuite { "0.1.0".to_string(), Flags::COMPATIBILITY, ), - DefaultExitHandler::default(), ) .start(shared.async_handle()) .expect("Start network service failed") diff --git a/shared/Cargo.toml b/shared/Cargo.toml index 84a780771b..66ebf6a756 100644 --- a/shared/Cargo.toml +++ b/shared/Cargo.toml @@ -24,7 +24,6 @@ ckb-logger = { path = "../util/logger", version = "= 0.112.0-pre" } ckb-db-schema = { path = "../db-schema", version = "= 0.112.0-pre" } ckb-async-runtime = { path = "../util/runtime", version = "= 0.112.0-pre" } ckb-stop-handler = { path = "../util/stop-handler", version = "= 0.112.0-pre" } -ckb-channel = { path = "../util/channel", version = "= 0.112.0-pre" } ckb-constant = { path = "../util/constant", version = "= 0.112.0-pre" } ckb-systemtime = { path = "../util/systemtime", version = "= 0.112.0-pre" } diff --git a/shared/src/shared.rs b/shared/src/shared.rs index 0ec83eb888..377b941df7 100644 --- a/shared/src/shared.rs +++ b/shared/src/shared.rs @@ -10,12 +10,12 @@ use ckb_db_schema::{COLUMN_BLOCK_BODY, COLUMN_NUMBER_HASH}; use ckb_error::{AnyError, Error}; use ckb_notify::NotifyController; use ckb_proposal_table::ProposalView; -use ckb_stop_handler::{SignalSender, StopHandler}; +use ckb_stop_handler::{new_crossbeam_exit_rx, register_thread}; use ckb_store::{ChainDB, ChainStore}; use ckb_systemtime::unix_time_as_millis; use ckb_tx_pool::{BlockTemplate, TokioRwLock, TxPoolController}; use ckb_types::{ - core::{service, BlockNumber, EpochExt, EpochNumber, HeaderView, Version}, + core::{BlockNumber, EpochExt, EpochNumber, HeaderView, Version}, packed::{self, Byte32}, prelude::*, U256, @@ -35,13 +35,11 @@ const MAX_FREEZE_LIMIT: BlockNumber = 30_000; /// An owned permission to close on a freezer thread pub struct FreezerClose { stopped: Arc, - stop: StopHandler<()>, } impl Drop for FreezerClose { fn drop(&mut self) { self.stopped.store(true, Ordering::SeqCst); - self.stop.try_send(()); } } @@ -86,10 +84,9 @@ impl Shared { pub fn spawn_freeze(&self) -> Option { if let Some(freezer) = self.store.freezer() { ckb_logger::info!("Freezer enable"); - let (signal_sender, signal_receiver) = - ckb_channel::bounded::<()>(service::SIGNAL_CHANNEL_SIZE); + let signal_receiver = new_crossbeam_exit_rx(); let shared = self.clone(); - let thread = thread::Builder::new() + let freeze_jh = thread::Builder::new() .spawn(move || loop { match signal_receiver.recv_timeout(FREEZER_INTERVAL) { Err(_) => { @@ -106,14 +103,10 @@ impl Shared { }) .expect("Start FreezerService failed"); - let stop = StopHandler::new( - SignalSender::Crossbeam(signal_sender), - Some(thread), - "freezer".to_string(), - ); + register_thread("freeze", freeze_jh); + return Some(FreezerClose { stopped: Arc::clone(&freezer.stopped), - stop, }); } None diff --git a/sync/src/relayer/tests/helper.rs b/sync/src/relayer/tests/helper.rs index 2f738f66f2..eb6b3c0a08 100644 --- a/sync/src/relayer/tests/helper.rs +++ b/sync/src/relayer/tests/helper.rs @@ -4,9 +4,9 @@ use ckb_chain::chain::ChainService; use ckb_chain_spec::consensus::{build_genesis_epoch_ext, ConsensusBuilder}; use ckb_launcher::SharedBuilder; use ckb_network::{ - async_trait, bytes::Bytes as P2pBytes, Behaviour, CKBProtocolContext, DefaultExitHandler, - Error, Flags, NetworkController, NetworkService, NetworkState, Peer, PeerIndex, ProtocolId, - SupportProtocols, TargetSession, + async_trait, bytes::Bytes as P2pBytes, Behaviour, CKBProtocolContext, Error, Flags, + NetworkController, NetworkService, NetworkState, Peer, PeerIndex, ProtocolId, SupportProtocols, + TargetSession, }; use ckb_shared::Shared; use ckb_store::ChainStore; @@ -122,7 +122,6 @@ pub(crate) fn dummy_network(shared: &Shared) -> NetworkController { "test".to_string(), Flags::COMPATIBILITY, ), - DefaultExitHandler::default(), ) .start(shared.async_handle()) .expect("Start network service failed") diff --git a/sync/src/synchronizer/mod.rs b/sync/src/synchronizer/mod.rs index 55e8edf394..a2ab6cf46b 100644 --- a/sync/src/synchronizer/mod.rs +++ b/sync/src/synchronizer/mod.rs @@ -37,6 +37,7 @@ use ckb_network::{ async_trait, bytes::Bytes, tokio, CKBProtocolContext, CKBProtocolHandler, PeerIndex, ServiceControl, SupportProtocols, }; +use ckb_stop_handler::register_thread; use ckb_systemtime::unix_time_as_millis; use ckb_types::{ core::{self, BlockNumber}, @@ -625,8 +626,9 @@ impl Synchronizer { self.fetch_channel = Some(sender); let thread = ::std::thread::Builder::new(); let number = self.shared.state().shared_best_header_ref().number(); - thread - .name("BlockDownload".to_string()) + const THREAD_NAME: &str = "BlockDownload"; + let blockdownload_jh = thread + .name(THREAD_NAME.into()) .spawn(move || { BlockFetchCMD { sync, @@ -638,6 +640,7 @@ impl Synchronizer { .run(); }) .expect("download thread can't start"); + register_thread(THREAD_NAME, blockdownload_jh); } }, None => { diff --git a/sync/src/tests/net_time_checker.rs b/sync/src/tests/net_time_checker.rs index 890e0e218b..d849458754 100644 --- a/sync/src/tests/net_time_checker.rs +++ b/sync/src/tests/net_time_checker.rs @@ -2,8 +2,8 @@ use crate::net_time_checker::{NetTimeChecker, NetTimeProtocol, TOLERANT_OFFSET}; use ckb_app_config::NetworkConfig; use ckb_network::{ multiaddr::{Multiaddr, Protocol}, - CKBProtocol, DefaultExitHandler, EventHandler, NetworkState, ServiceBuilder, ServiceControl, - SessionId, SupportProtocols, TargetProtocol, + CKBProtocol, EventHandler, NetworkState, ServiceBuilder, ServiceControl, SessionId, + SupportProtocols, TargetProtocol, }; use std::{ borrow::Cow, @@ -102,10 +102,7 @@ fn net_service_start() -> Node { .key_pair(network_state.local_private_key().clone()) .upnp(config.upnp) .forever(true) - .build(EventHandler::new( - Arc::clone(&network_state), - DefaultExitHandler::default(), - )); + .build(EventHandler::new(Arc::clone(&network_state))); let peer_id = network_state.local_peer_id().clone(); diff --git a/sync/src/types/header_map/mod.rs b/sync/src/types/header_map/mod.rs index 5210e482f6..78939164b6 100644 --- a/sync/src/types/header_map/mod.rs +++ b/sync/src/types/header_map/mod.rs @@ -1,10 +1,11 @@ use ckb_async_runtime::Handle; -use ckb_stop_handler::{SignalSender, StopHandler}; +use ckb_logger::info; +use ckb_stop_handler::{new_tokio_exit_rx, CancellationToken}; use ckb_types::packed::Byte32; use std::sync::Arc; use std::time::Duration; use std::{mem::size_of, path}; -use tokio::sync::oneshot; + use tokio::time::MissedTickBehavior; mod backend; @@ -21,13 +22,6 @@ use super::HeaderIndexView; pub struct HeaderMap { inner: Arc>, - stop: StopHandler<()>, -} - -impl Drop for HeaderMap { - fn drop(&mut self) { - self.stop.try_send(()); - } } const INTERVAL: Duration = Duration::from_millis(500); @@ -51,7 +45,7 @@ impl HeaderMap { let size_limit = memory_limit / ITEM_BYTES_SIZE; let inner = Arc::new(HeaderMapKernel::new(tmpdir, size_limit)); let map = Arc::clone(&inner); - let (stop, mut stop_rx) = oneshot::channel::<()>(); + let stop_rx: CancellationToken = new_tokio_exit_rx(); async_handle.spawn(async move { let mut interval = tokio::time::interval(INTERVAL); @@ -61,15 +55,15 @@ impl HeaderMap { _ = interval.tick() => { map.limit_memory(); } - _ = &mut stop_rx => break, + _ = stop_rx.cancelled() => { + info!("HeaderMap limit_memory received exit signal, exit now"); + break + }, } } }); - Self { - inner, - stop: StopHandler::new(SignalSender::Tokio(stop), None, "HeaderMap".to_string()), - } + Self { inner } } pub(crate) fn contains_key(&self, hash: &Byte32) -> bool { diff --git a/test/src/net.rs b/test/src/net.rs index 5cf87930ad..56c4f5676e 100644 --- a/test/src/net.rs +++ b/test/src/net.rs @@ -7,8 +7,8 @@ use ckb_channel::{self as channel, unbounded, Receiver, RecvTimeoutError, Sender use ckb_logger::info; use ckb_network::{ async_trait, bytes::Bytes, extract_peer_id, CKBProtocol, CKBProtocolContext, - CKBProtocolHandler, DefaultExitHandler, Flags, NetworkController, NetworkService, NetworkState, - PeerIndex, ProtocolId, SupportProtocols, + CKBProtocolHandler, Flags, NetworkController, NetworkService, NetworkState, PeerIndex, + ProtocolId, SupportProtocols, }; use ckb_util::Mutex; use std::collections::HashMap; @@ -63,7 +63,7 @@ impl Net { ) }) .collect(); - let (async_handle, async_runtime) = new_global_runtime(); + let (async_handle, _handle_recv, async_runtime) = new_global_runtime(); let controller = NetworkService::new( Arc::clone(&network_state), ckb_protocols, @@ -73,7 +73,6 @@ impl Net { "0.1.0".to_string(), Flags::COMPATIBILITY, ), - DefaultExitHandler::default(), ) .start(&async_handle) .unwrap(); diff --git a/tx-pool/Cargo.toml b/tx-pool/Cargo.toml index 5fd3bf0503..bc425d696c 100644 --- a/tx-pool/Cargo.toml +++ b/tx-pool/Cargo.toml @@ -39,6 +39,7 @@ hyper = { version = "0.14", features = ["http1", "client", "tcp"] } multi_index_map = "0.5.0" slab = "0.4" rustc-hash = "1.1" +tokio-util = "0.7.8" [dev-dependencies] tempfile.workspace = true diff --git a/tx-pool/src/chunk_process.rs b/tx-pool/src/chunk_process.rs index c86a2966cb..b35e547a21 100644 --- a/tx-pool/src/chunk_process.rs +++ b/tx-pool/src/chunk_process.rs @@ -4,6 +4,7 @@ use crate::try_or_return_with_snapshot; use crate::{error::Reject, service::TxPoolService}; use ckb_chain_spec::consensus::Consensus; use ckb_error::Error; +use ckb_logger::info; use ckb_snapshot::Snapshot; use ckb_store::data_loader_wrapper::AsDataLoader; use ckb_traits::{CellDataProvider, ExtensionProvider, HeaderProvider}; @@ -21,6 +22,7 @@ use std::sync::Arc; use tokio::sync::watch; use tokio::sync::RwLock; use tokio::task::block_in_place; +use tokio_util::sync::CancellationToken; const MIN_STEP_CYCLE: Cycle = 10_000_000; @@ -41,15 +43,15 @@ enum State { pub(crate) struct ChunkProcess { service: TxPoolService, recv: watch::Receiver, - signal: watch::Receiver, current_state: ChunkCommand, + signal: CancellationToken, } impl ChunkProcess { pub fn new( service: TxPoolService, recv: watch::Receiver, - signal: watch::Receiver, + signal: CancellationToken, ) -> Self { ChunkProcess { service, @@ -73,7 +75,10 @@ impl ChunkProcess { } } }, - _ = self.signal.changed() => break, + _ = self.signal.cancelled() => { + info!("TxPool received exit signal, exit now"); + break + }, _ = interval.tick() => { if matches!(self.current_state, ChunkCommand::Resume) { let stop = self.try_process().await; @@ -136,7 +141,7 @@ impl ChunkProcess { let mut tmp_state: Option = None; let completed: Cycle = loop { - if self.signal.has_changed().unwrap_or(false) { + if self.signal.is_cancelled() { return Ok(State::Stopped); } if self.recv.has_changed().unwrap_or(false) { diff --git a/tx-pool/src/process.rs b/tx-pool/src/process.rs index 678a9fbad5..0013d5cf33 100644 --- a/tx-pool/src/process.rs +++ b/tx-pool/src/process.rs @@ -916,7 +916,7 @@ impl TxPoolService { } } - pub(crate) async fn save_pool(&mut self) { + pub(crate) async fn save_pool(&self) { let mut tx_pool = self.tx_pool.write().await; if let Err(err) = tx_pool.save_into_file() { error!("failed to save pool, error: {:?}", err) diff --git a/tx-pool/src/service.rs b/tx-pool/src/service.rs index a4184809a1..f405129201 100644 --- a/tx-pool/src/service.rs +++ b/tx-pool/src/service.rs @@ -17,7 +17,7 @@ use ckb_logger::error; use ckb_logger::info; use ckb_network::{NetworkController, PeerIndex}; use ckb_snapshot::Snapshot; -use ckb_stop_handler::{SignalSender, StopHandler, WATCH_INIT}; +use ckb_stop_handler::new_tokio_exit_rx; use ckb_types::core::tx_pool::{TransactionWithStatus, TxStatus}; use ckb_types::{ core::{ @@ -37,6 +37,7 @@ use std::time::Duration; use tokio::sync::watch; use tokio::sync::{mpsc, RwLock}; use tokio::task::block_in_place; +use tokio_util::sync::CancellationToken; #[cfg(feature = "internal")] use crate::{component::entry::TxEntry, process::PlugTarget}; @@ -128,18 +129,9 @@ pub struct TxPoolController { reorg_sender: mpsc::Sender>, chunk_tx: Arc>, handle: Handle, - stop: StopHandler<()>, started: Arc, } -impl Drop for TxPoolController { - fn drop(&mut self) { - if self.service_started() { - self.stop.try_send(()); - } - } -} - macro_rules! send_message { ($self:ident, $msg_type:ident, $args:expr) => {{ let (responder, response) = oneshot::channel(); @@ -378,7 +370,7 @@ pub struct TxPoolServiceBuilder { pub(crate) callbacks: Callbacks, pub(crate) receiver: mpsc::Receiver, pub(crate) reorg_receiver: mpsc::Receiver>, - pub(crate) signal_receiver: watch::Receiver, + pub(crate) signal_receiver: CancellationToken, pub(crate) handle: Handle, pub(crate) tx_relay_sender: ckb_channel::Sender, pub(crate) chunk_rx: watch::Receiver, @@ -403,22 +395,16 @@ impl TxPoolServiceBuilder { let (sender, receiver) = mpsc::channel(DEFAULT_CHANNEL_SIZE); let block_assembler_channel = mpsc::channel(BLOCK_ASSEMBLER_CHANNEL_SIZE); let (reorg_sender, reorg_receiver) = mpsc::channel(DEFAULT_CHANNEL_SIZE); - let (signal_sender, signal_receiver) = watch::channel(WATCH_INIT); + let signal_receiver: CancellationToken = new_tokio_exit_rx(); let (chunk_tx, chunk_rx) = watch::channel(ChunkCommand::Resume); let chunk = Arc::new(RwLock::new(ChunkQueue::new())); let started = Arc::new(AtomicBool::new(false)); - let stop = StopHandler::new( - SignalSender::Watch(signal_sender), - None, - "tx-pool".to_string(), - ); let controller = TxPoolController { sender, reorg_sender, handle: handle.clone(), chunk_tx: Arc::new(chunk_tx), - stop, started: Arc::clone(&started), }; @@ -515,7 +501,7 @@ impl TxPoolServiceBuilder { let handle_clone = self.handle.clone(); let process_service = service.clone(); - let mut signal_receiver = self.signal_receiver.clone(); + let signal_receiver = self.signal_receiver.clone(); self.handle.spawn(async move { loop { tokio::select! { @@ -523,7 +509,11 @@ impl TxPoolServiceBuilder { let service_clone = process_service.clone(); handle_clone.spawn(process(service_clone, message)); }, - _ = signal_receiver.changed() => break, + _ = signal_receiver.cancelled() => { + info!("TxPool is saving, please wait..."); + process_service.save_pool().await; + break + }, else => break, } } @@ -531,7 +521,7 @@ impl TxPoolServiceBuilder { let process_service = service.clone(); if let Some(ref block_assembler) = service.block_assembler { - let mut signal_receiver = self.signal_receiver.clone(); + let signal_receiver = self.signal_receiver.clone(); let interval = Duration::from_millis(block_assembler.config.update_interval_millis); if interval.is_zero() { // block_assembler.update_interval_millis set zero interval should only be used for tests, @@ -547,7 +537,10 @@ impl TxPoolServiceBuilder { let service_clone = process_service.clone(); block_assembler::process(service_clone, &message).await; }, - _ = signal_receiver.changed() => break, + _ = signal_receiver.cancelled() => { + info!("TxPool received exit signal, exit now"); + break + }, else => break, } } @@ -579,7 +572,10 @@ impl TxPoolServiceBuilder { } queue.clear(); } - _ = signal_receiver.changed() => break, + _ = signal_receiver.cancelled() => { + info!("TxPool received exit signal, exit now"); + break + }, else => break, } } @@ -587,7 +583,7 @@ impl TxPoolServiceBuilder { } } - let mut signal_receiver = self.signal_receiver; + let signal_receiver = self.signal_receiver; self.handle.spawn(async move { loop { tokio::select! { @@ -614,7 +610,10 @@ impl TxPoolServiceBuilder { service.update_block_assembler_after_tx_pool_reorg().await; }, - _ = signal_receiver.changed() => break, + _ = signal_receiver.cancelled() => { + info!("TxPool received exit signal, exit now"); + break + }, else => break, } } diff --git a/util/channel/src/lib.rs b/util/channel/src/lib.rs index 90755a11f3..a250f5f104 100644 --- a/util/channel/src/lib.rs +++ b/util/channel/src/lib.rs @@ -1,7 +1,7 @@ //! Reexports `crossbeam_channel` to uniform the dependency version. pub use crossbeam_channel::{ - bounded, select, unbounded, Receiver, RecvError, RecvTimeoutError, Select, SendError, Sender, - TrySendError, + after, bounded, select, tick, unbounded, Receiver, RecvError, RecvTimeoutError, Select, + SendError, Sender, TrySendError, }; pub mod oneshot { diff --git a/util/indexer/src/service.rs b/util/indexer/src/service.rs index d104eb3646..ef8f876aee 100644 --- a/util/indexer/src/service.rs +++ b/util/indexer/src/service.rs @@ -7,7 +7,7 @@ use crate::store::{IteratorDirection, RocksdbStore, SecondaryDB, Store}; use crate::error::Error; use ckb_app_config::{DBConfig, IndexerConfig}; use ckb_async_runtime::{ - tokio::{self, sync::watch, time}, + tokio::{self, time}, Handle, }; use ckb_db_schema::{COLUMN_BLOCK_BODY, COLUMN_BLOCK_HEADER, COLUMN_INDEX, COLUMN_META}; @@ -18,7 +18,7 @@ use ckb_jsonrpc_types::{ }; use ckb_logger::{error, info}; use ckb_notify::NotifyController; -use ckb_stop_handler::{SignalSender, StopHandler, WATCH_INIT}; +use ckb_stop_handler::{new_tokio_exit_rx, CancellationToken}; use ckb_store::ChainStore; use ckb_types::{core, packed, prelude::*, H256}; use rocksdb::{prelude::*, Direction, IteratorMode}; @@ -39,8 +39,6 @@ pub struct IndexerService { pool: Option>>, poll_interval: Duration, async_handle: Handle, - stop_handler: StopHandler<()>, - stop: watch::Receiver, block_filter: Option, cell_filter: Option, } @@ -48,13 +46,6 @@ pub struct IndexerService { impl IndexerService { /// Construct new Indexer service instance from DBConfig and IndexerConfig pub fn new(ckb_db_config: &DBConfig, config: &IndexerConfig, async_handle: Handle) -> Self { - let (stop_sender, stop) = watch::channel(WATCH_INIT); - let stop_handler = StopHandler::new( - SignalSender::Watch(stop_sender), - None, - "indexer".to_string(), - ); - let store_opts = Self::indexer_store_options(config); let store = RocksdbStore::new(&store_opts, &config.store); let pool = if config.index_tx_pool { @@ -82,8 +73,6 @@ impl IndexerService { secondary_db, pool, async_handle, - stop_handler, - stop, poll_interval: Duration::from_secs(config.poll_interval), block_filter: config.block_filter.clone(), cell_filter: config.cell_filter.clone(), @@ -98,14 +87,13 @@ impl IndexerService { IndexerHandle { store: self.store.clone(), pool: self.pool.clone(), - stop_handler: self.stop_handler.clone(), } } /// Processes that handle index pool transaction and expect to be spawned to run in tokio runtime pub fn index_tx_pool(&self, notify_controller: NotifyController) { let service = self.clone(); - let mut stop = self.stop.clone(); + let stop: CancellationToken = new_tokio_exit_rx(); self.async_handle.spawn(async move { let mut new_transaction_receiver = notify_controller @@ -129,7 +117,10 @@ impl IndexerService { .transaction_rejected(&tx_entry.transaction); } } - _ = stop.changed() => break, + _ = stop.cancelled() => { + info!("Indexer received exit signal, exit now"); + break + }, else => break, } } @@ -183,7 +174,7 @@ impl IndexerService { let initial_syncing = self .async_handle .spawn_blocking(move || initial_service.try_loop_sync()); - let mut stop = self.stop.clone(); + let stop: CancellationToken = new_tokio_exit_rx(); let async_handle = self.async_handle.clone(); let poll_service = self.clone(); self.async_handle.spawn(async move { @@ -212,7 +203,10 @@ impl IndexerService { error!("ckb indexer syncing join error {:?}", e); } } - _ = stop.changed() => break, + _ = stop.cancelled() => { + info!("Indexer received exit signal, exit now"); + break + }, } } }); @@ -262,13 +256,6 @@ impl IndexerService { pub struct IndexerHandle { pub(crate) store: RocksdbStore, pub(crate) pool: Option>>, - stop_handler: StopHandler<()>, -} - -impl Drop for IndexerHandle { - fn drop(&mut self) { - self.stop_handler.try_send(()); - } } impl IndexerHandle { @@ -984,11 +971,9 @@ mod tests { let store = new_store("rpc"); let pool = Arc::new(RwLock::new(Pool::default())); let indexer = Indexer::new(store.clone(), 10, 100, None, CustomFilters::new(None, None)); - let stop_handler = StopHandler::new(SignalSender::Dummy, None, "indexer-test".to_string()); let rpc = IndexerHandle { store, pool: Some(Arc::clone(&pool)), - stop_handler, }; // setup test data @@ -1573,12 +1558,7 @@ mod tests { fn script_search_mode_rpc() { let store = new_store("script_search_mode_rpc"); let indexer = Indexer::new(store.clone(), 10, 100, None, CustomFilters::new(None, None)); - let stop_handler = StopHandler::new(SignalSender::Dummy, None, "indexer-test".to_string()); - let rpc = IndexerHandle { - store, - pool: None, - stop_handler, - }; + let rpc = IndexerHandle { store, pool: None }; // setup test data let lock_script1 = ScriptBuilder::default() diff --git a/util/launcher/Cargo.toml b/util/launcher/Cargo.toml index 35aa2a90e3..a1ec81525d 100644 --- a/util/launcher/Cargo.toml +++ b/util/launcher/Cargo.toml @@ -39,7 +39,6 @@ ckb-freezer = { path = "../../freezer", version = "= 0.112.0-pre" } ckb-notify = { path = "../../notify", version = "= 0.112.0-pre" } ckb-snapshot = { path = "../snapshot", version = "= 0.112.0-pre" } ckb-tx-pool = { path = "../../tx-pool", version = "= 0.112.0-pre" } -ckb-stop-handler = { path = "../stop-handler", version = "= 0.112.0-pre" } ckb-light-client-protocol-server = { path = "../light-client-protocol-server", version = "= 0.112.0-pre" } ckb-block-filter = { path = "../../block-filter", version = "= 0.112.0-pre" } ckb-hash = { path = "../hash", version = "= 0.112.0-pre" } diff --git a/util/launcher/src/lib.rs b/util/launcher/src/lib.rs index 96259acb5b..ad56947f35 100644 --- a/util/launcher/src/lib.rs +++ b/util/launcher/src/lib.rs @@ -23,15 +23,15 @@ use ckb_jsonrpc_types::ScriptHashType; use ckb_light_client_protocol_server::LightClientProtocol; use ckb_logger::info; use ckb_network::{ - observe_listen_port_occupancy, CKBProtocol, DefaultExitHandler, Flags, NetworkController, - NetworkService, NetworkState, SupportProtocols, + observe_listen_port_occupancy, CKBProtocol, Flags, NetworkController, NetworkService, + NetworkState, SupportProtocols, }; use ckb_network_alert::alert_relayer::AlertRelayer; use ckb_proposal_table::ProposalTable; use ckb_resource::Resource; use ckb_rpc::{RpcServer, ServiceBuilder}; use ckb_shared::Shared; -use ckb_stop_handler::StopHandler; + use ckb_store::{ChainDB, ChainStore}; use ckb_sync::{BlockFilter, NetTimeProtocol, Relayer, SyncShared, Synchronizer}; use ckb_tx_pool::service::TxVerificationResult; @@ -250,7 +250,7 @@ impl Launcher { } /// start block filter service - pub fn start_block_filter(&self, shared: &Shared) -> Option> { + pub fn start_block_filter(&self, shared: &Shared) { if self .args .config @@ -258,9 +258,7 @@ impl Launcher { .support_protocols .contains(&SupportProtocol::Filter) { - Some(BlockFilterService::new(shared.clone()).start()) - } else { - None + BlockFilterService::new(shared.clone()).start(); } } @@ -269,7 +267,6 @@ impl Launcher { &self, shared: &Shared, chain_controller: ChainController, - exit_handler: &DefaultExitHandler, miner_enable: bool, relay_tx_receiver: Receiver, ) -> (NetworkController, RpcServer) { @@ -383,7 +380,6 @@ impl Launcher { self.version.to_string(), flags, ), - exit_handler.clone(), ) .start(shared.async_handle()) .expect("Start network service failed"); diff --git a/util/launcher/src/shared_builder.rs b/util/launcher/src/shared_builder.rs index 7c1a096953..09f9fd862c 100644 --- a/util/launcher/src/shared_builder.rs +++ b/util/launcher/src/shared_builder.rs @@ -19,7 +19,7 @@ use ckb_proposal_table::ProposalTable; use ckb_proposal_table::ProposalView; use ckb_shared::Shared; use ckb_snapshot::{Snapshot, SnapshotMgr}; -use ckb_stop_handler::StopHandler; + use ckb_store::ChainDB; use ckb_store::ChainStore; use ckb_tx_pool::{ @@ -151,7 +151,7 @@ impl SharedBuilder { thread_local! { // NOTICE:we can't put the runtime directly into thread_local here, // on windows the runtime in thread_local will get stuck when dropping - static RUNTIME_HANDLE: unsync::OnceCell<(Handle, StopHandler<()>)> = unsync::OnceCell::new(); + static RUNTIME_HANDLE: unsync::OnceCell = unsync::OnceCell::new(); } static DB_COUNT: AtomicUsize = AtomicUsize::new(0); @@ -177,11 +177,7 @@ impl SharedBuilder { notify_config: None, store_config: None, block_assembler_config: None, - async_handle: runtime - .borrow() - .get_or_init(new_background_runtime) - .0 - .clone(), + async_handle: runtime.borrow().get_or_init(new_background_runtime).clone(), }) } } diff --git a/util/light-client-protocol-server/src/tests/utils/chain.rs b/util/light-client-protocol-server/src/tests/utils/chain.rs index a87805c486..fcfd483a86 100644 --- a/util/light-client-protocol-server/src/tests/utils/chain.rs +++ b/util/light-client-protocol-server/src/tests/utils/chain.rs @@ -9,7 +9,7 @@ use ckb_chain_spec::consensus::{build_genesis_epoch_ext, ConsensusBuilder}; use ckb_dao_utils::genesis_dao_data; use ckb_jsonrpc_types::ScriptHashType; use ckb_launcher::SharedBuilder; -use ckb_network::{DefaultExitHandler, Flags, NetworkController, NetworkService, NetworkState}; +use ckb_network::{Flags, NetworkController, NetworkService, NetworkState}; use ckb_shared::Shared; use ckb_systemtime::unix_time_as_millis; use ckb_test_chain_utils::always_success_cell; @@ -242,7 +242,6 @@ fn dummy_network(shared: &Shared) -> NetworkController { "test".to_string(), Flags::all(), ), - DefaultExitHandler::default(), ) .start(shared.async_handle()) .expect("Start network service failed") diff --git a/util/metrics-service/Cargo.toml b/util/metrics-service/Cargo.toml index 0bd9c7d9bd..20c6ecf941 100644 --- a/util/metrics-service/Cargo.toml +++ b/util/metrics-service/Cargo.toml @@ -16,3 +16,4 @@ ckb-async-runtime = { path = "../runtime", version = "= 0.112.0-pre" } ckb-util = { path = "..", version = "= 0.112.0-pre" } prometheus = "0.13.3" hyper = { version = "0.14", features = ["http1", "tcp", "server"] } +ckb-stop-handler = { path = "../stop-handler", version = "= 0.112.0-pre" } diff --git a/util/metrics-service/src/lib.rs b/util/metrics-service/src/lib.rs index 6d7fb3ebfc..1b88171de0 100644 --- a/util/metrics-service/src/lib.rs +++ b/util/metrics-service/src/lib.rs @@ -10,7 +10,9 @@ use hyper::{ use prometheus::Encoder as _; use ckb_async_runtime::Handle; +use ckb_logger::info; use ckb_metrics_config::{Config, Exporter, Target}; +use ckb_stop_handler::{new_tokio_exit_rx, CancellationToken}; use ckb_util::strings; /// Ensures the metrics service can shutdown gracefully. @@ -59,7 +61,13 @@ fn run_exporter(exporter: Exporter, handle: &Handle) -> Result<(), String> { }); ckb_logger::info!("start prometheus exporter at {}", addr); handle.spawn(async move { - let server = Server::bind(&addr).serve(make_svc); + let server = Server::bind(&addr) + .serve(make_svc) + .with_graceful_shutdown(async { + let exit_rx: CancellationToken = new_tokio_exit_rx(); + exit_rx.cancelled().await; + info!("prometheus server received exit signal, exit now"); + }); if let Err(err) = server.await { ckb_logger::error!("prometheus server error: {}", err); } diff --git a/util/network-alert/Cargo.toml b/util/network-alert/Cargo.toml index 035d01fc3b..c42f154094 100644 --- a/util/network-alert/Cargo.toml +++ b/util/network-alert/Cargo.toml @@ -25,7 +25,6 @@ semver = "1.0" [dev-dependencies] ckb-crypto = { path = "../crypto", version = "= 0.112.0-pre" } ckb-async-runtime = { path = "../runtime", version = "= 0.112.0-pre" } -ckb-stop-handler = { path = "../stop-handler", version = "= 0.112.0-pre" } once_cell = "1.8.0" ckb-systemtime = {path = "../systemtime", version = "= 0.112.0-pre", features = ["enable_faketime"]} faster-hex = "0.6" diff --git a/util/network-alert/src/tests/test_notifier.rs b/util/network-alert/src/tests/test_notifier.rs index 6b6a1f9bd8..fc793dc5e8 100644 --- a/util/network-alert/src/tests/test_notifier.rs +++ b/util/network-alert/src/tests/test_notifier.rs @@ -1,7 +1,7 @@ use crate::notifier::Notifier; use ckb_async_runtime::{new_background_runtime, Handle}; use ckb_notify::NotifyService; -use ckb_stop_handler::StopHandler; + use ckb_types::{packed, prelude::*}; use once_cell::unsync; use std::borrow::Borrow; @@ -27,17 +27,13 @@ fn new_notifier(version: &str) -> Notifier { thread_local! { // NOTICE:we can't put the runtime directly into thread_local here, // on windows the runtime in thread_local will get stuck when dropping - static RUNTIME_HANDLE: unsync::OnceCell<(Handle, StopHandler<()>)> = unsync::OnceCell::new(); + static RUNTIME_HANDLE: unsync::OnceCell = unsync::OnceCell::new(); } let notify_controller = RUNTIME_HANDLE.with(|runtime| { NotifyService::new( Default::default(), - runtime - .borrow() - .get_or_init(new_background_runtime) - .0 - .clone(), + runtime.borrow().get_or_init(new_background_runtime).clone(), ) .start() }); diff --git a/util/runtime/Cargo.toml b/util/runtime/Cargo.toml index 0dd064eb1b..6e01a931dd 100644 --- a/util/runtime/Cargo.toml +++ b/util/runtime/Cargo.toml @@ -10,6 +10,5 @@ repository = "https://github.com/nervosnetwork/ckb" [dependencies] tokio = { version = "1", features = ["full"] } -ckb-stop-handler = { path = "../stop-handler", version = "= 0.112.0-pre" } ckb-logger = { path = "../logger", version = "= 0.112.0-pre" } ckb-spawn = { path = "../spawn", version = "= 0.112.0-pre" } diff --git a/util/runtime/src/lib.rs b/util/runtime/src/lib.rs index 6984ded8e4..17a60a9d53 100644 --- a/util/runtime/src/lib.rs +++ b/util/runtime/src/lib.rs @@ -1,17 +1,17 @@ //! Utilities for tokio runtime. use ckb_spawn::Spawn; -use ckb_stop_handler::{SignalSender, StopHandler}; use core::future::Future; use std::sync::atomic::{AtomicU32, Ordering}; -use std::thread; + use tokio::runtime::Builder; use tokio::runtime::Handle as TokioHandle; -use tokio::sync::oneshot; + use tokio::task::JoinHandle; pub use tokio; pub use tokio::runtime::Runtime; +use tokio::sync::mpsc::{Receiver, Sender}; // Handle is a newtype wrap and unwrap tokio::Handle, it is workaround with Rust Orphan Rules. // We need `Handle` impl ckb spawn trait decouple tokio dependence @@ -20,6 +20,19 @@ pub use tokio::runtime::Runtime; #[derive(Debug, Clone)] pub struct Handle { pub(crate) inner: TokioHandle, + guard: Option>, +} + +impl Handle { + /// Create a new Handle + pub fn new(inner: TokioHandle, guard: Option>) -> Self { + Self { inner, guard } + } + + /// Drop the guard + pub fn drop_guard(&mut self) { + let _ = self.guard.take(); + } } impl Handle { @@ -42,7 +55,15 @@ impl Handle { F: Future + Send + 'static, F::Output: Send + 'static, { - self.inner.spawn(future) + let tokio_task_guard = self.guard.clone(); + + self.inner.spawn(async move { + // move tokio_task_guard into the spawned future + // so that it will be dropped when the future is finished + let _guard = tokio_task_guard; + + future.await + }) } /// Run a future to completion on the Tokio runtime from a synchronous context. @@ -101,32 +122,31 @@ fn new_runtime() -> Runtime { } /// Create new threaded_scheduler tokio Runtime, return `Runtime` -pub fn new_global_runtime() -> (Handle, Runtime) { +pub fn new_global_runtime() -> (Handle, Receiver<()>, Runtime) { let runtime = new_runtime(); let handle = runtime.handle().clone(); + let (guard, handle_stop_rx): (Sender<()>, Receiver<()>) = tokio::sync::mpsc::channel::<()>(1); - (Handle { inner: handle }, runtime) + (Handle::new(handle, Some(guard)), handle_stop_rx, runtime) } /// Create new threaded_scheduler tokio Runtime, return `Handle` and background thread join handle, /// NOTICE: This is only used in testing -pub fn new_background_runtime() -> (Handle, StopHandler<()>) { +pub fn new_background_runtime() -> Handle { let runtime = new_runtime(); let handle = runtime.handle().clone(); - let (tx, rx) = oneshot::channel(); - let thread = thread::Builder::new() + let (guard, mut handle_stop_rx): (Sender<()>, Receiver<()>) = + tokio::sync::mpsc::channel::<()>(1); + let _thread = std::thread::Builder::new() .name("GlobalRtBuilder".to_string()) .spawn(move || { - let ret = runtime.block_on(rx); + let ret = runtime.block_on(async move { handle_stop_rx.recv().await }); ckb_logger::debug!("global runtime finish {:?}", ret); }) .expect("tokio runtime started"); - ( - Handle { inner: handle }, - StopHandler::new(SignalSender::Tokio(tx), Some(thread), "GT".to_string()), - ) + Handle::new(handle, Some(guard)) } impl Spawn for Handle { diff --git a/util/stop-handler/Cargo.toml b/util/stop-handler/Cargo.toml index 996648b6ba..4ff8f9d529 100644 --- a/util/stop-handler/Cargo.toml +++ b/util/stop-handler/Cargo.toml @@ -9,7 +9,16 @@ homepage = "https://github.com/nervosnetwork/ckb" repository = "https://github.com/nervosnetwork/ckb" [dependencies] -parking_lot = "0.12" ckb-logger = { path = "../logger", version = "= 0.112.0-pre" } tokio = { version = "1", features = ["sync", "rt-multi-thread"] } ckb-channel = { path = "../channel", version = "= 0.112.0-pre" } +ckb-util = { path = "..", version = "= 0.112.0-pre" } +once_cell = "1.8.0" +ckb-async-runtime = { path = "../runtime", version = "= 0.112.0-pre" } +tokio-util = "0.7.8" + + +[dev-dependencies] +ctrlc = { version = "3.1", features = ["termination"] } +libc = "0.2" +rand = "0.8.5" From 97f7766518676547cf9e8bbe76e2efab49d69ab3 Mon Sep 17 00:00:00 2001 From: Eval EXEC Date: Tue, 20 Jun 2023 17:04:55 +0800 Subject: [PATCH 04/11] exit when `ProtocolHandleErrorKind::AbnormallyClosed` received Signed-off-by: Eval EXEC --- network/src/network.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/network/src/network.rs b/network/src/network.rs index c9436aa28d..eaf3f7576b 100644 --- a/network/src/network.rs +++ b/network/src/network.rs @@ -654,7 +654,9 @@ impl ServiceHandle for EventHandler { ) }, ); - self.exit_handler.notify_exit(); + error!("ProtocolHandleError: AbnormallyClosed, proto_id: {opt_session_id:?}, session id: {opt_session_id:?}"); + + broadcast_exit_signals(); } } } From c13a089f66eda200123ea22ab6b2a262d521cf73 Mon Sep 17 00:00:00 2001 From: Eval EXEC Date: Tue, 20 Jun 2023 17:55:56 +0800 Subject: [PATCH 05/11] Fix BlockDownload thread exit Signed-off-by: Eval EXEC --- Cargo.lock | 5 -- sync/src/synchronizer/mod.rs | 104 ++++++++++++++----------- util/stop-handler/src/stop_register.rs | 20 +++-- 3 files changed, 71 insertions(+), 58 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index eb5351f95d..afdd099c92 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -804,7 +804,6 @@ dependencies = [ "ckb-rpc", "ckb-shared", "ckb-snapshot", - "ckb-stop-handler", "ckb-store", "ckb-sync", "ckb-systemtime", @@ -1038,7 +1037,6 @@ dependencies = [ "ckb-multisig", "ckb-network", "ckb-notify", - "ckb-stop-handler", "ckb-systemtime", "ckb-types", "ckb-util", @@ -1179,7 +1177,6 @@ dependencies = [ "ckb-pow", "ckb-reward-calculator", "ckb-shared", - "ckb-stop-handler", "ckb-store", "ckb-sync", "ckb-systemtime", @@ -1245,7 +1242,6 @@ dependencies = [ "arc-swap", "ckb-async-runtime", "ckb-chain-spec", - "ckb-channel", "ckb-constant", "ckb-db", "ckb-db-schema", @@ -1293,7 +1289,6 @@ dependencies = [ "ctrlc", "libc", "once_cell", - "parking_lot 0.12.1", "rand 0.8.5", "tokio", "tokio-util 0.7.8", diff --git a/sync/src/synchronizer/mod.rs b/sync/src/synchronizer/mod.rs index a2ab6cf46b..7341ce4bc8 100644 --- a/sync/src/synchronizer/mod.rs +++ b/sync/src/synchronizer/mod.rs @@ -27,6 +27,7 @@ use crate::{Status, StatusCode}; use ckb_chain::chain::ChainController; use ckb_channel as channel; +use ckb_channel::{select, Receiver}; use ckb_constant::sync::{ BAD_MESSAGE_BAN_TIME, CHAIN_SYNC_TIMEOUT, EVICTION_HEADERS_RESPONSE_TIME, INIT_BLOCKS_IN_TRANSIT_PER_PEER, MAX_TIP_AGE, @@ -37,7 +38,7 @@ use ckb_network::{ async_trait, bytes::Bytes, tokio, CKBProtocolContext, CKBProtocolHandler, PeerIndex, ServiceControl, SupportProtocols, }; -use ckb_stop_handler::register_thread; +use ckb_stop_handler::{new_crossbeam_exit_rx, register_thread}; use ckb_systemtime::unix_time_as_millis; use ckb_types::{ core::{self, BlockNumber}, @@ -87,58 +88,70 @@ struct BlockFetchCMD { } impl BlockFetchCMD { - fn run(&mut self) { - while let Ok(cmd) = self.recv.recv() { - match cmd { - FetchCMD::Fetch((peers, state)) => match self.can_start() { - CanStart::Ready => { - for peer in peers { - if let Some(fetch) = BlockFetcher::new(&self.sync, peer, state).fetch() - { - for item in fetch { - BlockFetchCMD::send_getblocks(item, &self.p2p_control, peer); - } + fn process_fetch_cmd(&mut self, cmd: FetchCMD) { + match cmd { + FetchCMD::Fetch((peers, state)) => match self.can_start() { + CanStart::Ready => { + for peer in peers { + if let Some(fetch) = BlockFetcher::new(&self.sync, peer, state).fetch() { + for item in fetch { + BlockFetchCMD::send_getblocks(item, &self.p2p_control, peer); } } } - CanStart::MinWorkNotReach => { - let best_known = self.sync.shared.state().shared_best_header_ref(); - let number = best_known.number(); - if number != self.number && (number - self.number) % 10000 == 0 { - self.number = number; - info!( - "best known header number: {}, total difficulty: {:#x}, \ + } + CanStart::MinWorkNotReach => { + let best_known = self.sync.shared.state().shared_best_header_ref(); + let number = best_known.number(); + if number != self.number && (number - self.number) % 10000 == 0 { + self.number = number; + info!( + "best known header number: {}, total difficulty: {:#x}, \ require min header number on 500_000, min total difficulty: {:#x}, \ then start to download block", - number, - best_known.total_difficulty(), - self.sync.shared.state().min_chain_work() - ); - } + number, + best_known.total_difficulty(), + self.sync.shared.state().min_chain_work() + ); } - CanStart::AssumeValidNotFound => { - let state = self.sync.shared.state(); - let best_known = state.shared_best_header_ref(); - let number = best_known.number(); - let assume_valid_target: Byte32 = state - .assume_valid_target() - .as_ref() - .map(Pack::pack) - .expect("assume valid target must exist"); - - if number != self.number && (number - self.number) % 10000 == 0 { - self.number = number; - info!( - "best known header number: {}, hash: {:#?}, \ + } + CanStart::AssumeValidNotFound => { + let state = self.sync.shared.state(); + let best_known = state.shared_best_header_ref(); + let number = best_known.number(); + let assume_valid_target: Byte32 = state + .assume_valid_target() + .as_ref() + .map(Pack::pack) + .expect("assume valid target must exist"); + + if number != self.number && (number - self.number) % 10000 == 0 { + self.number = number; + info!( + "best known header number: {}, hash: {:#?}, \ can't find assume valid target temporarily, hash: {:#?} \ please wait", - number, - best_known.hash(), - assume_valid_target - ); - } + number, + best_known.hash(), + assume_valid_target + ); } - }, + } + }, + } + } + fn run(&mut self, stop_signal: Receiver<()>) { + loop { + select! { + recv(self.recv) -> msg => { + if let Ok(cmd) = msg { + self.process_fetch_cmd(cmd) + } + } + recv(stop_signal) -> _ => { + info!("thread BlockDownload received exit signal, exit now"); + return; + } } } } @@ -630,6 +643,7 @@ impl Synchronizer { let blockdownload_jh = thread .name(THREAD_NAME.into()) .spawn(move || { + let stop_signal = new_crossbeam_exit_rx(); BlockFetchCMD { sync, p2p_control, @@ -637,7 +651,7 @@ impl Synchronizer { number, can_start: CanStart::MinWorkNotReach, } - .run(); + .run(stop_signal); }) .expect("download thread can't start"); register_thread(THREAD_NAME, blockdownload_jh); diff --git a/util/stop-handler/src/stop_register.rs b/util/stop-handler/src/stop_register.rs index e329565ae8..e496866383 100644 --- a/util/stop-handler/src/stop_register.rs +++ b/util/stop-handler/src/stop_register.rs @@ -1,4 +1,5 @@ -use ckb_logger::{info, trace, warn}; +use ckb_channel::TrySendError; +use ckb_logger::{error, info, trace, warn}; use ckb_util::Mutex; use tokio_util::sync::CancellationToken; @@ -54,13 +55,16 @@ pub fn new_crossbeam_exit_rx() -> ckb_channel::Receiver<()> { /// Broadcast exit signals to all threads and all tokio tasks pub fn broadcast_exit_signals() { TOKIO_EXIT.cancel(); - CROSSBEAM_EXIT_SENDERS.lock().iter().for_each(|tx| { - if let Err(e) = tx.try_send(()) { - println!("broadcast thread: ERROR: {:?}", e) - } else { - println!("send a crossbeam exit signal"); - } - }); + CROSSBEAM_EXIT_SENDERS + .lock() + .iter() + .for_each(|tx| match tx.try_send(()) { + Ok(_) => {} + Err(TrySendError::Full(_)) => error!("send exit signal to channel failed since the channel is full, this should not happen"), + Err(TrySendError::Disconnected(_)) => { + info!("broadcast thread: channel is disconnected") + } + }); } /// Register a thread `JoinHandle` to `CKB_HANDLES` From 7ad68101ef53a1c3369e75b3e1970ba9a11ba62b Mon Sep 17 00:00:00 2001 From: Eval EXEC Date: Mon, 26 Jun 2023 10:48:38 +0800 Subject: [PATCH 06/11] Re-order ckb workspace members --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 79cb81e4c3..4d0c0cd5fe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,8 +40,8 @@ members = [ "util/occupied-capacity/macros", "util/fixed-hash/macros", "util/logger-service", - "util/stop-handler", "util/runtime", + "util/stop-handler", "util/metrics", "util/metrics-service", "util/fixed-hash", From 4fcf72b177b6479d3d4cb66296c0595853a39c30 Mon Sep 17 00:00:00 2001 From: Eval EXEC Date: Mon, 3 Jul 2023 15:34:48 +0800 Subject: [PATCH 07/11] Fix bats test for ckb run Signed-off-by: Eval EXEC --- util/app-config/src/tests/ckb_run_replay.bats | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/util/app-config/src/tests/ckb_run_replay.bats b/util/app-config/src/tests/ckb_run_replay.bats index 6ca4dd5405..dc9f87ef18 100644 --- a/util/app-config/src/tests/ckb_run_replay.bats +++ b/util/app-config/src/tests/ckb_run_replay.bats @@ -24,7 +24,7 @@ function ckb_run { #@test run _ckb_run [ "$status" -eq 0 ] # assert_output --regexp "ckb_chain::chain.*block number:.*, hash:.*, size:.*, cycles:.*" - assert_output --regexp "ckb_bin::subcommand::run Finishing work, please wait" + assert_output --regexp "ckb_bin all tokio tasks have been stopped" } function ckb_replay { #@test From 0bb4ff57737a741211559a6446ee01c6acca6ae4 Mon Sep 17 00:00:00 2001 From: Eval EXEC Date: Wed, 12 Jul 2023 09:31:05 +0800 Subject: [PATCH 08/11] Only catch exit signal for `ckb run` --- ckb-bin/src/lib.rs | 33 ++++++++------------------------- ckb-bin/src/subcommand/run.rs | 7 ++++++- notify/src/lib.rs | 8 ++++---- 3 files changed, 18 insertions(+), 30 deletions(-) diff --git a/ckb-bin/src/lib.rs b/ckb-bin/src/lib.rs index 91c61b813d..c8373224d7 100644 --- a/ckb-bin/src/lib.rs +++ b/ckb-bin/src/lib.rs @@ -10,10 +10,8 @@ use ckb_async_runtime::new_global_runtime; use ckb_build_info::Version; use ckb_logger::info; use ckb_network::tokio; -use ckb_stop_handler::broadcast_exit_signals; use helper::raise_fd_limit; use setup_guard::SetupGuard; -use std::sync::Arc; #[cfg(feature = "with_sentry")] pub(crate) const LOG_TARGET_SENTRY: &str = "sentry"; @@ -66,18 +64,6 @@ pub fn run_app(version: Version) -> Result<(), ExitCode> { raise_fd_limit(); - // indicate whether the process is terminated by an exit signal - let caught_exit_signal = Arc::new(std::sync::atomic::AtomicBool::new(false)); - - ctrlc::set_handler({ - let caught_exit_signal = Arc::clone(&caught_exit_signal); - move || { - broadcast_exit_signals(); - caught_exit_signal.store(true, std::sync::atomic::Ordering::SeqCst); - } - }) - .expect("Error setting Ctrl-C handler"); - let ret = match cmd { cli::CMD_RUN => subcommand::run(setup.run(matches)?, version, handle.clone()), cli::CMD_MINER => subcommand::miner(setup.miner(matches)?, handle.clone()), @@ -90,18 +76,15 @@ pub fn run_app(version: Version) -> Result<(), ExitCode> { _ => unreachable!(), }; - if !caught_exit_signal.load(std::sync::atomic::Ordering::SeqCst) { - // if `subcommand` finish normally, and we didn't catch exit signal, broadcast exit signals - broadcast_exit_signals(); - } - - handle.drop_guard(); + if matches!(cmd, cli::CMD_RUN) { + handle.drop_guard(); - tokio::task::block_in_place(|| { - info!("waiting all tokio tasks done"); - handle_stop_rx.blocking_recv(); - info!("all tokio tasks have been stopped"); - }); + tokio::task::block_in_place(|| { + info!("waiting all tokio tasks done"); + handle_stop_rx.blocking_recv(); + info!("all tokio tasks have been stopped"); + }); + } ret } diff --git a/ckb-bin/src/subcommand/run.rs b/ckb-bin/src/subcommand/run.rs index cd7d8c6282..2ce70792b9 100644 --- a/ckb-bin/src/subcommand/run.rs +++ b/ckb-bin/src/subcommand/run.rs @@ -4,7 +4,7 @@ use ckb_async_runtime::Handle; use ckb_build_info::Version; use ckb_launcher::Launcher; use ckb_logger::info; -use ckb_stop_handler::wait_all_ckb_services_exit; +use ckb_stop_handler::{broadcast_exit_signals, wait_all_ckb_services_exit}; use ckb_types::core::cell::setup_system_cell_cache; @@ -55,6 +55,11 @@ pub fn run(args: RunArgs, version: Version, async_handle: Handle) -> Result<(), let tx_pool_builder = pack.take_tx_pool_builder(); tx_pool_builder.start(network_controller.non_owning_clone()); + ctrlc::set_handler(|| { + broadcast_exit_signals(); + }) + .expect("Error setting Ctrl-C handler"); + wait_all_ckb_services_exit(); Ok(()) diff --git a/notify/src/lib.rs b/notify/src/lib.rs index 2b9969ba96..b6a0172a8a 100644 --- a/notify/src/lib.rs +++ b/notify/src/lib.rs @@ -166,10 +166,6 @@ impl NotifyService { handle.spawn(async move { loop { tokio::select! { - _ = signal_receiver.cancelled() => { - info!("NotifyService received exit signal, exit now"); - break; - } Some(msg) = new_block_register_receiver.recv() => { self.handle_register_new_block(msg) }, Some(msg) = new_block_watcher_receiver.recv() => { self.handle_watch_new_block(msg) }, Some(msg) = new_block_receiver.recv() => { self.handle_notify_new_block(msg) }, @@ -181,6 +177,10 @@ impl NotifyService { Some(msg) = reject_transaction_receiver.recv() => { self.handle_notify_reject_transaction(msg) }, Some(msg) = network_alert_register_receiver.recv() => { self.handle_register_network_alert(msg) }, Some(msg) = network_alert_receiver.recv() => { self.handle_notify_network_alert(msg) }, + _ = signal_receiver.cancelled() => { + info!("NotifyService received exit signal, exit now"); + break; + } else => break, } } From 6d00f61b765188679678a56c2eddaa48a5b3ba04 Mon Sep 17 00:00:00 2001 From: Eval EXEC Date: Wed, 12 Jul 2023 09:54:28 +0800 Subject: [PATCH 09/11] Use debug level to print exit signal log --- block-filter/src/filter.rs | 4 ++-- chain/src/chain.rs | 8 ++++---- ckb-bin/src/helper.rs | 4 ++-- ckb-bin/src/lib.rs | 6 +++--- ckb-bin/src/subcommand/run.rs | 1 + miner/src/client.rs | 8 ++++---- miner/src/miner.rs | 2 +- network/src/network.rs | 2 +- notify/src/lib.rs | 4 ++-- sync/src/synchronizer/mod.rs | 2 +- sync/src/tests/synchronizer/functions.rs | 2 -- sync/src/types/header_map/mod.rs | 4 ++-- tx-pool/src/chunk_process.rs | 10 +++++----- tx-pool/src/process.rs | 2 ++ tx-pool/src/service.rs | 8 ++++---- util/indexer/src/service.rs | 6 +++--- util/metrics-service/src/lib.rs | 4 ++-- util/stop-handler/src/stop_register.rs | 8 ++++---- 18 files changed, 43 insertions(+), 42 deletions(-) diff --git a/block-filter/src/filter.rs b/block-filter/src/filter.rs index 56fcc92f6b..04e8a6566b 100644 --- a/block-filter/src/filter.rs +++ b/block-filter/src/filter.rs @@ -1,5 +1,5 @@ use ckb_async_runtime::tokio::{self, task::block_in_place}; -use ckb_logger::{debug, info, warn}; +use ckb_logger::{debug, warn}; use ckb_shared::Shared; use ckb_stop_handler::{new_tokio_exit_rx, CancellationToken}; use ckb_store::{ChainDB, ChainStore}; @@ -63,7 +63,7 @@ impl BlockFilter { new_block_watcher.borrow_and_update(); } _ = stop_rx.cancelled() => { - info!("BlockFilter received exit signal, exit now"); + debug!("BlockFilter received exit signal, exit now"); break }, else => break, diff --git a/chain/src/chain.rs b/chain/src/chain.rs index 8f7e5d8e3c..3323492032 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -243,10 +243,6 @@ impl ChainService { let chain_jh = thread_builder .spawn(move || loop { select! { - recv(signal_receiver) -> _ => { - info!("ChainService received exit signal, stopped"); - break; - }, recv(process_block_receiver) -> msg => match msg { Ok(Request { responder, arguments: (block, verify) }) => { let _ = tx_control.suspend_chunk_process(); @@ -268,6 +264,10 @@ impl ChainService { error!("truncate_receiver closed"); break; }, + }, + recv(signal_receiver) -> _ => { + debug!("ChainService received exit signal, exit now"); + break; } } }) diff --git a/ckb-bin/src/helper.rs b/ckb-bin/src/helper.rs index 21c93732b8..7dee9de15d 100644 --- a/ckb-bin/src/helper.rs +++ b/ckb-bin/src/helper.rs @@ -8,7 +8,7 @@ pub fn deadlock_detection() {} #[cfg(feature = "deadlock_detection")] pub fn deadlock_detection() { use ckb_channel::select; - use ckb_logger::warn; + use ckb_logger::{debug, warn}; use ckb_stop_handler::{new_crossbeam_exit_rx, register_thread}; use ckb_util::parking_lot::deadlock; use std::{thread, time::Duration}; @@ -36,7 +36,7 @@ pub fn deadlock_detection() { }, recv(stop_rx) -> _ =>{ - info!("deadlock_detection received exit signal, stopped"); + debug!("deadlock_detection received exit signal, stopped"); return; } } diff --git a/ckb-bin/src/lib.rs b/ckb-bin/src/lib.rs index c8373224d7..8596f504d0 100644 --- a/ckb-bin/src/lib.rs +++ b/ckb-bin/src/lib.rs @@ -8,7 +8,7 @@ mod subcommand; use ckb_app_config::{cli, ExitCode, Setup}; use ckb_async_runtime::new_global_runtime; use ckb_build_info::Version; -use ckb_logger::info; +use ckb_logger::{debug, info}; use ckb_network::tokio; use helper::raise_fd_limit; use setup_guard::SetupGuard; @@ -80,9 +80,9 @@ pub fn run_app(version: Version) -> Result<(), ExitCode> { handle.drop_guard(); tokio::task::block_in_place(|| { - info!("waiting all tokio tasks done"); + debug!("waiting all tokio tasks done"); handle_stop_rx.blocking_recv(); - info!("all tokio tasks have been stopped"); + info!("ckb shutdown"); }); } diff --git a/ckb-bin/src/subcommand/run.rs b/ckb-bin/src/subcommand/run.rs index 2ce70792b9..726a415944 100644 --- a/ckb-bin/src/subcommand/run.rs +++ b/ckb-bin/src/subcommand/run.rs @@ -56,6 +56,7 @@ pub fn run(args: RunArgs, version: Version, async_handle: Handle) -> Result<(), tx_pool_builder.start(network_controller.non_owning_clone()); ctrlc::set_handler(|| { + info!("Trapped exit signal, exiting..."); broadcast_exit_signals(); }) .expect("Error setting Ctrl-C handler"); diff --git a/miner/src/client.rs b/miner/src/client.rs index dda47570a7..c598f5af9a 100644 --- a/miner/src/client.rs +++ b/miner/src/client.rs @@ -4,7 +4,7 @@ use ckb_app_config::MinerClientConfig; use ckb_async_runtime::Handle; use ckb_channel::Sender; use ckb_jsonrpc_types::{Block as JsonBlock, BlockTemplate}; -use ckb_logger::{debug, error, info}; +use ckb_logger::{debug, error}; use ckb_stop_handler::{new_tokio_exit_rx, CancellationToken}; use ckb_types::{ packed::{Block, Byte32}, @@ -87,7 +87,7 @@ impl Rpc { }); }, _ = stop_rx.cancelled() => { - info!("Rpc server received exit signal, exit now"); + debug!("Rpc server received exit signal, exit now"); break }, else => break @@ -235,7 +235,7 @@ Otherwise ckb-miner does not work properly and will behave as it stopped committ let stop_rx: CancellationToken = new_tokio_exit_rx(); let graceful = server.with_graceful_shutdown(async move { stop_rx.cancelled().await; - info!("Miner client received exit signal, exit now"); + debug!("Miner client received exit signal, exit now"); }); if let Err(e) = graceful.await { @@ -255,7 +255,7 @@ Otherwise ckb-miner does not work properly and will behave as it stopped committ self.fetch_block_template().await; } _ = stop_rx.cancelled() => { - info!("Miner client pool_block_template received exit signal, exit now"); + debug!("Miner client pool_block_template received exit signal, exit now"); break }, else => break, diff --git a/miner/src/miner.rs b/miner/src/miner.rs index e14ffef72f..110fad5514 100644 --- a/miner/src/miner.rs +++ b/miner/src/miner.rs @@ -103,7 +103,7 @@ impl Miner { }, }, recv(stop_rx) -> _msg => { - info!("miner received exit signal, stopped"); + debug!("miner received exit signal, stopped"); break; } }; diff --git a/network/src/network.rs b/network/src/network.rs index eaf3f7576b..93bccc1a9f 100644 --- a/network/src/network.rs +++ b/network/src/network.rs @@ -1127,7 +1127,7 @@ impl NetworkService { loop { tokio::select! { _ = receiver.cancelled() => { - info!("NetworkService receive exit signal, start shutdown..."); + debug!("NetworkService receive exit signal, start shutdown..."); let _ = p2p_control.shutdown().await; // Drop senders to stop all corresponding background task drop(bg_signals); diff --git a/notify/src/lib.rs b/notify/src/lib.rs index b6a0172a8a..41d1451e04 100644 --- a/notify/src/lib.rs +++ b/notify/src/lib.rs @@ -1,7 +1,7 @@ //! TODO(doc): @quake use ckb_app_config::NotifyConfig; use ckb_async_runtime::Handle; -use ckb_logger::{debug, error, info, trace}; +use ckb_logger::{debug, error, trace}; use ckb_stop_handler::{new_tokio_exit_rx, CancellationToken}; use ckb_types::packed::Byte32; use ckb_types::{ @@ -178,7 +178,7 @@ impl NotifyService { Some(msg) = network_alert_register_receiver.recv() => { self.handle_register_network_alert(msg) }, Some(msg) = network_alert_receiver.recv() => { self.handle_notify_network_alert(msg) }, _ = signal_receiver.cancelled() => { - info!("NotifyService received exit signal, exit now"); + debug!("NotifyService received exit signal, exit now"); break; } else => break, diff --git a/sync/src/synchronizer/mod.rs b/sync/src/synchronizer/mod.rs index 7341ce4bc8..b348226560 100644 --- a/sync/src/synchronizer/mod.rs +++ b/sync/src/synchronizer/mod.rs @@ -149,7 +149,7 @@ impl BlockFetchCMD { } } recv(stop_signal) -> _ => { - info!("thread BlockDownload received exit signal, exit now"); + debug!("thread BlockDownload received exit signal, exit now"); return; } } diff --git a/sync/src/tests/synchronizer/functions.rs b/sync/src/tests/synchronizer/functions.rs index bd0a55740c..bbdd902a3d 100644 --- a/sync/src/tests/synchronizer/functions.rs +++ b/sync/src/tests/synchronizer/functions.rs @@ -1226,8 +1226,6 @@ fn test_internal_db_error() { InternalErrorKind::Database.other("mocked db error").into(), )); - faux::when!(chain_controller.try_stop()).then_return(()); - let synchronizer = Synchronizer::new(chain_controller, sync_shared); let status = synchronizer diff --git a/sync/src/types/header_map/mod.rs b/sync/src/types/header_map/mod.rs index 78939164b6..975c7b9075 100644 --- a/sync/src/types/header_map/mod.rs +++ b/sync/src/types/header_map/mod.rs @@ -1,5 +1,5 @@ use ckb_async_runtime::Handle; -use ckb_logger::info; +use ckb_logger::debug; use ckb_stop_handler::{new_tokio_exit_rx, CancellationToken}; use ckb_types::packed::Byte32; use std::sync::Arc; @@ -56,7 +56,7 @@ impl HeaderMap { map.limit_memory(); } _ = stop_rx.cancelled() => { - info!("HeaderMap limit_memory received exit signal, exit now"); + debug!("HeaderMap limit_memory received exit signal, exit now"); break }, } diff --git a/tx-pool/src/chunk_process.rs b/tx-pool/src/chunk_process.rs index b35e547a21..73e4f246eb 100644 --- a/tx-pool/src/chunk_process.rs +++ b/tx-pool/src/chunk_process.rs @@ -4,7 +4,7 @@ use crate::try_or_return_with_snapshot; use crate::{error::Reject, service::TxPoolService}; use ckb_chain_spec::consensus::Consensus; use ckb_error::Error; -use ckb_logger::info; +use ckb_logger::debug; use ckb_snapshot::Snapshot; use ckb_store::data_loader_wrapper::AsDataLoader; use ckb_traits::{CellDataProvider, ExtensionProvider, HeaderProvider}; @@ -75,10 +75,6 @@ impl ChunkProcess { } } }, - _ = self.signal.cancelled() => { - info!("TxPool received exit signal, exit now"); - break - }, _ = interval.tick() => { if matches!(self.current_state, ChunkCommand::Resume) { let stop = self.try_process().await; @@ -87,6 +83,10 @@ impl ChunkProcess { } } }, + _ = self.signal.cancelled() => { + debug!("TxPool received exit signal, exit now"); + break + }, else => break, } } diff --git a/tx-pool/src/process.rs b/tx-pool/src/process.rs index 0013d5cf33..b5062b3b35 100644 --- a/tx-pool/src/process.rs +++ b/tx-pool/src/process.rs @@ -920,6 +920,8 @@ impl TxPoolService { let mut tx_pool = self.tx_pool.write().await; if let Err(err) = tx_pool.save_into_file() { error!("failed to save pool, error: {:?}", err) + } else { + info!("TxPool save successfully") } } diff --git a/tx-pool/src/service.rs b/tx-pool/src/service.rs index f405129201..b11564ea11 100644 --- a/tx-pool/src/service.rs +++ b/tx-pool/src/service.rs @@ -13,8 +13,8 @@ use ckb_chain_spec::consensus::Consensus; use ckb_channel::oneshot; use ckb_error::AnyError; use ckb_jsonrpc_types::BlockTemplate; -use ckb_logger::error; use ckb_logger::info; +use ckb_logger::{debug, error}; use ckb_network::{NetworkController, PeerIndex}; use ckb_snapshot::Snapshot; use ckb_stop_handler::new_tokio_exit_rx; @@ -538,7 +538,7 @@ impl TxPoolServiceBuilder { block_assembler::process(service_clone, &message).await; }, _ = signal_receiver.cancelled() => { - info!("TxPool received exit signal, exit now"); + debug!("TxPool received exit signal, exit now"); break }, else => break, @@ -573,7 +573,7 @@ impl TxPoolServiceBuilder { queue.clear(); } _ = signal_receiver.cancelled() => { - info!("TxPool received exit signal, exit now"); + debug!("TxPool received exit signal, exit now"); break }, else => break, @@ -611,7 +611,7 @@ impl TxPoolServiceBuilder { service.update_block_assembler_after_tx_pool_reorg().await; }, _ = signal_receiver.cancelled() => { - info!("TxPool received exit signal, exit now"); + debug!("TxPool received exit signal, exit now"); break }, else => break, diff --git a/util/indexer/src/service.rs b/util/indexer/src/service.rs index ef8f876aee..bcdc89c7ce 100644 --- a/util/indexer/src/service.rs +++ b/util/indexer/src/service.rs @@ -16,7 +16,7 @@ use ckb_jsonrpc_types::{ IndexerScriptSearchMode, IndexerScriptType, IndexerSearchKey, IndexerTip, IndexerTx, IndexerTxWithCell, IndexerTxWithCells, JsonBytes, Uint32, }; -use ckb_logger::{error, info}; +use ckb_logger::{debug, error, info}; use ckb_notify::NotifyController; use ckb_stop_handler::{new_tokio_exit_rx, CancellationToken}; use ckb_store::ChainStore; @@ -118,7 +118,7 @@ impl IndexerService { } } _ = stop.cancelled() => { - info!("Indexer received exit signal, exit now"); + debug!("Indexer received exit signal, exit now"); break }, else => break, @@ -204,7 +204,7 @@ impl IndexerService { } } _ = stop.cancelled() => { - info!("Indexer received exit signal, exit now"); + debug!("Indexer received exit signal, exit now"); break }, } diff --git a/util/metrics-service/src/lib.rs b/util/metrics-service/src/lib.rs index 1b88171de0..4206a3d0aa 100644 --- a/util/metrics-service/src/lib.rs +++ b/util/metrics-service/src/lib.rs @@ -10,7 +10,7 @@ use hyper::{ use prometheus::Encoder as _; use ckb_async_runtime::Handle; -use ckb_logger::info; +use ckb_logger::debug; use ckb_metrics_config::{Config, Exporter, Target}; use ckb_stop_handler::{new_tokio_exit_rx, CancellationToken}; use ckb_util::strings; @@ -66,7 +66,7 @@ fn run_exporter(exporter: Exporter, handle: &Handle) -> Result<(), String> { .with_graceful_shutdown(async { let exit_rx: CancellationToken = new_tokio_exit_rx(); exit_rx.cancelled().await; - info!("prometheus server received exit signal, exit now"); + debug!("prometheus server received exit signal, exit now"); }); if let Err(err) = server.await { ckb_logger::error!("prometheus server error: {}", err); diff --git a/util/stop-handler/src/stop_register.rs b/util/stop-handler/src/stop_register.rs index e496866383..8948217c19 100644 --- a/util/stop-handler/src/stop_register.rs +++ b/util/stop-handler/src/stop_register.rs @@ -1,5 +1,5 @@ use ckb_channel::TrySendError; -use ckb_logger::{error, info, trace, warn}; +use ckb_logger::{debug, error, info, trace, warn}; use ckb_util::Mutex; use tokio_util::sync::CancellationToken; @@ -12,19 +12,19 @@ pub fn wait_all_ckb_services_exit() { info!("waiting exit signal..."); let exit_signal = new_crossbeam_exit_rx(); let _ = exit_signal.recv(); - info!("received exit signal, broadcasting exit signal to all threads"); + debug!("received exit signal, broadcasting exit signal to all threads"); let mut handles = CKB_HANDLES.lock(); for (name, join_handle) in handles.thread_handles.drain(..) { match join_handle.join() { Ok(_) => { - info!("wait thread {} done", name); + debug!("wait thread {} done", name); } Err(e) => { warn!("wait thread {}: ERROR: {:?}", name, e) } } } - info!("all ckb threads have been stopped"); + debug!("all ckb threads have been stopped"); } static CKB_HANDLES: once_cell::sync::Lazy> = From 71e4297d158826045350b34c80f8ed802970a415 Mon Sep 17 00:00:00 2001 From: Eval EXEC Date: Wed, 12 Jul 2023 09:59:50 +0800 Subject: [PATCH 10/11] Add exit handler for `ckb miner` --- ckb-bin/src/subcommand/miner.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/ckb-bin/src/subcommand/miner.rs b/ckb-bin/src/subcommand/miner.rs index fd9a892abc..0bc7312e42 100644 --- a/ckb-bin/src/subcommand/miner.rs +++ b/ckb-bin/src/subcommand/miner.rs @@ -1,8 +1,11 @@ use ckb_app_config::{ExitCode, MinerArgs, MinerConfig}; use ckb_async_runtime::Handle; use ckb_channel::unbounded; +use ckb_logger::info; use ckb_miner::{Client, Miner}; -use ckb_stop_handler::{new_crossbeam_exit_rx, register_thread, wait_all_ckb_services_exit}; +use ckb_stop_handler::{ + broadcast_exit_signals, new_crossbeam_exit_rx, register_thread, wait_all_ckb_services_exit, +}; use std::thread; pub fn miner(args: MinerArgs, async_handle: Handle) -> Result<(), ExitCode> { @@ -30,6 +33,12 @@ pub fn miner(args: MinerArgs, async_handle: Handle) -> Result<(), ExitCode> { .expect("Start client failed!"); register_thread(THREAD_NAME, miner_jh); + ctrlc::set_handler(|| { + info!("Trapped exit signal, exiting..."); + broadcast_exit_signals(); + }) + .expect("Error setting Ctrl-C handler"); + wait_all_ckb_services_exit(); Ok(()) From fc9d3e6f17547c0d183c8d1cdfcca56a031c7d08 Mon Sep 17 00:00:00 2001 From: Eval EXEC Date: Wed, 12 Jul 2023 10:45:47 +0800 Subject: [PATCH 11/11] Add bats test for graceful shutdown Signed-off-by: Eval EXEC --- util/app-config/src/tests/ckb_run_replay.bats | 2 +- util/app-config/src/tests/cli_test.sh | 4 +- .../src/tests/graceful_shutdown.bats | 45 +++++++++++++++++++ 3 files changed, 48 insertions(+), 3 deletions(-) create mode 100644 util/app-config/src/tests/graceful_shutdown.bats diff --git a/util/app-config/src/tests/ckb_run_replay.bats b/util/app-config/src/tests/ckb_run_replay.bats index dc9f87ef18..c6943eeb5b 100644 --- a/util/app-config/src/tests/ckb_run_replay.bats +++ b/util/app-config/src/tests/ckb_run_replay.bats @@ -24,7 +24,7 @@ function ckb_run { #@test run _ckb_run [ "$status" -eq 0 ] # assert_output --regexp "ckb_chain::chain.*block number:.*, hash:.*, size:.*, cycles:.*" - assert_output --regexp "ckb_bin all tokio tasks have been stopped" + assert_output --regexp "ckb_bin ckb shutdown" } function ckb_replay { #@test diff --git a/util/app-config/src/tests/cli_test.sh b/util/app-config/src/tests/cli_test.sh index 56e30dbab8..707b2b1dea 100755 --- a/util/app-config/src/tests/cli_test.sh +++ b/util/app-config/src/tests/cli_test.sh @@ -38,7 +38,7 @@ bash ${CKB_BATS_CORE_DIR}/bats-assert/load.bash cd ${CKB_BATS_TESTBED} -./ckb init --force && ./ckb import ckb_mainnet_4000.json +./ckb init --force && sed -i 's/filter = "info"/filter = "debug"/g' ckb.toml && ./ckb import ckb_mainnet_4000.json export PATH=${CKB_BATS_TESTBED}:/tmp/ckb_bats_bin/tmp_install/bin:${PATH} export BATS_LIB_PATH=${CKB_BATS_CORE_DIR} @@ -47,7 +47,7 @@ export TMP_DIR=${CKB_BATS_TESTBED}/tmp_dir mkdir ${TMP_DIR} for bats_cases in *.bats; do - bats --trace "$bats_cases" + bats "$bats_cases" ret=$? if [ "$ret" -ne "0" ]; then exit "$ret" diff --git a/util/app-config/src/tests/graceful_shutdown.bats b/util/app-config/src/tests/graceful_shutdown.bats new file mode 100644 index 0000000000..eb906c3949 --- /dev/null +++ b/util/app-config/src/tests/graceful_shutdown.bats @@ -0,0 +1,45 @@ +#!/usr/bin/env bats +bats_load_library 'bats-assert' +bats_load_library 'bats-support' + +_ckb_graceful_shutdown() { + ckb run -C ${CKB_DIRNAME} &> ${TMP_DIR}/ckb_run.log & + PID=$! + sleep 10 + kill ${PID} + + while kill -0 ${PID}; do + echo "waiting for ckb to exit" + sleep 1 + done + + tail -n 500 ${TMP_DIR}/ckb_run.log +} + +function ckb_graceful_shutdown { #@test + run _ckb_graceful_shutdown + + [ "$status" -eq 0 ] + assert_output --regexp "INFO ckb_bin::subcommand::run Trapped exit signal, exiting..." + assert_output --regexp "DEBUG ckb_stop_handler::stop_register received exit signal, broadcasting exit signal to all threads" + assert_output --regexp "DEBUG ckb_tx_pool::chunk_process TxPool received exit signal, exit now" + assert_output --regexp "DEBUG ckb_sync::types::header_map HeaderMap limit_memory received exit signal, exit now" + assert_output --regexp "DEBUG ckb_chain::chain ChainService received exit signal, exit now" + assert_output --regexp "DEBUG ckb_sync::synchronizer thread BlockDownload received exit signal, exit now" + assert_output --regexp "DEBUG ckb_network::network NetworkService receive exit signal, start shutdown..." + assert_output --regexp "INFO ckb_tx_pool::service TxPool is saving, please wait..." + assert_output --regexp "DEBUG ckb_tx_pool::service TxPool received exit signal, exit now" + assert_output --regexp "DEBUG ckb_block_filter::filter BlockFilter received exit signal, exit now" + assert_output --regexp "DEBUG ckb_network::services::dump_peer_store dump peer store before exit" + assert_output --regexp "DEBUG ckb_notify NotifyService received exit signal, exit now" + assert_output --regexp "DEBUG ckb_stop_handler::stop_register wait thread ChainService done" + assert_output --regexp "DEBUG ckb_stop_handler::stop_register wait thread BlockDownload done" + assert_output --regexp "DEBUG ckb_stop_handler::stop_register all ckb threads have been stopped" + assert_output --regexp "DEBUG ckb_bin waiting all tokio tasks done" + assert_output --regexp "INFO ckb_tx_pool::process TxPool save successfully" + assert_output --regexp "INFO ckb_bin ckb shutdown" +} + +teardown_file() { + rm -f ${TMP_DIR}/ckb_run.log +}