Skip to content

Commit

Permalink
batch-system: add benchmark (tikv#6477)
Browse files Browse the repository at this point in the history
Signed-off-by: Jay Lee <BusyJayLee@gmail.com>
  • Loading branch information
BusyJay committed Feb 8, 2020
1 parent c6c8b19 commit ca57531
Show file tree
Hide file tree
Showing 9 changed files with 324 additions and 202 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 14 additions & 2 deletions components/batch-system/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,22 @@ crossbeam = "0.7"
tikv_util = { path = "../tikv_util" }
slog = { version = "2.3", features = ["max_level_trace", "release_max_level_debug"] }
slog-global = { version = "0.1", git = "https://github.com/breeswish/slog-global.git", rev = "91904ade" }
derive_more = "0.15.0"

[dev-dependencies]
derive_more = "0.15.0"
criterion = "0.3"

[[test]]
name = "tests"
path = "tests/cases/mod.rs"
path = "tests/cases/mod.rs"
required-features = ["test-runner"]

[[bench]]
name = "router"
harness = false
required-features = ["test-runner"]

[[bench]]
name = "batch-system"
harness = false
required-features = ["test-runner"]
142 changes: 142 additions & 0 deletions components/batch-system/benches/batch-system.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.

#![feature(test)]

extern crate test;

use batch_system::test_runner::*;
use batch_system::*;
use criterion::*;
use std::sync::atomic::*;
use std::sync::Arc;

fn end_hook(tx: &std::sync::mpsc::Sender<()>) -> Message {
let tx = tx.clone();
Message::Callback(Box::new(move |_| {
tx.send(()).unwrap();
}))
}

/// Benches how it performs when many messages are sent to the bench system.
///
/// A better router and lightweight batch scheduling can lead to better result.
fn bench_spawn_many(c: &mut Criterion) {
let (control_tx, control_fsm) = Runner::new(100000);
let (router, mut system) = batch_system::create_system(2, 2, control_tx, control_fsm);
system.spawn("test".to_owned(), Builder::new());
const ID_LIMIT: u64 = 32;
const MESSAGE_LIMIT: usize = 256;
for id in 0..ID_LIMIT {
let (normal_tx, normal_fsm) = Runner::new(100000);
let normal_box = BasicMailbox::new(normal_tx, normal_fsm);
router.register(id, normal_box);
}

let (tx, rx) = std::sync::mpsc::channel();
c.bench_function("spawn_many", |b| {
b.iter(|| {
for id in 0..ID_LIMIT {
for i in 0..MESSAGE_LIMIT {
router.send(id, Message::Loop(i)).unwrap();
}
router.send(id, end_hook(&tx)).unwrap();
}
for _ in 0..ID_LIMIT {
rx.recv().unwrap();
}
})
});
system.shutdown();
}

/// Bench how it performs if two hot FSMs are shown up at the same time.
///
/// A good scheduling algorithm should be able to spread the hot FSMs to
/// all available threads as soon as possible.
fn bench_imbalance(c: &mut Criterion) {
let (control_tx, control_fsm) = Runner::new(100000);
let (router, mut system) = batch_system::create_system(2, 2, control_tx, control_fsm);
system.spawn("test".to_owned(), Builder::new());
const ID_LIMIT: u64 = 10;
const MESSAGE_LIMIT: usize = 512;
for id in 0..ID_LIMIT {
let (normal_tx, normal_fsm) = Runner::new(100000);
let normal_box = BasicMailbox::new(normal_tx, normal_fsm);
router.register(id, normal_box);
}

let (tx, rx) = std::sync::mpsc::channel();
c.bench_function("imbalance", |b| {
b.iter(|| {
for i in 0..MESSAGE_LIMIT {
for id in 0..2 {
router.send(id, Message::Loop(i)).unwrap();
}
}
for id in 0..2 {
router.send(id, end_hook(&tx)).unwrap();
}
for _ in 0..2 {
rx.recv().unwrap();
}
})
});
system.shutdown();
}

/// Bench how it performs when scheduling a lot of quick tasks during an long-polling
/// tasks.
///
/// A good scheduling algorithm should not starve the quick tasks.
fn bench_fairness(c: &mut Criterion) {
let (control_tx, control_fsm) = Runner::new(100000);
let (router, mut system) = batch_system::create_system(2, 2, control_tx, control_fsm);
system.spawn("test".to_owned(), Builder::new());
for id in 0..10 {
let (normal_tx, normal_fsm) = Runner::new(100000);
let normal_box = BasicMailbox::new(normal_tx, normal_fsm);
router.register(id, normal_box);
}

let (tx, _rx) = std::sync::mpsc::channel();
let running = Arc::new(AtomicBool::new(true));
let router1 = router.clone();
let running1 = running.clone();
let handle = std::thread::spawn(move || {
while running1.load(Ordering::SeqCst) {
// Using 4 to ensure all worker threads are busy spinning.
for id in 0..4 {
let _ = router1.send(id, Message::Loop(16));
}
}
tx.send(()).unwrap();
});

let (tx2, rx2) = std::sync::mpsc::channel();
c.bench_function("fairness", |b| {
b.iter(|| {
for _ in 0..10 {
for id in 4..6 {
router.send(id, Message::Loop(10)).unwrap();
}
}
for id in 4..6 {
router.send(id, end_hook(&tx2)).unwrap();
}
for _ in 4..6 {
rx2.recv().unwrap();
}
})
});
running.store(false, Ordering::SeqCst);
system.shutdown();
let _ = handle.join();
}

criterion_group!(fair, bench_fairness);
criterion_group!(
name = load;
config = Criterion::default().sample_size(30);
targets = bench_imbalance, bench_spawn_many
);
criterion_main!(fair, load);
89 changes: 14 additions & 75 deletions components/batch-system/benches/router.rs
Original file line number Diff line number Diff line change
@@ -1,85 +1,24 @@
#![feature(test)]

extern crate test;
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.

use batch_system::test_runner::*;
use batch_system::*;
use std::borrow::Cow;
use test::*;
use tikv_util::mpsc;

pub type Message = ();

pub struct Runner {
is_stopped: bool,
recv: mpsc::Receiver<Message>,
mailbox: Option<BasicMailbox<Runner>>,
}

impl Fsm for Runner {
type Message = Message;

fn is_stopped(&self) -> bool {
self.is_stopped
}

fn set_mailbox(&mut self, mailbox: Cow<'_, BasicMailbox<Self>>) {
self.mailbox = Some(mailbox.into_owned());
}

fn take_mailbox(&mut self) -> Option<BasicMailbox<Self>> {
self.mailbox.take()
}
}

pub fn new_runner(cap: usize) -> (mpsc::LooseBoundedSender<Message>, Box<Runner>) {
let (tx, rx) = mpsc::loose_bounded(cap);
let fsm = Runner {
is_stopped: false,
recv: rx,
mailbox: None,
};
(tx, Box::new(fsm))
}
use criterion::*;

pub struct Handler;

impl PollHandler<Runner, Runner> for Handler {
fn begin(&mut self, _batch_size: usize) {}

fn handle_control(&mut self, control: &mut Runner) -> Option<usize> {
while let Ok(_) = control.recv.try_recv() {}
Some(0)
}

fn handle_normal(&mut self, normal: &mut Runner) -> Option<usize> {
while let Ok(_) = normal.recv.try_recv() {}
Some(0)
}

fn end(&mut self, _normals: &mut [Box<Runner>]) {}
}

pub struct Builder;

impl HandlerBuilder<Runner, Runner> for Builder {
type Handler = Handler;

fn build(&mut self) -> Handler {
Handler
}
}

#[bench]
fn bench_send(b: &mut Bencher) {
let (control_tx, control_fsm) = new_runner(100000);
fn bench_send(c: &mut Criterion) {
let (control_tx, control_fsm) = Runner::new(100000);
let (router, mut system) = batch_system::create_system(2, 2, control_tx, control_fsm);
system.spawn("test".to_owned(), Builder);
let (normal_tx, normal_fsm) = new_runner(100000);
system.spawn("test".to_owned(), Builder::new());
let (normal_tx, normal_fsm) = Runner::new(100000);
let normal_box = BasicMailbox::new(normal_tx, normal_fsm);
router.register(1, normal_box);

b.iter(|| {
router.send(1, ()).unwrap();
c.bench_function("router::send", |b| {
b.iter(|| {
router.send(1, Message::Loop(0)).unwrap();
})
});
system.shutdown();
}

criterion_group!(benches, bench_send);
criterion_main!(benches);
6 changes: 6 additions & 0 deletions components/batch-system/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,18 @@ extern crate slog;
extern crate slog_global;
#[macro_use]
extern crate tikv_util;
#[cfg(feature = "test-runner")]
#[macro_use]
extern crate derive_more;

mod batch;
mod fsm;
mod mailbox;
mod router;

#[cfg(feature = "test-runner")]
pub mod test_runner;

pub use self::batch::{create_system, BatchRouter, BatchSystem, HandlerBuilder, PollHandler};
pub use self::fsm::Fsm;
pub use self::mailbox::{BasicMailbox, Mailbox};
Expand Down
Loading

0 comments on commit ca57531

Please sign in to comment.