Skip to content

Commit

Permalink
remove atomic-wait dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
ibraheemdev committed Nov 28, 2024
1 parent 02afd52 commit 2469021
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 23 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ exclude = ["assets/*"]

[dependencies]
seize = "0.4"
atomic-wait = "1"
serde = { version = "1", optional = true }

[dev-dependencies]
Expand Down
46 changes: 27 additions & 19 deletions src/raw/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::borrow::Borrow;
use std::hash::{BuildHasher, Hash};
use std::marker::PhantomData;
use std::mem::{self, MaybeUninit};
use std::sync::atomic::{fence, AtomicPtr, AtomicU32, AtomicUsize, Ordering};
use std::sync::atomic::{fence, AtomicPtr, AtomicU8, AtomicUsize, Ordering};
use std::sync::Mutex;
use std::{hint, panic, ptr};

Expand Down Expand Up @@ -50,7 +50,7 @@ pub struct State {
// but not necessarily copied.
pub claim: AtomicUsize,
// The status of the resize.
pub status: AtomicU32,
pub status: AtomicU8,
// A thread parker for blocking on copy operations.
pub parker: Parker,
// Entries whose retirement has been deferred by later tables.
Expand All @@ -66,7 +66,7 @@ impl Default for State {
allocating: Mutex::new(()),
copied: AtomicUsize::new(0),
claim: AtomicUsize::new(0),
status: AtomicU32::new(State::PENDING),
status: AtomicU8::new(State::PENDING),
parker: Parker::default(),
deferred: seize::Deferred::new(),
collector: ptr::null(),
Expand All @@ -76,11 +76,13 @@ impl Default for State {

impl State {
// A resize is in-progress.
pub const PENDING: u32 = 0;
pub const PENDING: u8 = 0;

// The resize has been aborted, continue to the next table.
pub const ABORTED: u32 = 1;
pub const ABORTED: u8 = 1;

// The resize was complete and the table was promoted.
pub const PROMOTED: u32 = 2;
pub const PROMOTED: u8 = 2;
}

// The result of an insert operation.
Expand Down Expand Up @@ -1814,13 +1816,14 @@ where
// This table doesn't have space for the next entry.
//
// Abort the current resize.
next.state().status.store(State::ABORTED, Ordering::Relaxed);
next.state().status.store(State::ABORTED, Ordering::SeqCst);

// Allocate the next table.
let allocated = self.as_ref(next).get_or_alloc_next(None);

// Wake anyone waiting for us to finish.
atomic_wait::wake_all(&next.state().status);
let state = self.table.state();
state.parker.unpark(&state.status);

// Retry in a new table.
next = allocated;
Expand All @@ -1841,6 +1844,7 @@ where
}
}

let state = next.state();
// We copied all that we can, wait for the table to be promoted.
for spun in 0.. {
// Avoid spinning in tests, which can hide race conditions.
Expand All @@ -1850,7 +1854,7 @@ where
7
};

let status = next.state().status.load(Ordering::Relaxed);
let status = state.status.load(Ordering::SeqCst);

// If this copy was aborted, we have to retry in the new table.
if status == State::ABORTED {
Expand All @@ -1859,7 +1863,6 @@ where

// The copy has completed.
if status == State::PROMOTED {
fence(Ordering::Acquire);
return next;
}

Expand All @@ -1874,7 +1877,9 @@ where
}

// Park until the table is promoted.
atomic_wait::wait(&next.state().status, State::PENDING);
state
.parker
.park(&state.status, |status| status == State::PENDING);
}
}
}
Expand Down Expand Up @@ -1975,6 +1980,7 @@ where
return next;
}

let state = next.state();
for spun in 0.. {
// Avoid spinning in tests, which can hide race conditions.
const SPIN_WAIT: usize = if cfg!(any(test, debug_assertions)) {
Expand All @@ -1984,7 +1990,7 @@ where
};

// The copy has completed.
let status = next.state().status.load(Ordering::Acquire);
let status = state.status.load(Ordering::SeqCst);
if status == State::PROMOTED {
return next;
}
Expand All @@ -2000,7 +2006,9 @@ where
}

// Park until the table is promoted.
atomic_wait::wait(&next.state().status, State::PENDING);
state
.parker
.park(&state.status, |status| status == State::PENDING);
}
}
}
Expand Down Expand Up @@ -2135,11 +2143,13 @@ where
// Returns `true` if the table was promoted.
#[inline]
fn try_promote(&self, next: Table<K, V>, copied: usize, guard: &impl Guard) -> bool {
let state = next.state();

// Update the copy count.
let copied = if copied > 0 {
next.state().copied.fetch_add(copied, Ordering::AcqRel) + copied
state.copied.fetch_add(copied, Ordering::AcqRel) + copied
} else {
next.state().copied.load(Ordering::Acquire)
state.copied.load(Ordering::Acquire)
};

// If we copied all the entries in the table, we can try to promote.
Expand All @@ -2164,9 +2174,7 @@ where
.is_ok()
{
// Successfully promoted the table.
next.state()
.status
.store(State::PROMOTED, Ordering::Release);
state.status.store(State::PROMOTED, Ordering::SeqCst);

unsafe {
// Retire the old table.
Expand All @@ -2182,7 +2190,7 @@ where
}

// Wake up any writers waiting for the resize to complete.
atomic_wait::wake_all(&next.state().status);
state.parker.unpark(&state.status);
return true;
}
}
Expand Down
24 changes: 21 additions & 3 deletions src/raw/utils/parker.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::collections::HashMap;
use std::sync::atomic::{AtomicPtr, AtomicU64, Ordering};
use std::sync::atomic::{AtomicPtr, AtomicU64, AtomicU8, Ordering};
use std::sync::Mutex;
use std::thread::{self, Thread};

Expand All @@ -25,7 +25,7 @@ struct State {

impl Parker {
// Block the current thread until the park condition is false.
pub fn park<T>(&self, atomic: &AtomicPtr<T>, should_park: impl Fn(*mut T) -> bool) {
pub fn park<T>(&self, atomic: &impl Atomic<T>, should_park: impl Fn(T) -> bool) {
let key = atomic as *const _ as usize;

loop {
Expand Down Expand Up @@ -89,7 +89,7 @@ impl Parker {
// Unpark all threads waiting on the given atomic.
//
// Note that any modifications must be `SeqCst` to be visible to unparked threads.
pub fn unpark<T>(&self, atomic: &AtomicPtr<T>) {
pub fn unpark<T>(&self, atomic: &impl Atomic<T>) {
let key = atomic as *const _ as usize;

// Fast-path, no one waiting to be unparked.
Expand All @@ -113,3 +113,21 @@ impl Parker {
}
}
}

/// A generic atomic variable.
pub trait Atomic<T> {
/// Load the value using the given ordering.
fn load(&self, ordering: Ordering) -> T;
}

impl<T> Atomic<*mut T> for AtomicPtr<T> {
fn load(&self, ordering: Ordering) -> *mut T {
self.load(ordering)
}
}

impl Atomic<u8> for AtomicU8 {
fn load(&self, ordering: Ordering) -> u8 {
self.load(ordering)
}
}

0 comments on commit 2469021

Please sign in to comment.