From d9832637b19b0d81e78b27f0973102f9a576861d Mon Sep 17 00:00:00 2001 From: NikodemGapski Date: Fri, 29 Nov 2024 14:51:35 +0100 Subject: [PATCH] Add lock-free histogram Implement a lock-free histogram with hot and cold bucket pools. Implementation inspired by Prometheus library in Go and the histogram crate. --- scylla/src/transport/histogram/config.rs | 201 ++++++++++++++++++ .../histogram/lock_free_histogram.rs | 173 +++++++++++++++ scylla/src/transport/histogram/mod.rs | 5 + scylla/src/transport/metrics.rs | 21 +- scylla/src/transport/mod.rs | 1 + 5 files changed, 391 insertions(+), 10 deletions(-) create mode 100644 scylla/src/transport/histogram/config.rs create mode 100644 scylla/src/transport/histogram/lock_free_histogram.rs create mode 100644 scylla/src/transport/histogram/mod.rs diff --git a/scylla/src/transport/histogram/config.rs b/scylla/src/transport/histogram/config.rs new file mode 100644 index 0000000000..5188a41786 --- /dev/null +++ b/scylla/src/transport/histogram/config.rs @@ -0,0 +1,201 @@ +/// This file is a slightly adapted version of `config.rs` from the `histogram` +/// crate released under MIT License. +use core::ops::RangeInclusive; + +/// The configuration of a histogram which determines the bucketing strategy and +/// therefore the relative error and memory utilization of a histogram. +/// * `grouping_power` - controls the number of buckets that are used to span +/// consecutive powers of two. Lower values result in less memory usage since +/// fewer buckets will be created. However, this will result in larger +/// relative error as each bucket represents a wider range of values. +/// * `max_value_power` - controls the largest value which can be stored in the +/// histogram. `2^(max_value_power) - 1` is the inclusive upper bound for the +/// representable range of values. +/// +/// # How to choose parameters for your data +/// Please see for an +/// in-depth discussion about the bucketing strategy and an interactive +/// calculator that lets you explore how these parameters result in histograms +/// with varying error guarantees and memory utilization requirements. +/// +/// # The short version +/// ## Grouping Power +/// `grouping_power` should be set such that `2^(-1 * grouping_power)` is an +/// acceptable relative error. Rephrased, we can plug-in the acceptable +/// relative error into `grouping_power = ceil(log2(1/e))`. For example, if we +/// want to limit the error to 0.1% (0.001) we should set `grouping_power = 7`. +/// +/// ## Max Value Power +/// `max_value_power` should be the closest power of 2 that is larger than the +/// largest value you expect in your data. If your only guarantee is that the +/// values are all `u64`, then setting this to `64` may be reasonable if you +/// can tolerate a bit of relative error. +/// +/// ## Resulting size +/// +/// If we want to allow any value in a range of unsigned types, the amount of +/// memory for the histogram is approximately: +/// +/// | power | error | u16 | u32 | u64 | +/// |-------|-------|---------|---------|---------| +/// | 2 | 25% | 0.6 KiB | 1 KiB | 2 KiB | +/// | 3 | 12.5% | 1 KiB | 2 KiB | 4 KiB | +/// | 4 | 6.25% | 2 KiB | 4 KiB | 8 KiB | +/// | 5 | 3.13% | 3 KiB | 7 KiB | 15 KiB | +/// | 6 | 1.56% | 6 KiB | 14 KiB | 30 KiB | +/// | 7 | .781% | 10 KiB | 26 KiB | 58 KiB | +/// | 8 | .391% | 18 KiB | 50 KiB | 114 KiB | +/// | 9 | .195% | 32 KiB | 96 KiB | 224 KiB | +/// | 10 | .098% | 56 KiB | 184 KiB | 440 KiB | +/// | 11 | .049% | 96 KiB | 352 KiB | 864 KiB | +/// | 12 | .025% | 160 KiB | 672 KiB | 1.7 MiB | +/// +/// # Constraints: +/// * `max_value_power` must be in the range `0..=64` +/// * `max_value_power` must be greater than `grouping_power +#[derive(Clone, Copy, Debug, PartialEq)] +pub struct Config { + max: u64, + grouping_power: u8, + max_value_power: u8, + cutoff_power: u8, + cutoff_value: u64, + lower_bin_count: u32, + upper_bin_divisions: u32, + upper_bin_count: u32, +} + +impl Config { + /// Create a new histogram `Config` from the parameters. See the struct + /// documentation [`crate::Config`] for the meaning of the parameters and + /// their constraints. + pub const fn new(grouping_power: u8, max_value_power: u8) -> Result { + // we only allow values up to 2^64 + if max_value_power > 64 { + return Err("max_value_power too high"); + } + + // check that the other parameters make sense together + if grouping_power >= max_value_power { + return Err("max_value_power too low"); + } + + // the cutoff is the point at which the linear range divisions and the + // logarithmic range subdivisions diverge. + // + // for example: + // when a = 0, the linear range has bins with width 1. + // if b = 7 the logarithmic range has 128 subdivisions. + // this means that for 0..128 we must be representing the values exactly + // but we also represent 128..256 exactly since the subdivisions divide + // that range into bins with the same width as the linear portion. + // + // therefore our cutoff power = a + b + 1 + + // note: because a + b must be less than n which is a u8, a + b + 1 must + // be less than or equal to u8::MAX. This means our cutoff power will + // always fit in a u8 + let cutoff_power = grouping_power + 1; + let cutoff_value = 2_u64.pow(cutoff_power as u32); + let lower_bin_width = 2_u32.pow(0); + let upper_bin_divisions = 2_u32.pow(grouping_power as u32); + + let max = if max_value_power == 64 { + u64::MAX + } else { + 2_u64.pow(max_value_power as u32) + }; + + let lower_bin_count = (cutoff_value / lower_bin_width as u64) as u32; + let upper_bin_count = (max_value_power - cutoff_power) as u32 * upper_bin_divisions; + + Ok(Self { + max, + grouping_power, + max_value_power, + cutoff_power, + cutoff_value, + lower_bin_count, + upper_bin_divisions, + upper_bin_count, + }) + } + + /// Returns the grouping power that was used to create this configuration. + pub const fn grouping_power(&self) -> u8 { + self.grouping_power + } + + /// Returns the max value power that was used to create this configuration. + pub const fn max_value_power(&self) -> u8 { + self.max_value_power + } + + /// Returns the relative error (in percentage) of this configuration. This + /// only applies to the logarithmic bins of the histogram (linear bins have + /// a width of 1 and no error). For histograms with no logarithmic bins, + /// error for the entire histogram is zero. + pub fn error(&self) -> f64 { + match self.grouping_power == self.max_value_power - 1 { + true => 0.0, + false => 100.0 / 2_u64.pow(self.grouping_power as u32) as f64, + } + } + + /// Return the total number of buckets needed for this config. + pub const fn total_buckets(&self) -> usize { + (self.lower_bin_count + self.upper_bin_count) as usize + } + + /// Converts a value to a bucket index. Returns an error if the value is + /// outside of the range for the config. + pub(crate) fn value_to_index(&self, value: u64) -> Result { + if value < self.cutoff_value { + return Ok(value as usize); + } + + if value > self.max { + return Err("value out of range for histogram"); + } + + let power = 63 - value.leading_zeros(); + let log_bin = power - self.cutoff_power as u32; + let offset = (value - (1 << power)) >> (power - self.grouping_power as u32); + + Ok((self.lower_bin_count + log_bin * self.upper_bin_divisions + offset as u32) as usize) + } + + /// Convert a bucket index to a lower bound. + pub(crate) fn index_to_lower_bound(&self, index: usize) -> u64 { + let g = index as u64 >> self.grouping_power; + let h = index as u64 - g * (1 << self.grouping_power); + + if g < 1 { + h + } else { + (1 << (self.grouping_power as u64 + g - 1)) + (1 << (g - 1)) * h + } + } + + /// Convert a bucket index to a upper inclusive bound. + #[allow(dead_code)] + pub(crate) fn index_to_upper_bound(&self, index: usize) -> u64 { + if index as u32 == self.lower_bin_count + self.upper_bin_count - 1 { + return self.max; + } + let g = index as u64 >> self.grouping_power; + let h = index as u64 - g * (1 << self.grouping_power) + 1; + + if g < 1 { + h - 1 + } else { + (1 << (self.grouping_power as u64 + g - 1)) + (1 << (g - 1)) * h - 1 + } + } + + /// Convert a bucket index to a range. + #[allow(dead_code)] + pub(crate) fn index_to_range(&self, index: usize) -> RangeInclusive { + self.index_to_lower_bound(index)..=self.index_to_upper_bound(index) + } +} diff --git a/scylla/src/transport/histogram/lock_free_histogram.rs b/scylla/src/transport/histogram/lock_free_histogram.rs new file mode 100644 index 0000000000..70c6ec8be0 --- /dev/null +++ b/scylla/src/transport/histogram/lock_free_histogram.rs @@ -0,0 +1,173 @@ +use std::hint; +/// This file was inspired by a lock-free histogram implementation +/// from the Prometheus library in Go. +/// https://github.com/prometheus/client_golang/blob/main/prometheus/histogram.go +/// Note: in the current implementation, the histogram *may* incur a data race +/// after (1 << 63) increments (which is quite a lot). +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, Mutex}; + +use super::Config; + +const ORDER_TYPE: Ordering = Ordering::Relaxed; +const HIGH_BIT: u64 = 1 << 63; +const LOW_MASK: u64 = HIGH_BIT - 1; + +#[derive(Debug)] +pub struct Histogram { + /// Hot index - index of the hot pool. + /// Count - number of all started observations. + /// Use of both of these variables in one AtomicU64 + /// allows use of a lock-free algorithm. + hot_idx_and_count: AtomicU64, + /// Finished observations for each bucket pool. + pool_counts: [AtomicU64; 2], + /// Two bucket pools: hot and cold. + bucket_pools: [Box<[AtomicU64]>; 2], + /// Mutex for "writers" (logging operations) + /// (logging time is not as critical). + logger_mutex: Arc>, + /// Standard configuration for math operations. + config: Config, +} + +impl Histogram { + pub fn new() -> Self { + let grouping_power = 7; + let max_value_power = 64; + let config = Config::new(grouping_power, max_value_power) + .expect("default histogram construction failure"); + + Self::with_config(&config) + } + + pub fn with_config(config: &Config) -> Self { + let mut buckets1 = Vec::with_capacity(config.total_buckets()); + buckets1.resize_with(config.total_buckets(), || AtomicU64::new(0)); + let mut buckets2 = Vec::with_capacity(config.total_buckets()); + buckets2.resize_with(config.total_buckets(), || AtomicU64::new(0)); + + Self { + hot_idx_and_count: AtomicU64::new(0), + pool_counts: [AtomicU64::new(0), AtomicU64::new(0)], + bucket_pools: [buckets1.into(), buckets2.into()], + logger_mutex: Arc::new(Mutex::new(0)), + config: *config, + } + } + + pub fn increment(&self, value: u64) -> Result<(), &'static str> { + // Increment started observations count. + let n = self.hot_idx_and_count.fetch_add(1, ORDER_TYPE); + let hot_idx = (n >> 63) as usize; + + // Increment the corresponding bucket value. + let idx = self.config.value_to_index(value)?; + self.bucket_pools[hot_idx][idx].fetch_add(1, ORDER_TYPE); + + // Increment finished observations count. + self.pool_counts[hot_idx].fetch_add(1, ORDER_TYPE); + Ok(()) + } + + pub fn mean() -> impl FnOnce(&[AtomicU64], &Config) -> Result { + |buckets, config| { + let total_count = Histogram::get_total_count(buckets); + + let mut weighted_sum = 0; + for (i, bucket) in buckets.iter().enumerate() { + // Note: we choose index_to_lower_bound here + // but that choice is arbitrary. + weighted_sum += + bucket.load(ORDER_TYPE) as u128 * config.index_to_lower_bound(i) as u128; + } + Ok((weighted_sum / total_count) as u64) + } + } + + pub fn percentile( + percentile: f64, + ) -> impl FnOnce(&[AtomicU64], &Config) -> Result { + move |buckets, config| { + if !(0.0..100.0).contains(&percentile) { + return Err("percentile out of bounds"); + } + + let total_count = Histogram::get_total_count(buckets); + let count = (percentile / 100.0 * total_count as f64).ceil() as u128; + + let mut pref_sum = 0; + for (i, bucket) in buckets.iter().enumerate() { + if pref_sum >= count { + // Note: we choose index_to_lower_bound here and after the loop + // but that choice is arbitrary. + return Ok(config.index_to_lower_bound(i)); + } + pref_sum += bucket.load(ORDER_TYPE) as u128; + } + Ok(config.index_to_lower_bound(buckets.len() - 1)) + } + } + + pub fn get_total_count(buckets: &[AtomicU64]) -> u128 { + buckets.iter().map(|v| v.load(ORDER_TYPE) as u128).sum() + } + + pub fn log_operation( + &self, + f: impl FnOnce(&[AtomicU64], &Config) -> Result, + ) -> Result { + // Lock the "writers" mutex. + let _guard = self.logger_mutex.lock(); + if _guard.is_err() { + return Err("couldn't lock the logger mutex"); + } + let _guard = _guard.unwrap(); + + // Switch the hot-cold index (wrapping add on highest bit). + let n = self.hot_idx_and_count.fetch_add(HIGH_BIT, ORDER_TYPE); + let started_count = n & LOW_MASK; + + let hot_idx = (n >> 63) as usize; + let cold_idx = 1 - hot_idx; + + let hot_counts = &self.pool_counts[hot_idx]; + let cold_counts = &self.pool_counts[cold_idx]; + + // Wait until the old hot observers (now working on the currently + // cold bucket pool) finish their job. + // (Since observer's job is fast, we can wait in a spin loop). + while started_count != cold_counts.load(ORDER_TYPE) { + hint::spin_loop(); + } + + // Now there are no active observers on the cold pool, so we can safely + // access the data without a logical race. + let result = f(&self.bucket_pools[cold_idx], &self.config); + + // Compund the cold histogram results onto the already running hot ones. + // Note that no logging operation can run now as we still hold + // the mutex, so it doesn't matter that the entire operation isn't atomic. + + // Update finished observations' counts. + hot_counts.fetch_add(cold_counts.load(ORDER_TYPE), ORDER_TYPE); + cold_counts.store(0, ORDER_TYPE); + + // Update bucket values (both pools have the same length). + for i in 0..self.bucket_pools[0].len() { + let hot_bucket = &self.bucket_pools[hot_idx][i]; + let cold_bucket = &self.bucket_pools[cold_idx][i]; + + hot_bucket.fetch_add(cold_bucket.load(ORDER_TYPE), ORDER_TYPE); + cold_bucket.store(0, ORDER_TYPE); + } + + result + } +} + +impl Default for Histogram { + fn default() -> Self { + Histogram::new() + } +} diff --git a/scylla/src/transport/histogram/mod.rs b/scylla/src/transport/histogram/mod.rs new file mode 100644 index 0000000000..b02ade466d --- /dev/null +++ b/scylla/src/transport/histogram/mod.rs @@ -0,0 +1,5 @@ +mod config; +mod lock_free_histogram; + +pub use config::Config; +pub use lock_free_histogram::Histogram; diff --git a/scylla/src/transport/metrics.rs b/scylla/src/transport/metrics.rs index 85f7ed4913..832496b28f 100644 --- a/scylla/src/transport/metrics.rs +++ b/scylla/src/transport/metrics.rs @@ -1,6 +1,6 @@ -use histogram::Histogram; +use crate::transport::histogram::Histogram; use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; const ORDER_TYPE: Ordering = Ordering::Relaxed; @@ -28,7 +28,7 @@ pub struct Metrics { errors_iter_num: AtomicU64, queries_iter_num: AtomicU64, retries_num: AtomicU64, - histogram: Arc>, + histogram: Arc, } impl Metrics { @@ -39,7 +39,7 @@ impl Metrics { errors_iter_num: AtomicU64::new(0), queries_iter_num: AtomicU64::new(0), retries_num: AtomicU64::new(0), - histogram: Arc::new(Mutex::new(Histogram::new())), + histogram: Arc::new(Histogram::new()), } } @@ -76,15 +76,14 @@ impl Metrics { /// /// * `latency` - time in milliseconds that should be logged pub(crate) fn log_query_latency(&self, latency: u64) -> Result<(), MetricsError> { - let mut histogram_unlocked = self.histogram.lock().unwrap(); - histogram_unlocked.increment(latency)?; + self.histogram.increment(latency)?; Ok(()) } /// Returns average latency in milliseconds pub fn get_latency_avg_ms(&self) -> Result { - let histogram_unlocked = self.histogram.lock().unwrap(); - Ok(histogram_unlocked.mean()?) + let mean = self.histogram.log_operation(Histogram::mean())?; + Ok(mean) } /// Returns latency from histogram for a given percentile @@ -92,8 +91,10 @@ impl Metrics { /// /// * `percentile` - float value (0.0 - 100.0) pub fn get_latency_percentile_ms(&self, percentile: f64) -> Result { - let histogram_unlocked = self.histogram.lock().unwrap(); - Ok(histogram_unlocked.percentile(percentile)?) + let result = self + .histogram + .log_operation(Histogram::percentile(percentile))?; + Ok(result) } /// Returns counter for errors occurred in nonpaged queries diff --git a/scylla/src/transport/mod.rs b/scylla/src/transport/mod.rs index 55184aadc6..544e5644aa 100644 --- a/scylla/src/transport/mod.rs +++ b/scylla/src/transport/mod.rs @@ -5,6 +5,7 @@ mod connection_pool; pub mod downgrading_consistency_retry_policy; pub mod errors; pub mod execution_profile; +pub mod histogram; pub mod host_filter; pub mod iterator; pub mod legacy_query_result;