From 52a4e31cb979b40405324c85d7f6499cb080c5e0 Mon Sep 17 00:00:00 2001 From: Jeehoon Kang Date: Mon, 5 Nov 2018 22:27:00 +0900 Subject: [PATCH 1/2] Mark bags with local epoch, expire after 3 epochs --- crossbeam-epoch/src/internal.rs | 94 +++++++++++++++++--------------- crossbeam-skiplist/tests/base.rs | 1 + 2 files changed, 52 insertions(+), 43 deletions(-) diff --git a/crossbeam-epoch/src/internal.rs b/crossbeam-epoch/src/internal.rs index 645511b9c..3e7820ae5 100644 --- a/crossbeam-epoch/src/internal.rs +++ b/crossbeam-epoch/src/internal.rs @@ -227,9 +227,9 @@ unsafe impl Sync for SealedBag {} impl SealedBag { /// Checks if it is safe to drop the bag w.r.t. the given global epoch. fn is_expired(&self, global_epoch: Epoch) -> bool { - // A pinned participant can witness at most one epoch advancement. Therefore, any bag that - // is within one epoch of the current one cannot be destroyed yet. - global_epoch.wrapping_sub(self.epoch) >= 2 + // A pinned participant can witness at most two epoch advancement. Therefore, any bag that + // is within two epoch of the current one cannot be destroyed yet. + global_epoch.wrapping_sub(self.epoch) >= 3 } } @@ -260,12 +260,8 @@ impl Global { } /// Pushes the bag into the global queue and replaces the bag with a new empty bag. - pub fn push_bag(&self, bag: &mut Bag, guard: &Guard) { + pub fn push_bag(&self, bag: &mut Bag, epoch: Epoch, guard: &Guard) { let bag = mem::replace(bag, Bag::new()); - - atomic::fence(Ordering::SeqCst); - - let epoch = self.epoch.load(Ordering::Relaxed); self.queue.push(bag.seal(epoch), guard); } @@ -429,7 +425,8 @@ impl Local { let bag = &mut *self.bag.get(); while let Err(d) = bag.try_push(deferred) { - self.global().push_bag(bag, guard); + let epoch = self.epoch.load(Ordering::Relaxed).unpinned(); + self.global().push_bag(bag, epoch, guard); deferred = d; } } @@ -438,7 +435,8 @@ impl Local { let bag = unsafe { &mut *self.bag.get() }; if !bag.is_empty() { - self.global().push_bag(bag, guard); + let epoch = self.epoch.load(Ordering::Relaxed).unpinned(); + self.global().push_bag(bag, epoch, guard); } self.global().collect(guard); @@ -453,38 +451,47 @@ impl Local { self.guard_count.set(guard_count.checked_add(1).unwrap()); if guard_count == 0 { - let global_epoch = self.global().epoch.load(Ordering::Relaxed); - let new_epoch = global_epoch.pinned(); - - // Now we must store `new_epoch` into `self.epoch` and execute a `SeqCst` fence. - // The fence makes sure that any future loads from `Atomic`s will not happen before - // this store. - if cfg!(any(target_arch = "x86", target_arch = "x86_64")) { - // HACK(stjepang): On x86 architectures there are two different ways of executing - // a `SeqCst` fence. - // - // 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction. - // 2. `_.compare_and_swap(_, _, SeqCst)`, which compiles into a `lock cmpxchg` - // instruction. - // - // Both instructions have the effect of a full barrier, but benchmarks have shown - // that the second one makes pinning faster in this particular case. It is not - // clear that this is permitted by the C++ memory model (SC fences work very - // differently from SC accesses), but experimental evidence suggests that this - // works fine. Using inline assembly would be a viable (and correct) alternative, - // but alas, that is not possible on stable Rust. - let current = Epoch::starting(); - let previous = self - .epoch - .compare_and_swap(current, new_epoch, Ordering::SeqCst); - debug_assert_eq!(current, previous, "participant was expected to be unpinned"); - // We add a compiler fence to make it less likely for LLVM to do something wrong - // here. Formally, this is not enough to get rid of data races; practically, - // it should go a long way. - atomic::compiler_fence(Ordering::SeqCst); - } else { - self.epoch.store(new_epoch, Ordering::Relaxed); - atomic::fence(Ordering::SeqCst); + // Now we must store the global epoch into `self.epoch` and execute a `SeqCst` fence. + // The fence makes sure that any future loads from `Atomic`s will not happen before this + // store. + let mut current = Epoch::starting(); + let mut new = self.global().epoch.load(Ordering::Relaxed).pinned(); + + loop { + if cfg!(any(target_arch = "x86", target_arch = "x86_64")) { + // HACK(stjepang): On x86 architectures there are two different ways of + // executing a `SeqCst` fence. + // + // 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction. + // 2. `_.compare_and_swap(_, _, SeqCst)`, which compiles into a `lock cmpxchg` + // instruction. + // + // Both instructions have the effect of a full barrier, but benchmarks have + // shown that the second one makes pinning faster in this particular case. It + // is not clear that this is permitted by the C++ memory model (SC fences work + // very differently from SC accesses), but experimental evidence suggests that + // this works fine. Using inline assembly would be a viable (and correct) + // alternative, but alas, that is not possible on stable Rust. + let previous = self.epoch.compare_and_swap(current, new, Ordering::SeqCst); + debug_assert_eq!(current, previous, "participant was expected to be unpinned"); + + // We add a compiler fence to make it less likely for LLVM to do something wrong + // here. Formally, this is not enough to get rid of data races; practically, it + // should go a long way. + atomic::compiler_fence(Ordering::SeqCst); + } else { + self.epoch.store(new, Ordering::Relaxed); + atomic::fence(Ordering::SeqCst); + } + + // Now we validate that the value we read from the global epoch is not stale. + let validation = self.global().epoch.load(Ordering::Relaxed).pinned(); + if new == validation { + break; + } + + current = new; + new = validation; } // Increment the pin counter. @@ -572,8 +579,9 @@ impl Local { unsafe { // Pin and move the local bag into the global queue. It's important that `push_bag` // doesn't defer destruction on any new garbage. + let epoch = self.epoch.load(Ordering::Relaxed).unpinned(); let guard = &self.pin(); - self.global().push_bag(&mut *self.bag.get(), guard); + self.global().push_bag(&mut *self.bag.get(), epoch, guard); } // Revert the handle count back to zero. self.handle_count.set(0); diff --git a/crossbeam-skiplist/tests/base.rs b/crossbeam-skiplist/tests/base.rs index 19f233b78..47a549b9b 100644 --- a/crossbeam-skiplist/tests/base.rs +++ b/crossbeam-skiplist/tests/base.rs @@ -808,6 +808,7 @@ fn drops() { drop(s); } + handle.pin().flush(); handle.pin().flush(); handle.pin().flush(); assert_eq!(KEYS.load(Ordering::SeqCst), 8); From d66b1cc6998d7bd948e3f9a89d32576cb3ea0f63 Mon Sep 17 00:00:00 2001 From: Jeehoon Kang Date: Wed, 20 May 2020 08:44:16 +0900 Subject: [PATCH 2/2] Rustfmt --- crossbeam-utils/src/cache_padded.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/crossbeam-utils/src/cache_padded.rs b/crossbeam-utils/src/cache_padded.rs index 456df591f..eff6303ba 100644 --- a/crossbeam-utils/src/cache_padded.rs +++ b/crossbeam-utils/src/cache_padded.rs @@ -69,7 +69,10 @@ use core::ops::{Deref, DerefMut}; // - https://www.mono-project.com/news/2016/09/12/arm64-icache/ // #[cfg_attr(any(target_arch = "x86_64", target_arch = "aarch64"), repr(align(128)))] -#[cfg_attr(not(any(target_arch = "x86_64", target_arch = "aarch64")), repr(align(64)))] +#[cfg_attr( + not(any(target_arch = "x86_64", target_arch = "aarch64")), + repr(align(64)) +)] pub struct CachePadded { value: T, }