Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce memory overhead in the cht #79

Merged
merged 1 commit into from
Feb 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 61 additions & 8 deletions src/cht/map/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,24 @@ use std::{
hash::{BuildHasher, Hash, Hasher},
mem::{self, MaybeUninit},
ptr,
sync::atomic::{self, Ordering},
sync::atomic::{self, AtomicUsize, Ordering},
};

use crossbeam_epoch::{Atomic, CompareAndSetError, Guard, Owned, Shared};

pub(crate) const BUCKET_ARRAY_DEFAULT_LENGTH: usize = 128;

pub(crate) struct BucketArray<K, V> {
pub(crate) buckets: Box<[Atomic<Bucket<K, V>>]>,
pub(crate) next: Atomic<BucketArray<K, V>>,
pub(crate) epoch: usize,
pub(crate) tombstone_count: AtomicUsize,
}

impl<K, V> Default for BucketArray<K, V> {
fn default() -> Self {
Self::with_length(0, BUCKET_ARRAY_DEFAULT_LENGTH)
}
}

impl<K, V> BucketArray<K, V> {
Expand All @@ -30,6 +39,7 @@ impl<K, V> BucketArray<K, V> {
buckets,
next: Atomic::null(),
epoch,
tombstone_count: Default::default(),
}
}

Expand Down Expand Up @@ -275,12 +285,12 @@ impl<'g, K: 'g, V: 'g> BucketArray<K, V> {
&self,
guard: &'g Guard,
build_hasher: &H,
rehash_op: RehashOp,
) -> &'g BucketArray<K, V>
where
K: Hash + Eq,
{
let next_array = self.next_array(guard);
assert!(self.buckets.len() <= next_array.buckets.len());
let next_array = self.next_array(guard, rehash_op);

for this_bucket in self.buckets.iter() {
let mut maybe_state: Option<(usize, Shared<'g, Bucket<K, V>>)> = None;
Expand Down Expand Up @@ -329,6 +339,7 @@ impl<'g, K: 'g, V: 'g> BucketArray<K, V> {
)
.is_ok()
{
// TODO: If else, we may need to count tombstone.
if !this_bucket_ptr.is_null()
&& this_bucket_ptr.tag() & TOMBSTONE_TAG != 0
&& maybe_state.is_none()
Expand All @@ -344,7 +355,7 @@ impl<'g, K: 'g, V: 'g> BucketArray<K, V> {
next_array
}

fn next_array(&self, guard: &'g Guard) -> &'g BucketArray<K, V> {
fn next_array(&self, guard: &'g Guard, rehash_op: RehashOp) -> &'g BucketArray<K, V> {
let mut maybe_new_next = None;

loop {
Expand All @@ -354,11 +365,9 @@ impl<'g, K: 'g, V: 'g> BucketArray<K, V> {
return next_ref;
}

let new_length = rehash_op.new_len(self.buckets.len());
let new_next = maybe_new_next.unwrap_or_else(|| {
Owned::new(BucketArray::with_length(
self.epoch + 1,
self.buckets.len() * 2,
))
Owned::new(BucketArray::with_length(self.epoch + 1, new_length))
});

match self.next.compare_and_set_weak(
Expand Down Expand Up @@ -549,6 +558,50 @@ pub(crate) unsafe fn defer_acquire_destroy<'g, T>(guard: &'g Guard, ptr: Shared<
});
}

#[derive(Clone, Copy)]
pub(crate) enum RehashOp {
Expand,
Shrink,
GcOnly,
Skip,
}

impl RehashOp {
pub(crate) fn new(cap: usize, tombstone_count: &AtomicUsize, len: &AtomicUsize) -> Self {
let real_cap = cap as f64 * 2.0;
let quarter_cap = real_cap / 4.0;
let tbc = tombstone_count.load(Ordering::Relaxed) as f64;
let len = len.load(Ordering::Relaxed) as f64;

if tbc / real_cap >= 0.1 {
if len - tbc < quarter_cap && quarter_cap as usize >= BUCKET_ARRAY_DEFAULT_LENGTH {
return Self::Shrink;
} else {
return Self::GcOnly;
}
}

if len > real_cap * 0.7 {
return Self::Expand;
}

Self::Skip
}

pub(crate) fn is_skip(&self) -> bool {
matches!(self, &Self::Skip)
}

fn new_len(&self, current_len: usize) -> usize {
match self {
Self::Expand => current_len * 2,
Self::Shrink => current_len / 2,
Self::GcOnly => current_len,
Self::Skip => unreachable!(),
}
}
}

pub(crate) const SENTINEL_TAG: usize = 0b001; // set on old table buckets when copied into a new table
pub(crate) const TOMBSTONE_TAG: usize = 0b010; // set when the value has been destroyed
pub(crate) const BORROWED_TAG: usize = 0b100; // set on new table buckets when copied from an old table
Expand Down
44 changes: 34 additions & 10 deletions src/cht/map/bucket_array_ref.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::bucket::{self, Bucket, BucketArray, InsertOrModifyState};
use super::bucket::{self, Bucket, BucketArray, InsertOrModifyState, RehashOp};

use std::{
borrow::Borrow,
Expand Down Expand Up @@ -49,7 +49,8 @@ impl<'a, K: Hash + Eq, V, S: BuildHasher> BucketArrayRef<'a, K, V, S> {
break;
}
Err(_) => {
bucket_array_ref = bucket_array_ref.rehash(guard, self.build_hasher);
bucket_array_ref =
bucket_array_ref.rehash(guard, self.build_hasher, RehashOp::Expand);
}
}
}
Expand Down Expand Up @@ -81,6 +82,18 @@ impl<'a, K: Hash + Eq, V, S: BuildHasher> BucketArrayRef<'a, K, V, S> {
let result;

loop {
loop {
let rehash_op = RehashOp::new(
bucket_array_ref.capacity(),
&bucket_array_ref.tombstone_count,
self.len,
);
if rehash_op.is_skip() {
break;
}
bucket_array_ref = bucket_array_ref.rehash(guard, self.build_hasher, rehash_op);
}

match bucket_array_ref.remove_if(guard, hash, key, condition) {
Ok(previous_bucket_ptr) => {
if let Some(previous_bucket_ref) = unsafe { previous_bucket_ptr.as_ref() } {
Expand All @@ -89,6 +102,9 @@ impl<'a, K: Hash + Eq, V, S: BuildHasher> BucketArrayRef<'a, K, V, S> {
maybe_value: value,
} = previous_bucket_ref;
self.len.fetch_sub(1, Ordering::Relaxed);
bucket_array_ref
.tombstone_count
.fetch_add(1, Ordering::Relaxed);
result = Some(with_previous_entry(key, unsafe { &*value.as_ptr() }));

unsafe { bucket::defer_destroy_tombstone(guard, previous_bucket_ptr) };
Expand All @@ -100,7 +116,8 @@ impl<'a, K: Hash + Eq, V, S: BuildHasher> BucketArrayRef<'a, K, V, S> {
}
Err(c) => {
condition = c;
bucket_array_ref = bucket_array_ref.rehash(guard, self.build_hasher);
bucket_array_ref =
bucket_array_ref.rehash(guard, self.build_hasher, RehashOp::Expand);
}
}
}
Expand Down Expand Up @@ -131,8 +148,16 @@ impl<'a, K: Hash + Eq, V, S: BuildHasher> BucketArrayRef<'a, K, V, S> {
let result;

loop {
while self.len.load(Ordering::Relaxed) > bucket_array_ref.capacity() {
bucket_array_ref = bucket_array_ref.rehash(guard, self.build_hasher);
loop {
let rehash_op = RehashOp::new(
bucket_array_ref.capacity(),
&bucket_array_ref.tombstone_count,
self.len,
);
if rehash_op.is_skip() {
break;
}
bucket_array_ref = bucket_array_ref.rehash(guard, self.build_hasher, rehash_op);
}

match bucket_array_ref.insert_or_modify(guard, hash, state, on_modify) {
Expand Down Expand Up @@ -160,7 +185,8 @@ impl<'a, K: Hash + Eq, V, S: BuildHasher> BucketArrayRef<'a, K, V, S> {
Err((s, f)) => {
state = s;
on_modify = f;
bucket_array_ref = bucket_array_ref.rehash(guard, self.build_hasher);
bucket_array_ref =
bucket_array_ref.rehash(guard, self.build_hasher, RehashOp::Expand);
}
}
}
Expand All @@ -173,8 +199,6 @@ impl<'a, K: Hash + Eq, V, S: BuildHasher> BucketArrayRef<'a, K, V, S> {

impl<'a, 'g, K, V, S> BucketArrayRef<'a, K, V, S> {
fn get(&self, guard: &'g Guard) -> &'g BucketArray<K, V> {
const DEFAULT_LENGTH: usize = 128;

let mut maybe_new_bucket_array = None;

loop {
Expand All @@ -184,8 +208,8 @@ impl<'a, 'g, K, V, S> BucketArrayRef<'a, K, V, S> {
return bucket_array_ref;
}

let new_bucket_array = maybe_new_bucket_array
.unwrap_or_else(|| Owned::new(BucketArray::with_length(0, DEFAULT_LENGTH)));
let new_bucket_array =
maybe_new_bucket_array.unwrap_or_else(|| Owned::new(BucketArray::default()));

match self.bucket_array.compare_and_set_weak(
Shared::null(),
Expand Down