Skip to content

Commit

Permalink
wip test
Browse files Browse the repository at this point in the history
Signed-off-by: Eval EXEC <execvy@gmail.com>
  • Loading branch information
eval-exec committed Jun 2, 2023
1 parent c6fee18 commit 8c09ab7
Show file tree
Hide file tree
Showing 8 changed files with 158 additions and 38 deletions.
5 changes: 4 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions util/channel/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Reexports `crossbeam_channel` to uniform the dependency version.
pub use crossbeam_channel::{
bounded, select, unbounded, Receiver, RecvError, RecvTimeoutError, Select, SendError, Sender,
TrySendError,
after, bounded, select, tick, unbounded, Receiver, RecvError, RecvTimeoutError, Select,
SendError, Sender, TrySendError,
};

pub mod oneshot {
Expand Down
2 changes: 1 addition & 1 deletion util/runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ repository = "https://github.com/nervosnetwork/ckb"

[dependencies]
tokio = { version = "1", features = ["full"] }
ckb-stop-handler = { path = "../stop-handler", version = "= 0.111.0-pre" }
#ckb-stop-handler = { path = "../stop-handler", version = "= 0.111.0-pre" }
ckb-logger = { path = "../logger", version = "= 0.111.0-pre" }
ckb-spawn = { path = "../spawn", version = "= 0.111.0-pre" }
42 changes: 21 additions & 21 deletions util/runtime/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Utilities for tokio runtime.
use ckb_spawn::Spawn;
use ckb_stop_handler::{SignalSender, StopHandler};
// use ckb_stop_handler::{SignalSender, StopHandler};
use core::future::Future;
use std::sync::atomic::{AtomicU32, Ordering};
use std::thread;
Expand Down Expand Up @@ -108,26 +108,26 @@ pub fn new_global_runtime() -> (Handle, Runtime) {
(Handle { inner: handle }, runtime)
}

/// Create new threaded_scheduler tokio Runtime, return `Handle` and background thread join handle,
/// NOTICE: This is only used in testing
pub fn new_background_runtime() -> (Handle, StopHandler<()>) {
let runtime = new_runtime();
let handle = runtime.handle().clone();

let (tx, rx) = oneshot::channel();
let thread = thread::Builder::new()
.name("GlobalRtBuilder".to_string())
.spawn(move || {
let ret = runtime.block_on(rx);
ckb_logger::debug!("global runtime finish {:?}", ret);
})
.expect("tokio runtime started");

(
Handle { inner: handle },
StopHandler::new(SignalSender::Tokio(tx), Some(thread), "GT".to_string()),
)
}
// /// Create new threaded_scheduler tokio Runtime, return `Handle` and background thread join handle,
// /// NOTICE: This is only used in testing
// pub fn new_background_runtime() -> (Handle, StopHandler<()>) {
// let runtime = new_runtime();
// let handle = runtime.handle().clone();
//
// let (tx, rx) = oneshot::channel();
// let thread = thread::Builder::new()
// .name("GlobalRtBuilder".to_string())
// .spawn(move || {
// let ret = runtime.block_on(rx);
// ckb_logger::debug!("global runtime finish {:?}", ret);
// })
// .expect("tokio runtime started");
//
// (
// Handle { inner: handle },
// StopHandler::new(SignalSender::Tokio(tx), Some(thread), "GT".to_string()),
// )
// }

impl Spawn for Handle {
fn spawn_task<F>(&self, future: F)
Expand Down
7 changes: 7 additions & 0 deletions util/stop-handler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,10 @@ tokio = { version = "1", features = ["sync", "rt-multi-thread"] }
ckb-channel = { path = "../channel", version = "= 0.111.0-pre" }
ckb-util = { path = "..", version = "= 0.111.0-pre" }
once_cell = "1.8.0"
ckb-async-runtime = { path = "../runtime", version = "= 0.111.0-pre" }


[dev-dependencies]
ctrlc = { version = "3.1", features = ["termination"] }
libc = "0.2"
rand = "0.8.5"
3 changes: 3 additions & 0 deletions util/stop-handler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,6 @@ pub use stop_register::{
broadcast_exit_signals, new_crossbeam_exit_rx, new_tokio_exit_rx, register_thread,
register_tokio, wait_all_ckb_services_exit,
};

#[cfg(test)]
mod tests;
35 changes: 22 additions & 13 deletions util/stop-handler/src/stop_register.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,34 @@
use ckb_async_runtime::Handle;
use ckb_util::Mutex;

struct CkbServiceHandles {
thread_handles: Vec<std::thread::JoinHandle<()>>,
tokio_handles: Vec<tokio::task::JoinHandle<()>>,
}

pub fn wait_all_ckb_services_exit() {
pub fn wait_all_ckb_services_exit(handle: Handle) {
let mut handles = CKB_HANDLES.lock();
for handle in handles.thread_handles.drain(..) {
match handle.join() {
Ok(_) => {}
for join_handle in handles.thread_handles.drain(..) {
match join_handle.join() {
Ok(_) => {
println!("wait a thread done");
}
Err(e) => {
todo!("log error")
println!("wait thread: ERROR: {:?}", e)
}
}
}
for handle in handles.tokio_handles.drain(..) {
match tokio::runtime::Handle::current().block_on(handle) {
Ok(_) => {}
Err(e) => {
todo!("log error")
for join_handle in handles.tokio_handles.drain(..) {
handle.block_on(async move {
match join_handle.await {
Ok(_) => {
println!("wait a tokio task done");
}
Err(e) => {
println!("wait tokio: ERROR: {:?}", e)
}
}
}
});
}
}

Expand Down Expand Up @@ -55,10 +62,12 @@ pub fn new_crossbeam_exit_rx() -> ckb_channel::Receiver<()> {
}

pub fn broadcast_exit_signals() {
TOKIO_EXIT.0.send_modify(|x| *x = true);
TOKIO_EXIT.0.send(true).expect("send tokio exit signal");
CROSSBEAM_EXIT_SENDERS.lock().iter().for_each(|tx| {
if let Err(e) = tx.try_send(()) {
todo!("log error")
println!("broadcast thread: ERROR: {:?}", e)
} else {
println!("send a crossbeam exit signal");
}
});
}
Expand Down
98 changes: 98 additions & 0 deletions util/stop-handler/src/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
use crate::{
broadcast_exit_signals, new_crossbeam_exit_rx, new_tokio_exit_rx, register_thread,
register_tokio, wait_all_ckb_services_exit,
};
use ckb_async_runtime::{new_global_runtime, Handle};
use ckb_channel::select;
use rand::Rng;
use std::time::Duration;

fn send_ctrlc_later(duration: Duration) {
std::thread::spawn(move || {
std::thread::sleep(duration);
// send SIGINT to myself
unsafe {
libc::raise(libc::SIGINT);
println!("[ $$ sent SIGINT to myself $$ ]");
}
});
}

fn start_many_threads() {
for i in 0..5 {
let join = std::thread::spawn(move || {
let ticker = ckb_channel::tick(Duration::from_millis(500));
let deadline = ckb_channel::after(Duration::from_millis(
(rand::thread_rng().gen_range(1.0..5.0) * 1000.0) as u64,
));

let stop = new_crossbeam_exit_rx();

loop {
select! {
recv(ticker) -> _ => {
println!("thread {} received tick signal", i);
},
recv(stop) -> _ => {
println!("thread {} received crossbeam exit signal", i);
return;
},
recv(deadline) -> _ =>{
println!("thread {} finish its job", i);
return
}
}
}
});
register_thread(join);
}
}

fn start_many_tokio_tasks(handle: Handle) {
for i in 0..5 {
let mut stop = new_tokio_exit_rx();

let join = handle.spawn(async move {
let mut interval = tokio::time::interval(Duration::from_millis(500));

let duration =
Duration::from_millis((rand::thread_rng().gen_range(1.0..5.0) * 1000.0) as u64);
let deadline = tokio::time::sleep(duration);
tokio::pin!(deadline);

loop {
tokio::select! {
_ = &mut deadline =>{
println!("tokio task {} finish its job", i);
break;
}
_ = interval.tick()=> {
println!("tokio task {} received tick signal", i);
},
_ = stop.changed() => {
println!("tokio task {} receive exit signal", i);
break
},
else => break,
}
}
});
register_tokio(join);
}
}

#[test]
fn basic() {
let (handle, _runtime) = new_global_runtime();

ctrlc::set_handler(move || {
broadcast_exit_signals();
});

send_ctrlc_later(Duration::from_secs(3));

start_many_threads();
start_many_tokio_tasks(handle.clone());

wait_all_ckb_services_exit(handle);
}

0 comments on commit 8c09ab7

Please sign in to comment.