Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
niebayes committed Mar 29, 2024
1 parent f4e7126 commit 113323f
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 16 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ dotenv.workspace = true
futures.workspace = true
futures-util.workspace = true
humantime.workspace = true
humantime-serde.workspace = true
indicatif = "0.17.1"
itertools.workspace = true
lazy_static.workspace = true
Expand Down
21 changes: 21 additions & 0 deletions benchmarks/config/wal_bench.example.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Refers to the documents of `Args` in benchmarks/src/wal.rs`.
wal_provider = "kafka"
bootstrap_brokers = ["localhost:9092"]
num_workers = 10
num_topics = 32
num_regions = 1000
num_scrapes = 1000
num_rows = 5
col_types = "ifs"
max_batch_size = "512KB"
linger = "1ms"
backoff_init = "10ms"
backoff_max = "1ms"
backoff_base = 2
backoff_deadline = "3s"
compression = "zstd"
rng_seed = 42
skip_read = false
skip_write = false
random_topics = true
report_metrics = false
27 changes: 14 additions & 13 deletions benchmarks/src/bin/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ use rskafka::client::partition::Compression;
use rskafka::client::ClientBuilder;
use store_api::logstore::LogStore;
use store_api::storage::RegionId;
use tokio::sync::Barrier;

struct Benchmarker;

Expand All @@ -64,16 +63,12 @@ impl Benchmarker {
if !cfg.skip_write {
info!("Benchmarking write ...");

let barrier = Arc::new(Barrier::new(cfg.num_workers as usize));
let num_scrapes = cfg.num_scrapes;
let timer = Instant::now();

futures::future::join_all((0..cfg.num_workers).map(|i| {
let barrier = barrier.clone();
let wal = wal.clone();
let regions = region_chunks[i as usize].clone();
tokio::spawn(async move {
barrier.wait().await;
for _ in 0..num_scrapes {
let mut wal_writer = wal.writer();
regions
Expand All @@ -84,29 +79,23 @@ impl Benchmarker {
})
}))
.await;

write_elapsed += timer.elapsed().as_millis();
}

if !cfg.skip_read {
info!("Benchmarking read ...");

let barrier = Arc::new(Barrier::new(cfg.num_workers as usize));
let timer = Instant::now();

futures::future::join_all((0..cfg.num_workers).map(|i| {
let barrier = barrier.clone();
let wal = wal.clone();
let regions = region_chunks[i as usize].clone();
tokio::spawn(async move {
barrier.wait().await;
for region in regions.iter() {
region.replay(&wal).await;
}
})
}))
.await;

read_elapsed = timer.elapsed().as_millis();
}

Expand Down Expand Up @@ -239,18 +228,30 @@ fn parse_compression(comp: &str) -> Compression {
}

fn parse_col_types(col_types: &str) -> Vec<ColumnDataType> {
col_types
let parts = col_types.split('x').collect::<Vec<_>>();
assert!(parts.len() <= 2);

let pattern = parts[0];
let repeat = parts
.get(1)
.map(|r| r.parse::<usize>().unwrap())
.unwrap_or(1);

pattern
.chars()
.map(|c| match c {
'i' | 'I' => ColumnDataType::Int64,
'f' | 'F' => ColumnDataType::Float64,
's' | 'S' => ColumnDataType::String,
other => unreachable!("Cannot parse {other} as a column data type"),
})
.cycle()
.take(pattern.len() * repeat)
.collect()
}

fn main() {
// Sets the global logging to INFO and suppress loggings from rskafka other than ERROR and upper ones.
std::env::set_var("UNITTEST_LOG_LEVEL", "info,rskafka=error");
common_telemetry::init_default_ut_logging();

Expand All @@ -272,7 +273,7 @@ fn main() {
.min(cfg.num_scrapes)
.min(cfg.max_batch_size.as_bytes() as u32)
.min(cfg.bootstrap_brokers.len() as u32)
> 0
== 0
{
panic!("Invalid arguments");
}
Expand Down
16 changes: 13 additions & 3 deletions benchmarks/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use crate::metrics;

/// The wal provider.
#[derive(Clone, ValueEnum, Default, Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum WalProvider {
#[default]
RaftEngine,
Expand All @@ -44,7 +45,8 @@ pub enum WalProvider {
#[derive(Parser)]
pub struct Args {
/// The provided configuration file.
#[clap(long, default_value = "")]
/// The example configuration file can be found at `greptimedb/benchmarks/config/wal_bench.example.toml`.
#[clap(long, short = 'c')]
pub cfg_file: String,

/// The wal provider.
Expand Down Expand Up @@ -83,17 +85,21 @@ pub struct Args {
/// - f = ColumnDataType::Float64
/// - s = ColumnDataType::String
/// For e.g., "ifs" will be parsed as three columns: i64, f64, and string.
///
/// Additionally, a "x" sign can be provided to repeat the column types for a given number of times.
/// For e.g., "iix2" will be parsed as 4 columns: i64, i64, i64, and i64.
/// This feature is useful if you want to specify many columns.
#[clap(long, default_value = "ifs")]
pub col_types: String,

/// The maximum size of a batch of kafka records.
/// The default value is 1mb.
#[clap(long, default_value = "1MB")]
#[clap(long, default_value = "512KB")]
pub max_batch_size: ReadableSize,

/// The minimum latency the kafka client issues a batch of kafka records.
/// However, a batch of kafka records would be immediately issued if a record cannot be fit into the batch.
#[clap(long, default_value = "20ms")]
#[clap(long, default_value = "1ms")]
pub linger: String,

/// The initial backoff delay of the kafka consumer.
Expand Down Expand Up @@ -150,10 +156,14 @@ pub struct Config {
pub num_rows: u32,
pub col_types: String,
pub max_batch_size: ReadableSize,
#[serde(with = "humantime_serde")]
pub linger: Duration,
#[serde(with = "humantime_serde")]
pub backoff_init: Duration,
#[serde(with = "humantime_serde")]
pub backoff_max: Duration,
pub backoff_base: u32,
#[serde(with = "humantime_serde")]
pub backoff_deadline: Duration,
pub compression: String,
pub rng_seed: u64,
Expand Down

0 comments on commit 113323f

Please sign in to comment.