From 5dcfcdf1445c683aa54479a4383be9dd321dac73 Mon Sep 17 00:00:00 2001 From: MrCroxx Date: Mon, 25 Dec 2023 16:57:29 +0800 Subject: [PATCH 1/2] feat: introduce time-series distribution args for bench tool Signed-off-by: MrCroxx --- foyer-storage-bench/Cargo.toml | 1 + foyer-storage-bench/src/main.rs | 202 +++++++++++++++++++++++++++++--- foyer-storage/src/flusher.rs | 4 + 3 files changed, 191 insertions(+), 16 deletions(-) diff --git a/foyer-storage-bench/Cargo.toml b/foyer-storage-bench/Cargo.toml index c00cf56d..d8ad36d1 100644 --- a/foyer-storage-bench/Cargo.toml +++ b/foyer-storage-bench/Cargo.toml @@ -49,6 +49,7 @@ tokio = { workspace = true } tracing = "0.1" tracing-opentelemetry = { version = "0.22", optional = true } tracing-subscriber = { version = "0.3", features = ["env-filter"] } +zipf = "7" [features] deadlock = ["parking_lot/deadlock_detection", "foyer-storage/deadlock"] diff --git a/foyer-storage-bench/src/main.rs b/foyer-storage-bench/src/main.rs index f41bb7fc..eac078fd 100644 --- a/foyer-storage-bench/src/main.rs +++ b/foyer-storage-bench/src/main.rs @@ -22,6 +22,7 @@ mod text; mod utils; use std::{ + collections::BTreeMap, fs::create_dir_all, ops::Range, path::PathBuf, @@ -44,12 +45,13 @@ use foyer_storage::{ error::Result, reinsertion::{rated_ticket::RatedTicketReinsertionPolicy, ReinsertionPolicy}, runtime::{RuntimeConfig, RuntimeStore, RuntimeStoreConfig, RuntimeStoreWriter}, - storage::{Storage, StorageExt, StorageWriter}, + storage::{AsyncStorageExt, Storage, StorageExt, StorageWriter}, store::{LfuFsStoreConfig, Store, StoreConfig, StoreWriter}, }; use futures::future::join_all; use itertools::Itertools; use rand::{ + distributions::Distribution, rngs::{OsRng, StdRng}, Rng, SeedableRng, }; @@ -59,6 +61,7 @@ use tokio::sync::broadcast; use utils::{detect_fs_type, dev_stat_path, file_stat_path, iostat, FsType}; #[derive(Parser, Debug, Clone)] +#[command(author, version, about)] pub struct Args { /// dir for cache data #[arg(short, long)] @@ -152,6 +155,22 @@ pub struct Args { /// available values: "none", "zstd" #[arg(long, default_value = "none")] compression: String, + + /// Time-series operation distribution. + /// + /// Available values: "none", "uniform", "zipf". + /// + /// If "uniform" or "zipf" is used, operations will be performed in async mode. + #[arg(long, default_value = "none")] + distribution: String, + + /// For `--distribution zipf` only. + #[arg(long, default_value_t = 100)] + distribution_zipf_n: usize, + + /// For `--distribution zipf` only. + #[arg(long, default_value_t = 0.5)] + distribution_zipf_s: f64, } #[derive(Debug)] @@ -367,6 +386,47 @@ where } } +#[derive(Debug)] +enum TimeSeriesDistribution { + None, + Uniform { + interval: Duration, + }, + Zipf { + n: usize, + s: f64, + interval: Duration, + }, +} + +impl TimeSeriesDistribution { + fn new(args: &Args) -> Self { + match args.distribution.as_str() { + "none" => TimeSeriesDistribution::None, + "uniform" => { + // interval = 1 / freq = 1 / (rate / size) = size / rate + let interval = ((args.entry_size_min + args.entry_size_max) >> 1) as f64 + / (args.w_rate * 1024.0 * 1024.0); + let interval = Duration::from_secs_f64(interval); + TimeSeriesDistribution::Uniform { interval } + } + "zipf" => { + // interval = 1 / freq = 1 / (rate / size) = size / rate + let interval = ((args.entry_size_min + args.entry_size_max) >> 1) as f64 + / (args.w_rate * 1024.0 * 1024.0); + let interval = Duration::from_secs_f64(interval); + display_zipf_sample(args.distribution_zipf_n, args.distribution_zipf_s); + TimeSeriesDistribution::Zipf { + n: args.distribution_zipf_n, + s: args.distribution_zipf_s, + interval, + } + } + other => panic!("unsupported distribution: {}", other), + } + } +} + struct Context { w_rate: Option, r_rate: Option, @@ -374,6 +434,7 @@ struct Context { entry_size_range: Range, lookup_range: u64, time: u64, + distribution: TimeSeriesDistribution, metrics: Metrics, } @@ -637,14 +698,17 @@ async fn bench( .map(|_| AtomicU64::default()) .collect_vec(); + let distribution = TimeSeriesDistribution::new(&args); + let context = Arc::new(Context { w_rate, r_rate, + lookup_range: args.lookup_range, counts, entry_size_range: args.entry_size_min..args.entry_size_max + 1, time: args.time, + distribution, metrics: metrics.clone(), - lookup_range: args.lookup_range, }); let w_handles = (0..args.writers) @@ -675,9 +739,38 @@ async fn write( let mut limiter = context.w_rate.map(RateLimiter::new); let step = context.counts.len() as u64; - let count = &context.counts[id as usize]; + + const K: usize = 100; + const G: usize = 10; + + let zipf_intervals = match context.distribution { + TimeSeriesDistribution::Zipf { n, s, interval } => { + let histogram = gen_zipf_histogram(n, s, G, n * K); + + let loop_interval = Duration::from_secs_f64(interval.as_secs_f64() * K as f64); + let group_cnt = K / G; + let group_interval = interval.as_secs_f64() * group_cnt as f64; + + let intervals = histogram + .values() + .copied() + .map(|ratio| Duration::from_secs_f64(group_interval / (ratio * K as f64))) + .collect_vec(); + + if id == 0 { + println!("loop interval: {loop_interval:?}, zipf intervals: {intervals:?}"); + } + + Some(intervals) + } + _ => None, + }; + + let mut c = 0; loop { + let l = Instant::now(); + match stop.try_recv() { Err(broadcast::error::TryRecvError::Empty) => {} _ => return, @@ -686,7 +779,6 @@ async fn write( return; } - let c = count.load(Ordering::Relaxed); let idx = id + step * c; // TODO(MrCroxx): Use random content? let entry_size = OsRng.gen_range(context.entry_size_range.clone()); @@ -696,20 +788,43 @@ async fn write( } let time = Instant::now(); - let inserted = store.insert(idx, data).await.unwrap(); - let lat = time.elapsed().as_micros() as u64; - count.store(c + 1, Ordering::Relaxed); - if let Err(e) = context.metrics.insert_lats.write().record(lat) { - tracing::error!("metrics error: {:?}, value: {}", e, lat); - } + let ctx = context.clone(); + let callback = move |res: Result| async move { + let inserted = res.unwrap(); + let lat = time.elapsed().as_micros() as u64; + ctx.counts[id as usize].fetch_add(1, Ordering::Relaxed); + if let Err(e) = ctx.metrics.insert_lats.write().record(lat) { + tracing::error!("metrics error: {:?}, value: {}", e, lat); + } - if inserted { - context.metrics.insert_ios.fetch_add(1, Ordering::Relaxed); - context - .metrics - .insert_bytes - .fetch_add(entry_size, Ordering::Relaxed); + if inserted { + ctx.metrics.insert_ios.fetch_add(1, Ordering::Relaxed); + ctx.metrics + .insert_bytes + .fetch_add(entry_size, Ordering::Relaxed); + } + }; + + let elapsed = l.elapsed(); + + match &context.distribution { + TimeSeriesDistribution::None => { + let res = store.insert(idx, data).await; + callback(res).await; + } + TimeSeriesDistribution::Uniform { interval } => { + store.insert_async_with_callback(idx, data, callback); + tokio::time::sleep(interval.saturating_sub(elapsed)).await; + } + TimeSeriesDistribution::Zipf { .. } => { + store.insert_async_with_callback(idx, data, callback); + let intervals = zipf_intervals.as_ref().unwrap(); + let group = (c as usize % K) / (K / G); + tokio::time::sleep(intervals[group].saturating_sub(elapsed)).await; + } } + + c += 1; } } @@ -773,3 +888,58 @@ async fn read( tokio::task::consume_budget().await; } } + +fn gen_zipf_histogram(n: usize, s: f64, groups: usize, samples: usize) -> BTreeMap { + let step = n / groups; + + let mut rng = rand::thread_rng(); + let zipf = zipf::ZipfDistribution::new(n, s).unwrap(); + let mut data: BTreeMap = BTreeMap::default(); + for _ in 0..samples { + let v = zipf.sample(&mut rng); + let g = std::cmp::min(v / step, groups); + *data.entry(g).or_default() += 1; + } + let mut histogram: BTreeMap = BTreeMap::default(); + for group in 0..groups { + histogram.insert( + group, + data.get(&group).copied().unwrap_or_default() as f64 / samples as f64, + ); + } + histogram +} + +fn display_zipf_sample(n: usize, s: f64) { + const W: usize = 100; + const H: usize = 10; + + let samples = n * 1000; + + let histogram = gen_zipf_histogram(n, s, H, samples); + + let max = histogram.values().copied().fold(0.0, f64::max); + + println!("zipf's diagram [N = {n}][s = {s}][samples = {}]", n * 1000); + + for (g, ratio) in histogram { + let shares = (ratio / max * W as f64) as usize; + let bar: String = if shares != 0 { + "=".repeat(shares) + } else { + ".".to_string() + }; + println!( + "{:3} : {:6} : {:6.3}% : {}", + g, + (samples as f64 * ratio) as usize, + ratio * 100.0, + bar + ); + } +} + +#[test] +fn zipf() { + display_zipf_sample(1000, 0.5); +} diff --git a/foyer-storage/src/flusher.rs b/foyer-storage/src/flusher.rs index 333b5023..0ad99c61 100644 --- a/foyer-storage/src/flusher.rs +++ b/foyer-storage/src/flusher.rs @@ -192,6 +192,10 @@ where #[tracing::instrument(skip(self))] async fn update_catalog(&self, entries: Vec>) -> Result<()> { + if entries.is_empty() { + return Ok(()); + } + // record fully flushed bytes by the way let mut bytes = 0; From b9a6e1daef9c9c68099db05f2156f2d6e7d48cbf Mon Sep 17 00:00:00 2001 From: MrCroxx Date: Mon, 25 Dec 2023 17:41:27 +0800 Subject: [PATCH 2/2] fix: fix calculate zipf interval Signed-off-by: MrCroxx --- foyer-storage-bench/src/main.rs | 29 ++++++++++++++++++++++------- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/foyer-storage-bench/src/main.rs b/foyer-storage-bench/src/main.rs index eac078fd..218487fa 100644 --- a/foyer-storage-bench/src/main.rs +++ b/foyer-storage-bench/src/main.rs @@ -751,16 +751,25 @@ async fn write( let group_cnt = K / G; let group_interval = interval.as_secs_f64() * group_cnt as f64; + if id == 0 { + println!("loop interval: {loop_interval:?}, zipf intervals: "); + } + + let mut sum = 0; let intervals = histogram .values() .copied() - .map(|ratio| Duration::from_secs_f64(group_interval / (ratio * K as f64))) + .map(|ratio| { + let cnt = ratio * K as f64; + sum += cnt as usize; + let interval = Duration::from_secs_f64(group_interval / cnt); + if id == 0 { + println!(" [{cnt:3.0} ==> {interval:010.3?}]"); + } + (sum, interval) + }) .collect_vec(); - if id == 0 { - println!("loop interval: {loop_interval:?}, zipf intervals: {intervals:?}"); - } - Some(intervals) } _ => None, @@ -819,8 +828,14 @@ async fn write( TimeSeriesDistribution::Zipf { .. } => { store.insert_async_with_callback(idx, data, callback); let intervals = zipf_intervals.as_ref().unwrap(); - let group = (c as usize % K) / (K / G); - tokio::time::sleep(intervals[group].saturating_sub(elapsed)).await; + + let group = match intervals.binary_search_by_key(&(c as usize % K), |(sum, _)| *sum) + { + Ok(i) => i, + Err(i) => i.min(G - 1), + }; + + tokio::time::sleep(intervals[group].1.saturating_sub(elapsed)).await; } }