Skip to content

Commit

Permalink
Merge pull request #79 from moka-rs/manage-tombstones
Browse files Browse the repository at this point in the history
Reduce memory overhead in the cht
  • Loading branch information
tatsuya6502 authored Feb 1, 2022
2 parents d4d9d9c + 80d5a87 commit 09b3f31
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 18 deletions.
69 changes: 61 additions & 8 deletions src/cht/map/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,24 @@ use std::{
hash::{BuildHasher, Hash, Hasher},
mem::{self, MaybeUninit},
ptr,
sync::atomic::{self, Ordering},
sync::atomic::{self, AtomicUsize, Ordering},
};

use crossbeam_epoch::{Atomic, CompareAndSetError, Guard, Owned, Shared};

pub(crate) const BUCKET_ARRAY_DEFAULT_LENGTH: usize = 128;

pub(crate) struct BucketArray<K, V> {
pub(crate) buckets: Box<[Atomic<Bucket<K, V>>]>,
pub(crate) next: Atomic<BucketArray<K, V>>,
pub(crate) epoch: usize,
pub(crate) tombstone_count: AtomicUsize,
}

impl<K, V> Default for BucketArray<K, V> {
fn default() -> Self {
Self::with_length(0, BUCKET_ARRAY_DEFAULT_LENGTH)
}
}

impl<K, V> BucketArray<K, V> {
Expand All @@ -30,6 +39,7 @@ impl<K, V> BucketArray<K, V> {
buckets,
next: Atomic::null(),
epoch,
tombstone_count: Default::default(),
}
}

Expand Down Expand Up @@ -275,12 +285,12 @@ impl<'g, K: 'g, V: 'g> BucketArray<K, V> {
&self,
guard: &'g Guard,
build_hasher: &H,
rehash_op: RehashOp,
) -> &'g BucketArray<K, V>
where
K: Hash + Eq,
{
let next_array = self.next_array(guard);
assert!(self.buckets.len() <= next_array.buckets.len());
let next_array = self.next_array(guard, rehash_op);

for this_bucket in self.buckets.iter() {
let mut maybe_state: Option<(usize, Shared<'g, Bucket<K, V>>)> = None;
Expand Down Expand Up @@ -329,6 +339,7 @@ impl<'g, K: 'g, V: 'g> BucketArray<K, V> {
)
.is_ok()
{
// TODO: If else, we may need to count tombstone.
if !this_bucket_ptr.is_null()
&& this_bucket_ptr.tag() & TOMBSTONE_TAG != 0
&& maybe_state.is_none()
Expand All @@ -344,7 +355,7 @@ impl<'g, K: 'g, V: 'g> BucketArray<K, V> {
next_array
}

fn next_array(&self, guard: &'g Guard) -> &'g BucketArray<K, V> {
fn next_array(&self, guard: &'g Guard, rehash_op: RehashOp) -> &'g BucketArray<K, V> {
let mut maybe_new_next = None;

loop {
Expand All @@ -354,11 +365,9 @@ impl<'g, K: 'g, V: 'g> BucketArray<K, V> {
return next_ref;
}

let new_length = rehash_op.new_len(self.buckets.len());
let new_next = maybe_new_next.unwrap_or_else(|| {
Owned::new(BucketArray::with_length(
self.epoch + 1,
self.buckets.len() * 2,
))
Owned::new(BucketArray::with_length(self.epoch + 1, new_length))
});

match self.next.compare_and_set_weak(
Expand Down Expand Up @@ -549,6 +558,50 @@ pub(crate) unsafe fn defer_acquire_destroy<'g, T>(guard: &'g Guard, ptr: Shared<
});
}

#[derive(Clone, Copy)]
pub(crate) enum RehashOp {
Expand,
Shrink,
GcOnly,
Skip,
}

impl RehashOp {
pub(crate) fn new(cap: usize, tombstone_count: &AtomicUsize, len: &AtomicUsize) -> Self {
let real_cap = cap as f64 * 2.0;
let quarter_cap = real_cap / 4.0;
let tbc = tombstone_count.load(Ordering::Relaxed) as f64;
let len = len.load(Ordering::Relaxed) as f64;

if tbc / real_cap >= 0.1 {
if len - tbc < quarter_cap && quarter_cap as usize >= BUCKET_ARRAY_DEFAULT_LENGTH {
return Self::Shrink;
} else {
return Self::GcOnly;
}
}

if len > real_cap * 0.7 {
return Self::Expand;
}

Self::Skip
}

pub(crate) fn is_skip(&self) -> bool {
matches!(self, &Self::Skip)
}

fn new_len(&self, current_len: usize) -> usize {
match self {
Self::Expand => current_len * 2,
Self::Shrink => current_len / 2,
Self::GcOnly => current_len,
Self::Skip => unreachable!(),
}
}
}

pub(crate) const SENTINEL_TAG: usize = 0b001; // set on old table buckets when copied into a new table
pub(crate) const TOMBSTONE_TAG: usize = 0b010; // set when the value has been destroyed
pub(crate) const BORROWED_TAG: usize = 0b100; // set on new table buckets when copied from an old table
Expand Down
44 changes: 34 additions & 10 deletions src/cht/map/bucket_array_ref.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::bucket::{self, Bucket, BucketArray, InsertOrModifyState};
use super::bucket::{self, Bucket, BucketArray, InsertOrModifyState, RehashOp};

use std::{
borrow::Borrow,
Expand Down Expand Up @@ -49,7 +49,8 @@ impl<'a, K: Hash + Eq, V, S: BuildHasher> BucketArrayRef<'a, K, V, S> {
break;
}
Err(_) => {
bucket_array_ref = bucket_array_ref.rehash(guard, self.build_hasher);
bucket_array_ref =
bucket_array_ref.rehash(guard, self.build_hasher, RehashOp::Expand);
}
}
}
Expand Down Expand Up @@ -81,6 +82,18 @@ impl<'a, K: Hash + Eq, V, S: BuildHasher> BucketArrayRef<'a, K, V, S> {
let result;

loop {
loop {
let rehash_op = RehashOp::new(
bucket_array_ref.capacity(),
&bucket_array_ref.tombstone_count,
self.len,
);
if rehash_op.is_skip() {
break;
}
bucket_array_ref = bucket_array_ref.rehash(guard, self.build_hasher, rehash_op);
}

match bucket_array_ref.remove_if(guard, hash, key, condition) {
Ok(previous_bucket_ptr) => {
if let Some(previous_bucket_ref) = unsafe { previous_bucket_ptr.as_ref() } {
Expand All @@ -89,6 +102,9 @@ impl<'a, K: Hash + Eq, V, S: BuildHasher> BucketArrayRef<'a, K, V, S> {
maybe_value: value,
} = previous_bucket_ref;
self.len.fetch_sub(1, Ordering::Relaxed);
bucket_array_ref
.tombstone_count
.fetch_add(1, Ordering::Relaxed);
result = Some(with_previous_entry(key, unsafe { &*value.as_ptr() }));

unsafe { bucket::defer_destroy_tombstone(guard, previous_bucket_ptr) };
Expand All @@ -100,7 +116,8 @@ impl<'a, K: Hash + Eq, V, S: BuildHasher> BucketArrayRef<'a, K, V, S> {
}
Err(c) => {
condition = c;
bucket_array_ref = bucket_array_ref.rehash(guard, self.build_hasher);
bucket_array_ref =
bucket_array_ref.rehash(guard, self.build_hasher, RehashOp::Expand);
}
}
}
Expand Down Expand Up @@ -131,8 +148,16 @@ impl<'a, K: Hash + Eq, V, S: BuildHasher> BucketArrayRef<'a, K, V, S> {
let result;

loop {
while self.len.load(Ordering::Relaxed) > bucket_array_ref.capacity() {
bucket_array_ref = bucket_array_ref.rehash(guard, self.build_hasher);
loop {
let rehash_op = RehashOp::new(
bucket_array_ref.capacity(),
&bucket_array_ref.tombstone_count,
self.len,
);
if rehash_op.is_skip() {
break;
}
bucket_array_ref = bucket_array_ref.rehash(guard, self.build_hasher, rehash_op);
}

match bucket_array_ref.insert_or_modify(guard, hash, state, on_modify) {
Expand Down Expand Up @@ -160,7 +185,8 @@ impl<'a, K: Hash + Eq, V, S: BuildHasher> BucketArrayRef<'a, K, V, S> {
Err((s, f)) => {
state = s;
on_modify = f;
bucket_array_ref = bucket_array_ref.rehash(guard, self.build_hasher);
bucket_array_ref =
bucket_array_ref.rehash(guard, self.build_hasher, RehashOp::Expand);
}
}
}
Expand All @@ -173,8 +199,6 @@ impl<'a, K: Hash + Eq, V, S: BuildHasher> BucketArrayRef<'a, K, V, S> {

impl<'a, 'g, K, V, S> BucketArrayRef<'a, K, V, S> {
fn get(&self, guard: &'g Guard) -> &'g BucketArray<K, V> {
const DEFAULT_LENGTH: usize = 128;

let mut maybe_new_bucket_array = None;

loop {
Expand All @@ -184,8 +208,8 @@ impl<'a, 'g, K, V, S> BucketArrayRef<'a, K, V, S> {
return bucket_array_ref;
}

let new_bucket_array = maybe_new_bucket_array
.unwrap_or_else(|| Owned::new(BucketArray::with_length(0, DEFAULT_LENGTH)));
let new_bucket_array =
maybe_new_bucket_array.unwrap_or_else(|| Owned::new(BucketArray::default()));

match self.bucket_array.compare_and_set_weak(
Shared::null(),
Expand Down

0 comments on commit 09b3f31

Please sign in to comment.