diff --git a/Cargo.lock b/Cargo.lock index ed7cb7fa196..3d54178e535 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -176,6 +176,7 @@ dependencies = [ name = "batch-system" version = "0.1.0" dependencies = [ + "criterion 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "crossbeam 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "derive_more 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)", "slog 2.3.3 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/components/batch-system/Cargo.toml b/components/batch-system/Cargo.toml index a76b4bc5aeb..5d22c9961d5 100644 --- a/components/batch-system/Cargo.toml +++ b/components/batch-system/Cargo.toml @@ -8,10 +8,22 @@ crossbeam = "0.7" tikv_util = { path = "../tikv_util" } slog = { version = "2.3", features = ["max_level_trace", "release_max_level_debug"] } slog-global = { version = "0.1", git = "https://github.com/breeswish/slog-global.git", rev = "91904ade" } +derive_more = "0.15.0" [dev-dependencies] -derive_more = "0.15.0" +criterion = "0.3" [[test]] name = "tests" -path = "tests/cases/mod.rs" \ No newline at end of file +path = "tests/cases/mod.rs" +required-features = ["test-runner"] + +[[bench]] +name = "router" +harness = false +required-features = ["test-runner"] + +[[bench]] +name = "batch-system" +harness = false +required-features = ["test-runner"] \ No newline at end of file diff --git a/components/batch-system/benches/batch-system.rs b/components/batch-system/benches/batch-system.rs new file mode 100644 index 00000000000..0c317d7d89b --- /dev/null +++ b/components/batch-system/benches/batch-system.rs @@ -0,0 +1,142 @@ +// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. + +#![feature(test)] + +extern crate test; + +use batch_system::test_runner::*; +use batch_system::*; +use criterion::*; +use std::sync::atomic::*; +use std::sync::Arc; + +fn end_hook(tx: &std::sync::mpsc::Sender<()>) -> Message { + let tx = tx.clone(); + Message::Callback(Box::new(move |_| { + tx.send(()).unwrap(); + })) +} + +/// Benches how it performs when many messages are sent to the bench system. +/// +/// A better router and lightweight batch scheduling can lead to better result. +fn bench_spawn_many(c: &mut Criterion) { + let (control_tx, control_fsm) = Runner::new(100000); + let (router, mut system) = batch_system::create_system(2, 2, control_tx, control_fsm); + system.spawn("test".to_owned(), Builder::new()); + const ID_LIMIT: u64 = 32; + const MESSAGE_LIMIT: usize = 256; + for id in 0..ID_LIMIT { + let (normal_tx, normal_fsm) = Runner::new(100000); + let normal_box = BasicMailbox::new(normal_tx, normal_fsm); + router.register(id, normal_box); + } + + let (tx, rx) = std::sync::mpsc::channel(); + c.bench_function("spawn_many", |b| { + b.iter(|| { + for id in 0..ID_LIMIT { + for i in 0..MESSAGE_LIMIT { + router.send(id, Message::Loop(i)).unwrap(); + } + router.send(id, end_hook(&tx)).unwrap(); + } + for _ in 0..ID_LIMIT { + rx.recv().unwrap(); + } + }) + }); + system.shutdown(); +} + +/// Bench how it performs if two hot FSMs are shown up at the same time. +/// +/// A good scheduling algorithm should be able to spread the hot FSMs to +/// all available threads as soon as possible. +fn bench_imbalance(c: &mut Criterion) { + let (control_tx, control_fsm) = Runner::new(100000); + let (router, mut system) = batch_system::create_system(2, 2, control_tx, control_fsm); + system.spawn("test".to_owned(), Builder::new()); + const ID_LIMIT: u64 = 10; + const MESSAGE_LIMIT: usize = 512; + for id in 0..ID_LIMIT { + let (normal_tx, normal_fsm) = Runner::new(100000); + let normal_box = BasicMailbox::new(normal_tx, normal_fsm); + router.register(id, normal_box); + } + + let (tx, rx) = std::sync::mpsc::channel(); + c.bench_function("imbalance", |b| { + b.iter(|| { + for i in 0..MESSAGE_LIMIT { + for id in 0..2 { + router.send(id, Message::Loop(i)).unwrap(); + } + } + for id in 0..2 { + router.send(id, end_hook(&tx)).unwrap(); + } + for _ in 0..2 { + rx.recv().unwrap(); + } + }) + }); + system.shutdown(); +} + +/// Bench how it performs when scheduling a lot of quick tasks during an long-polling +/// tasks. +/// +/// A good scheduling algorithm should not starve the quick tasks. +fn bench_fairness(c: &mut Criterion) { + let (control_tx, control_fsm) = Runner::new(100000); + let (router, mut system) = batch_system::create_system(2, 2, control_tx, control_fsm); + system.spawn("test".to_owned(), Builder::new()); + for id in 0..10 { + let (normal_tx, normal_fsm) = Runner::new(100000); + let normal_box = BasicMailbox::new(normal_tx, normal_fsm); + router.register(id, normal_box); + } + + let (tx, _rx) = std::sync::mpsc::channel(); + let running = Arc::new(AtomicBool::new(true)); + let router1 = router.clone(); + let running1 = running.clone(); + let handle = std::thread::spawn(move || { + while running1.load(Ordering::SeqCst) { + // Using 4 to ensure all worker threads are busy spinning. + for id in 0..4 { + let _ = router1.send(id, Message::Loop(16)); + } + } + tx.send(()).unwrap(); + }); + + let (tx2, rx2) = std::sync::mpsc::channel(); + c.bench_function("fairness", |b| { + b.iter(|| { + for _ in 0..10 { + for id in 4..6 { + router.send(id, Message::Loop(10)).unwrap(); + } + } + for id in 4..6 { + router.send(id, end_hook(&tx2)).unwrap(); + } + for _ in 4..6 { + rx2.recv().unwrap(); + } + }) + }); + running.store(false, Ordering::SeqCst); + system.shutdown(); + let _ = handle.join(); +} + +criterion_group!(fair, bench_fairness); +criterion_group!( + name = load; + config = Criterion::default().sample_size(30); + targets = bench_imbalance, bench_spawn_many +); +criterion_main!(fair, load); diff --git a/components/batch-system/benches/router.rs b/components/batch-system/benches/router.rs index 2e2ab2e31d3..03c01e0dd58 100644 --- a/components/batch-system/benches/router.rs +++ b/components/batch-system/benches/router.rs @@ -1,85 +1,24 @@ -#![feature(test)] - -extern crate test; +// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. +use batch_system::test_runner::*; use batch_system::*; -use std::borrow::Cow; -use test::*; -use tikv_util::mpsc; - -pub type Message = (); - -pub struct Runner { - is_stopped: bool, - recv: mpsc::Receiver, - mailbox: Option>, -} - -impl Fsm for Runner { - type Message = Message; - - fn is_stopped(&self) -> bool { - self.is_stopped - } - - fn set_mailbox(&mut self, mailbox: Cow<'_, BasicMailbox>) { - self.mailbox = Some(mailbox.into_owned()); - } - - fn take_mailbox(&mut self) -> Option> { - self.mailbox.take() - } -} - -pub fn new_runner(cap: usize) -> (mpsc::LooseBoundedSender, Box) { - let (tx, rx) = mpsc::loose_bounded(cap); - let fsm = Runner { - is_stopped: false, - recv: rx, - mailbox: None, - }; - (tx, Box::new(fsm)) -} +use criterion::*; -pub struct Handler; - -impl PollHandler for Handler { - fn begin(&mut self, _batch_size: usize) {} - - fn handle_control(&mut self, control: &mut Runner) -> Option { - while let Ok(_) = control.recv.try_recv() {} - Some(0) - } - - fn handle_normal(&mut self, normal: &mut Runner) -> Option { - while let Ok(_) = normal.recv.try_recv() {} - Some(0) - } - - fn end(&mut self, _normals: &mut [Box]) {} -} - -pub struct Builder; - -impl HandlerBuilder for Builder { - type Handler = Handler; - - fn build(&mut self) -> Handler { - Handler - } -} - -#[bench] -fn bench_send(b: &mut Bencher) { - let (control_tx, control_fsm) = new_runner(100000); +fn bench_send(c: &mut Criterion) { + let (control_tx, control_fsm) = Runner::new(100000); let (router, mut system) = batch_system::create_system(2, 2, control_tx, control_fsm); - system.spawn("test".to_owned(), Builder); - let (normal_tx, normal_fsm) = new_runner(100000); + system.spawn("test".to_owned(), Builder::new()); + let (normal_tx, normal_fsm) = Runner::new(100000); let normal_box = BasicMailbox::new(normal_tx, normal_fsm); router.register(1, normal_box); - b.iter(|| { - router.send(1, ()).unwrap(); + c.bench_function("router::send", |b| { + b.iter(|| { + router.send(1, Message::Loop(0)).unwrap(); + }) }); system.shutdown(); } + +criterion_group!(benches, bench_send); +criterion_main!(benches); diff --git a/components/batch-system/src/lib.rs b/components/batch-system/src/lib.rs index 01bbd9a70e2..ff784fa7ebc 100644 --- a/components/batch-system/src/lib.rs +++ b/components/batch-system/src/lib.rs @@ -15,12 +15,18 @@ extern crate slog; extern crate slog_global; #[macro_use] extern crate tikv_util; +#[cfg(feature = "test-runner")] +#[macro_use] +extern crate derive_more; mod batch; mod fsm; mod mailbox; mod router; +#[cfg(feature = "test-runner")] +pub mod test_runner; + pub use self::batch::{create_system, BatchRouter, BatchSystem, HandlerBuilder, PollHandler}; pub use self::fsm::Fsm; pub use self::mailbox::{BasicMailbox, Mailbox}; diff --git a/components/batch-system/src/test_runner.rs b/components/batch-system/src/test_runner.rs new file mode 100644 index 00000000000..d6a7609f9ab --- /dev/null +++ b/components/batch-system/src/test_runner.rs @@ -0,0 +1,133 @@ +// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. + +//! A sample Handler for test and micro-benchmark purpose. + +use crate::*; +use std::borrow::Cow; +use std::sync::{Arc, Mutex}; +use tikv_util::mpsc; + +/// Message `Runner` can accepts. +pub enum Message { + /// `Runner` will do simple calculation for the given times. + Loop(usize), + /// `Runner` will call the callback directly. + Callback(Box), +} + +/// A simple runner used for benchmarking only. +pub struct Runner { + is_stopped: bool, + recv: mpsc::Receiver, + mailbox: Option>, + pub sender: Option>, + /// Result of the calculation triggered by `Message::Loop`. + /// Stores it inside `Runner` to avoid accidental optimization. + res: usize, +} + +impl Fsm for Runner { + type Message = Message; + + fn is_stopped(&self) -> bool { + self.is_stopped + } + + fn set_mailbox(&mut self, mailbox: Cow<'_, BasicMailbox>) { + self.mailbox = Some(mailbox.into_owned()); + } + + fn take_mailbox(&mut self) -> Option> { + self.mailbox.take() + } +} + +impl Runner { + pub fn new(cap: usize) -> (mpsc::LooseBoundedSender, Box) { + let (tx, rx) = mpsc::loose_bounded(cap); + let fsm = Box::new(Runner { + is_stopped: false, + recv: rx, + mailbox: None, + sender: None, + res: 0, + }); + (tx, fsm) + } +} + +#[derive(Add, PartialEq, Debug, Default, AddAssign, Clone, Copy)] +pub struct HandleMetrics { + pub begin: usize, + pub control: usize, + pub normal: usize, +} + +pub struct Handler { + local: HandleMetrics, + metrics: Arc>, +} + +impl Handler { + fn handle(&mut self, r: &mut Runner) -> Option { + for _ in 0..16 { + match r.recv.try_recv() { + Ok(Message::Loop(count)) => { + // Some calculation to represent a CPU consuming work + for _ in 0..count { + r.res *= count; + r.res %= count + 1; + } + } + Ok(Message::Callback(cb)) => cb(r), + Err(_) => break, + } + } + Some(0) + } +} + +impl PollHandler for Handler { + fn begin(&mut self, _batch_size: usize) { + self.local.begin += 1; + } + + fn handle_control(&mut self, control: &mut Runner) -> Option { + self.local.control += 1; + self.handle(control) + } + + fn handle_normal(&mut self, normal: &mut Runner) -> Option { + self.local.normal += 1; + self.handle(normal) + } + + fn end(&mut self, _normals: &mut [Box]) { + let mut c = self.metrics.lock().unwrap(); + *c += self.local; + self.local = HandleMetrics::default(); + } +} + +pub struct Builder { + pub metrics: Arc>, +} + +impl Builder { + pub fn new() -> Builder { + Builder { + metrics: Arc::default(), + } + } +} + +impl HandlerBuilder for Builder { + type Handler = Handler; + + fn build(&mut self) -> Handler { + Handler { + local: HandleMetrics::default(), + metrics: self.metrics.clone(), + } + } +} diff --git a/components/batch-system/tests/cases/batch.rs b/components/batch-system/tests/cases/batch.rs index 5e456de9574..71569640f62 100644 --- a/components/batch-system/tests/cases/batch.rs +++ b/components/batch-system/tests/cases/batch.rs @@ -1,13 +1,13 @@ // Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. -use crate::*; +use batch_system::test_runner::*; use batch_system::*; use std::time::Duration; use tikv_util::mpsc; #[test] fn test_batch() { - let (control_tx, control_fsm) = new_runner(10); + let (control_tx, control_fsm) = Runner::new(10); let (router, mut system) = batch_system::create_system(2, 2, control_tx, control_fsm); let builder = Builder::new(); let metrics = builder.metrics.clone(); @@ -18,8 +18,8 @@ fn test_batch() { let tx_ = tx.clone(); let r = router.clone(); router - .send_control(Some(Box::new(move |_: &mut Runner| { - let (tx, runner) = new_runner(10); + .send_control(Message::Callback(Box::new(move |_: &mut Runner| { + let (tx, runner) = Runner::new(10); let mailbox = BasicMailbox::new(tx, runner); r.register(1, mailbox); tx_.send(1).unwrap(); @@ -29,7 +29,7 @@ fn test_batch() { router .send( 1, - Some(Box::new(move |_: &mut Runner| { + Message::Callback(Box::new(move |_: &mut Runner| { tx.send(2).unwrap(); })), ) diff --git a/components/batch-system/tests/cases/mod.rs b/components/batch-system/tests/cases/mod.rs index 5dbc31c2bac..22eda9f01ab 100644 --- a/components/batch-system/tests/cases/mod.rs +++ b/components/batch-system/tests/cases/mod.rs @@ -1,115 +1,4 @@ // Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. -#[macro_use] -extern crate derive_more; - mod batch; mod router; - -use batch_system::*; -use std::borrow::Cow; -use std::sync::{Arc, Mutex}; -use tikv_util::mpsc; - -pub type Message = Option>; - -pub struct Runner { - is_stopped: bool, - recv: mpsc::Receiver, - mailbox: Option>, - pub sender: Option>, -} - -impl Fsm for Runner { - type Message = Message; - - fn is_stopped(&self) -> bool { - self.is_stopped - } - - fn set_mailbox(&mut self, mailbox: Cow<'_, BasicMailbox>) { - self.mailbox = Some(mailbox.into_owned()); - } - - fn take_mailbox(&mut self) -> Option> { - self.mailbox.take() - } -} - -pub fn new_runner(cap: usize) -> (mpsc::LooseBoundedSender, Box) { - let (tx, rx) = mpsc::loose_bounded(cap); - let fsm = Runner { - is_stopped: false, - recv: rx, - mailbox: None, - sender: None, - }; - (tx, Box::new(fsm)) -} - -#[derive(Add, PartialEq, Debug, Default, AddAssign, Clone, Copy)] -struct HandleMetrics { - begin: usize, - control: usize, - normal: usize, -} - -pub struct Handler { - local: HandleMetrics, - metrics: Arc>, -} - -impl PollHandler for Handler { - fn begin(&mut self, _batch_size: usize) { - self.local.begin += 1; - } - - fn handle_control(&mut self, control: &mut Runner) -> Option { - self.local.control += 1; - while let Ok(r) = control.recv.try_recv() { - if let Some(r) = r { - r(control); - } - } - Some(0) - } - - fn handle_normal(&mut self, normal: &mut Runner) -> Option { - self.local.normal += 1; - while let Ok(r) = normal.recv.try_recv() { - if let Some(r) = r { - r(normal); - } - } - Some(0) - } - - fn end(&mut self, _normals: &mut [Box]) { - let mut c = self.metrics.lock().unwrap(); - *c += self.local; - self.local = HandleMetrics::default(); - } -} - -pub struct Builder { - metrics: Arc>, -} - -impl Builder { - pub fn new() -> Builder { - Builder { - metrics: Arc::default(), - } - } -} - -impl HandlerBuilder for Builder { - type Handler = Handler; - - fn build(&mut self) -> Handler { - Handler { - local: HandleMetrics::default(), - metrics: self.metrics.clone(), - } - } -} diff --git a/components/batch-system/tests/cases/router.rs b/components/batch-system/tests/cases/router.rs index ac2654ec7b8..41080f415e2 100644 --- a/components/batch-system/tests/cases/router.rs +++ b/components/batch-system/tests/cases/router.rs @@ -1,6 +1,6 @@ // Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. -use crate::*; +use batch_system::test_runner::*; use batch_system::*; use crossbeam::channel::*; use std::sync::atomic::*; @@ -10,22 +10,22 @@ use tikv_util::mpsc; fn counter_closure(counter: &Arc) -> Message { let c = counter.clone(); - Some(Box::new(move |_: &mut Runner| { + Message::Callback(Box::new(move |_: &mut Runner| { c.fetch_add(1, Ordering::SeqCst); })) } fn noop() -> Message { - None + Message::Callback(Box::new(|_| ())) } fn unreachable() -> Message { - Some(Box::new(|_: &mut Runner| unreachable!())) + Message::Callback(Box::new(|_: &mut Runner| unreachable!())) } #[test] fn test_basic() { - let (control_tx, mut control_fsm) = new_runner(10); + let (control_tx, mut control_fsm) = Runner::new(10); let (control_drop_tx, control_drop_rx) = mpsc::unbounded(); control_fsm.sender = Some(control_drop_tx); let (router, mut system) = batch_system::create_system(2, 2, control_tx, control_fsm); @@ -47,8 +47,8 @@ fn test_basic() { let router_ = router.clone(); // Control mailbox should be connected. router - .send_control(Some(Box::new(move |_: &mut Runner| { - let (sender, mut runner) = new_runner(10); + .send_control(Message::Callback(Box::new(move |_: &mut Runner| { + let (sender, mut runner) = Runner::new(10); let (tx1, rx1) = mpsc::unbounded(); runner.sender = Some(tx1); let mailbox = BasicMailbox::new(sender, runner); @@ -67,7 +67,7 @@ fn test_basic() { router .send( 1, - Some(Box::new(move |_: &mut Runner| { + Message::Callback(Box::new(move |_: &mut Runner| { rx.recv_timeout(Duration::from_secs(100)).unwrap(); })), ) @@ -88,7 +88,7 @@ fn test_basic() { router .force_send( 1, - Some(Box::new(move |_: &mut Runner| { + Message::Callback(Box::new(move |_: &mut Runner| { tx.send(1).unwrap(); })), )