Skip to content

Commit

Permalink
feat: introduce time-series distribution args to bench tool (#253)
Browse files Browse the repository at this point in the history
  • Loading branch information
MrCroxx authored Dec 26, 2023
1 parent d3b375e commit 32d5d3d
Show file tree
Hide file tree
Showing 3 changed files with 206 additions and 16 deletions.
1 change: 1 addition & 0 deletions foyer-storage-bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ tokio = { workspace = true }
tracing = "0.1"
tracing-opentelemetry = { version = "0.22", optional = true }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
zipf = "7"

[features]
deadlock = ["parking_lot/deadlock_detection", "foyer-storage/deadlock"]
Expand Down
217 changes: 201 additions & 16 deletions foyer-storage-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod text;
mod utils;

use std::{
collections::BTreeMap,
fs::create_dir_all,
ops::Range,
path::PathBuf,
Expand All @@ -44,12 +45,13 @@ use foyer_storage::{
error::Result,
reinsertion::{rated_ticket::RatedTicketReinsertionPolicy, ReinsertionPolicy},
runtime::{RuntimeConfig, RuntimeStore, RuntimeStoreConfig, RuntimeStoreWriter},
storage::{Storage, StorageExt, StorageWriter},
storage::{AsyncStorageExt, Storage, StorageExt, StorageWriter},
store::{LfuFsStoreConfig, Store, StoreConfig, StoreWriter},
};
use futures::future::join_all;
use itertools::Itertools;
use rand::{
distributions::Distribution,
rngs::{OsRng, StdRng},
Rng, SeedableRng,
};
Expand All @@ -59,6 +61,7 @@ use tokio::sync::broadcast;
use utils::{detect_fs_type, dev_stat_path, file_stat_path, iostat, FsType};

#[derive(Parser, Debug, Clone)]
#[command(author, version, about)]
pub struct Args {
/// dir for cache data
#[arg(short, long)]
Expand Down Expand Up @@ -152,6 +155,22 @@ pub struct Args {
/// available values: "none", "zstd"
#[arg(long, default_value = "none")]
compression: String,

/// Time-series operation distribution.
///
/// Available values: "none", "uniform", "zipf".
///
/// If "uniform" or "zipf" is used, operations will be performed in async mode.
#[arg(long, default_value = "none")]
distribution: String,

/// For `--distribution zipf` only.
#[arg(long, default_value_t = 100)]
distribution_zipf_n: usize,

/// For `--distribution zipf` only.
#[arg(long, default_value_t = 0.5)]
distribution_zipf_s: f64,
}

#[derive(Debug)]
Expand Down Expand Up @@ -367,13 +386,55 @@ where
}
}

#[derive(Debug)]
enum TimeSeriesDistribution {
None,
Uniform {
interval: Duration,
},
Zipf {
n: usize,
s: f64,
interval: Duration,
},
}

impl TimeSeriesDistribution {
fn new(args: &Args) -> Self {
match args.distribution.as_str() {
"none" => TimeSeriesDistribution::None,
"uniform" => {
// interval = 1 / freq = 1 / (rate / size) = size / rate
let interval = ((args.entry_size_min + args.entry_size_max) >> 1) as f64
/ (args.w_rate * 1024.0 * 1024.0);
let interval = Duration::from_secs_f64(interval);
TimeSeriesDistribution::Uniform { interval }
}
"zipf" => {
// interval = 1 / freq = 1 / (rate / size) = size / rate
let interval = ((args.entry_size_min + args.entry_size_max) >> 1) as f64
/ (args.w_rate * 1024.0 * 1024.0);
let interval = Duration::from_secs_f64(interval);
display_zipf_sample(args.distribution_zipf_n, args.distribution_zipf_s);
TimeSeriesDistribution::Zipf {
n: args.distribution_zipf_n,
s: args.distribution_zipf_s,
interval,
}
}
other => panic!("unsupported distribution: {}", other),
}
}
}

struct Context {
w_rate: Option<f64>,
r_rate: Option<f64>,
counts: Vec<AtomicU64>,
entry_size_range: Range<usize>,
lookup_range: u64,
time: u64,
distribution: TimeSeriesDistribution,
metrics: Metrics,
}

Expand Down Expand Up @@ -637,14 +698,17 @@ async fn bench(
.map(|_| AtomicU64::default())
.collect_vec();

let distribution = TimeSeriesDistribution::new(&args);

let context = Arc::new(Context {
w_rate,
r_rate,
lookup_range: args.lookup_range,
counts,
entry_size_range: args.entry_size_min..args.entry_size_max + 1,
time: args.time,
distribution,
metrics: metrics.clone(),
lookup_range: args.lookup_range,
});

let w_handles = (0..args.writers)
Expand Down Expand Up @@ -675,9 +739,47 @@ async fn write(

let mut limiter = context.w_rate.map(RateLimiter::new);
let step = context.counts.len() as u64;
let count = &context.counts[id as usize];

const K: usize = 100;
const G: usize = 10;

let zipf_intervals = match context.distribution {
TimeSeriesDistribution::Zipf { n, s, interval } => {
let histogram = gen_zipf_histogram(n, s, G, n * K);

let loop_interval = Duration::from_secs_f64(interval.as_secs_f64() * K as f64);
let group_cnt = K / G;
let group_interval = interval.as_secs_f64() * group_cnt as f64;

if id == 0 {
println!("loop interval: {loop_interval:?}, zipf intervals: ");
}

let mut sum = 0;
let intervals = histogram
.values()
.copied()
.map(|ratio| {
let cnt = ratio * K as f64;
sum += cnt as usize;
let interval = Duration::from_secs_f64(group_interval / cnt);
if id == 0 {
println!(" [{cnt:3.0} ==> {interval:010.3?}]");
}
(sum, interval)
})
.collect_vec();

Some(intervals)
}
_ => None,
};

let mut c = 0;

loop {
let l = Instant::now();

match stop.try_recv() {
Err(broadcast::error::TryRecvError::Empty) => {}
_ => return,
Expand All @@ -686,7 +788,6 @@ async fn write(
return;
}

let c = count.load(Ordering::Relaxed);
let idx = id + step * c;
// TODO(MrCroxx): Use random content?
let entry_size = OsRng.gen_range(context.entry_size_range.clone());
Expand All @@ -696,20 +797,49 @@ async fn write(
}

let time = Instant::now();
let inserted = store.insert(idx, data).await.unwrap();
let lat = time.elapsed().as_micros() as u64;
count.store(c + 1, Ordering::Relaxed);
if let Err(e) = context.metrics.insert_lats.write().record(lat) {
tracing::error!("metrics error: {:?}, value: {}", e, lat);
}
let ctx = context.clone();
let callback = move |res: Result<bool>| async move {
let inserted = res.unwrap();
let lat = time.elapsed().as_micros() as u64;
ctx.counts[id as usize].fetch_add(1, Ordering::Relaxed);
if let Err(e) = ctx.metrics.insert_lats.write().record(lat) {
tracing::error!("metrics error: {:?}, value: {}", e, lat);
}

if inserted {
context.metrics.insert_ios.fetch_add(1, Ordering::Relaxed);
context
.metrics
.insert_bytes
.fetch_add(entry_size, Ordering::Relaxed);
if inserted {
ctx.metrics.insert_ios.fetch_add(1, Ordering::Relaxed);
ctx.metrics
.insert_bytes
.fetch_add(entry_size, Ordering::Relaxed);
}
};

let elapsed = l.elapsed();

match &context.distribution {
TimeSeriesDistribution::None => {
let res = store.insert(idx, data).await;
callback(res).await;
}
TimeSeriesDistribution::Uniform { interval } => {
store.insert_async_with_callback(idx, data, callback);
tokio::time::sleep(interval.saturating_sub(elapsed)).await;
}
TimeSeriesDistribution::Zipf { .. } => {
store.insert_async_with_callback(idx, data, callback);
let intervals = zipf_intervals.as_ref().unwrap();

let group = match intervals.binary_search_by_key(&(c as usize % K), |(sum, _)| *sum)
{
Ok(i) => i,
Err(i) => i.min(G - 1),
};

tokio::time::sleep(intervals[group].1.saturating_sub(elapsed)).await;
}
}

c += 1;
}
}

Expand Down Expand Up @@ -773,3 +903,58 @@ async fn read(
tokio::task::consume_budget().await;
}
}

fn gen_zipf_histogram(n: usize, s: f64, groups: usize, samples: usize) -> BTreeMap<usize, f64> {
let step = n / groups;

let mut rng = rand::thread_rng();
let zipf = zipf::ZipfDistribution::new(n, s).unwrap();
let mut data: BTreeMap<usize, usize> = BTreeMap::default();
for _ in 0..samples {
let v = zipf.sample(&mut rng);
let g = std::cmp::min(v / step, groups);
*data.entry(g).or_default() += 1;
}
let mut histogram: BTreeMap<usize, f64> = BTreeMap::default();
for group in 0..groups {
histogram.insert(
group,
data.get(&group).copied().unwrap_or_default() as f64 / samples as f64,
);
}
histogram
}

fn display_zipf_sample(n: usize, s: f64) {
const W: usize = 100;
const H: usize = 10;

let samples = n * 1000;

let histogram = gen_zipf_histogram(n, s, H, samples);

let max = histogram.values().copied().fold(0.0, f64::max);

println!("zipf's diagram [N = {n}][s = {s}][samples = {}]", n * 1000);

for (g, ratio) in histogram {
let shares = (ratio / max * W as f64) as usize;
let bar: String = if shares != 0 {
"=".repeat(shares)
} else {
".".to_string()
};
println!(
"{:3} : {:6} : {:6.3}% : {}",
g,
(samples as f64 * ratio) as usize,
ratio * 100.0,
bar
);
}
}

#[test]
fn zipf() {
display_zipf_sample(1000, 0.5);
}
4 changes: 4 additions & 0 deletions foyer-storage/src/flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ where

#[tracing::instrument(skip(self))]
async fn update_catalog(&self, entries: Vec<PositionedEntry<K, V>>) -> Result<()> {
if entries.is_empty() {
return Ok(());
}

// record fully flushed bytes by the way
let mut bytes = 0;

Expand Down

0 comments on commit 32d5d3d

Please sign in to comment.