Skip to content

Commit

Permalink
Add lock-free histogram
Browse files Browse the repository at this point in the history
Implement a lock-free histogram with hot and cold bucket pools.
Implementation inspired by Prometheus library in Go and the histogram
crate.
  • Loading branch information
NikodemGapski committed Nov 29, 2024
1 parent a24f685 commit d983263
Show file tree
Hide file tree
Showing 5 changed files with 391 additions and 10 deletions.
201 changes: 201 additions & 0 deletions scylla/src/transport/histogram/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
/// This file is a slightly adapted version of `config.rs` from the `histogram`
/// crate released under MIT License.
use core::ops::RangeInclusive;

/// The configuration of a histogram which determines the bucketing strategy and
/// therefore the relative error and memory utilization of a histogram.
/// * `grouping_power` - controls the number of buckets that are used to span
/// consecutive powers of two. Lower values result in less memory usage since
/// fewer buckets will be created. However, this will result in larger
/// relative error as each bucket represents a wider range of values.
/// * `max_value_power` - controls the largest value which can be stored in the
/// histogram. `2^(max_value_power) - 1` is the inclusive upper bound for the
/// representable range of values.
///
/// # How to choose parameters for your data
/// Please see <https://observablehq.com/@iopsystems/h2histogram> for an
/// in-depth discussion about the bucketing strategy and an interactive
/// calculator that lets you explore how these parameters result in histograms
/// with varying error guarantees and memory utilization requirements.
///
/// # The short version
/// ## Grouping Power
/// `grouping_power` should be set such that `2^(-1 * grouping_power)` is an
/// acceptable relative error. Rephrased, we can plug-in the acceptable
/// relative error into `grouping_power = ceil(log2(1/e))`. For example, if we
/// want to limit the error to 0.1% (0.001) we should set `grouping_power = 7`.
///
/// ## Max Value Power
/// `max_value_power` should be the closest power of 2 that is larger than the
/// largest value you expect in your data. If your only guarantee is that the
/// values are all `u64`, then setting this to `64` may be reasonable if you
/// can tolerate a bit of relative error.
///
/// ## Resulting size
///
/// If we want to allow any value in a range of unsigned types, the amount of
/// memory for the histogram is approximately:
///
/// | power | error | u16 | u32 | u64 |
/// |-------|-------|---------|---------|---------|
/// | 2 | 25% | 0.6 KiB | 1 KiB | 2 KiB |
/// | 3 | 12.5% | 1 KiB | 2 KiB | 4 KiB |
/// | 4 | 6.25% | 2 KiB | 4 KiB | 8 KiB |
/// | 5 | 3.13% | 3 KiB | 7 KiB | 15 KiB |
/// | 6 | 1.56% | 6 KiB | 14 KiB | 30 KiB |
/// | 7 | .781% | 10 KiB | 26 KiB | 58 KiB |
/// | 8 | .391% | 18 KiB | 50 KiB | 114 KiB |
/// | 9 | .195% | 32 KiB | 96 KiB | 224 KiB |
/// | 10 | .098% | 56 KiB | 184 KiB | 440 KiB |
/// | 11 | .049% | 96 KiB | 352 KiB | 864 KiB |
/// | 12 | .025% | 160 KiB | 672 KiB | 1.7 MiB |
///
/// # Constraints:
/// * `max_value_power` must be in the range `0..=64`
/// * `max_value_power` must be greater than `grouping_power
#[derive(Clone, Copy, Debug, PartialEq)]
pub struct Config {
max: u64,
grouping_power: u8,
max_value_power: u8,
cutoff_power: u8,
cutoff_value: u64,
lower_bin_count: u32,
upper_bin_divisions: u32,
upper_bin_count: u32,
}

impl Config {
/// Create a new histogram `Config` from the parameters. See the struct
/// documentation [`crate::Config`] for the meaning of the parameters and
/// their constraints.
pub const fn new(grouping_power: u8, max_value_power: u8) -> Result<Self, &'static str> {
// we only allow values up to 2^64
if max_value_power > 64 {
return Err("max_value_power too high");
}

// check that the other parameters make sense together
if grouping_power >= max_value_power {
return Err("max_value_power too low");
}

// the cutoff is the point at which the linear range divisions and the
// logarithmic range subdivisions diverge.
//
// for example:
// when a = 0, the linear range has bins with width 1.
// if b = 7 the logarithmic range has 128 subdivisions.
// this means that for 0..128 we must be representing the values exactly
// but we also represent 128..256 exactly since the subdivisions divide
// that range into bins with the same width as the linear portion.
//
// therefore our cutoff power = a + b + 1

// note: because a + b must be less than n which is a u8, a + b + 1 must
// be less than or equal to u8::MAX. This means our cutoff power will
// always fit in a u8
let cutoff_power = grouping_power + 1;
let cutoff_value = 2_u64.pow(cutoff_power as u32);
let lower_bin_width = 2_u32.pow(0);
let upper_bin_divisions = 2_u32.pow(grouping_power as u32);

let max = if max_value_power == 64 {
u64::MAX
} else {
2_u64.pow(max_value_power as u32)
};

let lower_bin_count = (cutoff_value / lower_bin_width as u64) as u32;
let upper_bin_count = (max_value_power - cutoff_power) as u32 * upper_bin_divisions;

Ok(Self {
max,
grouping_power,
max_value_power,
cutoff_power,
cutoff_value,
lower_bin_count,
upper_bin_divisions,
upper_bin_count,
})
}

/// Returns the grouping power that was used to create this configuration.
pub const fn grouping_power(&self) -> u8 {
self.grouping_power
}

/// Returns the max value power that was used to create this configuration.
pub const fn max_value_power(&self) -> u8 {
self.max_value_power
}

/// Returns the relative error (in percentage) of this configuration. This
/// only applies to the logarithmic bins of the histogram (linear bins have
/// a width of 1 and no error). For histograms with no logarithmic bins,
/// error for the entire histogram is zero.
pub fn error(&self) -> f64 {
match self.grouping_power == self.max_value_power - 1 {
true => 0.0,
false => 100.0 / 2_u64.pow(self.grouping_power as u32) as f64,
}
}

/// Return the total number of buckets needed for this config.
pub const fn total_buckets(&self) -> usize {
(self.lower_bin_count + self.upper_bin_count) as usize
}

/// Converts a value to a bucket index. Returns an error if the value is
/// outside of the range for the config.
pub(crate) fn value_to_index(&self, value: u64) -> Result<usize, &'static str> {
if value < self.cutoff_value {
return Ok(value as usize);
}

if value > self.max {
return Err("value out of range for histogram");
}

let power = 63 - value.leading_zeros();
let log_bin = power - self.cutoff_power as u32;
let offset = (value - (1 << power)) >> (power - self.grouping_power as u32);

Ok((self.lower_bin_count + log_bin * self.upper_bin_divisions + offset as u32) as usize)
}

/// Convert a bucket index to a lower bound.
pub(crate) fn index_to_lower_bound(&self, index: usize) -> u64 {
let g = index as u64 >> self.grouping_power;
let h = index as u64 - g * (1 << self.grouping_power);

if g < 1 {
h
} else {
(1 << (self.grouping_power as u64 + g - 1)) + (1 << (g - 1)) * h
}
}

/// Convert a bucket index to a upper inclusive bound.
#[allow(dead_code)]
pub(crate) fn index_to_upper_bound(&self, index: usize) -> u64 {
if index as u32 == self.lower_bin_count + self.upper_bin_count - 1 {
return self.max;
}
let g = index as u64 >> self.grouping_power;
let h = index as u64 - g * (1 << self.grouping_power) + 1;

if g < 1 {
h - 1
} else {
(1 << (self.grouping_power as u64 + g - 1)) + (1 << (g - 1)) * h - 1
}
}

/// Convert a bucket index to a range.
#[allow(dead_code)]
pub(crate) fn index_to_range(&self, index: usize) -> RangeInclusive<u64> {
self.index_to_lower_bound(index)..=self.index_to_upper_bound(index)
}
}
173 changes: 173 additions & 0 deletions scylla/src/transport/histogram/lock_free_histogram.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
use std::hint;
/// This file was inspired by a lock-free histogram implementation
/// from the Prometheus library in Go.
/// https://github.com/prometheus/client_golang/blob/main/prometheus/histogram.go
/// Note: in the current implementation, the histogram *may* incur a data race
/// after (1 << 63) increments (which is quite a lot).
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};

use super::Config;

const ORDER_TYPE: Ordering = Ordering::Relaxed;
const HIGH_BIT: u64 = 1 << 63;
const LOW_MASK: u64 = HIGH_BIT - 1;

#[derive(Debug)]
pub struct Histogram {
/// Hot index - index of the hot pool.
/// Count - number of all started observations.
/// Use of both of these variables in one AtomicU64
/// allows use of a lock-free algorithm.
hot_idx_and_count: AtomicU64,
/// Finished observations for each bucket pool.
pool_counts: [AtomicU64; 2],
/// Two bucket pools: hot and cold.
bucket_pools: [Box<[AtomicU64]>; 2],
/// Mutex for "writers" (logging operations)
/// (logging time is not as critical).
logger_mutex: Arc<Mutex<i8>>,
/// Standard configuration for math operations.
config: Config,
}

impl Histogram {
pub fn new() -> Self {
let grouping_power = 7;
let max_value_power = 64;
let config = Config::new(grouping_power, max_value_power)
.expect("default histogram construction failure");

Self::with_config(&config)
}

pub fn with_config(config: &Config) -> Self {
let mut buckets1 = Vec::with_capacity(config.total_buckets());
buckets1.resize_with(config.total_buckets(), || AtomicU64::new(0));
let mut buckets2 = Vec::with_capacity(config.total_buckets());
buckets2.resize_with(config.total_buckets(), || AtomicU64::new(0));

Self {
hot_idx_and_count: AtomicU64::new(0),
pool_counts: [AtomicU64::new(0), AtomicU64::new(0)],
bucket_pools: [buckets1.into(), buckets2.into()],
logger_mutex: Arc::new(Mutex::new(0)),
config: *config,
}
}

pub fn increment(&self, value: u64) -> Result<(), &'static str> {
// Increment started observations count.
let n = self.hot_idx_and_count.fetch_add(1, ORDER_TYPE);
let hot_idx = (n >> 63) as usize;

// Increment the corresponding bucket value.
let idx = self.config.value_to_index(value)?;
self.bucket_pools[hot_idx][idx].fetch_add(1, ORDER_TYPE);

// Increment finished observations count.
self.pool_counts[hot_idx].fetch_add(1, ORDER_TYPE);
Ok(())
}

pub fn mean() -> impl FnOnce(&[AtomicU64], &Config) -> Result<u64, &'static str> {
|buckets, config| {
let total_count = Histogram::get_total_count(buckets);

let mut weighted_sum = 0;
for (i, bucket) in buckets.iter().enumerate() {
// Note: we choose index_to_lower_bound here
// but that choice is arbitrary.
weighted_sum +=
bucket.load(ORDER_TYPE) as u128 * config.index_to_lower_bound(i) as u128;
}
Ok((weighted_sum / total_count) as u64)
}
}

pub fn percentile(
percentile: f64,
) -> impl FnOnce(&[AtomicU64], &Config) -> Result<u64, &'static str> {
move |buckets, config| {
if !(0.0..100.0).contains(&percentile) {
return Err("percentile out of bounds");
}

let total_count = Histogram::get_total_count(buckets);
let count = (percentile / 100.0 * total_count as f64).ceil() as u128;

let mut pref_sum = 0;
for (i, bucket) in buckets.iter().enumerate() {
if pref_sum >= count {
// Note: we choose index_to_lower_bound here and after the loop
// but that choice is arbitrary.
return Ok(config.index_to_lower_bound(i));
}
pref_sum += bucket.load(ORDER_TYPE) as u128;
}
Ok(config.index_to_lower_bound(buckets.len() - 1))
}
}

pub fn get_total_count(buckets: &[AtomicU64]) -> u128 {
buckets.iter().map(|v| v.load(ORDER_TYPE) as u128).sum()
}

pub fn log_operation<T>(
&self,
f: impl FnOnce(&[AtomicU64], &Config) -> Result<T, &'static str>,
) -> Result<T, &'static str> {
// Lock the "writers" mutex.
let _guard = self.logger_mutex.lock();
if _guard.is_err() {
return Err("couldn't lock the logger mutex");
}
let _guard = _guard.unwrap();

// Switch the hot-cold index (wrapping add on highest bit).
let n = self.hot_idx_and_count.fetch_add(HIGH_BIT, ORDER_TYPE);
let started_count = n & LOW_MASK;

let hot_idx = (n >> 63) as usize;
let cold_idx = 1 - hot_idx;

let hot_counts = &self.pool_counts[hot_idx];
let cold_counts = &self.pool_counts[cold_idx];

// Wait until the old hot observers (now working on the currently
// cold bucket pool) finish their job.
// (Since observer's job is fast, we can wait in a spin loop).
while started_count != cold_counts.load(ORDER_TYPE) {
hint::spin_loop();
}

// Now there are no active observers on the cold pool, so we can safely
// access the data without a logical race.
let result = f(&self.bucket_pools[cold_idx], &self.config);

// Compund the cold histogram results onto the already running hot ones.
// Note that no logging operation can run now as we still hold
// the mutex, so it doesn't matter that the entire operation isn't atomic.

// Update finished observations' counts.
hot_counts.fetch_add(cold_counts.load(ORDER_TYPE), ORDER_TYPE);
cold_counts.store(0, ORDER_TYPE);

// Update bucket values (both pools have the same length).
for i in 0..self.bucket_pools[0].len() {
let hot_bucket = &self.bucket_pools[hot_idx][i];
let cold_bucket = &self.bucket_pools[cold_idx][i];

hot_bucket.fetch_add(cold_bucket.load(ORDER_TYPE), ORDER_TYPE);
cold_bucket.store(0, ORDER_TYPE);
}

result
}
}

impl Default for Histogram {
fn default() -> Self {
Histogram::new()
}
}
5 changes: 5 additions & 0 deletions scylla/src/transport/histogram/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
mod config;
mod lock_free_histogram;

pub use config::Config;
pub use lock_free_histogram::Histogram;
Loading

0 comments on commit d983263

Please sign in to comment.