From 113323f5fdf628e392616bdf761246a8b5381b75 Mon Sep 17 00:00:00 2001 From: niebayes Date: Fri, 29 Mar 2024 16:45:15 +0800 Subject: [PATCH] tmp --- Cargo.lock | 1 + benchmarks/Cargo.toml | 1 + benchmarks/config/wal_bench.example.toml | 21 ++++++++++++++++++ benchmarks/src/bin/wal.rs | 27 ++++++++++++------------ benchmarks/src/wal.rs | 16 +++++++++++--- 5 files changed, 50 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 58badb94ae47..c444979a081f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -886,6 +886,7 @@ dependencies = [ "futures", "futures-util", "humantime", + "humantime-serde", "indicatif", "itertools 0.10.5", "lazy_static", diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index 534c64119393..18b44e944858 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -20,6 +20,7 @@ dotenv.workspace = true futures.workspace = true futures-util.workspace = true humantime.workspace = true +humantime-serde.workspace = true indicatif = "0.17.1" itertools.workspace = true lazy_static.workspace = true diff --git a/benchmarks/config/wal_bench.example.toml b/benchmarks/config/wal_bench.example.toml index e69de29bb2d1..72faed0d7410 100644 --- a/benchmarks/config/wal_bench.example.toml +++ b/benchmarks/config/wal_bench.example.toml @@ -0,0 +1,21 @@ +# Refers to the documents of `Args` in benchmarks/src/wal.rs`. +wal_provider = "kafka" +bootstrap_brokers = ["localhost:9092"] +num_workers = 10 +num_topics = 32 +num_regions = 1000 +num_scrapes = 1000 +num_rows = 5 +col_types = "ifs" +max_batch_size = "512KB" +linger = "1ms" +backoff_init = "10ms" +backoff_max = "1ms" +backoff_base = 2 +backoff_deadline = "3s" +compression = "zstd" +rng_seed = 42 +skip_read = false +skip_write = false +random_topics = true +report_metrics = false diff --git a/benchmarks/src/bin/wal.rs b/benchmarks/src/bin/wal.rs index ef048f40faf6..d71100635d3d 100644 --- a/benchmarks/src/bin/wal.rs +++ b/benchmarks/src/bin/wal.rs @@ -37,7 +37,6 @@ use rskafka::client::partition::Compression; use rskafka::client::ClientBuilder; use store_api::logstore::LogStore; use store_api::storage::RegionId; -use tokio::sync::Barrier; struct Benchmarker; @@ -64,16 +63,12 @@ impl Benchmarker { if !cfg.skip_write { info!("Benchmarking write ..."); - let barrier = Arc::new(Barrier::new(cfg.num_workers as usize)); let num_scrapes = cfg.num_scrapes; let timer = Instant::now(); - futures::future::join_all((0..cfg.num_workers).map(|i| { - let barrier = barrier.clone(); let wal = wal.clone(); let regions = region_chunks[i as usize].clone(); tokio::spawn(async move { - barrier.wait().await; for _ in 0..num_scrapes { let mut wal_writer = wal.writer(); regions @@ -84,29 +79,23 @@ impl Benchmarker { }) })) .await; - write_elapsed += timer.elapsed().as_millis(); } if !cfg.skip_read { info!("Benchmarking read ..."); - let barrier = Arc::new(Barrier::new(cfg.num_workers as usize)); let timer = Instant::now(); - futures::future::join_all((0..cfg.num_workers).map(|i| { - let barrier = barrier.clone(); let wal = wal.clone(); let regions = region_chunks[i as usize].clone(); tokio::spawn(async move { - barrier.wait().await; for region in regions.iter() { region.replay(&wal).await; } }) })) .await; - read_elapsed = timer.elapsed().as_millis(); } @@ -239,7 +228,16 @@ fn parse_compression(comp: &str) -> Compression { } fn parse_col_types(col_types: &str) -> Vec { - col_types + let parts = col_types.split('x').collect::>(); + assert!(parts.len() <= 2); + + let pattern = parts[0]; + let repeat = parts + .get(1) + .map(|r| r.parse::().unwrap()) + .unwrap_or(1); + + pattern .chars() .map(|c| match c { 'i' | 'I' => ColumnDataType::Int64, @@ -247,10 +245,13 @@ fn parse_col_types(col_types: &str) -> Vec { 's' | 'S' => ColumnDataType::String, other => unreachable!("Cannot parse {other} as a column data type"), }) + .cycle() + .take(pattern.len() * repeat) .collect() } fn main() { + // Sets the global logging to INFO and suppress loggings from rskafka other than ERROR and upper ones. std::env::set_var("UNITTEST_LOG_LEVEL", "info,rskafka=error"); common_telemetry::init_default_ut_logging(); @@ -272,7 +273,7 @@ fn main() { .min(cfg.num_scrapes) .min(cfg.max_batch_size.as_bytes() as u32) .min(cfg.bootstrap_brokers.len() as u32) - > 0 + == 0 { panic!("Invalid arguments"); } diff --git a/benchmarks/src/wal.rs b/benchmarks/src/wal.rs index 9addf2af7f78..10e88f99f37c 100644 --- a/benchmarks/src/wal.rs +++ b/benchmarks/src/wal.rs @@ -35,6 +35,7 @@ use crate::metrics; /// The wal provider. #[derive(Clone, ValueEnum, Default, Debug, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] pub enum WalProvider { #[default] RaftEngine, @@ -44,7 +45,8 @@ pub enum WalProvider { #[derive(Parser)] pub struct Args { /// The provided configuration file. - #[clap(long, default_value = "")] + /// The example configuration file can be found at `greptimedb/benchmarks/config/wal_bench.example.toml`. + #[clap(long, short = 'c')] pub cfg_file: String, /// The wal provider. @@ -83,17 +85,21 @@ pub struct Args { /// - f = ColumnDataType::Float64 /// - s = ColumnDataType::String /// For e.g., "ifs" will be parsed as three columns: i64, f64, and string. + /// + /// Additionally, a "x" sign can be provided to repeat the column types for a given number of times. + /// For e.g., "iix2" will be parsed as 4 columns: i64, i64, i64, and i64. + /// This feature is useful if you want to specify many columns. #[clap(long, default_value = "ifs")] pub col_types: String, /// The maximum size of a batch of kafka records. /// The default value is 1mb. - #[clap(long, default_value = "1MB")] + #[clap(long, default_value = "512KB")] pub max_batch_size: ReadableSize, /// The minimum latency the kafka client issues a batch of kafka records. /// However, a batch of kafka records would be immediately issued if a record cannot be fit into the batch. - #[clap(long, default_value = "20ms")] + #[clap(long, default_value = "1ms")] pub linger: String, /// The initial backoff delay of the kafka consumer. @@ -150,10 +156,14 @@ pub struct Config { pub num_rows: u32, pub col_types: String, pub max_batch_size: ReadableSize, + #[serde(with = "humantime_serde")] pub linger: Duration, + #[serde(with = "humantime_serde")] pub backoff_init: Duration, + #[serde(with = "humantime_serde")] pub backoff_max: Duration, pub backoff_base: u32, + #[serde(with = "humantime_serde")] pub backoff_deadline: Duration, pub compression: String, pub rng_seed: u64,