diff --git a/crossbeam-epoch/src/internal.rs b/crossbeam-epoch/src/internal.rs index 819e6bdc1..926e90926 100644 --- a/crossbeam-epoch/src/internal.rs +++ b/crossbeam-epoch/src/internal.rs @@ -122,7 +122,7 @@ impl SealedBag { fn is_expired(&self, global_epoch: Epoch) -> bool { // A pinned participant can witness at most one epoch advancement. Therefore, any bag that // is within one epoch of the current one cannot be destroyed yet. - global_epoch.wrapping_sub(self.epoch) >= 2 + global_epoch.wrapping_sub(self.epoch) >= 3 } } @@ -153,12 +153,8 @@ impl Global { } /// Pushes the bag into the global queue and replaces the bag with a new empty bag. - pub fn push_bag(&self, bag: &mut Bag, guard: &Guard) { + pub fn push_bag(&self, bag: &mut Bag, epoch: Epoch, guard: &Guard) { let bag = mem::replace(bag, Bag::new()); - - atomic::fence(Ordering::SeqCst); - - let epoch = self.epoch.load(Ordering::Relaxed); self.queue.push(bag.seal(epoch), guard); } @@ -322,7 +318,8 @@ impl Local { let bag = &mut *self.bag.get(); while let Err(d) = bag.try_push(deferred) { - self.global().push_bag(bag, guard); + let epoch = self.epoch.load(Ordering::Relaxed).unpinned(); + self.global().push_bag(bag, epoch, guard); deferred = d; } } @@ -331,7 +328,8 @@ impl Local { let bag = unsafe { &mut *self.bag.get() }; if !bag.is_empty() { - self.global().push_bag(bag, guard); + let epoch = self.epoch.load(Ordering::Relaxed).unpinned(); + self.global().push_bag(bag, epoch, guard); } self.global().collect(guard); @@ -346,38 +344,47 @@ impl Local { self.guard_count.set(guard_count.checked_add(1).unwrap()); if guard_count == 0 { - let global_epoch = self.global().epoch.load(Ordering::Relaxed); - let new_epoch = global_epoch.pinned(); - - // Now we must store `new_epoch` into `self.epoch` and execute a `SeqCst` fence. - // The fence makes sure that any future loads from `Atomic`s will not happen before - // this store. - if cfg!(any(target_arch = "x86", target_arch = "x86_64")) { - // HACK(stjepang): On x86 architectures there are two different ways of executing - // a `SeqCst` fence. - // - // 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction. - // 2. `_.compare_and_swap(_, _, SeqCst)`, which compiles into a `lock cmpxchg` - // instruction. - // - // Both instructions have the effect of a full barrier, but benchmarks have shown - // that the second one makes pinning faster in this particular case. It is not - // clear that this is permitted by the C++ memory model (SC fences work very - // differently from SC accesses), but experimental evidence suggests that this - // works fine. Using inline assembly would be a viable (and correct) alternative, - // but alas, that is not possible on stable Rust. - let current = Epoch::starting(); - let previous = self - .epoch - .compare_and_swap(current, new_epoch, Ordering::SeqCst); - debug_assert_eq!(current, previous, "participant was expected to be unpinned"); - // We add a compiler fence to make it less likely for LLVM to do something wrong - // here. Formally, this is not enough to get rid of data races; practically, - // it should go a long way. - atomic::compiler_fence(Ordering::SeqCst); - } else { - self.epoch.store(new_epoch, Ordering::Relaxed); - atomic::fence(Ordering::SeqCst); + // Now we must store the global epoch into `self.epoch` and execute a `SeqCst` fence. + // The fence makes sure that any future loads from `Atomic`s will not happen before this + // store. + let mut current = Epoch::starting(); + let mut new = self.global().epoch.load(Ordering::Relaxed).pinned(); + + loop { + if cfg!(any(target_arch = "x86", target_arch = "x86_64")) { + // HACK(stjepang): On x86 architectures there are two different ways of + // executing a `SeqCst` fence. + // + // 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction. + // 2. `_.compare_and_swap(_, _, SeqCst)`, which compiles into a `lock cmpxchg` + // instruction. + // + // Both instructions have the effect of a full barrier, but benchmarks have + // shown that the second one makes pinning faster in this particular case. It + // is not clear that this is permitted by the C++ memory model (SC fences work + // very differently from SC accesses), but experimental evidence suggests that + // this works fine. Using inline assembly would be a viable (and correct) + // alternative, but alas, that is not possible on stable Rust. + let previous = self.epoch.compare_and_swap(current, new, Ordering::SeqCst); + debug_assert_eq!(current, previous, "participant was expected to be unpinned"); + + // We add a compiler fence to make it less likely for LLVM to do something wrong + // here. Formally, this is not enough to get rid of data races; practically, it + // should go a long way. + atomic::compiler_fence(Ordering::SeqCst); + } else { + self.epoch.store(new, Ordering::Relaxed); + atomic::fence(Ordering::SeqCst); + } + + // Now we validate that the value we read from the global epoch is not stale. + let validation = self.global().epoch.load(Ordering::Relaxed).pinned(); + if new == validation { + break; + } + + current = new; + new = validation; } // Increment the pin counter. @@ -465,8 +472,9 @@ impl Local { unsafe { // Pin and move the local bag into the global queue. It's important that `push_bag` // doesn't defer destruction on any new garbage. + let epoch = self.epoch.load(Ordering::Relaxed).unpinned(); let guard = &self.pin(); - self.global().push_bag(&mut *self.bag.get(), guard); + self.global().push_bag(&mut *self.bag.get(), epoch, guard); } // Revert the handle count back to zero. self.handle_count.set(0);