Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce time-series distribution args to bench tool #253

Merged
merged 2 commits into from
Dec 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions foyer-storage-bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ tokio = { workspace = true }
tracing = "0.1"
tracing-opentelemetry = { version = "0.22", optional = true }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
zipf = "7"

[features]
deadlock = ["parking_lot/deadlock_detection", "foyer-storage/deadlock"]
Expand Down
217 changes: 201 additions & 16 deletions foyer-storage-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod text;
mod utils;

use std::{
collections::BTreeMap,
fs::create_dir_all,
ops::Range,
path::PathBuf,
Expand All @@ -44,12 +45,13 @@ use foyer_storage::{
error::Result,
reinsertion::{rated_ticket::RatedTicketReinsertionPolicy, ReinsertionPolicy},
runtime::{RuntimeConfig, RuntimeStore, RuntimeStoreConfig, RuntimeStoreWriter},
storage::{Storage, StorageExt, StorageWriter},
storage::{AsyncStorageExt, Storage, StorageExt, StorageWriter},
store::{LfuFsStoreConfig, Store, StoreConfig, StoreWriter},
};
use futures::future::join_all;
use itertools::Itertools;
use rand::{
distributions::Distribution,
rngs::{OsRng, StdRng},
Rng, SeedableRng,
};
Expand All @@ -59,6 +61,7 @@ use tokio::sync::broadcast;
use utils::{detect_fs_type, dev_stat_path, file_stat_path, iostat, FsType};

#[derive(Parser, Debug, Clone)]
#[command(author, version, about)]
pub struct Args {
/// dir for cache data
#[arg(short, long)]
Expand Down Expand Up @@ -152,6 +155,22 @@ pub struct Args {
/// available values: "none", "zstd"
#[arg(long, default_value = "none")]
compression: String,

/// Time-series operation distribution.
///
/// Available values: "none", "uniform", "zipf".
///
/// If "uniform" or "zipf" is used, operations will be performed in async mode.
#[arg(long, default_value = "none")]
distribution: String,

/// For `--distribution zipf` only.
#[arg(long, default_value_t = 100)]
distribution_zipf_n: usize,

/// For `--distribution zipf` only.
#[arg(long, default_value_t = 0.5)]
distribution_zipf_s: f64,
}

#[derive(Debug)]
Expand Down Expand Up @@ -367,13 +386,55 @@ where
}
}

#[derive(Debug)]
enum TimeSeriesDistribution {
None,
Uniform {
interval: Duration,
},
Zipf {
n: usize,
s: f64,
interval: Duration,
},
}

impl TimeSeriesDistribution {
fn new(args: &Args) -> Self {
match args.distribution.as_str() {
"none" => TimeSeriesDistribution::None,
"uniform" => {
// interval = 1 / freq = 1 / (rate / size) = size / rate
let interval = ((args.entry_size_min + args.entry_size_max) >> 1) as f64
/ (args.w_rate * 1024.0 * 1024.0);
let interval = Duration::from_secs_f64(interval);
TimeSeriesDistribution::Uniform { interval }
}
"zipf" => {
// interval = 1 / freq = 1 / (rate / size) = size / rate
let interval = ((args.entry_size_min + args.entry_size_max) >> 1) as f64
/ (args.w_rate * 1024.0 * 1024.0);
let interval = Duration::from_secs_f64(interval);
display_zipf_sample(args.distribution_zipf_n, args.distribution_zipf_s);
TimeSeriesDistribution::Zipf {
n: args.distribution_zipf_n,
s: args.distribution_zipf_s,
interval,
}
}
other => panic!("unsupported distribution: {}", other),
}
}
}

struct Context {
w_rate: Option<f64>,
r_rate: Option<f64>,
counts: Vec<AtomicU64>,
entry_size_range: Range<usize>,
lookup_range: u64,
time: u64,
distribution: TimeSeriesDistribution,
metrics: Metrics,
}

Expand Down Expand Up @@ -637,14 +698,17 @@ async fn bench(
.map(|_| AtomicU64::default())
.collect_vec();

let distribution = TimeSeriesDistribution::new(&args);

let context = Arc::new(Context {
w_rate,
r_rate,
lookup_range: args.lookup_range,
counts,
entry_size_range: args.entry_size_min..args.entry_size_max + 1,
time: args.time,
distribution,
metrics: metrics.clone(),
lookup_range: args.lookup_range,
});

let w_handles = (0..args.writers)
Expand Down Expand Up @@ -675,9 +739,47 @@ async fn write(

let mut limiter = context.w_rate.map(RateLimiter::new);
let step = context.counts.len() as u64;
let count = &context.counts[id as usize];

const K: usize = 100;
const G: usize = 10;

let zipf_intervals = match context.distribution {
TimeSeriesDistribution::Zipf { n, s, interval } => {
let histogram = gen_zipf_histogram(n, s, G, n * K);

let loop_interval = Duration::from_secs_f64(interval.as_secs_f64() * K as f64);
let group_cnt = K / G;
let group_interval = interval.as_secs_f64() * group_cnt as f64;

if id == 0 {
println!("loop interval: {loop_interval:?}, zipf intervals: ");
}

let mut sum = 0;
let intervals = histogram
.values()
.copied()
.map(|ratio| {
let cnt = ratio * K as f64;
sum += cnt as usize;
let interval = Duration::from_secs_f64(group_interval / cnt);
if id == 0 {
println!(" [{cnt:3.0} ==> {interval:010.3?}]");
}
(sum, interval)
})
.collect_vec();

Some(intervals)
}
_ => None,
};

let mut c = 0;

loop {
let l = Instant::now();

match stop.try_recv() {
Err(broadcast::error::TryRecvError::Empty) => {}
_ => return,
Expand All @@ -686,7 +788,6 @@ async fn write(
return;
}

let c = count.load(Ordering::Relaxed);
let idx = id + step * c;
// TODO(MrCroxx): Use random content?
let entry_size = OsRng.gen_range(context.entry_size_range.clone());
Expand All @@ -696,20 +797,49 @@ async fn write(
}

let time = Instant::now();
let inserted = store.insert(idx, data).await.unwrap();
let lat = time.elapsed().as_micros() as u64;
count.store(c + 1, Ordering::Relaxed);
if let Err(e) = context.metrics.insert_lats.write().record(lat) {
tracing::error!("metrics error: {:?}, value: {}", e, lat);
}
let ctx = context.clone();
let callback = move |res: Result<bool>| async move {
let inserted = res.unwrap();
let lat = time.elapsed().as_micros() as u64;
ctx.counts[id as usize].fetch_add(1, Ordering::Relaxed);
if let Err(e) = ctx.metrics.insert_lats.write().record(lat) {
tracing::error!("metrics error: {:?}, value: {}", e, lat);
}

if inserted {
context.metrics.insert_ios.fetch_add(1, Ordering::Relaxed);
context
.metrics
.insert_bytes
.fetch_add(entry_size, Ordering::Relaxed);
if inserted {
ctx.metrics.insert_ios.fetch_add(1, Ordering::Relaxed);
ctx.metrics
.insert_bytes
.fetch_add(entry_size, Ordering::Relaxed);
}
};

let elapsed = l.elapsed();

match &context.distribution {
TimeSeriesDistribution::None => {
let res = store.insert(idx, data).await;
callback(res).await;
}
TimeSeriesDistribution::Uniform { interval } => {
store.insert_async_with_callback(idx, data, callback);
tokio::time::sleep(interval.saturating_sub(elapsed)).await;
}
TimeSeriesDistribution::Zipf { .. } => {
store.insert_async_with_callback(idx, data, callback);
let intervals = zipf_intervals.as_ref().unwrap();

let group = match intervals.binary_search_by_key(&(c as usize % K), |(sum, _)| *sum)
{
Ok(i) => i,
Err(i) => i.min(G - 1),
};

tokio::time::sleep(intervals[group].1.saturating_sub(elapsed)).await;
}
}

c += 1;
}
}

Expand Down Expand Up @@ -773,3 +903,58 @@ async fn read(
tokio::task::consume_budget().await;
}
}

fn gen_zipf_histogram(n: usize, s: f64, groups: usize, samples: usize) -> BTreeMap<usize, f64> {
let step = n / groups;

let mut rng = rand::thread_rng();
let zipf = zipf::ZipfDistribution::new(n, s).unwrap();
let mut data: BTreeMap<usize, usize> = BTreeMap::default();
for _ in 0..samples {
let v = zipf.sample(&mut rng);
let g = std::cmp::min(v / step, groups);
*data.entry(g).or_default() += 1;
}
let mut histogram: BTreeMap<usize, f64> = BTreeMap::default();
for group in 0..groups {
histogram.insert(
group,
data.get(&group).copied().unwrap_or_default() as f64 / samples as f64,
);
}
histogram
}

fn display_zipf_sample(n: usize, s: f64) {
const W: usize = 100;
const H: usize = 10;

let samples = n * 1000;

let histogram = gen_zipf_histogram(n, s, H, samples);

let max = histogram.values().copied().fold(0.0, f64::max);

println!("zipf's diagram [N = {n}][s = {s}][samples = {}]", n * 1000);

for (g, ratio) in histogram {
let shares = (ratio / max * W as f64) as usize;
let bar: String = if shares != 0 {
"=".repeat(shares)
} else {
".".to_string()
};
println!(
"{:3} : {:6} : {:6.3}% : {}",
g,
(samples as f64 * ratio) as usize,
ratio * 100.0,
bar
);
}
}

#[test]
fn zipf() {
display_zipf_sample(1000, 0.5);
}
4 changes: 4 additions & 0 deletions foyer-storage/src/flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ where

#[tracing::instrument(skip(self))]
async fn update_catalog(&self, entries: Vec<PositionedEntry<K, V>>) -> Result<()> {
if entries.is_empty() {
return Ok(());
}

// record fully flushed bytes by the way
let mut bytes = 0;

Expand Down
Loading