From 8c09ab7ce3e62b167298aef6fd6765007712c850 Mon Sep 17 00:00:00 2001 From: Eval EXEC Date: Fri, 2 Jun 2023 13:02:26 +0800 Subject: [PATCH] wip test Signed-off-by: Eval EXEC --- Cargo.lock | 5 +- util/channel/src/lib.rs | 4 +- util/runtime/Cargo.toml | 2 +- util/runtime/src/lib.rs | 42 +++++------ util/stop-handler/Cargo.toml | 7 ++ util/stop-handler/src/lib.rs | 3 + util/stop-handler/src/stop_register.rs | 35 +++++---- util/stop-handler/src/tests.rs | 98 ++++++++++++++++++++++++++ 8 files changed, 158 insertions(+), 38 deletions(-) create mode 100644 util/stop-handler/src/tests.rs diff --git a/Cargo.lock b/Cargo.lock index 2a8d7f4f4fa..292e138f1d1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -437,7 +437,6 @@ version = "0.111.0-pre" dependencies = [ "ckb-logger", "ckb-spawn", - "ckb-stop-handler", "tokio", ] @@ -1286,11 +1285,15 @@ version = "0.111.0-pre" name = "ckb-stop-handler" version = "0.111.0-pre" dependencies = [ + "ckb-async-runtime", "ckb-channel", "ckb-logger", "ckb-util", + "ctrlc", + "libc", "once_cell", "parking_lot 0.12.1", + "rand 0.8.5", "tokio", ] diff --git a/util/channel/src/lib.rs b/util/channel/src/lib.rs index 90755a11f36..a250f5f1047 100644 --- a/util/channel/src/lib.rs +++ b/util/channel/src/lib.rs @@ -1,7 +1,7 @@ //! Reexports `crossbeam_channel` to uniform the dependency version. pub use crossbeam_channel::{ - bounded, select, unbounded, Receiver, RecvError, RecvTimeoutError, Select, SendError, Sender, - TrySendError, + after, bounded, select, tick, unbounded, Receiver, RecvError, RecvTimeoutError, Select, + SendError, Sender, TrySendError, }; pub mod oneshot { diff --git a/util/runtime/Cargo.toml b/util/runtime/Cargo.toml index 5d8d463738d..82ae7267c6c 100644 --- a/util/runtime/Cargo.toml +++ b/util/runtime/Cargo.toml @@ -10,6 +10,6 @@ repository = "https://github.com/nervosnetwork/ckb" [dependencies] tokio = { version = "1", features = ["full"] } -ckb-stop-handler = { path = "../stop-handler", version = "= 0.111.0-pre" } +#ckb-stop-handler = { path = "../stop-handler", version = "= 0.111.0-pre" } ckb-logger = { path = "../logger", version = "= 0.111.0-pre" } ckb-spawn = { path = "../spawn", version = "= 0.111.0-pre" } diff --git a/util/runtime/src/lib.rs b/util/runtime/src/lib.rs index 6984ded8e42..e10edb71adc 100644 --- a/util/runtime/src/lib.rs +++ b/util/runtime/src/lib.rs @@ -1,7 +1,7 @@ //! Utilities for tokio runtime. use ckb_spawn::Spawn; -use ckb_stop_handler::{SignalSender, StopHandler}; +// use ckb_stop_handler::{SignalSender, StopHandler}; use core::future::Future; use std::sync::atomic::{AtomicU32, Ordering}; use std::thread; @@ -108,26 +108,26 @@ pub fn new_global_runtime() -> (Handle, Runtime) { (Handle { inner: handle }, runtime) } -/// Create new threaded_scheduler tokio Runtime, return `Handle` and background thread join handle, -/// NOTICE: This is only used in testing -pub fn new_background_runtime() -> (Handle, StopHandler<()>) { - let runtime = new_runtime(); - let handle = runtime.handle().clone(); - - let (tx, rx) = oneshot::channel(); - let thread = thread::Builder::new() - .name("GlobalRtBuilder".to_string()) - .spawn(move || { - let ret = runtime.block_on(rx); - ckb_logger::debug!("global runtime finish {:?}", ret); - }) - .expect("tokio runtime started"); - - ( - Handle { inner: handle }, - StopHandler::new(SignalSender::Tokio(tx), Some(thread), "GT".to_string()), - ) -} +// /// Create new threaded_scheduler tokio Runtime, return `Handle` and background thread join handle, +// /// NOTICE: This is only used in testing +// pub fn new_background_runtime() -> (Handle, StopHandler<()>) { +// let runtime = new_runtime(); +// let handle = runtime.handle().clone(); +// +// let (tx, rx) = oneshot::channel(); +// let thread = thread::Builder::new() +// .name("GlobalRtBuilder".to_string()) +// .spawn(move || { +// let ret = runtime.block_on(rx); +// ckb_logger::debug!("global runtime finish {:?}", ret); +// }) +// .expect("tokio runtime started"); +// +// ( +// Handle { inner: handle }, +// StopHandler::new(SignalSender::Tokio(tx), Some(thread), "GT".to_string()), +// ) +// } impl Spawn for Handle { fn spawn_task(&self, future: F) diff --git a/util/stop-handler/Cargo.toml b/util/stop-handler/Cargo.toml index 1e0c15d94e7..6509dd31cf0 100644 --- a/util/stop-handler/Cargo.toml +++ b/util/stop-handler/Cargo.toml @@ -15,3 +15,10 @@ tokio = { version = "1", features = ["sync", "rt-multi-thread"] } ckb-channel = { path = "../channel", version = "= 0.111.0-pre" } ckb-util = { path = "..", version = "= 0.111.0-pre" } once_cell = "1.8.0" +ckb-async-runtime = { path = "../runtime", version = "= 0.111.0-pre" } + + +[dev-dependencies] +ctrlc = { version = "3.1", features = ["termination"] } +libc = "0.2" +rand = "0.8.5" diff --git a/util/stop-handler/src/lib.rs b/util/stop-handler/src/lib.rs index 387ddd47ae4..abdb7728754 100644 --- a/util/stop-handler/src/lib.rs +++ b/util/stop-handler/src/lib.rs @@ -7,3 +7,6 @@ pub use stop_register::{ broadcast_exit_signals, new_crossbeam_exit_rx, new_tokio_exit_rx, register_thread, register_tokio, wait_all_ckb_services_exit, }; + +#[cfg(test)] +mod tests; diff --git a/util/stop-handler/src/stop_register.rs b/util/stop-handler/src/stop_register.rs index 06ec21546ca..ca10171c1b9 100644 --- a/util/stop-handler/src/stop_register.rs +++ b/util/stop-handler/src/stop_register.rs @@ -1,3 +1,4 @@ +use ckb_async_runtime::Handle; use ckb_util::Mutex; struct CkbServiceHandles { @@ -5,23 +6,29 @@ struct CkbServiceHandles { tokio_handles: Vec>, } -pub fn wait_all_ckb_services_exit() { +pub fn wait_all_ckb_services_exit(handle: Handle) { let mut handles = CKB_HANDLES.lock(); - for handle in handles.thread_handles.drain(..) { - match handle.join() { - Ok(_) => {} + for join_handle in handles.thread_handles.drain(..) { + match join_handle.join() { + Ok(_) => { + println!("wait a thread done"); + } Err(e) => { - todo!("log error") + println!("wait thread: ERROR: {:?}", e) } } } - for handle in handles.tokio_handles.drain(..) { - match tokio::runtime::Handle::current().block_on(handle) { - Ok(_) => {} - Err(e) => { - todo!("log error") + for join_handle in handles.tokio_handles.drain(..) { + handle.block_on(async move { + match join_handle.await { + Ok(_) => { + println!("wait a tokio task done"); + } + Err(e) => { + println!("wait tokio: ERROR: {:?}", e) + } } - } + }); } } @@ -55,10 +62,12 @@ pub fn new_crossbeam_exit_rx() -> ckb_channel::Receiver<()> { } pub fn broadcast_exit_signals() { - TOKIO_EXIT.0.send_modify(|x| *x = true); + TOKIO_EXIT.0.send(true).expect("send tokio exit signal"); CROSSBEAM_EXIT_SENDERS.lock().iter().for_each(|tx| { if let Err(e) = tx.try_send(()) { - todo!("log error") + println!("broadcast thread: ERROR: {:?}", e) + } else { + println!("send a crossbeam exit signal"); } }); } diff --git a/util/stop-handler/src/tests.rs b/util/stop-handler/src/tests.rs new file mode 100644 index 00000000000..0bb8b468503 --- /dev/null +++ b/util/stop-handler/src/tests.rs @@ -0,0 +1,98 @@ +use crate::{ + broadcast_exit_signals, new_crossbeam_exit_rx, new_tokio_exit_rx, register_thread, + register_tokio, wait_all_ckb_services_exit, +}; +use ckb_async_runtime::{new_global_runtime, Handle}; +use ckb_channel::select; +use rand::Rng; +use std::time::Duration; + +fn send_ctrlc_later(duration: Duration) { + std::thread::spawn(move || { + std::thread::sleep(duration); + // send SIGINT to myself + unsafe { + libc::raise(libc::SIGINT); + println!("[ $$ sent SIGINT to myself $$ ]"); + } + }); +} + +fn start_many_threads() { + for i in 0..5 { + let join = std::thread::spawn(move || { + let ticker = ckb_channel::tick(Duration::from_millis(500)); + let deadline = ckb_channel::after(Duration::from_millis( + (rand::thread_rng().gen_range(1.0..5.0) * 1000.0) as u64, + )); + + let stop = new_crossbeam_exit_rx(); + + loop { + select! { + recv(ticker) -> _ => { + println!("thread {} received tick signal", i); + }, + recv(stop) -> _ => { + println!("thread {} received crossbeam exit signal", i); + return; + }, + recv(deadline) -> _ =>{ + println!("thread {} finish its job", i); + return + } + } + } + }); + register_thread(join); + } +} + +fn start_many_tokio_tasks(handle: Handle) { + for i in 0..5 { + let mut stop = new_tokio_exit_rx(); + + let join = handle.spawn(async move { + let mut interval = tokio::time::interval(Duration::from_millis(500)); + + let duration = + Duration::from_millis((rand::thread_rng().gen_range(1.0..5.0) * 1000.0) as u64); + let deadline = tokio::time::sleep(duration); + tokio::pin!(deadline); + + loop { + tokio::select! { + _ = &mut deadline =>{ + println!("tokio task {} finish its job", i); + break; + } + _ = interval.tick()=> { + println!("tokio task {} received tick signal", i); + }, + _ = stop.changed() => { + println!("tokio task {} receive exit signal", i); + break + }, + else => break, + } + } + }); + register_tokio(join); + } +} + +#[test] +fn basic() { + let (handle, _runtime) = new_global_runtime(); + + ctrlc::set_handler(move || { + broadcast_exit_signals(); + }); + + send_ctrlc_later(Duration::from_secs(3)); + + start_many_threads(); + start_many_tokio_tasks(handle.clone()); + + wait_all_ckb_services_exit(handle); +}