Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait tokio tasks finish before CKB process exit #3999

Merged
merged 11 commits into from
Jul 18, 2023
29 changes: 17 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ members = [
"util/occupied-capacity/macros",
"util/fixed-hash/macros",
"util/logger-service",
"util/stop-handler",
"util/runtime",
"util/stop-handler",
"util/metrics",
"util/metrics-service",
"util/fixed-hash",
Expand Down
3 changes: 1 addition & 2 deletions benches/benches/benchmarks/overall.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use ckb_chain_spec::consensus::{ConsensusBuilder, ProposalWindow};
use ckb_dao_utils::genesis_dao_data;
use ckb_jsonrpc_types::JsonBytes;
use ckb_launcher::SharedBuilder;
use ckb_network::{DefaultExitHandler, Flags, NetworkController, NetworkService, NetworkState};
use ckb_network::{Flags, NetworkController, NetworkService, NetworkState};
use ckb_shared::Shared;
use ckb_store::ChainStore;
use ckb_types::{
Expand Down Expand Up @@ -77,7 +77,6 @@ fn dummy_network(shared: &Shared) -> NetworkController {
"test".to_string(),
Flags::COMPATIBILITY,
),
DefaultExitHandler::default(),
)
.start(shared.async_handle())
.expect("Start network service failed")
Expand Down
14 changes: 8 additions & 6 deletions block-filter/src/filter.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use ckb_async_runtime::tokio::{self, sync::oneshot, task::block_in_place};
use ckb_async_runtime::tokio::{self, task::block_in_place};
use ckb_logger::{debug, warn};
use ckb_shared::Shared;
use ckb_stop_handler::{SignalSender, StopHandler};
use ckb_stop_handler::{new_tokio_exit_rx, CancellationToken};
use ckb_store::{ChainDB, ChainStore};
use ckb_types::{
core::HeaderView,
Expand Down Expand Up @@ -43,10 +43,10 @@ impl BlockFilter {
}

/// start background single-threaded service to create block filter data
pub fn start(self) -> StopHandler<()> {
pub fn start(self) {
let notify_controller = self.shared.notify_controller().clone();
let async_handle = self.shared.async_handle().clone();
let (stop, mut stop_rx) = oneshot::channel::<()>();
let stop_rx: CancellationToken = new_tokio_exit_rx();
let filter_data_builder = self.clone();

let build_filter_data =
Expand All @@ -62,12 +62,14 @@ impl BlockFilter {
block_in_place(|| self.build_filter_data());
new_block_watcher.borrow_and_update();
}
_ = &mut stop_rx => break,
_ = stop_rx.cancelled() => {
debug!("BlockFilter received exit signal, exit now");
break
},
else => break,
}
}
});
StopHandler::new(SignalSender::Tokio(stop), None, NAME.to_string())
}

/// build block filter data to the latest block
Expand Down
40 changes: 11 additions & 29 deletions chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use ckb_proposal_table::ProposalTable;
#[cfg(debug_assertions)]
use ckb_rust_unstable_port::IsSorted;
use ckb_shared::shared::Shared;
use ckb_stop_handler::{SignalSender, StopHandler};
use ckb_stop_handler::{new_crossbeam_exit_rx, register_thread};
use ckb_store::{attach_block_cell, detach_block_cell, ChainStore, StoreTransaction};
use ckb_systemtime::unix_time_as_millis;
use ckb_types::{
Expand All @@ -22,7 +22,7 @@ use ckb_types::{
ResolvedTransaction,
},
hardfork::HardForks,
service::{Request, DEFAULT_CHANNEL_SIZE, SIGNAL_CHANNEL_SIZE},
service::{Request, DEFAULT_CHANNEL_SIZE},
BlockExt, BlockNumber, BlockView, Cycle, HeaderView,
},
packed::{Byte32, ProposalShortId},
Expand Down Expand Up @@ -50,26 +50,17 @@ type TruncateRequest = Request<Byte32, Result<(), Error>>;
pub struct ChainController {
process_block_sender: Sender<ProcessBlockRequest>,
truncate_sender: Sender<TruncateRequest>, // Used for testing only
stop: Option<StopHandler<()>>,
}

impl Drop for ChainController {
fn drop(&mut self) {
self.try_stop();
}
}

#[cfg_attr(feature = "mock", faux::methods)]
impl ChainController {
pub fn new(
process_block_sender: Sender<ProcessBlockRequest>,
truncate_sender: Sender<TruncateRequest>,
stop: StopHandler<()>,
) -> Self {
ChainController {
process_block_sender,
truncate_sender,
stop: Some(stop),
}
}
/// Inserts the block into database.
Expand Down Expand Up @@ -109,17 +100,10 @@ impl ChainController {
})
}

pub fn try_stop(&mut self) {
if let Some(ref mut stop) = self.stop {
stop.try_send(());
}
}

/// Since a non-owning reference does not count towards ownership,
/// it will not prevent the value stored in the allocation from being dropped
pub fn non_owning_clone(&self) -> Self {
ChainController {
stop: None,
truncate_sender: self.truncate_sender.clone(),
process_block_sender: self.process_block_sender.clone(),
}
Expand Down Expand Up @@ -245,7 +229,7 @@ impl ChainService {

/// start background single-threaded service with specified thread_name.
pub fn start<S: ToString>(mut self, thread_name: Option<S>) -> ChainController {
let (signal_sender, signal_receiver) = channel::bounded::<()>(SIGNAL_CHANNEL_SIZE);
let signal_receiver = new_crossbeam_exit_rx();
let (process_block_sender, process_block_receiver) = channel::bounded(DEFAULT_CHANNEL_SIZE);
let (truncate_sender, truncate_receiver) = channel::bounded(1);

Expand All @@ -256,12 +240,9 @@ impl ChainService {
}
let tx_control = self.shared.tx_pool_controller().clone();

let thread = thread_builder
let chain_jh = thread_builder
.spawn(move || loop {
select! {
recv(signal_receiver) -> _ => {
break;
},
recv(process_block_receiver) -> msg => match msg {
Ok(Request { responder, arguments: (block, verify) }) => {
let _ = tx_control.suspend_chunk_process();
Expand All @@ -283,17 +264,18 @@ impl ChainService {
error!("truncate_receiver closed");
break;
},
},
recv(signal_receiver) -> _ => {
debug!("ChainService received exit signal, exit now");
break;
}
}
})
.expect("Start ChainService failed");
let stop = StopHandler::new(
SignalSender::Crossbeam(signal_sender),
Some(thread),
"chain".to_string(),
);

ChainController::new(process_block_sender, truncate_sender, stop)
register_thread("ChainService", chain_jh);

ChainController::new(process_block_sender, truncate_sender)
}

fn make_fork_for_truncate(&self, target: &HeaderView, current_tip: &HeaderView) -> ForkChanges {
Expand Down
3 changes: 1 addition & 2 deletions chain/src/tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use ckb_dao::DaoCalculator;
use ckb_dao_utils::genesis_dao_data;
use ckb_jsonrpc_types::ScriptHashType;
use ckb_launcher::SharedBuilder;
use ckb_network::{DefaultExitHandler, Flags, NetworkController, NetworkService, NetworkState};
use ckb_network::{Flags, NetworkController, NetworkService, NetworkState};
use ckb_shared::shared::Shared;
use ckb_store::ChainStore;
pub use ckb_test_chain_utils::MockStore;
Expand Down Expand Up @@ -314,7 +314,6 @@ pub(crate) fn dummy_network(shared: &Shared) -> NetworkController {
"test".to_string(),
Flags::COMPATIBILITY,
),
DefaultExitHandler::default(),
)
.start(shared.async_handle())
.expect("Start network service failed")
Expand Down
1 change: 1 addition & 0 deletions ckb-bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ rayon = "1.0"
sentry = { version = "0.26.0", optional = true }
is-terminal = "0.4.7"
fdlimit = "0.2.1"
ckb-stop-handler = { path = "../util/stop-handler", version = "= 0.112.0-pre" }

[features]
deadlock_detection = ["ckb-util/deadlock_detection"]
Expand Down
42 changes: 29 additions & 13 deletions ckb-bin/src/helper.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,48 @@
use ckb_logger::info;

use std::io::{stdin, stdout, Write};

#[cfg(not(feature = "deadlock_detection"))]
pub fn deadlock_detection() {}

#[cfg(feature = "deadlock_detection")]
pub fn deadlock_detection() {
use ckb_logger::warn;
use ckb_channel::select;
use ckb_logger::{debug, warn};
use ckb_stop_handler::{new_crossbeam_exit_rx, register_thread};
use ckb_util::parking_lot::deadlock;
use std::{thread, time::Duration};

info!("deadlock_detection enable");
thread::spawn(move || loop {
thread::sleep(Duration::from_secs(10));
let deadlocks = deadlock::check_deadlock();
if deadlocks.is_empty() {
continue;
}
let dead_lock_jh = thread::spawn({
let ticker = ckb_channel::tick(Duration::from_secs(10));
let stop_rx = new_crossbeam_exit_rx();
move || loop {
select! {
recv(ticker) -> _ => {
let deadlocks = deadlock::check_deadlock();
if deadlocks.is_empty() {
continue;
}

warn!("{} deadlocks detected", deadlocks.len());
for (i, threads) in deadlocks.iter().enumerate() {
warn!("Deadlock #{}", i);
for t in threads {
warn!("Thread Id {:#?}", t.thread_id());
warn!("{:#?}", t.backtrace());
}
}

warn!("{} deadlocks detected", deadlocks.len());
for (i, threads) in deadlocks.iter().enumerate() {
warn!("Deadlock #{}", i);
for t in threads {
warn!("Thread Id {:#?}", t.thread_id());
warn!("{:#?}", t.backtrace());
},
recv(stop_rx) -> _ =>{
debug!("deadlock_detection received exit signal, stopped");
return;
}
}
}
});
register_thread("dead_lock_detect", dead_lock_jh);
}

pub fn prompt(msg: &str) -> String {
Expand Down
Loading