From 530889fba3889a5bf30ceada5a4b7c5b6a66013c Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Sat, 18 Jun 2022 22:59:39 +0800 Subject: [PATCH 1/4] Upgrade crossbeam-epoch from v0.8.2 to v0.9.9 --- Cargo.toml | 8 ++----- src/cht/map/bucket.rs | 39 +++++++++++++++++++-------------- src/cht/map/bucket_array_ref.rs | 14 +++++++----- 3 files changed, 33 insertions(+), 28 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index bf1c098f..9316d8c3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,7 +38,8 @@ atomic64 = [] unstable-debug-counters = ["future"] [dependencies] -crossbeam-channel = "0.5.4" +crossbeam-channel = "0.5.5" +crossbeam-epoch = "0.9.9" crossbeam-utils = "0.8" num_cpus = "1.13" once_cell = "1.7" @@ -49,11 +50,6 @@ tagptr = "0.2" thiserror = "1.0" uuid = { version = "1.1", features = ["v4"] } -# Although v0.8.2 is not the current version (v0.9.x), we will keep using it until -# we perform enough tests to get conformable with memory safety. -# See: https://github.com/moka-rs/moka/issues/34 -crossbeam-epoch = "0.8.2" - # Opt-out serde and stable_deref_trait features # https://github.com/Manishearth/triomphe/pull/5 triomphe = { version = "0.1", default-features = false } diff --git a/src/cht/map/bucket.rs b/src/cht/map/bucket.rs index e220bcf2..0005ef32 100644 --- a/src/cht/map/bucket.rs +++ b/src/cht/map/bucket.rs @@ -9,7 +9,7 @@ use std::{ #[cfg(feature = "unstable-debug-counters")] use crate::common::concurrent::debug_counters; -use crossbeam_epoch::{Atomic, CompareAndSetError, Guard, Owned, Shared}; +use crossbeam_epoch::{Atomic, CompareExchangeError, Guard, Owned, Shared}; pub(crate) const BUCKET_ARRAY_DEFAULT_LENGTH: usize = 128; @@ -154,10 +154,11 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray { let new_bucket_ptr = this_bucket_ptr.with_tag(TOMBSTONE_TAG); - match this_bucket.compare_and_set_weak( + match this_bucket.compare_exchange( this_bucket_ptr, new_bucket_ptr, - (Ordering::Release, Ordering::Relaxed), + Ordering::Release, + Ordering::Relaxed, guard, ) { // Succeeded. Return the removed value. (can be null) @@ -207,10 +208,11 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray { let new_bucket = state.into_insert_bucket(); - if let Err(CompareAndSetError { new, .. }) = this_bucket.compare_and_set_weak( + if let Err(CompareExchangeError { new, .. }) = this_bucket.compare_exchange( this_bucket_ptr, new_bucket, - (Ordering::Release, Ordering::Relaxed), + Ordering::Release, + Ordering::Relaxed, guard, ) { maybe_state = Some(InsertOrModifyState::from_bucket_value(new, None)); @@ -272,10 +274,11 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray { (state.into_insert_bucket(), None) }; - if let Err(CompareAndSetError { new, .. }) = this_bucket.compare_and_set_weak( + if let Err(CompareExchangeError { new, .. }) = this_bucket.compare_exchange( this_bucket_ptr, new_bucket, - (Ordering::Release, Ordering::Relaxed), + Ordering::Release, + Ordering::Relaxed, guard, ) { // Failed. Reload to retry. @@ -321,10 +324,11 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray { if this_bucket_ptr.is_null() && is_tombstone(bucket_ptr) { ProbeLoopAction::Return(None) } else if this_bucket - .compare_and_set_weak( + .compare_exchange( this_bucket_ptr, bucket_ptr, - (Ordering::Release, Ordering::Relaxed), + Ordering::Release, + Ordering::Relaxed, guard, ) .is_ok() @@ -427,10 +431,11 @@ impl<'g, K: 'g, V: 'g> BucketArray { while is_borrowed(next_bucket_ptr) && next_bucket - .compare_and_set_weak( + .compare_exchange( next_bucket_ptr, to_put_ptr, - (Ordering::Release, Ordering::Relaxed), + Ordering::Release, + Ordering::Relaxed, guard, ) .is_err() @@ -447,10 +452,11 @@ impl<'g, K: 'g, V: 'g> BucketArray { } if this_bucket - .compare_and_set_weak( + .compare_exchange( this_bucket_ptr, Shared::null().with_tag(SENTINEL_TAG), - (Ordering::Release, Ordering::Relaxed), + Ordering::Release, + Ordering::Relaxed, guard, ) .is_ok() @@ -486,14 +492,15 @@ impl<'g, K: 'g, V: 'g> BucketArray { Owned::new(BucketArray::with_length(self.epoch + 1, new_length)) }); - match self.next.compare_and_set_weak( + match self.next.compare_exchange( Shared::null(), new_next, - (Ordering::Release, Ordering::Relaxed), + Ordering::Release, + Ordering::Relaxed, guard, ) { Ok(p) => return unsafe { p.deref() }, - Err(CompareAndSetError { new, .. }) => { + Err(CompareExchangeError { new, .. }) => { maybe_new_next = Some(new); } } diff --git a/src/cht/map/bucket_array_ref.rs b/src/cht/map/bucket_array_ref.rs index 9937e0d5..9cd59c77 100644 --- a/src/cht/map/bucket_array_ref.rs +++ b/src/cht/map/bucket_array_ref.rs @@ -6,7 +6,7 @@ use std::{ sync::atomic::{AtomicUsize, Ordering}, }; -use crossbeam_epoch::{Atomic, CompareAndSetError, Guard, Owned, Shared}; +use crossbeam_epoch::{Atomic, CompareExchangeError, Guard, Owned, Shared}; pub(crate) struct BucketArrayRef<'a, K, V, S> { pub(crate) bucket_array: &'a Atomic>, @@ -310,14 +310,15 @@ impl<'a, 'g, K, V, S> BucketArrayRef<'a, K, V, S> { let new_bucket_array = maybe_new_bucket_array.unwrap_or_else(|| Owned::new(BucketArray::default())); - match self.bucket_array.compare_and_set_weak( + match self.bucket_array.compare_exchange( Shared::null(), new_bucket_array, - (Ordering::Release, Ordering::Relaxed), + Ordering::Release, + Ordering::Relaxed, guard, ) { Ok(b) => return unsafe { b.as_ref() }.unwrap(), - Err(CompareAndSetError { new, .. }) => maybe_new_bucket_array = Some(new), + Err(CompareExchangeError { new, .. }) => maybe_new_bucket_array = Some(new), } } } @@ -338,10 +339,11 @@ impl<'a, 'g, K, V, S> BucketArrayRef<'a, K, V, S> { return; } - match self.bucket_array.compare_and_set_weak( + match self.bucket_array.compare_exchange( current_ptr, min_ptr, - (Ordering::Release, Ordering::Relaxed), + Ordering::Release, + Ordering::Relaxed, guard, ) { Ok(_) => unsafe { bucket::defer_acquire_destroy(guard, current_ptr) }, From fc044d6529b76b5e052c4d0aa708b340d0fc7383 Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Tue, 19 Jul 2022 17:55:58 +0800 Subject: [PATCH 2/4] Prevent segmentation fault in `sync` and `future` caches (#34) - Add a lock to the rehash function of the concurrent hash table (`moka::cht`) to ensure only one thread can participate rehashing at a time. - To prevent potential inconsistency issues in non x86 based systems, strengthen the memory ordering used for `compare_exchange_weak` (`Release` to `AcqRel`). --- src/cht/map/bucket.rs | 53 +++++++++++++++++++++---------- src/cht/map/bucket_array_ref.rs | 55 +++++++++++++++++++++++---------- 2 files changed, 74 insertions(+), 34 deletions(-) diff --git a/src/cht/map/bucket.rs b/src/cht/map/bucket.rs index af2c6120..81a7221a 100644 --- a/src/cht/map/bucket.rs +++ b/src/cht/map/bucket.rs @@ -2,7 +2,10 @@ use std::{ hash::{BuildHasher, Hash, Hasher}, mem::{self, MaybeUninit}, ptr, - sync::atomic::{self, AtomicUsize, Ordering}, + sync::{ + atomic::{self, AtomicUsize, Ordering}, + Arc, Mutex, TryLockError, + }, }; #[cfg(feature = "unstable-debug-counters")] @@ -16,6 +19,7 @@ pub(crate) struct BucketArray { pub(crate) buckets: Box<[Atomic>]>, pub(crate) next: Atomic>, pub(crate) epoch: usize, + pub(crate) rehash_lock: Arc>, pub(crate) tombstone_count: AtomicUsize, } @@ -49,6 +53,7 @@ impl BucketArray { buckets, next: Atomic::null(), epoch, + rehash_lock: Arc::new(Mutex::new(())), tombstone_count: Default::default(), } } @@ -147,10 +152,10 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray { let new_bucket_ptr = this_bucket_ptr.with_tag(TOMBSTONE_TAG); - match this_bucket.compare_exchange( + match this_bucket.compare_exchange_weak( this_bucket_ptr, new_bucket_ptr, - Ordering::Release, + Ordering::AcqRel, Ordering::Relaxed, guard, ) { @@ -201,10 +206,10 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray { let new_bucket = state.into_insert_bucket(); - if let Err(CompareExchangeError { new, .. }) = this_bucket.compare_exchange( + if let Err(CompareExchangeError { new, .. }) = this_bucket.compare_exchange_weak( this_bucket_ptr, new_bucket, - Ordering::Release, + Ordering::AcqRel, Ordering::Relaxed, guard, ) { @@ -267,10 +272,10 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray { (state.into_insert_bucket(), None) }; - if let Err(CompareExchangeError { new, .. }) = this_bucket.compare_exchange( + if let Err(CompareExchangeError { new, .. }) = this_bucket.compare_exchange_weak( this_bucket_ptr, new_bucket, - Ordering::Release, + Ordering::AcqRel, Ordering::Relaxed, guard, ) { @@ -317,10 +322,10 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray { if this_bucket_ptr.is_null() && is_tombstone(bucket_ptr) { ProbeLoopAction::Return(None) } else if this_bucket - .compare_exchange( + .compare_exchange_weak( this_bucket_ptr, bucket_ptr, - Ordering::Release, + Ordering::AcqRel, Ordering::Relaxed, guard, ) @@ -398,11 +403,24 @@ impl<'g, K: 'g, V: 'g> BucketArray { guard: &'g Guard, build_hasher: &H, rehash_op: RehashOp, - ) -> &'g BucketArray + ) -> Option<&'g BucketArray> where K: Hash + Eq, H: BuildHasher, { + // Ensure that the rehashing is not performed concurrently. + let lock; + match self.rehash_lock.try_lock() { + Ok(lk) => lock = lk, + Err(TryLockError::WouldBlock) => { + // Wait until the lock become available. + std::mem::drop(self.rehash_lock.lock()); + // We need to return here to see if rehashing is still needed. + return None; + } + Err(e @ TryLockError::Poisoned(_)) => panic!("{:?}", e), + }; + let next_array = self.next_array(guard, rehash_op); for this_bucket in self.buckets.iter() { @@ -424,10 +442,10 @@ impl<'g, K: 'g, V: 'g> BucketArray { while is_borrowed(next_bucket_ptr) && next_bucket - .compare_exchange( + .compare_exchange_weak( next_bucket_ptr, to_put_ptr, - Ordering::Release, + Ordering::AcqRel, Ordering::Relaxed, guard, ) @@ -445,10 +463,10 @@ impl<'g, K: 'g, V: 'g> BucketArray { } if this_bucket - .compare_exchange( + .compare_exchange_weak( this_bucket_ptr, Shared::null().with_tag(SENTINEL_TAG), - Ordering::Release, + Ordering::AcqRel, Ordering::Relaxed, guard, ) @@ -466,8 +484,9 @@ impl<'g, K: 'g, V: 'g> BucketArray { } } } + std::mem::drop(lock); - next_array + Some(next_array) } fn next_array(&self, guard: &'g Guard, rehash_op: RehashOp) -> &'g BucketArray { @@ -485,10 +504,10 @@ impl<'g, K: 'g, V: 'g> BucketArray { Owned::new(BucketArray::with_length(self.epoch + 1, new_length)) }); - match self.next.compare_exchange( + match self.next.compare_exchange_weak( Shared::null(), new_next, - Ordering::Release, + Ordering::AcqRel, Ordering::Relaxed, guard, ) { diff --git a/src/cht/map/bucket_array_ref.rs b/src/cht/map/bucket_array_ref.rs index d28785c6..761ec45c 100644 --- a/src/cht/map/bucket_array_ref.rs +++ b/src/cht/map/bucket_array_ref.rs @@ -47,8 +47,11 @@ where break; } Err(_) => { - bucket_array_ref = - bucket_array_ref.rehash(guard, self.build_hasher, RehashOp::Expand); + if let Some(r) = + bucket_array_ref.rehash(guard, self.build_hasher, RehashOp::Expand) + { + bucket_array_ref = r; + } } } } @@ -81,7 +84,9 @@ where if rehash_op.is_skip() { break; } - bucket_array_ref = bucket_array_ref.rehash(guard, self.build_hasher, rehash_op); + if let Some(r) = bucket_array_ref.rehash(guard, self.build_hasher, rehash_op) { + bucket_array_ref = r; + } } match bucket_array_ref.remove_if(guard, hash, &mut eq, condition) { @@ -106,8 +111,11 @@ where } Err(c) => { condition = c; - bucket_array_ref = - bucket_array_ref.rehash(guard, self.build_hasher, RehashOp::Expand); + if let Some(r) = + bucket_array_ref.rehash(guard, self.build_hasher, RehashOp::Expand) + { + bucket_array_ref = r; + } } } } @@ -143,7 +151,9 @@ where if rehash_op.is_skip() { break; } - bucket_array_ref = bucket_array_ref.rehash(guard, self.build_hasher, rehash_op); + if let Some(r) = bucket_array_ref.rehash(guard, self.build_hasher, rehash_op) { + bucket_array_ref = r; + } } match bucket_array_ref.insert_if_not_present(guard, hash, state) { @@ -171,8 +181,11 @@ where } Err(s) => { state = s; - bucket_array_ref = - bucket_array_ref.rehash(guard, self.build_hasher, RehashOp::Expand); + if let Some(r) = + bucket_array_ref.rehash(guard, self.build_hasher, RehashOp::Expand) + { + bucket_array_ref = r; + } } } } @@ -207,7 +220,9 @@ where if rehash_op.is_skip() { break; } - bucket_array_ref = bucket_array_ref.rehash(guard, self.build_hasher, rehash_op); + if let Some(r) = bucket_array_ref.rehash(guard, self.build_hasher, rehash_op) { + bucket_array_ref = r; + } } match bucket_array_ref.insert_or_modify(guard, hash, state, on_modify) { @@ -235,8 +250,11 @@ where Err((s, f)) => { state = s; on_modify = f; - bucket_array_ref = - bucket_array_ref.rehash(guard, self.build_hasher, RehashOp::Expand); + if let Some(r) = + bucket_array_ref.rehash(guard, self.build_hasher, RehashOp::Expand) + { + bucket_array_ref = r; + } } } } @@ -260,8 +278,11 @@ where break; } Err(_) => { - bucket_array_ref = - bucket_array_ref.rehash(guard, self.build_hasher, RehashOp::Expand); + if let Some(r) = + bucket_array_ref.rehash(guard, self.build_hasher, RehashOp::Expand) + { + bucket_array_ref = r; + } } } } @@ -286,10 +307,10 @@ impl<'a, 'g, K, V, S> BucketArrayRef<'a, K, V, S> { let new_bucket_array = maybe_new_bucket_array.unwrap_or_else(|| Owned::new(BucketArray::default())); - match self.bucket_array.compare_exchange( + match self.bucket_array.compare_exchange_weak( Shared::null(), new_bucket_array, - Ordering::Release, + Ordering::AcqRel, Ordering::Relaxed, guard, ) { @@ -315,10 +336,10 @@ impl<'a, 'g, K, V, S> BucketArrayRef<'a, K, V, S> { return; } - match self.bucket_array.compare_exchange( + match self.bucket_array.compare_exchange_weak( current_ptr, min_ptr, - Ordering::Release, + Ordering::AcqRel, Ordering::Relaxed, guard, ) { From 3ed73c1977cd1beca2d5de78654b1de6c0f05805 Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Tue, 19 Jul 2022 17:56:30 +0800 Subject: [PATCH 3/4] Bump the version (v0.9.2) --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index a5479bea..d117bd5c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "moka" -version = "0.9.1" +version = "0.9.2" edition = "2018" rust-version = "1.51" From e7731152447bf6c9eea1edba55d39771e8f2eca0 Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Tue, 19 Jul 2022 17:57:10 +0800 Subject: [PATCH 4/4] Update the change log --- CHANGELOG.md | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4bcf1074..a807bfd4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,21 @@ # Moka Cache — Change Log +## Version 0.9.2 + +### Fixed + +- Fix segmentation faults in `sync` and `future` caches under heavy loads on + many-core machine ([#34][gh-issue-0034]): + - NOTE: Although this issue was found in our testing environment 10 months ago + (v0.5.1), no user reported that they had the same issue. + - NOTE: In [v0.8.4](#version-084), we added a mitigation to reduce the chance of + the segfaults occurring. + +### Changed + +- Upgrade crossbeam-epoch from v0.8.2 to v0.9.9 ([#157][gh-pull-0157]). + + ## Version 0.9.1 ### Fixed @@ -427,6 +443,7 @@ The minimum supported Rust version (MSRV) is now 1.51.0 (2021-03-25). [gh-pull-0167]: https://github.com/moka-rs/moka/pull/167/ [gh-pull-0159]: https://github.com/moka-rs/moka/pull/159/ +[gh-pull-0157]: https://github.com/moka-rs/moka/pull/157/ [gh-pull-0145]: https://github.com/moka-rs/moka/pull/145/ [gh-pull-0143]: https://github.com/moka-rs/moka/pull/143/ [gh-pull-0138]: https://github.com/moka-rs/moka/pull/138/