From f8a125ad6a2726e53062f26cb93b5e00bc9c5a5d Mon Sep 17 00:00:00 2001 From: GanZiheng Date: Mon, 4 Jul 2022 17:36:11 +0800 Subject: [PATCH 01/17] add bench action Signed-off-by: GanZiheng --- .github/workflows/bench.yml | 26 +++++++++++++++++++------- Cargo.toml | 3 +++ proto/Cargo.toml | 3 +++ skiplist/Cargo.toml | 3 +++ 4 files changed, 28 insertions(+), 7 deletions(-) diff --git a/.github/workflows/bench.yml b/.github/workflows/bench.yml index 2dc581b5..f3352152 100644 --- a/.github/workflows/bench.yml +++ b/.github/workflows/bench.yml @@ -2,9 +2,15 @@ on: workflow_dispatch: inputs: reason: - description: 'reason to trigger this build' + description: "reason to trigger this build" required: false - + +permissions: + # deployments permission to deploy GitHub pages website + deployments: write + # contents permission to update benchmark contents in gh-pages branch + contents: write + name: Bench jobs: @@ -26,11 +32,17 @@ jobs: with: toolchain: nightly default: true - - uses: actions-rs/cargo@v1 - name: Benchmark 🚀 + - name: Benchmark 🚀 + run: cargo +nightly bench --all-features --workspace -- --output-format bencher | tee output.txt + - uses: benchmark-action/github-action-benchmark@v1 + name: Store benchmark result with: - command: bench - args: --all-features --workspace + tool: "cargo" + output-file-path: output.txt + # Access token to deploy GitHub Pages branch + github-token: ${{ secrets.GITHUB_TOKEN }} + # Push and deploy GitHub pages branch automatically + auto-push: true sanitizer_bench: name: Bench with Sanitizer runs-on: ubuntu-latest @@ -77,4 +89,4 @@ jobs: command: bench args: --all-features -Zbuild-std --target x86_64-unknown-linux-gnu env: - RUSTFLAGS: "-Zsanitizer=thread" \ No newline at end of file + RUSTFLAGS: "-Zsanitizer=thread" diff --git a/Cargo.toml b/Cargo.toml index 3fec08a6..49ae17c8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,3 +54,6 @@ incremental = false debug-assertions = false overflow-checks = false rpath = false + +[lib] +bench = false diff --git a/proto/Cargo.toml b/proto/Cargo.toml index 1f55de7e..3f526b95 100644 --- a/proto/Cargo.toml +++ b/proto/Cargo.toml @@ -10,3 +10,6 @@ prost = "0.8" [build-dependencies] prost-build = { version = "0.8" } + +[lib] +bench = false diff --git a/skiplist/Cargo.toml b/skiplist/Cargo.toml index 3db51e64..aa85774e 100644 --- a/skiplist/Cargo.toml +++ b/skiplist/Cargo.toml @@ -18,3 +18,6 @@ tikv-jemallocator = "0.4.0" [[bench]] name = "bench" harness = false + +[lib] +bench = false From ff11cd034b203bcf6e2159b97d3b69ac88c2391a Mon Sep 17 00:00:00 2001 From: GanZiheng Date: Mon, 4 Jul 2022 19:54:06 +0800 Subject: [PATCH 02/17] auto trigger bench action Signed-off-by: GanZiheng --- .github/workflows/bench.yml | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/.github/workflows/bench.yml b/.github/workflows/bench.yml index f3352152..7124ba27 100644 --- a/.github/workflows/bench.yml +++ b/.github/workflows/bench.yml @@ -1,9 +1,12 @@ on: - workflow_dispatch: - inputs: - reason: - description: "reason to trigger this build" - required: false + push: + branches: + - master + - develop + pull_request: + branches: + - master + - develop permissions: # deployments permission to deploy GitHub pages website From 3ecde50173e1c9d79cf2289580be03e3d4984a9a Mon Sep 17 00:00:00 2001 From: GanZiheng Date: Mon, 4 Jul 2022 22:12:34 +0800 Subject: [PATCH 03/17] remove sanitizer bench and only run bench on push to master Signed-off-by: GanZiheng --- .github/workflows/bench.yml | 53 +------------------------------------ 1 file changed, 1 insertion(+), 52 deletions(-) diff --git a/.github/workflows/bench.yml b/.github/workflows/bench.yml index 7124ba27..c07c4be5 100644 --- a/.github/workflows/bench.yml +++ b/.github/workflows/bench.yml @@ -1,12 +1,8 @@ +# Do not run this workflow on pull request since this workflow has permission to modify contents. on: push: branches: - master - - develop - pull_request: - branches: - - master - - develop permissions: # deployments permission to deploy GitHub pages website @@ -46,50 +42,3 @@ jobs: github-token: ${{ secrets.GITHUB_TOKEN }} # Push and deploy GitHub pages branch automatically auto-push: true - sanitizer_bench: - name: Bench with Sanitizer - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v2 - name: Checkout 🛎️ - - uses: actions/cache@v2 - with: - path: | - ~/.cargo/registry - ~/.cargo/git - target - key: ${{ runner.os }}-cargo-sanitizer-bench - - uses: actions-rs/toolchain@v1 - name: Setup Cargo Toolchain 🛎️ - with: - components: rust-src - toolchain: nightly - default: true - - uses: actions-rs/cargo@v1 - name: Bench with Address Sanitizer 🚀 - with: - command: bench - args: --all-features -Zbuild-std --target x86_64-unknown-linux-gnu - env: - RUSTFLAGS: "-Zsanitizer=address" - - uses: actions-rs/cargo@v1 - name: Bench with Leak Sanitizer 🚀 - with: - command: bench - args: --all-features -Zbuild-std --target x86_64-unknown-linux-gnu - env: - RUSTFLAGS: "-Zsanitizer=leak" - - uses: actions-rs/cargo@v1 - name: Bench with Memory Sanitizer 🚀 - with: - command: bench - args: --all-features -Zbuild-std --target x86_64-unknown-linux-gnu - env: - RUSTFLAGS: "-Zsanitizer=memory" - - uses: actions-rs/cargo@v1 - name: Bench with Thread Sanitizer 🚀 - with: - command: bench - args: --all-features -Zbuild-std --target x86_64-unknown-linux-gnu - env: - RUSTFLAGS: "-Zsanitizer=thread" From f84c122422f2d7f71016e8927168975c0781bc47 Mon Sep 17 00:00:00 2001 From: GanZiheng Date: Tue, 5 Jul 2022 18:20:16 +0800 Subject: [PATCH 04/17] update test Signed-off-by: GanZiheng --- .github/workflows/bench.yml | 2 +- Cargo.toml | 9 ++ benches/benches_badger_rocks.rs | 197 ++++++++++++++++++++++++++++++++ benches/common.rs | 23 +++- 4 files changed, 228 insertions(+), 3 deletions(-) create mode 100644 benches/benches_badger_rocks.rs diff --git a/.github/workflows/bench.yml b/.github/workflows/bench.yml index c07c4be5..fc606455 100644 --- a/.github/workflows/bench.yml +++ b/.github/workflows/bench.yml @@ -32,7 +32,7 @@ jobs: toolchain: nightly default: true - name: Benchmark 🚀 - run: cargo +nightly bench --all-features --workspace -- --output-format bencher | tee output.txt + run: cargo +nightly bench --all-features --workspace --bench benches_badger_rocks -- --output-format bencher | tee output.txt - uses: benchmark-action/github-action-benchmark@v1 name: Store benchmark result with: diff --git a/Cargo.toml b/Cargo.toml index 96035fd0..6f3720e4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,10 @@ version = "0.1.0" authors = ["Jay Lee "] edition = "2018" +[features] +default = [] +enable-rocksdb = ["rocksdb"] + [dependencies] bytes = "1.0" coarsetime = "0.1.22" @@ -20,6 +24,7 @@ parking_lot = "0.11" prost = "0.8" proto = { path = "proto" } rand = "0.7" +rocksdb = { version = "0.15", optional = true } skiplist = { path = "skiplist" } tempdir = "0.3" thiserror = "1.0" @@ -51,6 +56,10 @@ harness = false name = "bench_iterator" harness = false +[[bench]] +name = "benches_badger_rocks" +harness = false + [profile.bench] opt-level = 3 debug = false diff --git a/benches/benches_badger_rocks.rs b/benches/benches_badger_rocks.rs new file mode 100644 index 00000000..09a5adca --- /dev/null +++ b/benches/benches_badger_rocks.rs @@ -0,0 +1,197 @@ +#![cfg(feature = "enable-rocksdb")] +mod common; + +use agatedb::Agate; +use agatedb::AgateOptions; +use agatedb::IteratorOptions; +use common::{gen_kv_pair, unix_time}; +use criterion::{criterion_group, criterion_main, Criterion}; +use rand::prelude::ThreadRng; +use rand::Rng; +use rocksdb::DB; +use std::sync::Arc; +use tempdir::TempDir; + +fn badger_populate(agate: Arc, batch_size: u64, value_size: usize) { + let mut txn = agate.new_transaction_at(unix_time(), true); + + for i in 0..batch_size { + let (key, value) = gen_kv_pair(i, value_size); + txn.set(key, value).unwrap(); + } + + txn.commit_at(unix_time()).unwrap(); +} + +fn badger_randread(agate: Arc, batch_size: u64, value_size: usize, rng: &mut ThreadRng) { + let txn = agate.new_transaction_at(unix_time(), false); + + for _ in 0..batch_size { + let (key, value) = gen_kv_pair(rng.gen_range(0, batch_size), value_size); + + let item = txn.get(&key).unwrap(); + assert_eq!(item.value(), value); + } +} + +fn badger_iterate(agate: Arc, value_size: usize) { + let txn = agate.new_transaction_at(unix_time(), false); + let opts = IteratorOptions::default(); + let mut iter = txn.new_iterator(&opts); + iter.rewind(); + + while iter.valid() { + let item = iter.item(); + assert_eq!(item.value().len(), value_size); + + iter.next(); + } +} + +fn rocks_populate(db: Arc, batch_size: u64, value_size: usize) { + let mut write_options = rocksdb::WriteOptions::default(); + write_options.set_sync(true); + write_options.disable_wal(false); + + let mut batch = rocksdb::WriteBatch::default(); + + for i in 0..batch_size { + let (key, value) = gen_kv_pair(i, value_size); + batch.put(key, value); + } + + db.write_opt(batch, &write_options).unwrap(); +} + +fn rocks_randread(db: Arc, batch_size: u64, value_size: usize, rng: &mut ThreadRng) { + for _ in 0..batch_size { + let (key, value) = gen_kv_pair(rng.gen_range(0, batch_size), value_size); + + let find = db.get(key).unwrap(); + assert_eq!(find.unwrap(), value) + } +} + +fn rocks_iterate(db: Arc, value_size: usize) { + let iter = db.iterator(rocksdb::IteratorMode::Start); + + for (_, value) in iter { + assert_eq!(value.len(), value_size); + } +} + +fn bench_badger(c: &mut Criterion) { + let batch_size = 1000; + let mut rng = rand::thread_rng(); + + let dir = TempDir::new("agatedb-bench-small-value").unwrap(); + let mut opts = AgateOptions { + create_if_not_exists: true, + dir: dir.as_ref().to_path_buf(), + value_dir: dir.as_ref().to_path_buf(), + managed_txns: true, + ..Default::default() + }; + let agate = Arc::new(opts.open().unwrap()); + let value_size = 32; + + c.bench_function("badger populate small value", |b| { + b.iter(|| { + badger_populate(agate.clone(), batch_size, value_size); + }); + }); + + c.bench_function("badger randread small value", |b| { + b.iter(|| { + badger_randread(agate.clone(), batch_size, value_size, &mut rng); + }); + }); + + c.bench_function("badger iterate small value", |b| { + b.iter(|| { + badger_iterate(agate.clone(), value_size); + }); + }); + + dir.close().unwrap(); + let dir = TempDir::new("agatedb-bench-large-value").unwrap(); + opts.dir = dir.as_ref().to_path_buf(); + opts.value_dir = dir.as_ref().to_path_buf(); + let agate = Arc::new(opts.open().unwrap()); + let value_size = 102400; + + c.bench_function("badger populate large value", |b| { + b.iter(|| { + badger_populate(agate.clone(), batch_size, value_size); + }); + }); + + c.bench_function("badger randread large value", |b| { + b.iter(|| { + badger_randread(agate.clone(), batch_size, value_size, &mut rng); + }); + }); + + c.bench_function("badger iterate large value", |b| { + b.iter(|| { + badger_iterate(agate.clone(), value_size); + }); + }); + + dir.close().unwrap(); +} + +fn bench_rocks(c: &mut Criterion) { + let batch_size = 1000; + let mut rng = rand::thread_rng(); + + let dir = TempDir::new("rocks-bench-small-value").unwrap(); + let mut opts = rocksdb::Options::default(); + opts.create_if_missing(true); + opts.set_compression_type(rocksdb::DBCompressionType::None); + let db = Arc::new(rocksdb::DB::open(&opts, &dir).unwrap()); + let value_size = 32; + + c.bench_function("rocks populate small value", |b| { + b.iter(|| rocks_populate(db.clone(), batch_size, value_size)); + }); + + c.bench_function("rocks randread small value", |b| { + b.iter(|| { + rocks_randread(db.clone(), batch_size, value_size, &mut rng); + }); + }); + + c.bench_function("rocks iterate small value", |b| { + b.iter(|| rocks_iterate(db.clone(), value_size)); + }); + + dir.close().unwrap(); + let dir = TempDir::new("rocks-bench-large-value").unwrap(); + let db = Arc::new(rocksdb::DB::open(&opts, &dir).unwrap()); + let value_size = 102400; + + c.bench_function("rocks populate large value", |b| { + b.iter(|| rocks_populate(db.clone(), batch_size, value_size)); + }); + + c.bench_function("rocks randread large value", |b| { + b.iter(|| { + rocks_randread(db.clone(), batch_size, value_size, &mut rng); + }); + }); + + c.bench_function("rocks iterate large value", |b| { + b.iter(|| rocks_iterate(db.clone(), value_size)); + }); + + dir.close().unwrap(); +} + +criterion_group! { + name = benches_badger_rocks; + config = Criterion::default(); + targets = bench_badger, bench_rocks +} + +criterion_main!(benches_badger_rocks); diff --git a/benches/common.rs b/benches/common.rs index 6742ad2f..b4eea560 100644 --- a/benches/common.rs +++ b/benches/common.rs @@ -3,9 +3,12 @@ use agatedb::{ opt::build_table_options, AgateOptions, ChecksumVerificationMode::NoVerification, Table, TableBuilder, Value, }; -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; use rand::{distributions::Alphanumeric, Rng}; -use std::ops::{Deref, DerefMut}; +use std::{ + ops::{Deref, DerefMut}, + time::UNIX_EPOCH, +}; use tempdir::TempDir; pub fn rand_value() -> String { @@ -64,3 +67,19 @@ pub fn get_table_for_benchmark(count: usize) -> TableGuard { _tmp_dir: tmp_dir, } } + +pub fn gen_kv_pair(key: u64, value_size: usize) -> (Bytes, Bytes) { + let key = Bytes::from(format!("vsz={:05}-k={:010}", value_size, key)); + + let mut value = BytesMut::with_capacity(value_size); + value.resize(value_size, 0); + + (key, value.freeze()) +} + +pub fn unix_time() -> u64 { + UNIX_EPOCH + .elapsed() + .expect("Time went backwards") + .as_millis() as u64 +} From a8d54c70b8786ce0f82bad547d474d124238bfa4 Mon Sep 17 00:00:00 2001 From: GanZiheng Date: Wed, 6 Jul 2022 10:46:03 +0800 Subject: [PATCH 05/17] sync write Signed-off-by: GanZiheng --- agate_bench/src/main.rs | 1 + benches/benches_badger_rocks.rs | 1 + 2 files changed, 2 insertions(+) diff --git a/agate_bench/src/main.rs b/agate_bench/src/main.rs index 7853c576..100662ca 100644 --- a/agate_bench/src/main.rs +++ b/agate_bench/src/main.rs @@ -134,6 +134,7 @@ fn main() { let mut options = AgateOptions { create_if_not_exists: true, + sync_writes: true, dir: directory.clone(), value_dir: directory, managed_txns: true, diff --git a/benches/benches_badger_rocks.rs b/benches/benches_badger_rocks.rs index 09a5adca..b5cbe9ea 100644 --- a/benches/benches_badger_rocks.rs +++ b/benches/benches_badger_rocks.rs @@ -87,6 +87,7 @@ fn bench_badger(c: &mut Criterion) { let dir = TempDir::new("agatedb-bench-small-value").unwrap(); let mut opts = AgateOptions { create_if_not_exists: true, + sync_writes: true, dir: dir.as_ref().to_path_buf(), value_dir: dir.as_ref().to_path_buf(), managed_txns: true, From f4266d347cce155184f67a7b0980364d291ab0c1 Mon Sep 17 00:00:00 2001 From: GanZiheng Date: Wed, 6 Jul 2022 11:41:32 +0800 Subject: [PATCH 06/17] update Signed-off-by: GanZiheng --- benches/benches_badger_rocks.rs | 120 +++++++++++++++++++++----------- benches/common.rs | 9 +++ 2 files changed, 88 insertions(+), 41 deletions(-) diff --git a/benches/benches_badger_rocks.rs b/benches/benches_badger_rocks.rs index b5cbe9ea..ae8218d9 100644 --- a/benches/benches_badger_rocks.rs +++ b/benches/benches_badger_rocks.rs @@ -4,7 +4,7 @@ mod common; use agatedb::Agate; use agatedb::AgateOptions; use agatedb::IteratorOptions; -use common::{gen_kv_pair, unix_time}; +use common::{gen_kv_pair, remove_files, unix_time}; use criterion::{criterion_group, criterion_main, Criterion}; use rand::prelude::ThreadRng; use rand::Rng; @@ -12,10 +12,14 @@ use rocksdb::DB; use std::sync::Arc; use tempdir::TempDir; -fn badger_populate(agate: Arc, batch_size: u64, value_size: usize) { +const BATCH_SIZE: u64 = 1000; +const SMALL_VALUE_SIZE: usize = 32; +const LARGE_VALUE_SIZE: usize = 102400; + +fn badger_populate(agate: Arc, value_size: usize) { let mut txn = agate.new_transaction_at(unix_time(), true); - for i in 0..batch_size { + for i in 0..BATCH_SIZE { let (key, value) = gen_kv_pair(i, value_size); txn.set(key, value).unwrap(); } @@ -23,11 +27,11 @@ fn badger_populate(agate: Arc, batch_size: u64, value_size: usize) { txn.commit_at(unix_time()).unwrap(); } -fn badger_randread(agate: Arc, batch_size: u64, value_size: usize, rng: &mut ThreadRng) { +fn badger_randread(agate: Arc, value_size: usize, rng: &mut ThreadRng) { let txn = agate.new_transaction_at(unix_time(), false); - for _ in 0..batch_size { - let (key, value) = gen_kv_pair(rng.gen_range(0, batch_size), value_size); + for _ in 0..BATCH_SIZE { + let (key, value) = gen_kv_pair(rng.gen_range(0, BATCH_SIZE), value_size); let item = txn.get(&key).unwrap(); assert_eq!(item.value(), value); @@ -48,14 +52,14 @@ fn badger_iterate(agate: Arc, value_size: usize) { } } -fn rocks_populate(db: Arc, batch_size: u64, value_size: usize) { +fn rocks_populate(db: Arc, value_size: usize) { let mut write_options = rocksdb::WriteOptions::default(); write_options.set_sync(true); write_options.disable_wal(false); let mut batch = rocksdb::WriteBatch::default(); - for i in 0..batch_size { + for i in 0..BATCH_SIZE { let (key, value) = gen_kv_pair(i, value_size); batch.put(key, value); } @@ -63,9 +67,9 @@ fn rocks_populate(db: Arc, batch_size: u64, value_size: usize) { db.write_opt(batch, &write_options).unwrap(); } -fn rocks_randread(db: Arc, batch_size: u64, value_size: usize, rng: &mut ThreadRng) { - for _ in 0..batch_size { - let (key, value) = gen_kv_pair(rng.gen_range(0, batch_size), value_size); +fn rocks_randread(db: Arc, value_size: usize, rng: &mut ThreadRng) { + for _ in 0..BATCH_SIZE { + let (key, value) = gen_kv_pair(rng.gen_range(0, BATCH_SIZE), value_size); let find = db.get(key).unwrap(); assert_eq!(find.unwrap(), value) @@ -81,61 +85,76 @@ fn rocks_iterate(db: Arc, value_size: usize) { } fn bench_badger(c: &mut Criterion) { - let batch_size = 1000; let mut rng = rand::thread_rng(); let dir = TempDir::new("agatedb-bench-small-value").unwrap(); + let dir_path = dir.path(); let mut opts = AgateOptions { create_if_not_exists: true, sync_writes: true, - dir: dir.as_ref().to_path_buf(), - value_dir: dir.as_ref().to_path_buf(), managed_txns: true, ..Default::default() }; - let agate = Arc::new(opts.open().unwrap()); - let value_size = 32; c.bench_function("badger populate small value", |b| { - b.iter(|| { - badger_populate(agate.clone(), batch_size, value_size); - }); + b.iter_batched( + || { + remove_files(dir_path); + opts.dir = dir_path.to_path_buf(); + opts.value_dir = dir_path.to_path_buf(); + Arc::new(opts.open().unwrap()) + }, + |agate| { + badger_populate(agate, SMALL_VALUE_SIZE); + }, + criterion::BatchSize::SmallInput, + ); }); + let agate = Arc::new(opts.open().unwrap()); + c.bench_function("badger randread small value", |b| { b.iter(|| { - badger_randread(agate.clone(), batch_size, value_size, &mut rng); + badger_randread(agate.clone(), SMALL_VALUE_SIZE, &mut rng); }); }); c.bench_function("badger iterate small value", |b| { b.iter(|| { - badger_iterate(agate.clone(), value_size); + badger_iterate(agate.clone(), SMALL_VALUE_SIZE); }); }); dir.close().unwrap(); let dir = TempDir::new("agatedb-bench-large-value").unwrap(); - opts.dir = dir.as_ref().to_path_buf(); - opts.value_dir = dir.as_ref().to_path_buf(); - let agate = Arc::new(opts.open().unwrap()); - let value_size = 102400; + let dir_path = dir.path(); c.bench_function("badger populate large value", |b| { - b.iter(|| { - badger_populate(agate.clone(), batch_size, value_size); - }); + b.iter_batched( + || { + remove_files(dir_path); + opts.dir = dir_path.to_path_buf(); + opts.value_dir = dir_path.to_path_buf(); + Arc::new(opts.open().unwrap()) + }, + |agate| { + badger_populate(agate, LARGE_VALUE_SIZE); + }, + criterion::BatchSize::SmallInput, + ); }); + let agate = Arc::new(opts.open().unwrap()); + c.bench_function("badger randread large value", |b| { b.iter(|| { - badger_randread(agate.clone(), batch_size, value_size, &mut rng); + badger_randread(agate.clone(), LARGE_VALUE_SIZE, &mut rng); }); }); c.bench_function("badger iterate large value", |b| { b.iter(|| { - badger_iterate(agate.clone(), value_size); + badger_iterate(agate.clone(), LARGE_VALUE_SIZE); }); }); @@ -143,47 +162,66 @@ fn bench_badger(c: &mut Criterion) { } fn bench_rocks(c: &mut Criterion) { - let batch_size = 1000; let mut rng = rand::thread_rng(); let dir = TempDir::new("rocks-bench-small-value").unwrap(); + let dir_path = dir.path(); let mut opts = rocksdb::Options::default(); opts.create_if_missing(true); opts.set_compression_type(rocksdb::DBCompressionType::None); - let db = Arc::new(rocksdb::DB::open(&opts, &dir).unwrap()); - let value_size = 32; c.bench_function("rocks populate small value", |b| { - b.iter(|| rocks_populate(db.clone(), batch_size, value_size)); + b.iter_batched( + || { + remove_files(dir_path); + Arc::new(rocksdb::DB::open(&opts, &dir).unwrap()) + }, + |db| { + rocks_populate(db, SMALL_VALUE_SIZE); + }, + criterion::BatchSize::SmallInput, + ); }); + let db = Arc::new(rocksdb::DB::open(&opts, &dir).unwrap()); + c.bench_function("rocks randread small value", |b| { b.iter(|| { - rocks_randread(db.clone(), batch_size, value_size, &mut rng); + rocks_randread(db.clone(), SMALL_VALUE_SIZE, &mut rng); }); }); c.bench_function("rocks iterate small value", |b| { - b.iter(|| rocks_iterate(db.clone(), value_size)); + b.iter(|| rocks_iterate(db.clone(), SMALL_VALUE_SIZE)); }); dir.close().unwrap(); let dir = TempDir::new("rocks-bench-large-value").unwrap(); - let db = Arc::new(rocksdb::DB::open(&opts, &dir).unwrap()); - let value_size = 102400; + let dir_path = dir.path(); c.bench_function("rocks populate large value", |b| { - b.iter(|| rocks_populate(db.clone(), batch_size, value_size)); + b.iter_batched( + || { + remove_files(dir_path); + Arc::new(rocksdb::DB::open(&opts, &dir).unwrap()) + }, + |db| { + rocks_populate(db, LARGE_VALUE_SIZE); + }, + criterion::BatchSize::SmallInput, + ); }); + let db = Arc::new(rocksdb::DB::open(&opts, &dir).unwrap()); + c.bench_function("rocks randread large value", |b| { b.iter(|| { - rocks_randread(db.clone(), batch_size, value_size, &mut rng); + rocks_randread(db.clone(), LARGE_VALUE_SIZE, &mut rng); }); }); c.bench_function("rocks iterate large value", |b| { - b.iter(|| rocks_iterate(db.clone(), value_size)); + b.iter(|| rocks_iterate(db.clone(), LARGE_VALUE_SIZE)); }); dir.close().unwrap(); diff --git a/benches/common.rs b/benches/common.rs index b4eea560..09d75a89 100644 --- a/benches/common.rs +++ b/benches/common.rs @@ -6,7 +6,9 @@ use agatedb::{ use bytes::{Bytes, BytesMut}; use rand::{distributions::Alphanumeric, Rng}; use std::{ + fs::{read_dir, remove_file}, ops::{Deref, DerefMut}, + path::Path, time::UNIX_EPOCH, }; use tempdir::TempDir; @@ -83,3 +85,10 @@ pub fn unix_time() -> u64 { .expect("Time went backwards") .as_millis() as u64 } + +pub fn remove_files(path: &Path) { + read_dir(path).unwrap().into_iter().for_each(|entry| { + let entry = entry.unwrap(); + remove_file(entry.path()).unwrap(); + }); +} From bc555b0cbf7b31444e366bf79488a0b7155fe43a Mon Sep 17 00:00:00 2001 From: GanZiheng Date: Wed, 6 Jul 2022 11:46:42 +0800 Subject: [PATCH 07/17] sync dir Signed-off-by: GanZiheng --- benches/common.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/benches/common.rs b/benches/common.rs index 09d75a89..fb503b96 100644 --- a/benches/common.rs +++ b/benches/common.rs @@ -1,7 +1,7 @@ #![allow(dead_code)] use agatedb::{ - opt::build_table_options, AgateOptions, ChecksumVerificationMode::NoVerification, Table, - TableBuilder, Value, + opt::build_table_options, util::sync_dir, AgateOptions, + ChecksumVerificationMode::NoVerification, Table, TableBuilder, Value, }; use bytes::{Bytes, BytesMut}; use rand::{distributions::Alphanumeric, Rng}; @@ -91,4 +91,5 @@ pub fn remove_files(path: &Path) { let entry = entry.unwrap(); remove_file(entry.path()).unwrap(); }); + sync_dir(&path).unwrap(); } From fc0cdae441156b9b5c9c04f105dcdd4744458f8d Mon Sep 17 00:00:00 2001 From: GanZiheng Date: Wed, 6 Jul 2022 12:38:04 +0800 Subject: [PATCH 08/17] update Signed-off-by: GanZiheng --- benches/benches_badger_rocks.rs | 91 ++++++++++++++++++++------------- 1 file changed, 55 insertions(+), 36 deletions(-) diff --git a/benches/benches_badger_rocks.rs b/benches/benches_badger_rocks.rs index ae8218d9..fca0192d 100644 --- a/benches/benches_badger_rocks.rs +++ b/benches/benches_badger_rocks.rs @@ -9,7 +9,10 @@ use criterion::{criterion_group, criterion_main, Criterion}; use rand::prelude::ThreadRng; use rand::Rng; use rocksdb::DB; +use std::ops::Add; use std::sync::Arc; +use std::time::Duration; +use std::time::Instant; use tempdir::TempDir; const BATCH_SIZE: u64 = 1000; @@ -92,23 +95,27 @@ fn bench_badger(c: &mut Criterion) { let mut opts = AgateOptions { create_if_not_exists: true, sync_writes: true, + dir: dir_path.to_path_buf(), + value_dir: dir_path.to_path_buf(), managed_txns: true, ..Default::default() }; c.bench_function("badger populate small value", |b| { - b.iter_batched( - || { + b.iter_custom(|iters| { + let mut total = Duration::new(0, 0); + + (0..iters).into_iter().for_each(|_| { remove_files(dir_path); - opts.dir = dir_path.to_path_buf(); - opts.value_dir = dir_path.to_path_buf(); - Arc::new(opts.open().unwrap()) - }, - |agate| { + let agate = Arc::new(opts.open().unwrap()); + + let now = Instant::now(); badger_populate(agate, SMALL_VALUE_SIZE); - }, - criterion::BatchSize::SmallInput, - ); + total = total.add(now.elapsed()); + }); + + total + }); }); let agate = Arc::new(opts.open().unwrap()); @@ -128,20 +135,24 @@ fn bench_badger(c: &mut Criterion) { dir.close().unwrap(); let dir = TempDir::new("agatedb-bench-large-value").unwrap(); let dir_path = dir.path(); + opts.dir = dir_path.to_path_buf(); + opts.value_dir = dir_path.to_path_buf(); c.bench_function("badger populate large value", |b| { - b.iter_batched( - || { + b.iter_custom(|iters| { + let mut total = Duration::new(0, 0); + + (0..iters).into_iter().for_each(|_| { remove_files(dir_path); - opts.dir = dir_path.to_path_buf(); - opts.value_dir = dir_path.to_path_buf(); - Arc::new(opts.open().unwrap()) - }, - |agate| { + let agate = Arc::new(opts.open().unwrap()); + + let now = Instant::now(); badger_populate(agate, LARGE_VALUE_SIZE); - }, - criterion::BatchSize::SmallInput, - ); + total = total.add(now.elapsed()); + }); + + total + }); }); let agate = Arc::new(opts.open().unwrap()); @@ -171,16 +182,20 @@ fn bench_rocks(c: &mut Criterion) { opts.set_compression_type(rocksdb::DBCompressionType::None); c.bench_function("rocks populate small value", |b| { - b.iter_batched( - || { + b.iter_custom(|iters| { + let mut total = Duration::new(0, 0); + + (0..iters).into_iter().for_each(|_| { remove_files(dir_path); - Arc::new(rocksdb::DB::open(&opts, &dir).unwrap()) - }, - |db| { + let db = Arc::new(rocksdb::DB::open(&opts, &dir).unwrap()); + + let now = Instant::now(); rocks_populate(db, SMALL_VALUE_SIZE); - }, - criterion::BatchSize::SmallInput, - ); + total = total.add(now.elapsed()); + }); + + total + }); }); let db = Arc::new(rocksdb::DB::open(&opts, &dir).unwrap()); @@ -200,16 +215,20 @@ fn bench_rocks(c: &mut Criterion) { let dir_path = dir.path(); c.bench_function("rocks populate large value", |b| { - b.iter_batched( - || { + b.iter_custom(|iters| { + let mut total = Duration::new(0, 0); + + (0..iters).into_iter().for_each(|_| { remove_files(dir_path); - Arc::new(rocksdb::DB::open(&opts, &dir).unwrap()) - }, - |db| { + let db = Arc::new(rocksdb::DB::open(&opts, &dir).unwrap()); + + let now = Instant::now(); rocks_populate(db, LARGE_VALUE_SIZE); - }, - criterion::BatchSize::SmallInput, - ); + total = total.add(now.elapsed()); + }); + + total + }); }); let db = Arc::new(rocksdb::DB::open(&opts, &dir).unwrap()); From e9a8a3e45bdbc9c4c5d76f72e608d90cd6f4b2c6 Mon Sep 17 00:00:00 2001 From: GanZiheng Date: Wed, 6 Jul 2022 15:36:58 +0800 Subject: [PATCH 09/17] update Signed-off-by: GanZiheng --- Cargo.toml | 3 --- agate_bench/src/main.rs | 11 ++++++----- benches/bench_iterator.rs | 10 +++------- benches/benches_badger_rocks.rs | 17 ++++++++--------- benches/common.rs | 13 +++++++------ proto/Cargo.toml | 3 --- skiplist/Cargo.toml | 3 --- src/checksum.rs | 3 ++- src/db/opt.rs | 6 +++--- src/lib.rs | 8 ++++---- src/managed_db.rs | 6 ++---- src/ops/transaction_test.rs | 22 ++++++++++------------ src/table/merge_iterator.rs | 6 ++++-- 13 files changed, 49 insertions(+), 62 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 6f3720e4..b187632d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -69,6 +69,3 @@ incremental = false debug-assertions = false overflow-checks = false rpath = false - -[lib] -bench = false diff --git a/agate_bench/src/main.rs b/agate_bench/src/main.rs index 100662ca..178f7a48 100644 --- a/agate_bench/src/main.rs +++ b/agate_bench/src/main.rs @@ -1,13 +1,14 @@ +use std::{ + path::PathBuf, + sync::{mpsc::channel, Arc}, + time::{Duration, UNIX_EPOCH}, +}; + use agatedb::{AgateOptions, IteratorOptions}; use bytes::{Bytes, BytesMut}; use clap::clap_app; use indicatif::{ProgressBar, ProgressStyle}; use rand::Rng; -use std::path::PathBuf; -use std::sync::Arc; -use std::time::UNIX_EPOCH; -use std::{sync::mpsc::channel, time::Duration}; - #[cfg(not(target_env = "msvc"))] use tikv_jemallocator::Jemalloc; diff --git a/benches/bench_iterator.rs b/benches/bench_iterator.rs index 692c5483..bdc702e7 100644 --- a/benches/bench_iterator.rs +++ b/benches/bench_iterator.rs @@ -1,12 +1,8 @@ mod common; -use agatedb::util::unix_time; -use agatedb::AgateIterator; -use agatedb::AgateOptions; -use agatedb::ConcatIterator; -use agatedb::Iterators; -use agatedb::MergeIterator; - +use agatedb::{ + util::unix_time, AgateIterator, AgateOptions, ConcatIterator, Iterators, MergeIterator, +}; use bytes::Bytes; use common::get_table_for_benchmark; use criterion::{criterion_group, criterion_main, Criterion}; diff --git a/benches/benches_badger_rocks.rs b/benches/benches_badger_rocks.rs index fca0192d..90c47b78 100644 --- a/benches/benches_badger_rocks.rs +++ b/benches/benches_badger_rocks.rs @@ -1,18 +1,17 @@ #![cfg(feature = "enable-rocksdb")] mod common; -use agatedb::Agate; -use agatedb::AgateOptions; -use agatedb::IteratorOptions; +use std::{ + ops::Add, + sync::Arc, + time::{Duration, Instant}, +}; + +use agatedb::{Agate, AgateOptions, IteratorOptions}; use common::{gen_kv_pair, remove_files, unix_time}; use criterion::{criterion_group, criterion_main, Criterion}; -use rand::prelude::ThreadRng; -use rand::Rng; +use rand::{prelude::ThreadRng, Rng}; use rocksdb::DB; -use std::ops::Add; -use std::sync::Arc; -use std::time::Duration; -use std::time::Instant; use tempdir::TempDir; const BATCH_SIZE: u64 = 1000; diff --git a/benches/common.rs b/benches/common.rs index fb503b96..1feb6aef 100644 --- a/benches/common.rs +++ b/benches/common.rs @@ -1,16 +1,17 @@ #![allow(dead_code)] -use agatedb::{ - opt::build_table_options, util::sync_dir, AgateOptions, - ChecksumVerificationMode::NoVerification, Table, TableBuilder, Value, -}; -use bytes::{Bytes, BytesMut}; -use rand::{distributions::Alphanumeric, Rng}; use std::{ fs::{read_dir, remove_file}, ops::{Deref, DerefMut}, path::Path, time::UNIX_EPOCH, }; + +use agatedb::{ + opt::build_table_options, util::sync_dir, AgateOptions, + ChecksumVerificationMode::NoVerification, Table, TableBuilder, Value, +}; +use bytes::{Bytes, BytesMut}; +use rand::{distributions::Alphanumeric, Rng}; use tempdir::TempDir; pub fn rand_value() -> String { diff --git a/proto/Cargo.toml b/proto/Cargo.toml index 3f526b95..1f55de7e 100644 --- a/proto/Cargo.toml +++ b/proto/Cargo.toml @@ -10,6 +10,3 @@ prost = "0.8" [build-dependencies] prost-build = { version = "0.8" } - -[lib] -bench = false diff --git a/skiplist/Cargo.toml b/skiplist/Cargo.toml index aa85774e..3db51e64 100644 --- a/skiplist/Cargo.toml +++ b/skiplist/Cargo.toml @@ -18,6 +18,3 @@ tikv-jemallocator = "0.4.0" [[bench]] name = "bench" harness = false - -[lib] -bench = false diff --git a/src/checksum.rs b/src/checksum.rs index b6126b82..98768845 100644 --- a/src/checksum.rs +++ b/src/checksum.rs @@ -1,6 +1,7 @@ -use crate::{Error, Result}; use proto::meta::{checksum::Algorithm as ChecksumAlgorithm, Checksum}; +use crate::{Error, Result}; + pub fn calculate_checksum(data: &[u8], algo: ChecksumAlgorithm) -> u64 { match algo { ChecksumAlgorithm::Crc32c => { diff --git a/src/db/opt.rs b/src/db/opt.rs index c785c3c9..edd3bef3 100644 --- a/src/db/opt.rs +++ b/src/db/opt.rs @@ -1,10 +1,10 @@ -use skiplist::MAX_NODE_SIZE; +use std::cmp; -use super::*; use getset::Setters; +use skiplist::MAX_NODE_SIZE; +use super::*; use crate::{entry::Entry, opt}; -use std::cmp; #[derive(Clone, Setters)] pub struct AgateOptions { diff --git a/src/lib.rs b/src/lib.rs index 7f01a66e..73f37d74 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -29,8 +29,8 @@ pub use iterator::IteratorOptions; pub use iterator_trait::AgateIterator; pub use opt::{ChecksumVerificationMode, Options as TableOptions}; pub use skiplist::Skiplist; -pub use table::merge_iterator::Iterators; -pub use table::ConcatIterator; -pub use table::MergeIterator; -pub use table::{builder::Builder as TableBuilder, Table}; +pub use table::{ + builder::Builder as TableBuilder, merge_iterator::Iterators, ConcatIterator, MergeIterator, + Table, +}; pub use value::Value; diff --git a/src/managed_db.rs b/src/managed_db.rs index 8fe4d1ce..b617ba47 100644 --- a/src/managed_db.rs +++ b/src/managed_db.rs @@ -1,9 +1,7 @@ -use crate::db::Agate; -use crate::ops::transaction::Transaction; -use crate::Result; - use std::sync::Arc; +use crate::{db::Agate, ops::transaction::Transaction, Result}; + impl crate::db::Core { /// Follows the same logic as `new_transaction`, but uses the provided read timestamp. pub fn new_transaction_at(self: &Arc, read_ts: u64, update: bool) -> Transaction { diff --git a/src/ops/transaction_test.rs b/src/ops/transaction_test.rs index 01869769..e5cab415 100644 --- a/src/ops/transaction_test.rs +++ b/src/ops/transaction_test.rs @@ -1,17 +1,15 @@ -use crate::Error; - -use crate::assert_bytes_eq; +use crate::{assert_bytes_eq, Error}; /// Tests in managed mode. mod managed_db { + use bytes::{Bytes, BytesMut}; + + use super::*; use crate::{ db::tests::{generate_test_agate_options, run_agate_test, with_payload}, entry::Entry, AgateOptions, }; - use bytes::{Bytes, BytesMut}; - - use super::*; fn default_test_managed_opts() -> AgateOptions { let mut opts = generate_test_agate_options(); @@ -136,6 +134,12 @@ mod normal_db { Arc, }; + use bytes::{Bytes, BytesMut}; + use crossbeam_channel::select; + use rand::Rng; + use yatp::{task::callback::Handle, Builder}; + + use super::*; use crate::{ closer::Closer, db::tests::{generate_test_agate_options, run_agate_test}, @@ -146,12 +150,6 @@ mod normal_db { value::VALUE_DELETE, Agate, AgateOptions, }; - use bytes::{Bytes, BytesMut}; - use crossbeam_channel::select; - use rand::Rng; - use yatp::{task::callback::Handle, Builder}; - - use super::*; #[test] fn test_txn_simple() { diff --git a/src/table/merge_iterator.rs b/src/table/merge_iterator.rs index 612398f4..dd08460d 100644 --- a/src/table/merge_iterator.rs +++ b/src/table/merge_iterator.rs @@ -235,8 +235,10 @@ impl AgateIterator for MergeIterator { #[cfg(test)] mod tests { use super::*; - use crate::assert_bytes_eq; - use crate::format::{key_with_ts, user_key}; + use crate::{ + assert_bytes_eq, + format::{key_with_ts, user_key}, + }; pub struct VecIterator { vec: Vec, From 2e3376a18cbf90a50bdf5440832d3ccb78492c59 Mon Sep 17 00:00:00 2001 From: GanZiheng Date: Wed, 6 Jul 2022 19:47:48 +0800 Subject: [PATCH 10/17] update Signed-off-by: GanZiheng --- .github/workflows/bench.yml | 4 +- Cargo.toml | 2 +- ...badger_rocks.rs => benches_agate_rocks.rs} | 38 +++++++++---------- 3 files changed, 22 insertions(+), 22 deletions(-) rename benches/{benches_badger_rocks.rs => benches_agate_rocks.rs} (85%) diff --git a/.github/workflows/bench.yml b/.github/workflows/bench.yml index fc606455..ff39d90e 100644 --- a/.github/workflows/bench.yml +++ b/.github/workflows/bench.yml @@ -31,8 +31,8 @@ jobs: with: toolchain: nightly default: true - - name: Benchmark 🚀 - run: cargo +nightly bench --all-features --workspace --bench benches_badger_rocks -- --output-format bencher | tee output.txt + - name: Benchmark with RocksDB + run: cargo +nightly bench --all-features --workspace --bench benches_agate_rocks -- --output-format bencher | tee output.txt - uses: benchmark-action/github-action-benchmark@v1 name: Store benchmark result with: diff --git a/Cargo.toml b/Cargo.toml index b187632d..b147ba90 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -57,7 +57,7 @@ name = "bench_iterator" harness = false [[bench]] -name = "benches_badger_rocks" +name = "benches_agate_rocks" harness = false [profile.bench] diff --git a/benches/benches_badger_rocks.rs b/benches/benches_agate_rocks.rs similarity index 85% rename from benches/benches_badger_rocks.rs rename to benches/benches_agate_rocks.rs index 90c47b78..7cb486bd 100644 --- a/benches/benches_badger_rocks.rs +++ b/benches/benches_agate_rocks.rs @@ -18,7 +18,7 @@ const BATCH_SIZE: u64 = 1000; const SMALL_VALUE_SIZE: usize = 32; const LARGE_VALUE_SIZE: usize = 102400; -fn badger_populate(agate: Arc, value_size: usize) { +fn agate_populate(agate: Arc, value_size: usize) { let mut txn = agate.new_transaction_at(unix_time(), true); for i in 0..BATCH_SIZE { @@ -29,7 +29,7 @@ fn badger_populate(agate: Arc, value_size: usize) { txn.commit_at(unix_time()).unwrap(); } -fn badger_randread(agate: Arc, value_size: usize, rng: &mut ThreadRng) { +fn agate_randread(agate: Arc, value_size: usize, rng: &mut ThreadRng) { let txn = agate.new_transaction_at(unix_time(), false); for _ in 0..BATCH_SIZE { @@ -40,7 +40,7 @@ fn badger_randread(agate: Arc, value_size: usize, rng: &mut ThreadRng) { } } -fn badger_iterate(agate: Arc, value_size: usize) { +fn agate_iterate(agate: Arc, value_size: usize) { let txn = agate.new_transaction_at(unix_time(), false); let opts = IteratorOptions::default(); let mut iter = txn.new_iterator(&opts); @@ -86,7 +86,7 @@ fn rocks_iterate(db: Arc, value_size: usize) { } } -fn bench_badger(c: &mut Criterion) { +fn bench_agate(c: &mut Criterion) { let mut rng = rand::thread_rng(); let dir = TempDir::new("agatedb-bench-small-value").unwrap(); @@ -100,7 +100,7 @@ fn bench_badger(c: &mut Criterion) { ..Default::default() }; - c.bench_function("badger populate small value", |b| { + c.bench_function("agate populate small value", |b| { b.iter_custom(|iters| { let mut total = Duration::new(0, 0); @@ -109,7 +109,7 @@ fn bench_badger(c: &mut Criterion) { let agate = Arc::new(opts.open().unwrap()); let now = Instant::now(); - badger_populate(agate, SMALL_VALUE_SIZE); + agate_populate(agate, SMALL_VALUE_SIZE); total = total.add(now.elapsed()); }); @@ -119,15 +119,15 @@ fn bench_badger(c: &mut Criterion) { let agate = Arc::new(opts.open().unwrap()); - c.bench_function("badger randread small value", |b| { + c.bench_function("agate randread small value", |b| { b.iter(|| { - badger_randread(agate.clone(), SMALL_VALUE_SIZE, &mut rng); + agate_randread(agate.clone(), SMALL_VALUE_SIZE, &mut rng); }); }); - c.bench_function("badger iterate small value", |b| { + c.bench_function("agate iterate small value", |b| { b.iter(|| { - badger_iterate(agate.clone(), SMALL_VALUE_SIZE); + agate_iterate(agate.clone(), SMALL_VALUE_SIZE); }); }); @@ -137,7 +137,7 @@ fn bench_badger(c: &mut Criterion) { opts.dir = dir_path.to_path_buf(); opts.value_dir = dir_path.to_path_buf(); - c.bench_function("badger populate large value", |b| { + c.bench_function("agate populate large value", |b| { b.iter_custom(|iters| { let mut total = Duration::new(0, 0); @@ -146,7 +146,7 @@ fn bench_badger(c: &mut Criterion) { let agate = Arc::new(opts.open().unwrap()); let now = Instant::now(); - badger_populate(agate, LARGE_VALUE_SIZE); + agate_populate(agate, LARGE_VALUE_SIZE); total = total.add(now.elapsed()); }); @@ -156,15 +156,15 @@ fn bench_badger(c: &mut Criterion) { let agate = Arc::new(opts.open().unwrap()); - c.bench_function("badger randread large value", |b| { + c.bench_function("agate randread large value", |b| { b.iter(|| { - badger_randread(agate.clone(), LARGE_VALUE_SIZE, &mut rng); + agate_randread(agate.clone(), LARGE_VALUE_SIZE, &mut rng); }); }); - c.bench_function("badger iterate large value", |b| { + c.bench_function("agate iterate large value", |b| { b.iter(|| { - badger_iterate(agate.clone(), LARGE_VALUE_SIZE); + agate_iterate(agate.clone(), LARGE_VALUE_SIZE); }); }); @@ -246,9 +246,9 @@ fn bench_rocks(c: &mut Criterion) { } criterion_group! { - name = benches_badger_rocks; + name = benches_agate_rocks; config = Criterion::default(); - targets = bench_badger, bench_rocks + targets = bench_agate, bench_rocks } -criterion_main!(benches_badger_rocks); +criterion_main!(benches_agate_rocks); From 70e2e68461366da921769132877a683115d64a4b Mon Sep 17 00:00:00 2001 From: GanZiheng Date: Wed, 6 Jul 2022 20:19:50 +0800 Subject: [PATCH 11/17] update Signed-off-by: GanZiheng --- .github/workflows/bench.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/bench.yml b/.github/workflows/bench.yml index ff39d90e..b365187a 100644 --- a/.github/workflows/bench.yml +++ b/.github/workflows/bench.yml @@ -31,11 +31,12 @@ jobs: with: toolchain: nightly default: true - - name: Benchmark with RocksDB + - name: Benchmark 🚀 run: cargo +nightly bench --all-features --workspace --bench benches_agate_rocks -- --output-format bencher | tee output.txt - uses: benchmark-action/github-action-benchmark@v1 name: Store benchmark result with: + name: Benchmark with RocksDB tool: "cargo" output-file-path: output.txt # Access token to deploy GitHub Pages branch From 1e5564a0e75d06158296c355d1c7762d63920d7c Mon Sep 17 00:00:00 2001 From: GanZiheng Date: Thu, 7 Jul 2022 12:57:39 +0800 Subject: [PATCH 12/17] dont use create_if_not_exists Signed-off-by: GanZiheng --- benches/benches_agate_rocks.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/benches/benches_agate_rocks.rs b/benches/benches_agate_rocks.rs index 7cb486bd..1719f07d 100644 --- a/benches/benches_agate_rocks.rs +++ b/benches/benches_agate_rocks.rs @@ -92,10 +92,9 @@ fn bench_agate(c: &mut Criterion) { let dir = TempDir::new("agatedb-bench-small-value").unwrap(); let dir_path = dir.path(); let mut opts = AgateOptions { - create_if_not_exists: true, - sync_writes: true, dir: dir_path.to_path_buf(), value_dir: dir_path.to_path_buf(), + sync_writes: true, managed_txns: true, ..Default::default() }; From 81eee966982bba0566a6f0285eecaff32a4f41a5 Mon Sep 17 00:00:00 2001 From: GanZiheng Date: Thu, 7 Jul 2022 19:19:43 +0800 Subject: [PATCH 13/17] update Signed-off-by: GanZiheng --- benches/benches_agate_rocks.rs | 234 ++++++++++++++++++++++++--------- 1 file changed, 175 insertions(+), 59 deletions(-) diff --git a/benches/benches_agate_rocks.rs b/benches/benches_agate_rocks.rs index 1719f07d..8842ab00 100644 --- a/benches/benches_agate_rocks.rs +++ b/benches/benches_agate_rocks.rs @@ -10,85 +10,203 @@ use std::{ use agatedb::{Agate, AgateOptions, IteratorOptions}; use common::{gen_kv_pair, remove_files, unix_time}; use criterion::{criterion_group, criterion_main, Criterion}; -use rand::{prelude::ThreadRng, Rng}; +use rand::Rng; use rocksdb::DB; use tempdir::TempDir; -const BATCH_SIZE: u64 = 1000; +// We will process `CHUNK_SIZE` items in a thread, and in one certain thread, +// we will process `BATCH_SIZE` items in a transaction or write batch. +const KEY_NUMS: u64 = 160_000; +const CHUNK_SIZE: u64 = 10_000; +const BATCH_SIZE: u64 = 100; + const SMALL_VALUE_SIZE: usize = 32; -const LARGE_VALUE_SIZE: usize = 102400; +const LARGE_VALUE_SIZE: usize = 4096; + +pub fn agate_populate( + agate: Arc, + key_nums: u64, + chunk_size: u64, + batch_size: u64, + value_size: usize, +) { + let mut handles = vec![]; + + for chunk_start in (0..key_nums).step_by(chunk_size as usize) { + let agate = agate.clone(); + + handles.push(std::thread::spawn(move || { + let range = chunk_start..chunk_start + chunk_size; + + for batch_start in range.step_by(batch_size as usize) { + let mut txn = agate.new_transaction_at(unix_time(), true); + + (batch_start..batch_start + batch_size).for_each(|key| { + let (key, value) = gen_kv_pair(key, value_size); + txn.set(key, value).unwrap(); + }); + + txn.commit_at(unix_time()).unwrap(); + } + })); + } -fn agate_populate(agate: Arc, value_size: usize) { - let mut txn = agate.new_transaction_at(unix_time(), true); + handles + .into_iter() + .for_each(|handle| handle.join().unwrap()); +} - for i in 0..BATCH_SIZE { - let (key, value) = gen_kv_pair(i, value_size); - txn.set(key, value).unwrap(); +pub fn agate_randread(agate: Arc, key_nums: u64, chunk_size: u64, value_size: usize) { + let mut handles = vec![]; + + for chunk_start in (0..key_nums).step_by(chunk_size as usize) { + let agate = agate.clone(); + + handles.push(std::thread::spawn(move || { + let mut rng = rand::thread_rng(); + let range = chunk_start..chunk_start + chunk_size; + let txn = agate.new_transaction_at(unix_time(), false); + + for _ in range { + let (key, _) = gen_kv_pair(rng.gen_range(0, key_nums), value_size); + match txn.get(&key) { + Ok(item) => { + assert_eq!(item.value().len(), value_size); + } + Err(err) => { + if matches!(err, agatedb::Error::KeyNotFound(_)) { + continue; + } else { + panic!("{:?}", err); + } + } + } + } + })); } - txn.commit_at(unix_time()).unwrap(); + handles + .into_iter() + .for_each(|handle| handle.join().unwrap()); } -fn agate_randread(agate: Arc, value_size: usize, rng: &mut ThreadRng) { - let txn = agate.new_transaction_at(unix_time(), false); +pub fn agate_iterate(agate: Arc, key_nums: u64, chunk_size: u64, value_size: usize) { + let mut handles = vec![]; - for _ in 0..BATCH_SIZE { - let (key, value) = gen_kv_pair(rng.gen_range(0, BATCH_SIZE), value_size); + for _ in (0..key_nums).step_by(chunk_size as usize) { + let agate = agate.clone(); - let item = txn.get(&key).unwrap(); - assert_eq!(item.value(), value); - } -} + handles.push(std::thread::spawn(move || { + let txn = agate.new_transaction_at(unix_time(), false); + let opts = IteratorOptions::default(); + let mut iter = txn.new_iterator(&opts); + iter.rewind(); -fn agate_iterate(agate: Arc, value_size: usize) { - let txn = agate.new_transaction_at(unix_time(), false); - let opts = IteratorOptions::default(); - let mut iter = txn.new_iterator(&opts); - iter.rewind(); + while iter.valid() { + let item = iter.item(); + assert_eq!(item.value().len(), value_size); - while iter.valid() { - let item = iter.item(); - assert_eq!(item.value().len(), value_size); - - iter.next(); + iter.next(); + } + })); } + + handles + .into_iter() + .for_each(|handle| handle.join().unwrap()); } -fn rocks_populate(db: Arc, value_size: usize) { +pub fn rocks_populate( + db: Arc, + key_nums: u64, + chunk_size: u64, + batch_size: u64, + value_size: usize, +) { let mut write_options = rocksdb::WriteOptions::default(); write_options.set_sync(true); write_options.disable_wal(false); + let write_options = Arc::new(write_options); + + let mut handles = vec![]; + + for chunk_start in (0..key_nums).step_by(chunk_size as usize) { + let db = db.clone(); + let write_options = write_options.clone(); - let mut batch = rocksdb::WriteBatch::default(); + handles.push(std::thread::spawn(move || { + let range = chunk_start..chunk_start + chunk_size; - for i in 0..BATCH_SIZE { - let (key, value) = gen_kv_pair(i, value_size); - batch.put(key, value); + for batch_start in range.step_by(batch_size as usize) { + let mut batch = rocksdb::WriteBatch::default(); + + (batch_start..batch_start + batch_size).for_each(|key| { + let (key, value) = gen_kv_pair(key, value_size); + batch.put(key, value); + }); + + db.write_opt(batch, &write_options).unwrap(); + } + })); } - db.write_opt(batch, &write_options).unwrap(); + handles + .into_iter() + .for_each(|handle| handle.join().unwrap()); } -fn rocks_randread(db: Arc, value_size: usize, rng: &mut ThreadRng) { - for _ in 0..BATCH_SIZE { - let (key, value) = gen_kv_pair(rng.gen_range(0, BATCH_SIZE), value_size); - - let find = db.get(key).unwrap(); - assert_eq!(find.unwrap(), value) +pub fn rocks_randread(db: Arc, key_nums: u64, chunk_size: u64, value_size: usize) { + let mut handles = vec![]; + + for chunk_start in (0..key_nums).step_by(chunk_size as usize) { + let db = db.clone(); + + handles.push(std::thread::spawn(move || { + let mut rng = rand::thread_rng(); + let range = chunk_start..chunk_start + chunk_size; + + for _ in range { + let (key, _) = gen_kv_pair(rng.gen_range(0, key_nums), value_size); + match db.get(key) { + Ok(item) => { + if item.is_some() { + assert_eq!(item.unwrap().len(), value_size); + } + } + Err(err) => { + panic!("{:?}", err); + } + } + } + })); } + + handles + .into_iter() + .for_each(|handle| handle.join().unwrap()); } -fn rocks_iterate(db: Arc, value_size: usize) { - let iter = db.iterator(rocksdb::IteratorMode::Start); +pub fn rocks_iterate(db: Arc, key_nums: u64, chunk_size: u64, value_size: usize) { + let mut handles = vec![]; + + for _ in (0..key_nums).step_by(chunk_size as usize) { + let db = db.clone(); - for (_, value) in iter { - assert_eq!(value.len(), value_size); + handles.push(std::thread::spawn(move || { + let iter = db.iterator(rocksdb::IteratorMode::Start); + + for (_, value) in iter { + assert_eq!(value.len(), value_size); + } + })); } + + handles + .into_iter() + .for_each(|handle| handle.join().unwrap()); } fn bench_agate(c: &mut Criterion) { - let mut rng = rand::thread_rng(); - let dir = TempDir::new("agatedb-bench-small-value").unwrap(); let dir_path = dir.path(); let mut opts = AgateOptions { @@ -108,7 +226,7 @@ fn bench_agate(c: &mut Criterion) { let agate = Arc::new(opts.open().unwrap()); let now = Instant::now(); - agate_populate(agate, SMALL_VALUE_SIZE); + agate_populate(agate, KEY_NUMS, CHUNK_SIZE, BATCH_SIZE, SMALL_VALUE_SIZE); total = total.add(now.elapsed()); }); @@ -120,13 +238,13 @@ fn bench_agate(c: &mut Criterion) { c.bench_function("agate randread small value", |b| { b.iter(|| { - agate_randread(agate.clone(), SMALL_VALUE_SIZE, &mut rng); + agate_randread(agate.clone(), KEY_NUMS, CHUNK_SIZE, SMALL_VALUE_SIZE); }); }); c.bench_function("agate iterate small value", |b| { b.iter(|| { - agate_iterate(agate.clone(), SMALL_VALUE_SIZE); + agate_iterate(agate.clone(), KEY_NUMS, CHUNK_SIZE, SMALL_VALUE_SIZE); }); }); @@ -145,7 +263,7 @@ fn bench_agate(c: &mut Criterion) { let agate = Arc::new(opts.open().unwrap()); let now = Instant::now(); - agate_populate(agate, LARGE_VALUE_SIZE); + agate_populate(agate, KEY_NUMS, CHUNK_SIZE, BATCH_SIZE, LARGE_VALUE_SIZE); total = total.add(now.elapsed()); }); @@ -157,13 +275,13 @@ fn bench_agate(c: &mut Criterion) { c.bench_function("agate randread large value", |b| { b.iter(|| { - agate_randread(agate.clone(), LARGE_VALUE_SIZE, &mut rng); + agate_randread(agate.clone(), KEY_NUMS, CHUNK_SIZE, LARGE_VALUE_SIZE); }); }); c.bench_function("agate iterate large value", |b| { b.iter(|| { - agate_iterate(agate.clone(), LARGE_VALUE_SIZE); + agate_iterate(agate.clone(), KEY_NUMS, CHUNK_SIZE, LARGE_VALUE_SIZE); }); }); @@ -171,8 +289,6 @@ fn bench_agate(c: &mut Criterion) { } fn bench_rocks(c: &mut Criterion) { - let mut rng = rand::thread_rng(); - let dir = TempDir::new("rocks-bench-small-value").unwrap(); let dir_path = dir.path(); let mut opts = rocksdb::Options::default(); @@ -188,7 +304,7 @@ fn bench_rocks(c: &mut Criterion) { let db = Arc::new(rocksdb::DB::open(&opts, &dir).unwrap()); let now = Instant::now(); - rocks_populate(db, SMALL_VALUE_SIZE); + rocks_populate(db, KEY_NUMS, CHUNK_SIZE, BATCH_SIZE, SMALL_VALUE_SIZE); total = total.add(now.elapsed()); }); @@ -200,12 +316,12 @@ fn bench_rocks(c: &mut Criterion) { c.bench_function("rocks randread small value", |b| { b.iter(|| { - rocks_randread(db.clone(), SMALL_VALUE_SIZE, &mut rng); + rocks_randread(db.clone(), KEY_NUMS, CHUNK_SIZE, SMALL_VALUE_SIZE); }); }); c.bench_function("rocks iterate small value", |b| { - b.iter(|| rocks_iterate(db.clone(), SMALL_VALUE_SIZE)); + b.iter(|| rocks_iterate(db.clone(), KEY_NUMS, CHUNK_SIZE, SMALL_VALUE_SIZE)); }); dir.close().unwrap(); @@ -221,7 +337,7 @@ fn bench_rocks(c: &mut Criterion) { let db = Arc::new(rocksdb::DB::open(&opts, &dir).unwrap()); let now = Instant::now(); - rocks_populate(db, LARGE_VALUE_SIZE); + rocks_populate(db, KEY_NUMS, CHUNK_SIZE, BATCH_SIZE, LARGE_VALUE_SIZE); total = total.add(now.elapsed()); }); @@ -233,12 +349,12 @@ fn bench_rocks(c: &mut Criterion) { c.bench_function("rocks randread large value", |b| { b.iter(|| { - rocks_randread(db.clone(), LARGE_VALUE_SIZE, &mut rng); + rocks_randread(db.clone(), KEY_NUMS, CHUNK_SIZE, LARGE_VALUE_SIZE); }); }); c.bench_function("rocks iterate large value", |b| { - b.iter(|| rocks_iterate(db.clone(), LARGE_VALUE_SIZE)); + b.iter(|| rocks_iterate(db.clone(), KEY_NUMS, CHUNK_SIZE, LARGE_VALUE_SIZE)); }); dir.close().unwrap(); From eabca6b228ef533b21ab4fb8587fc5cdec00b7d8 Mon Sep 17 00:00:00 2001 From: GanZiheng Date: Mon, 11 Jul 2022 17:01:43 +0800 Subject: [PATCH 14/17] update agate bench Signed-off-by: GanZiheng --- Cargo.toml | 3 +- agate_bench/Cargo.toml | 4 +- agate_bench/src/main.rs | 462 ++++++++------------------------- benches/benches_agate_rocks.rs | 193 +------------- benches/common.rs | 206 ++++++++++++++- 5 files changed, 325 insertions(+), 543 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b147ba90..e4a37afd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,6 @@ edition = "2018" [features] default = [] -enable-rocksdb = ["rocksdb"] [dependencies] bytes = "1.0" @@ -24,7 +23,7 @@ parking_lot = "0.11" prost = "0.8" proto = { path = "proto" } rand = "0.7" -rocksdb = { version = "0.15", optional = true } +rocksdb = "0.15" skiplist = { path = "skiplist" } tempdir = "0.3" thiserror = "1.0" diff --git a/agate_bench/Cargo.toml b/agate_bench/Cargo.toml index 4b3faea8..2b9ca30d 100644 --- a/agate_bench/Cargo.toml +++ b/agate_bench/Cargo.toml @@ -6,7 +6,6 @@ edition = "2018" [features] default = [] -enable-rocksdb = ["rocksdb"] [dependencies] agatedb = { path = "../" } @@ -14,7 +13,8 @@ bytes = "1.0" clap = "2.33" indicatif = "0.15" rand = "0.7" -rocksdb = { version = "0.15", optional = true } +rocksdb = "0.15" +tempdir = "0.3" threadpool = "1.8" yatp = { git = "https://github.com/tikv/yatp.git" } diff --git a/agate_bench/src/main.rs b/agate_bench/src/main.rs index 178f7a48..b4f1c537 100644 --- a/agate_bench/src/main.rs +++ b/agate_bench/src/main.rs @@ -1,36 +1,15 @@ -use std::{ - path::PathBuf, - sync::{mpsc::channel, Arc}, - time::{Duration, UNIX_EPOCH}, -}; +use std::{path::PathBuf, sync::Arc, time::Instant}; -use agatedb::{AgateOptions, IteratorOptions}; -use bytes::{Bytes, BytesMut}; +use agatedb::AgateOptions; use clap::clap_app; use indicatif::{ProgressBar, ProgressStyle}; -use rand::Rng; -#[cfg(not(target_env = "msvc"))] -use tikv_jemallocator::Jemalloc; - -#[cfg(not(target_env = "msvc"))] -#[global_allocator] -static GLOBAL: Jemalloc = Jemalloc; - -fn gen_kv_pair(key: u64, value_size: usize) -> (Bytes, Bytes) { - let key = Bytes::from(format!("vsz={:05}-k={:010}", value_size, key)); - let mut value = BytesMut::with_capacity(value_size); - value.resize(value_size, 0); - - (key, value.freeze()) -} +#[path = "../../benches/common.rs"] +mod common; -pub fn unix_time() -> u64 { - UNIX_EPOCH - .elapsed() - .expect("Time went backwards") - .as_millis() as u64 -} +use common::{ + agate_iterate, agate_populate, agate_randread, rocks_iterate, rocks_populate, rocks_randread, +}; pub struct Rate { pub data: std::sync::Arc, @@ -74,24 +53,23 @@ fn main() { (version: "1.0") (author: "TiKV authors") (@arg key_nums: --key_nums +takes_value default_value("1024") "key numbers") - (@arg seq: --seq +takes_value default_value("true") "write sequentially") + (@arg batch_size: --batch_size +takes_value default_value("1000") "pairs in one txn") (@arg value_size: --value_size +takes_value default_value("1024") "value size") - (@arg chunk_size: --chunk_size +takes_value default_value("1000") "pairs in one txn") ) (@subcommand randread => (about: "randomly read from database") (version: "1.0") (author: "TiKV authors") - (@arg key_nums: --key_nums +takes_value default_value("1024") "key numbers") (@arg times: --times +takes_value default_value("5") "read how many times") + (@arg key_nums: --key_nums +takes_value default_value("1024") "key numbers") (@arg value_size: --value_size +takes_value default_value("1024") "value size") - (@arg chunk_size: --chunk_size +takes_value default_value("1000") "pairs in one txn") ) (@subcommand iterate => (about: "iterate database") (version: "1.0") (author: "TiKV authors") (@arg times: --times +takes_value default_value("5") "read how many times") + (@arg key_nums: --key_nums +takes_value default_value("1024") "key numbers") (@arg value_size: --value_size +takes_value default_value("1024") "value size") ) (@subcommand rocks_populate => @@ -99,123 +77,68 @@ fn main() { (version: "1.0") (author: "TiKV authors") (@arg key_nums: --key_nums +takes_value default_value("1024") "key numbers") - (@arg seq: --seq +takes_value default_value("true") "write sequentially") + (@arg batch_size: --batch_size +takes_value default_value("1000") "pairs in one txn") (@arg value_size: --value_size +takes_value default_value("1024") "value size") - (@arg chunk_size: --chunk_size +takes_value default_value("1000") "pairs in one txn") ) (@subcommand rocks_randread => (about: "randomly read from database") (version: "1.0") (author: "TiKV authors") - (@arg key_nums: --key_nums +takes_value default_value("1024") "key numbers") (@arg times: --times +takes_value default_value("5") "read how many times") + (@arg key_nums: --key_nums +takes_value default_value("1024") "key numbers") (@arg value_size: --value_size +takes_value default_value("1024") "value size") - (@arg chunk_size: --chunk_size +takes_value default_value("1000") "pairs in one txn") ) (@subcommand rocks_iterate => (about: "iterate database") (version: "1.0") (author: "TiKV authors") (@arg times: --times +takes_value default_value("5") "read how many times") + (@arg key_nums: --key_nums +takes_value default_value("1024") "key numbers") (@arg value_size: --value_size +takes_value default_value("1024") "value size") ) ) .get_matches(); let directory = PathBuf::from(matches.value_of("directory").unwrap()); - let threads: usize = matches.value_of("threads").unwrap().parse().unwrap(); - let pool = threadpool::ThreadPool::new(threads); - let (tx, rx) = channel(); + let threads: u64 = matches.value_of("threads").unwrap().parse().unwrap(); + + let mut agate_opts = AgateOptions { + dir: directory.clone(), + value_dir: directory.clone(), + sync_writes: true, + managed_txns: true, + ..Default::default() + }; + + let mut rocks_opts = rocksdb::Options::default(); + rocks_opts.create_if_missing(true); + rocks_opts.set_compression_type(rocksdb::DBCompressionType::None); match matches.subcommand() { ("populate", Some(sub_matches)) => { let key_nums: u64 = sub_matches.value_of("key_nums").unwrap().parse().unwrap(); + let batch_size: u64 = sub_matches.value_of("batch_size").unwrap().parse().unwrap(); let value_size: usize = sub_matches.value_of("value_size").unwrap().parse().unwrap(); - let chunk_size: u64 = sub_matches.value_of("chunk_size").unwrap().parse().unwrap(); - - let mut options = AgateOptions { - create_if_not_exists: true, - sync_writes: true, - dir: directory.clone(), - value_dir: directory, - managed_txns: true, - ..Default::default() - }; - let agate = Arc::new(options.open().unwrap()); - let mut expected = 0; - let pb = ProgressBar::hidden(); - pb.set_style(ProgressStyle::default_bar() - .template( - "{prefix:.bold.dim} [{elapsed_precise}] [{bar:40}] [{per_sec}] ({pos}/{len}) {msg}", - ) - .progress_chars("=> ")); - let mut write = Rate::new(); - let mut last_report = std::time::Instant::now(); + let agate = Arc::new(agate_opts.open().unwrap()); + let chunk_size = key_nums / threads; - let seq: bool = sub_matches.value_of("seq").unwrap().parse().unwrap(); + let begin = Instant::now(); - if seq { - println!("writing sequentially"); - } + agate_populate(agate, key_nums, chunk_size, batch_size, value_size); - for i in 0..key_nums / chunk_size { - let agate = agate.clone(); - let tx = tx.clone(); - let write = write.data.clone(); - pool.execute(move || { - let range = (i * chunk_size)..((i + 1) * chunk_size); - let mut txn = agate.new_transaction_at(unix_time(), true); - let mut rng = rand::thread_rng(); - for j in range { - let (key, value) = if seq { - gen_kv_pair(j, value_size) - } else { - gen_kv_pair(rng.gen_range(0, key_nums), value_size) - }; - txn.set(key, value).unwrap(); - write.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - } - txn.commit_at(unix_time()).unwrap(); - tx.send(()).unwrap(); - }); - expected += 1; - } + let cost = begin.elapsed(); - let begin = std::time::Instant::now(); - - for _ in rx.iter().take(expected) { - pb.inc(chunk_size); - let now = std::time::Instant::now(); - let delta = now.duration_since(last_report); - if delta > std::time::Duration::from_secs(1) { - last_report = now; - println!( - "{}, rate: {}, total: {}", - now.duration_since(begin).as_secs_f64(), - write.rate() as f64 / delta.as_secs_f64(), - write.now() - ); - } - } - pb.finish_with_message("done"); + println!("populate {} keys in {} ms", key_nums, cost.as_millis()); } ("randread", Some(sub_matches)) => { + let times: u64 = sub_matches.value_of("times").unwrap().parse().unwrap(); let key_nums: u64 = sub_matches.value_of("key_nums").unwrap().parse().unwrap(); let value_size: usize = sub_matches.value_of("value_size").unwrap().parse().unwrap(); - let chunk_size: u64 = sub_matches.value_of("chunk_size").unwrap().parse().unwrap(); - let times: u64 = sub_matches.value_of("times").unwrap().parse().unwrap(); - let mut options = AgateOptions { - create_if_not_exists: true, - sync_writes: true, - dir: directory.clone(), - value_dir: directory, - managed_txns: true, - ..Default::default() - }; - let agate = Arc::new(options.open().unwrap()); - let mut expected = 0; + let agate = Arc::new(agate_opts.open().unwrap()); + let chunk_size = key_nums / threads; + let pb = ProgressBar::new(key_nums * times); pb.set_style(ProgressStyle::default_bar() .template( @@ -223,195 +146,79 @@ fn main() { ) .progress_chars("=> ")); - let mut missing = Rate::new(); - let mut found = Rate::new(); - let mut last_report = std::time::Instant::now(); + let begin = Instant::now(); for _ in 0..times { - for i in 0..key_nums / chunk_size { - let agate = agate.clone(); - let tx = tx.clone(); - let missing = missing.data.clone(); - let found = found.data.clone(); - pool.execute(move || { - let range = (i * chunk_size)..((i + 1) * chunk_size); - let txn = agate.new_transaction_at(unix_time(), false); - let mut rng = rand::thread_rng(); - for _ in range { - let (key, _) = gen_kv_pair(rng.gen_range(0, key_nums), value_size); - match txn.get(&key) { - Ok(item) => { - assert_eq!(item.value().len(), value_size); - found.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - } - Err(err) => { - if matches!(err, agatedb::Error::KeyNotFound(_)) { - missing.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - continue; - } else { - panic!("{:?}", err); - } - } - } - } - tx.send(()).unwrap(); - }); - expected += 1; - } + agate_randread(agate.clone(), key_nums, chunk_size, value_size); + pb.inc(key_nums); } - let begin = std::time::Instant::now(); - - for _ in rx.iter().take(expected) { - let now = std::time::Instant::now(); - let delta = now.duration_since(last_report); - last_report = now; - if delta > std::time::Duration::from_secs(1) { - println!( - "{}, rate: {}, found: {}, missing: {}", - now.duration_since(begin).as_secs_f64(), - (found.rate() + missing.rate()) as f64 / delta.as_secs_f64(), - found.now(), - missing.now() - ); - } - } pb.finish_with_message("done"); + + let cost = begin.elapsed(); + println!( + "randread {} keys {} times in {} ms", + key_nums, + times, + cost.as_millis() + ); } ("iterate", Some(sub_matches)) => { let times: u64 = sub_matches.value_of("times").unwrap().parse().unwrap(); + let key_nums: u64 = sub_matches.value_of("key_nums").unwrap().parse().unwrap(); let value_size: usize = sub_matches.value_of("value_size").unwrap().parse().unwrap(); - let mut options = AgateOptions { - create_if_not_exists: true, - sync_writes: true, - dir: directory.clone(), - value_dir: directory, - managed_txns: true, - ..Default::default() - }; - let agate = Arc::new(options.open().unwrap()); + let agate = Arc::new(agate_opts.open().unwrap()); + let chunk_size = key_nums / threads; + + let pb = ProgressBar::new(times); + pb.set_style(ProgressStyle::default_bar() + .template( + "{prefix:.bold.dim} [{elapsed_precise}] [{bar:40}] [{per_sec}] ({pos}/{len}) {msg}", + ) + .progress_chars("=> ")); - let begin = std::time::Instant::now(); - let mut lst_report = std::time::Instant::now(); - let mut total = Rate::new(); + let begin = Instant::now(); for _ in 0..times { - let agate = agate.clone(); - let txn = agate.new_transaction_at(unix_time(), false); - let opts = IteratorOptions::default(); - let mut iter = txn.new_iterator(&opts); - iter.rewind(); - while iter.valid() { - let item = iter.item(); - assert_eq!(item.value().len(), value_size); - total - .data - .fetch_add(1, std::sync::atomic::Ordering::Relaxed); - iter.next(); - let now = std::time::Instant::now(); - if now.duration_since(lst_report) >= Duration::from_secs(1) { - lst_report = now; - println!( - "{}, rate: {}, total: {}", - now.duration_since(begin).as_secs_f64(), - total.rate(), - total.now() - ); - } - } + agate_iterate(agate.clone(), key_nums, chunk_size, value_size); + pb.inc(1); } + pb.finish_with_message("done"); + + let cost = begin.elapsed(); println!( - "read total {} keys in {}", - total.now(), - begin.elapsed().as_secs_f64() - ) + "iterate {} keys {} times in {} ms", + key_nums, + times, + cost.as_millis() + ); } - #[cfg(feature = "enable-rocksdb")] ("rocks_populate", Some(sub_matches)) => { let key_nums: u64 = sub_matches.value_of("key_nums").unwrap().parse().unwrap(); + let batch_size: u64 = sub_matches.value_of("batch_size").unwrap().parse().unwrap(); let value_size: usize = sub_matches.value_of("value_size").unwrap().parse().unwrap(); - let chunk_size: u64 = sub_matches.value_of("chunk_size").unwrap().parse().unwrap(); - let mut opts = rocksdb::Options::default(); - opts.create_if_missing(true); - opts.set_compression_type(rocksdb::DBCompressionType::None); - - let db = Arc::new(rocksdb::DB::open(&opts, directory).unwrap()); - let mut expected = 0; - let pb = ProgressBar::hidden(); - pb.set_style(ProgressStyle::default_bar() - .template( - "{prefix:.bold.dim} [{elapsed_precise}] [{bar:40}] [{per_sec}] ({pos}/{len}) {msg}", - ) - .progress_chars("=> ")); - let mut write = Rate::new(); - let mut last_report = std::time::Instant::now(); + let db = Arc::new(rocksdb::DB::open(&rocks_opts, &directory).unwrap()); + let chunk_size = key_nums / threads; - let seq: bool = sub_matches.value_of("seq").unwrap().parse().unwrap(); + let begin = Instant::now(); - if seq { - println!("writing sequentially"); - } + rocks_populate(db, key_nums, chunk_size, batch_size, value_size); - for i in 0..key_nums / chunk_size { - let db = db.clone(); - let tx = tx.clone(); - let write = write.data.clone(); - pool.execute(move || { - let mut write_options = rocksdb::WriteOptions::default(); - write_options.set_sync(true); - write_options.disable_wal(false); - - let range = (i * chunk_size)..((i + 1) * chunk_size); - let mut batch = rocksdb::WriteBatch::default(); - let mut rng = rand::thread_rng(); - for j in range { - let (key, value) = if seq { - gen_kv_pair(j, value_size) - } else { - gen_kv_pair(rng.gen_range(0, key_nums), value_size) - }; - batch.put(key, value); - write.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - } - db.write_opt(batch, &write_options).unwrap(); - tx.send(()).unwrap(); - }); - expected += 1; - } + let cost = begin.elapsed(); - let begin = std::time::Instant::now(); - - for _ in rx.iter().take(expected) { - pb.inc(chunk_size); - let now = std::time::Instant::now(); - let delta = now.duration_since(last_report); - if delta > std::time::Duration::from_secs(1) { - last_report = now; - println!( - "{}, rate: {}, total: {}", - now.duration_since(begin).as_secs_f64(), - write.rate() as f64 / delta.as_secs_f64(), - write.now() - ); - } - } - pb.finish_with_message("done"); + println!("populate {} keys in {} ms", key_nums, cost.as_millis()); } - #[cfg(feature = "enable-rocksdb")] ("rocks_randread", Some(sub_matches)) => { + let times: u64 = sub_matches.value_of("times").unwrap().parse().unwrap(); let key_nums: u64 = sub_matches.value_of("key_nums").unwrap().parse().unwrap(); let value_size: usize = sub_matches.value_of("value_size").unwrap().parse().unwrap(); - let chunk_size: u64 = sub_matches.value_of("chunk_size").unwrap().parse().unwrap(); - let times: u64 = sub_matches.value_of("times").unwrap().parse().unwrap(); - let mut opts = rocksdb::Options::default(); - opts.create_if_missing(true); - opts.set_compression_type(rocksdb::DBCompressionType::None); - let db = Arc::new(rocksdb::DB::open(&opts, directory).unwrap()); - let mut expected = 0; + let db = Arc::new(rocksdb::DB::open(&rocks_opts, &directory).unwrap()); + let chunk_size = key_nums / threads; + let pb = ProgressBar::new(key_nums * times); pb.set_style(ProgressStyle::default_bar() .template( @@ -419,99 +226,56 @@ fn main() { ) .progress_chars("=> ")); - let mut missing = Rate::new(); - let mut found = Rate::new(); - let mut last_report = std::time::Instant::now(); + let begin = Instant::now(); for _ in 0..times { - for i in 0..key_nums / chunk_size { - let db = db.clone(); - let tx = tx.clone(); - let missing = missing.data.clone(); - let found = found.data.clone(); - pool.execute(move || { - let range = (i * chunk_size)..((i + 1) * chunk_size); - let mut rng = rand::thread_rng(); - for _ in range { - let (key, _) = gen_kv_pair(rng.gen_range(0, key_nums), value_size); - match db.get(&key) { - Ok(Some(value)) => { - assert_eq!(value.len(), value_size); - found.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - } - Ok(None) => { - missing.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - continue; - } - Err(err) => { - panic!("{:?}", err); - } - } - } - tx.send(()).unwrap(); - }); - expected += 1; - } + rocks_randread(db.clone(), key_nums, chunk_size, value_size); + pb.inc(key_nums); } - let begin = std::time::Instant::now(); - - for _ in rx.iter().take(expected) { - let now = std::time::Instant::now(); - let delta = now.duration_since(last_report); - last_report = now; - if delta > std::time::Duration::from_secs(1) { - println!( - "{}, rate: {}, found: {}, missing: {}", - now.duration_since(begin).as_secs_f64(), - (found.rate() + missing.rate()) as f64 / delta.as_secs_f64(), - found.now(), - missing.now() - ); - } - } pb.finish_with_message("done"); + + let cost = begin.elapsed(); + println!( + "randread {} keys {} times in {} ms", + key_nums, + times, + cost.as_millis() + ); } - #[cfg(feature = "enable-rocksdb")] ("rocks_iterate", Some(sub_matches)) => { let times: u64 = sub_matches.value_of("times").unwrap().parse().unwrap(); + let key_nums: u64 = sub_matches.value_of("key_nums").unwrap().parse().unwrap(); let value_size: usize = sub_matches.value_of("value_size").unwrap().parse().unwrap(); - let mut opts = rocksdb::Options::default(); - opts.create_if_missing(true); - opts.set_compression_type(rocksdb::DBCompressionType::None); - let db = Arc::new(rocksdb::DB::open(&opts, directory).unwrap()); + let db = Arc::new(rocksdb::DB::open(&rocks_opts, &directory).unwrap()); + let chunk_size = key_nums / threads; + + let pb = ProgressBar::new(times); + pb.set_style(ProgressStyle::default_bar() + .template( + "{prefix:.bold.dim} [{elapsed_precise}] [{bar:40}] [{per_sec}] ({pos}/{len}) {msg}", + ) + .progress_chars("=> ")); - let begin = std::time::Instant::now(); - let mut lst_report = std::time::Instant::now(); - let mut total = Rate::new(); + let begin = Instant::now(); for _ in 0..times { - let iter = db.iterator(rocksdb::IteratorMode::Start); - for (_, value) in iter { - assert_eq!(value.len(), value_size); - total - .data - .fetch_add(1, std::sync::atomic::Ordering::Relaxed); - let now = std::time::Instant::now(); - if now.duration_since(lst_report) >= Duration::from_secs(1) { - lst_report = now; - println!( - "{}, rate: {}, total: {}", - now.duration_since(begin).as_secs_f64(), - total.rate(), - total.now() - ); - } - } + rocks_iterate(db.clone(), key_nums, chunk_size, value_size); + pb.inc(1); } + pb.finish_with_message("done"); + + let cost = begin.elapsed(); println!( - "read total {} keys in {}", - total.now(), - begin.elapsed().as_secs_f64() - ) + "iterate {} keys {} times in {} ms", + key_nums, + times, + cost.as_millis() + ); } + _ => panic!("unsupported command"), } } diff --git a/benches/benches_agate_rocks.rs b/benches/benches_agate_rocks.rs index 8842ab00..29a48705 100644 --- a/benches/benches_agate_rocks.rs +++ b/benches/benches_agate_rocks.rs @@ -1,4 +1,3 @@ -#![cfg(feature = "enable-rocksdb")] mod common; use std::{ @@ -7,11 +6,12 @@ use std::{ time::{Duration, Instant}, }; -use agatedb::{Agate, AgateOptions, IteratorOptions}; -use common::{gen_kv_pair, remove_files, unix_time}; +use agatedb::AgateOptions; +use common::{ + agate_iterate, agate_populate, agate_randread, remove_files, rocks_iterate, rocks_populate, + rocks_randread, +}; use criterion::{criterion_group, criterion_main, Criterion}; -use rand::Rng; -use rocksdb::DB; use tempdir::TempDir; // We will process `CHUNK_SIZE` items in a thread, and in one certain thread, @@ -23,189 +23,6 @@ const BATCH_SIZE: u64 = 100; const SMALL_VALUE_SIZE: usize = 32; const LARGE_VALUE_SIZE: usize = 4096; -pub fn agate_populate( - agate: Arc, - key_nums: u64, - chunk_size: u64, - batch_size: u64, - value_size: usize, -) { - let mut handles = vec![]; - - for chunk_start in (0..key_nums).step_by(chunk_size as usize) { - let agate = agate.clone(); - - handles.push(std::thread::spawn(move || { - let range = chunk_start..chunk_start + chunk_size; - - for batch_start in range.step_by(batch_size as usize) { - let mut txn = agate.new_transaction_at(unix_time(), true); - - (batch_start..batch_start + batch_size).for_each(|key| { - let (key, value) = gen_kv_pair(key, value_size); - txn.set(key, value).unwrap(); - }); - - txn.commit_at(unix_time()).unwrap(); - } - })); - } - - handles - .into_iter() - .for_each(|handle| handle.join().unwrap()); -} - -pub fn agate_randread(agate: Arc, key_nums: u64, chunk_size: u64, value_size: usize) { - let mut handles = vec![]; - - for chunk_start in (0..key_nums).step_by(chunk_size as usize) { - let agate = agate.clone(); - - handles.push(std::thread::spawn(move || { - let mut rng = rand::thread_rng(); - let range = chunk_start..chunk_start + chunk_size; - let txn = agate.new_transaction_at(unix_time(), false); - - for _ in range { - let (key, _) = gen_kv_pair(rng.gen_range(0, key_nums), value_size); - match txn.get(&key) { - Ok(item) => { - assert_eq!(item.value().len(), value_size); - } - Err(err) => { - if matches!(err, agatedb::Error::KeyNotFound(_)) { - continue; - } else { - panic!("{:?}", err); - } - } - } - } - })); - } - - handles - .into_iter() - .for_each(|handle| handle.join().unwrap()); -} - -pub fn agate_iterate(agate: Arc, key_nums: u64, chunk_size: u64, value_size: usize) { - let mut handles = vec![]; - - for _ in (0..key_nums).step_by(chunk_size as usize) { - let agate = agate.clone(); - - handles.push(std::thread::spawn(move || { - let txn = agate.new_transaction_at(unix_time(), false); - let opts = IteratorOptions::default(); - let mut iter = txn.new_iterator(&opts); - iter.rewind(); - - while iter.valid() { - let item = iter.item(); - assert_eq!(item.value().len(), value_size); - - iter.next(); - } - })); - } - - handles - .into_iter() - .for_each(|handle| handle.join().unwrap()); -} - -pub fn rocks_populate( - db: Arc, - key_nums: u64, - chunk_size: u64, - batch_size: u64, - value_size: usize, -) { - let mut write_options = rocksdb::WriteOptions::default(); - write_options.set_sync(true); - write_options.disable_wal(false); - let write_options = Arc::new(write_options); - - let mut handles = vec![]; - - for chunk_start in (0..key_nums).step_by(chunk_size as usize) { - let db = db.clone(); - let write_options = write_options.clone(); - - handles.push(std::thread::spawn(move || { - let range = chunk_start..chunk_start + chunk_size; - - for batch_start in range.step_by(batch_size as usize) { - let mut batch = rocksdb::WriteBatch::default(); - - (batch_start..batch_start + batch_size).for_each(|key| { - let (key, value) = gen_kv_pair(key, value_size); - batch.put(key, value); - }); - - db.write_opt(batch, &write_options).unwrap(); - } - })); - } - - handles - .into_iter() - .for_each(|handle| handle.join().unwrap()); -} - -pub fn rocks_randread(db: Arc, key_nums: u64, chunk_size: u64, value_size: usize) { - let mut handles = vec![]; - - for chunk_start in (0..key_nums).step_by(chunk_size as usize) { - let db = db.clone(); - - handles.push(std::thread::spawn(move || { - let mut rng = rand::thread_rng(); - let range = chunk_start..chunk_start + chunk_size; - - for _ in range { - let (key, _) = gen_kv_pair(rng.gen_range(0, key_nums), value_size); - match db.get(key) { - Ok(item) => { - if item.is_some() { - assert_eq!(item.unwrap().len(), value_size); - } - } - Err(err) => { - panic!("{:?}", err); - } - } - } - })); - } - - handles - .into_iter() - .for_each(|handle| handle.join().unwrap()); -} - -pub fn rocks_iterate(db: Arc, key_nums: u64, chunk_size: u64, value_size: usize) { - let mut handles = vec![]; - - for _ in (0..key_nums).step_by(chunk_size as usize) { - let db = db.clone(); - - handles.push(std::thread::spawn(move || { - let iter = db.iterator(rocksdb::IteratorMode::Start); - - for (_, value) in iter { - assert_eq!(value.len(), value_size); - } - })); - } - - handles - .into_iter() - .for_each(|handle| handle.join().unwrap()); -} - fn bench_agate(c: &mut Criterion) { let dir = TempDir::new("agatedb-bench-small-value").unwrap(); let dir_path = dir.path(); diff --git a/benches/common.rs b/benches/common.rs index 1feb6aef..df25b742 100644 --- a/benches/common.rs +++ b/benches/common.rs @@ -3,15 +3,17 @@ use std::{ fs::{read_dir, remove_file}, ops::{Deref, DerefMut}, path::Path, + sync::Arc, time::UNIX_EPOCH, }; use agatedb::{ - opt::build_table_options, util::sync_dir, AgateOptions, - ChecksumVerificationMode::NoVerification, Table, TableBuilder, Value, + opt::build_table_options, util::sync_dir, Agate, AgateOptions, + ChecksumVerificationMode::NoVerification, IteratorOptions, Table, TableBuilder, Value, }; use bytes::{Bytes, BytesMut}; use rand::{distributions::Alphanumeric, Rng}; +use rocksdb::DB; use tempdir::TempDir; pub fn rand_value() -> String { @@ -94,3 +96,203 @@ pub fn remove_files(path: &Path) { }); sync_dir(&path).unwrap(); } + +pub fn agate_populate( + agate: Arc, + key_nums: u64, + chunk_size: u64, + batch_size: u64, + value_size: usize, +) { + let mut handles = vec![]; + + for chunk_start in (0..key_nums).step_by(chunk_size as usize) { + let agate = agate.clone(); + + handles.push(std::thread::spawn(move || { + let range = chunk_start..chunk_start + chunk_size; + + for batch_start in range.step_by(batch_size as usize) { + let mut txn = agate.new_transaction_at(unix_time(), true); + + (batch_start..batch_start + batch_size).for_each(|key| { + let (key, value) = gen_kv_pair(key, value_size); + txn.set(key, value).unwrap(); + }); + + txn.commit_at(unix_time()).unwrap(); + } + })); + } + + handles + .into_iter() + .for_each(|handle| handle.join().unwrap()); +} + +pub fn agate_randread(agate: Arc, key_nums: u64, chunk_size: u64, value_size: usize) { + let mut handles = vec![]; + + for chunk_start in (0..key_nums).step_by(chunk_size as usize) { + let agate = agate.clone(); + + handles.push(std::thread::spawn(move || { + let mut rng = rand::thread_rng(); + let range = chunk_start..chunk_start + chunk_size; + let txn = agate.new_transaction_at(unix_time(), false); + + for _ in range { + let (key, _) = gen_kv_pair(rng.gen_range(0, key_nums), value_size); + match txn.get(&key) { + Ok(item) => { + assert_eq!(item.value().len(), value_size); + } + Err(err) => { + if matches!(err, agatedb::Error::KeyNotFound(_)) { + continue; + } else { + panic!("{:?}", err); + } + } + } + } + })); + } + + handles + .into_iter() + .for_each(|handle| handle.join().unwrap()); +} + +pub fn agate_iterate(agate: Arc, key_nums: u64, chunk_size: u64, value_size: usize) { + let mut handles = vec![]; + + for chunk_start in (0..key_nums).step_by(chunk_size as usize) { + let agate = agate.clone(); + let (key, _) = gen_kv_pair(chunk_start, value_size); + + handles.push(std::thread::spawn(move || { + let txn = agate.new_transaction_at(unix_time(), false); + let opts = IteratorOptions::default(); + let mut iter = txn.new_iterator(&opts); + iter.seek(&key); + let mut count = 0; + + while iter.valid() { + let item = iter.item(); + assert_eq!(item.value().len(), value_size); + + iter.next(); + + count += 1; + if count > chunk_size { + break; + } + } + })); + } + + handles + .into_iter() + .for_each(|handle| handle.join().unwrap()); +} + +pub fn rocks_populate( + db: Arc, + key_nums: u64, + chunk_size: u64, + batch_size: u64, + value_size: usize, +) { + let mut write_options = rocksdb::WriteOptions::default(); + write_options.set_sync(true); + write_options.disable_wal(false); + let write_options = Arc::new(write_options); + + let mut handles = vec![]; + + for chunk_start in (0..key_nums).step_by(chunk_size as usize) { + let db = db.clone(); + let write_options = write_options.clone(); + + handles.push(std::thread::spawn(move || { + let range = chunk_start..chunk_start + chunk_size; + + for batch_start in range.step_by(batch_size as usize) { + let mut batch = rocksdb::WriteBatch::default(); + + (batch_start..batch_start + batch_size).for_each(|key| { + let (key, value) = gen_kv_pair(key, value_size); + batch.put(key, value); + }); + + db.write_opt(batch, &write_options).unwrap(); + } + })); + } + + handles + .into_iter() + .for_each(|handle| handle.join().unwrap()); +} + +pub fn rocks_randread(db: Arc, key_nums: u64, chunk_size: u64, value_size: usize) { + let mut handles = vec![]; + + for chunk_start in (0..key_nums).step_by(chunk_size as usize) { + let db = db.clone(); + + handles.push(std::thread::spawn(move || { + let mut rng = rand::thread_rng(); + let range = chunk_start..chunk_start + chunk_size; + + for _ in range { + let (key, _) = gen_kv_pair(rng.gen_range(0, key_nums), value_size); + match db.get(key) { + Ok(item) => { + if item.is_some() { + assert_eq!(item.unwrap().len(), value_size); + } + } + Err(err) => { + panic!("{:?}", err); + } + } + } + })); + } + + handles + .into_iter() + .for_each(|handle| handle.join().unwrap()); +} + +pub fn rocks_iterate(db: Arc, key_nums: u64, chunk_size: u64, value_size: usize) { + let mut handles = vec![]; + + for chunk_start in (0..key_nums).step_by(chunk_size as usize) { + let db = db.clone(); + let (key, _) = gen_kv_pair(chunk_start, value_size); + + handles.push(std::thread::spawn(move || { + let mut iter = db.raw_iterator(); + iter.seek(&key); + let mut count = 0; + + while iter.valid() { + assert_eq!(iter.value().unwrap().len(), value_size); + + iter.next(); + + count += 1; + if count > chunk_size { + break; + } + } + })); + } + + handles + .into_iter() + .for_each(|handle| handle.join().unwrap()); +} From 514d5cfa507c93fb0aad88a928e9e987044a53bb Mon Sep 17 00:00:00 2001 From: GanZiheng Date: Mon, 11 Jul 2022 17:12:13 +0800 Subject: [PATCH 15/17] update feature Signed-off-by: GanZiheng --- Cargo.toml | 3 ++- agate_bench/Cargo.toml | 3 ++- benches/benches_agate_rocks.rs | 1 + benches/common.rs | 4 ++++ 4 files changed, 9 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e4a37afd..b147ba90 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ edition = "2018" [features] default = [] +enable-rocksdb = ["rocksdb"] [dependencies] bytes = "1.0" @@ -23,7 +24,7 @@ parking_lot = "0.11" prost = "0.8" proto = { path = "proto" } rand = "0.7" -rocksdb = "0.15" +rocksdb = { version = "0.15", optional = true } skiplist = { path = "skiplist" } tempdir = "0.3" thiserror = "1.0" diff --git a/agate_bench/Cargo.toml b/agate_bench/Cargo.toml index 2b9ca30d..a2f23434 100644 --- a/agate_bench/Cargo.toml +++ b/agate_bench/Cargo.toml @@ -6,6 +6,7 @@ edition = "2018" [features] default = [] +enable-rocksdb = ["rocksdb"] [dependencies] agatedb = { path = "../" } @@ -13,7 +14,7 @@ bytes = "1.0" clap = "2.33" indicatif = "0.15" rand = "0.7" -rocksdb = "0.15" +rocksdb = { version = "0.15", optional = true } tempdir = "0.3" threadpool = "1.8" yatp = { git = "https://github.com/tikv/yatp.git" } diff --git a/benches/benches_agate_rocks.rs b/benches/benches_agate_rocks.rs index 29a48705..b5ee3089 100644 --- a/benches/benches_agate_rocks.rs +++ b/benches/benches_agate_rocks.rs @@ -1,3 +1,4 @@ +#![cfg(feature = "enable-rocksdb")] mod common; use std::{ diff --git a/benches/common.rs b/benches/common.rs index df25b742..cf60ac45 100644 --- a/benches/common.rs +++ b/benches/common.rs @@ -13,6 +13,7 @@ use agatedb::{ }; use bytes::{Bytes, BytesMut}; use rand::{distributions::Alphanumeric, Rng}; +#[cfg(feature = "enable-rocksdb")] use rocksdb::DB; use tempdir::TempDir; @@ -197,6 +198,7 @@ pub fn agate_iterate(agate: Arc, key_nums: u64, chunk_size: u64, value_si .for_each(|handle| handle.join().unwrap()); } +#[cfg(feature = "enable-rocksdb")] pub fn rocks_populate( db: Arc, key_nums: u64, @@ -236,6 +238,7 @@ pub fn rocks_populate( .for_each(|handle| handle.join().unwrap()); } +#[cfg(feature = "enable-rocksdb")] pub fn rocks_randread(db: Arc, key_nums: u64, chunk_size: u64, value_size: usize) { let mut handles = vec![]; @@ -267,6 +270,7 @@ pub fn rocks_randread(db: Arc, key_nums: u64, chunk_size: u64, value_size: u .for_each(|handle| handle.join().unwrap()); } +#[cfg(feature = "enable-rocksdb")] pub fn rocks_iterate(db: Arc, key_nums: u64, chunk_size: u64, value_size: usize) { let mut handles = vec![]; From 9a234cfcba1fd3ee530b08153d5cf31cd9cf74e4 Mon Sep 17 00:00:00 2001 From: GanZiheng Date: Mon, 11 Jul 2022 17:19:57 +0800 Subject: [PATCH 16/17] update Signed-off-by: GanZiheng --- agate_bench/src/main.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/agate_bench/src/main.rs b/agate_bench/src/main.rs index b4f1c537..cf83f042 100644 --- a/agate_bench/src/main.rs +++ b/agate_bench/src/main.rs @@ -171,7 +171,7 @@ fn main() { let agate = Arc::new(agate_opts.open().unwrap()); let chunk_size = key_nums / threads; - let pb = ProgressBar::new(times); + let pb = ProgressBar::new(key_nums * times); pb.set_style(ProgressStyle::default_bar() .template( "{prefix:.bold.dim} [{elapsed_precise}] [{bar:40}] [{per_sec}] ({pos}/{len}) {msg}", @@ -182,7 +182,7 @@ fn main() { for _ in 0..times { agate_iterate(agate.clone(), key_nums, chunk_size, value_size); - pb.inc(1); + pb.inc(key_nums); } pb.finish_with_message("done"); @@ -251,7 +251,7 @@ fn main() { let db = Arc::new(rocksdb::DB::open(&rocks_opts, &directory).unwrap()); let chunk_size = key_nums / threads; - let pb = ProgressBar::new(times); + let pb = ProgressBar::new(key_nums * times); pb.set_style(ProgressStyle::default_bar() .template( "{prefix:.bold.dim} [{elapsed_precise}] [{bar:40}] [{per_sec}] ({pos}/{len}) {msg}", @@ -262,7 +262,7 @@ fn main() { for _ in 0..times { rocks_iterate(db.clone(), key_nums, chunk_size, value_size); - pb.inc(1); + pb.inc(key_nums); } pb.finish_with_message("done"); From 9e816ec9895f384e87edac1209d9320204878fe9 Mon Sep 17 00:00:00 2001 From: GanZiheng Date: Tue, 12 Jul 2022 14:44:42 +0800 Subject: [PATCH 17/17] support randomly populate Signed-off-by: GanZiheng --- agate_bench/src/main.rs | 8 ++- benches/benches_agate_rocks.rs | 126 ++++++++++++++++++++++++++++++--- benches/common.rs | 16 ++++- 3 files changed, 138 insertions(+), 12 deletions(-) diff --git a/agate_bench/src/main.rs b/agate_bench/src/main.rs index cf83f042..17e3b4d9 100644 --- a/agate_bench/src/main.rs +++ b/agate_bench/src/main.rs @@ -55,6 +55,7 @@ fn main() { (@arg key_nums: --key_nums +takes_value default_value("1024") "key numbers") (@arg batch_size: --batch_size +takes_value default_value("1000") "pairs in one txn") (@arg value_size: --value_size +takes_value default_value("1024") "value size") + (@arg seq: --seq +takes_value default_value("true") "write sequentially") ) (@subcommand randread => (about: "randomly read from database") @@ -79,6 +80,7 @@ fn main() { (@arg key_nums: --key_nums +takes_value default_value("1024") "key numbers") (@arg batch_size: --batch_size +takes_value default_value("1000") "pairs in one txn") (@arg value_size: --value_size +takes_value default_value("1024") "value size") + (@arg seq: --seq +takes_value default_value("true") "write sequentially") ) (@subcommand rocks_randread => (about: "randomly read from database") @@ -119,13 +121,14 @@ fn main() { let key_nums: u64 = sub_matches.value_of("key_nums").unwrap().parse().unwrap(); let batch_size: u64 = sub_matches.value_of("batch_size").unwrap().parse().unwrap(); let value_size: usize = sub_matches.value_of("value_size").unwrap().parse().unwrap(); + let seq: bool = sub_matches.value_of("seq").unwrap().parse().unwrap(); let agate = Arc::new(agate_opts.open().unwrap()); let chunk_size = key_nums / threads; let begin = Instant::now(); - agate_populate(agate, key_nums, chunk_size, batch_size, value_size); + agate_populate(agate, key_nums, chunk_size, batch_size, value_size, seq); let cost = begin.elapsed(); @@ -199,13 +202,14 @@ fn main() { let key_nums: u64 = sub_matches.value_of("key_nums").unwrap().parse().unwrap(); let batch_size: u64 = sub_matches.value_of("batch_size").unwrap().parse().unwrap(); let value_size: usize = sub_matches.value_of("value_size").unwrap().parse().unwrap(); + let seq: bool = sub_matches.value_of("seq").unwrap().parse().unwrap(); let db = Arc::new(rocksdb::DB::open(&rocks_opts, &directory).unwrap()); let chunk_size = key_nums / threads; let begin = Instant::now(); - rocks_populate(db, key_nums, chunk_size, batch_size, value_size); + rocks_populate(db, key_nums, chunk_size, batch_size, value_size, seq); let cost = begin.elapsed(); diff --git a/benches/benches_agate_rocks.rs b/benches/benches_agate_rocks.rs index b5ee3089..4abd3424 100644 --- a/benches/benches_agate_rocks.rs +++ b/benches/benches_agate_rocks.rs @@ -35,7 +35,7 @@ fn bench_agate(c: &mut Criterion) { ..Default::default() }; - c.bench_function("agate populate small value", |b| { + c.bench_function("agate sequentially populate small value", |b| { b.iter_custom(|iters| { let mut total = Duration::new(0, 0); @@ -44,7 +44,38 @@ fn bench_agate(c: &mut Criterion) { let agate = Arc::new(opts.open().unwrap()); let now = Instant::now(); - agate_populate(agate, KEY_NUMS, CHUNK_SIZE, BATCH_SIZE, SMALL_VALUE_SIZE); + agate_populate( + agate, + KEY_NUMS, + CHUNK_SIZE, + BATCH_SIZE, + SMALL_VALUE_SIZE, + true, + ); + total = total.add(now.elapsed()); + }); + + total + }); + }); + + c.bench_function("agate randomly populate small value", |b| { + b.iter_custom(|iters| { + let mut total = Duration::new(0, 0); + + (0..iters).into_iter().for_each(|_| { + remove_files(dir_path); + let agate = Arc::new(opts.open().unwrap()); + + let now = Instant::now(); + agate_populate( + agate, + KEY_NUMS, + CHUNK_SIZE, + BATCH_SIZE, + SMALL_VALUE_SIZE, + false, + ); total = total.add(now.elapsed()); }); @@ -72,7 +103,31 @@ fn bench_agate(c: &mut Criterion) { opts.dir = dir_path.to_path_buf(); opts.value_dir = dir_path.to_path_buf(); - c.bench_function("agate populate large value", |b| { + c.bench_function("agate sequentially populate large value", |b| { + b.iter_custom(|iters| { + let mut total = Duration::new(0, 0); + + (0..iters).into_iter().for_each(|_| { + remove_files(dir_path); + let agate = Arc::new(opts.open().unwrap()); + + let now = Instant::now(); + agate_populate( + agate, + KEY_NUMS, + CHUNK_SIZE, + BATCH_SIZE, + LARGE_VALUE_SIZE, + true, + ); + total = total.add(now.elapsed()); + }); + + total + }); + }); + + c.bench_function("agate randomly populate large value", |b| { b.iter_custom(|iters| { let mut total = Duration::new(0, 0); @@ -81,7 +136,14 @@ fn bench_agate(c: &mut Criterion) { let agate = Arc::new(opts.open().unwrap()); let now = Instant::now(); - agate_populate(agate, KEY_NUMS, CHUNK_SIZE, BATCH_SIZE, LARGE_VALUE_SIZE); + agate_populate( + agate, + KEY_NUMS, + CHUNK_SIZE, + BATCH_SIZE, + LARGE_VALUE_SIZE, + false, + ); total = total.add(now.elapsed()); }); @@ -113,7 +175,7 @@ fn bench_rocks(c: &mut Criterion) { opts.create_if_missing(true); opts.set_compression_type(rocksdb::DBCompressionType::None); - c.bench_function("rocks populate small value", |b| { + c.bench_function("rocks sequentially populate small value", |b| { b.iter_custom(|iters| { let mut total = Duration::new(0, 0); @@ -122,7 +184,31 @@ fn bench_rocks(c: &mut Criterion) { let db = Arc::new(rocksdb::DB::open(&opts, &dir).unwrap()); let now = Instant::now(); - rocks_populate(db, KEY_NUMS, CHUNK_SIZE, BATCH_SIZE, SMALL_VALUE_SIZE); + rocks_populate(db, KEY_NUMS, CHUNK_SIZE, BATCH_SIZE, SMALL_VALUE_SIZE, true); + total = total.add(now.elapsed()); + }); + + total + }); + }); + + c.bench_function("rocks randomly populate small value", |b| { + b.iter_custom(|iters| { + let mut total = Duration::new(0, 0); + + (0..iters).into_iter().for_each(|_| { + remove_files(dir_path); + let db = Arc::new(rocksdb::DB::open(&opts, &dir).unwrap()); + + let now = Instant::now(); + rocks_populate( + db, + KEY_NUMS, + CHUNK_SIZE, + BATCH_SIZE, + SMALL_VALUE_SIZE, + false, + ); total = total.add(now.elapsed()); }); @@ -146,7 +232,24 @@ fn bench_rocks(c: &mut Criterion) { let dir = TempDir::new("rocks-bench-large-value").unwrap(); let dir_path = dir.path(); - c.bench_function("rocks populate large value", |b| { + c.bench_function("rocks sequentially populate large value", |b| { + b.iter_custom(|iters| { + let mut total = Duration::new(0, 0); + + (0..iters).into_iter().for_each(|_| { + remove_files(dir_path); + let db = Arc::new(rocksdb::DB::open(&opts, &dir).unwrap()); + + let now = Instant::now(); + rocks_populate(db, KEY_NUMS, CHUNK_SIZE, BATCH_SIZE, LARGE_VALUE_SIZE, true); + total = total.add(now.elapsed()); + }); + + total + }); + }); + + c.bench_function("rocks randomly populate large value", |b| { b.iter_custom(|iters| { let mut total = Duration::new(0, 0); @@ -155,7 +258,14 @@ fn bench_rocks(c: &mut Criterion) { let db = Arc::new(rocksdb::DB::open(&opts, &dir).unwrap()); let now = Instant::now(); - rocks_populate(db, KEY_NUMS, CHUNK_SIZE, BATCH_SIZE, LARGE_VALUE_SIZE); + rocks_populate( + db, + KEY_NUMS, + CHUNK_SIZE, + BATCH_SIZE, + LARGE_VALUE_SIZE, + false, + ); total = total.add(now.elapsed()); }); diff --git a/benches/common.rs b/benches/common.rs index cf60ac45..44e83e22 100644 --- a/benches/common.rs +++ b/benches/common.rs @@ -104,6 +104,7 @@ pub fn agate_populate( chunk_size: u64, batch_size: u64, value_size: usize, + seq: bool, ) { let mut handles = vec![]; @@ -111,13 +112,18 @@ pub fn agate_populate( let agate = agate.clone(); handles.push(std::thread::spawn(move || { + let mut rng = rand::thread_rng(); let range = chunk_start..chunk_start + chunk_size; for batch_start in range.step_by(batch_size as usize) { let mut txn = agate.new_transaction_at(unix_time(), true); (batch_start..batch_start + batch_size).for_each(|key| { - let (key, value) = gen_kv_pair(key, value_size); + let (key, value) = if seq { + gen_kv_pair(key, value_size) + } else { + gen_kv_pair(rng.gen_range(0, key_nums), value_size) + }; txn.set(key, value).unwrap(); }); @@ -205,6 +211,7 @@ pub fn rocks_populate( chunk_size: u64, batch_size: u64, value_size: usize, + seq: bool, ) { let mut write_options = rocksdb::WriteOptions::default(); write_options.set_sync(true); @@ -221,10 +228,15 @@ pub fn rocks_populate( let range = chunk_start..chunk_start + chunk_size; for batch_start in range.step_by(batch_size as usize) { + let mut rng = rand::thread_rng(); let mut batch = rocksdb::WriteBatch::default(); (batch_start..batch_start + batch_size).for_each(|key| { - let (key, value) = gen_kv_pair(key, value_size); + let (key, value) = if seq { + gen_kv_pair(key, value_size) + } else { + gen_kv_pair(rng.gen_range(0, key_nums), value_size) + }; batch.put(key, value); });