Skip to content

Commit

Permalink
feat: Add an overflow push method
Browse files Browse the repository at this point in the history
In some cases it is desired to have a "lossy" queue for data. Such as an
event queue where more recent events should be prioritized over older
ones, where infinite storage is impractical. This commit adds a method
called "force_push" which enables this usage.

Bounded queue code is partially derived from the following commit:
crossbeam-rs/crossbeam@bd75c3c

cc smol-rs/async-channel#44
  • Loading branch information
notgull authored Mar 30, 2024
1 parent ff53a68 commit c407467
Show file tree
Hide file tree
Showing 5 changed files with 250 additions and 20 deletions.
99 changes: 82 additions & 17 deletions src/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::sync::atomic::{AtomicUsize, Ordering};
use crate::sync::cell::UnsafeCell;
#[allow(unused_imports)]
use crate::sync::prelude::*;
use crate::{busy_wait, PopError, PushError};
use crate::{busy_wait, ForcePushError, PopError, PushError};

/// A slot in a queue.
struct Slot<T> {
Expand Down Expand Up @@ -83,6 +83,74 @@ impl<T> Bounded<T> {

/// Attempts to push an item into the queue.
pub fn push(&self, value: T) -> Result<(), PushError<T>> {
self.push_or_else(value, |value, tail, _, _| {
let head = self.head.load(Ordering::Relaxed);

// If the head lags one lap behind the tail as well...
if head.wrapping_add(self.one_lap) == tail {
// ...then the queue is full.
Err(PushError::Full(value))
} else {
Ok(value)
}
})
}

/// Pushes an item into the queue, displacing another item if needed.
pub fn force_push(&self, value: T) -> Result<Option<T>, ForcePushError<T>> {
let result = self.push_or_else(value, |value, tail, new_tail, slot| {
let head = tail.wrapping_sub(self.one_lap);
let new_head = new_tail.wrapping_sub(self.one_lap);

// Try to move the head.
if self
.head
.compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Relaxed)
.is_ok()
{
// Move the tail.
self.tail.store(new_tail, Ordering::SeqCst);

// Swap out the old value.
// SAFETY: We know this is initialized, since it's covered by the current queue.
let old = unsafe {
slot.value
.with_mut(|slot| slot.replace(MaybeUninit::new(value)).assume_init())
};

// Update the stamp.
slot.stamp.store(tail + 1, Ordering::Release);

// Return a PushError.
Err(PushError::Full(old))
} else {
Ok(value)
}
});

match result {
Ok(()) => Ok(None),
Err(PushError::Full(old_value)) => Ok(Some(old_value)),
Err(PushError::Closed(value)) => Err(ForcePushError(value)),
}
}

/// Attempts to push an item into the queue, running a closure on failure.
///
/// `fail` is run when there is no more room left in the tail of the queue. The parameters of
/// this function are as follows:
///
/// - The item that failed to push.
/// - The value of `self.tail` before the new value would be inserted.
/// - The value of `self.tail` after the new value would be inserted.
/// - The slot that we attempted to push into.
///
/// If `fail` returns `Ok(val)`, we will try pushing `val` to the head of the queue. Otherwise,
/// this function will return the error.
fn push_or_else<F>(&self, mut value: T, mut fail: F) -> Result<(), PushError<T>>
where
F: FnMut(T, usize, usize, &Slot<T>) -> Result<T, PushError<T>>,
{
let mut tail = self.tail.load(Ordering::Relaxed);

loop {
Expand All @@ -95,22 +163,23 @@ impl<T> Bounded<T> {
let index = tail & (self.mark_bit - 1);
let lap = tail & !(self.one_lap - 1);

// Calculate the new location of the tail.
let new_tail = if index + 1 < self.buffer.len() {
// Same lap, incremented index.
// Set to `{ lap: lap, mark: 0, index: index + 1 }`.
tail + 1
} else {
// One lap forward, index wraps around to zero.
// Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
lap.wrapping_add(self.one_lap)
};

// Inspect the corresponding slot.
let slot = &self.buffer[index];
let stamp = slot.stamp.load(Ordering::Acquire);

// If the tail and the stamp match, we may attempt to push.
if tail == stamp {
let new_tail = if index + 1 < self.buffer.len() {
// Same lap, incremented index.
// Set to `{ lap: lap, mark: 0, index: index + 1 }`.
tail + 1
} else {
// One lap forward, index wraps around to zero.
// Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
lap.wrapping_add(self.one_lap)
};

// Try moving the tail.
match self.tail.compare_exchange_weak(
tail,
Expand All @@ -132,13 +201,9 @@ impl<T> Bounded<T> {
}
} else if stamp.wrapping_add(self.one_lap) == tail + 1 {
crate::full_fence();
let head = self.head.load(Ordering::Relaxed);

// If the head lags one lap behind the tail as well...
if head.wrapping_add(self.one_lap) == tail {
// ...then the queue is full.
return Err(PushError::Full(value));
}
// We've failed to push; run our failure closure.
value = fail(value, tail, new_tail, slot)?;

// Loom complains if there isn't an explicit busy wait here.
#[cfg(loom)]
Expand Down
74 changes: 74 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,54 @@ impl<T> ConcurrentQueue<T> {
}
}

/// Push an element into the queue, potentially displacing another element.
///
/// Attempts to push an element into the queue. If the queue is full, one item from the
/// queue is replaced with the provided item. The displaced item is returned as `Some(T)`.
/// If the queue is closed, an error is returned.
///
/// # Examples
///
/// ```
/// use concurrent_queue::{ConcurrentQueue, ForcePushError, PushError};
///
/// let q = ConcurrentQueue::bounded(3);
///
/// // We can push to the queue.
/// for i in 1..=3 {
/// assert_eq!(q.force_push(i), Ok(None));
/// }
///
/// // Push errors because the queue is now full.
/// assert_eq!(q.push(4), Err(PushError::Full(4)));
///
/// // Pushing a new value replaces the old ones.
/// assert_eq!(q.force_push(5), Ok(Some(1)));
/// assert_eq!(q.force_push(6), Ok(Some(2)));
///
/// // Close the queue to stop further pushes.
/// q.close();
///
/// // Pushing will return an error.
/// assert_eq!(q.force_push(7), Err(ForcePushError(7)));
///
/// // Popping items will return the force-pushed ones.
/// assert_eq!(q.pop(), Ok(3));
/// assert_eq!(q.pop(), Ok(5));
/// assert_eq!(q.pop(), Ok(6));
/// ```
pub fn force_push(&self, value: T) -> Result<Option<T>, ForcePushError<T>> {
match &self.0 {
Inner::Single(q) => q.force_push(value),
Inner::Bounded(q) => q.force_push(value),
Inner::Unbounded(q) => match q.push(value) {
Ok(()) => Ok(None),
Err(PushError::Closed(value)) => Err(ForcePushError(value)),
Err(PushError::Full(_)) => unreachable!(),
},
}
}

/// Attempts to pop an item from the queue.
///
/// If the queue is empty, an error is returned.
Expand Down Expand Up @@ -532,6 +580,32 @@ impl<T> fmt::Display for PushError<T> {
}
}

/// Error that occurs when force-pushing into a full queue.
#[derive(Clone, Copy, PartialEq, Eq)]
pub struct ForcePushError<T>(pub T);

impl<T> ForcePushError<T> {
/// Return the inner value that failed to be force-pushed.
pub fn into_inner(self) -> T {
self.0
}
}

impl<T: fmt::Debug> fmt::Debug for ForcePushError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("ForcePushError").field(&self.0).finish()
}
}

impl<T> fmt::Display for ForcePushError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Closed")
}
}

#[cfg(feature = "std")]
impl<T: fmt::Debug> error::Error for ForcePushError<T> {}

/// Equivalent to `atomic::fence(Ordering::SeqCst)`, but in some cases faster.
#[inline]
fn full_fence() {
Expand Down
52 changes: 51 additions & 1 deletion src/single.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use core::mem::MaybeUninit;
use core::ptr;

use crate::sync::atomic::{AtomicUsize, Ordering};
use crate::sync::cell::UnsafeCell;
#[allow(unused_imports)]
use crate::sync::prelude::*;
use crate::{busy_wait, PopError, PushError};
use crate::{busy_wait, ForcePushError, PopError, PushError};

const LOCKED: usize = 1 << 0;
const PUSHED: usize = 1 << 1;
Expand Down Expand Up @@ -47,6 +48,55 @@ impl<T> Single<T> {
}
}

/// Attempts to push an item into the queue, displacing another if necessary.
pub fn force_push(&self, value: T) -> Result<Option<T>, ForcePushError<T>> {
// Attempt to lock the slot.
let mut state = 0;

loop {
// Lock the slot.
let prev = self
.state
.compare_exchange(state, LOCKED | PUSHED, Ordering::SeqCst, Ordering::SeqCst)
.unwrap_or_else(|x| x);

if prev & CLOSED != 0 {
return Err(ForcePushError(value));
}

if prev == state {
// Swap out the value.
// SAFETY: We have locked the state.
let prev_value = unsafe {
self.slot
.with_mut(move |slot| ptr::replace(slot, MaybeUninit::new(value)))
};

// We can unlock the slot now.
self.state.fetch_and(!LOCKED, Ordering::Release);

// If the value was pushed, initialize it and return it.
let prev_value = if prev & PUSHED == 0 {
None
} else {
Some(unsafe { prev_value.assume_init() })
};

// Return the old value.
return Ok(prev_value);
}

// Try to go for the current (pushed) state.
if prev & LOCKED == 0 {
state = prev;
} else {
// State is locked.
busy_wait();
state = prev & !LOCKED;
}
}
}

/// Attempts to pop an item from the queue.
pub fn pop(&self) -> Result<T, PopError> {
let mut state = PUSHED;
Expand Down
17 changes: 16 additions & 1 deletion tests/single.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![allow(clippy::bool_assert_comparison)]

use concurrent_queue::{ConcurrentQueue, PopError, PushError};
use concurrent_queue::{ConcurrentQueue, ForcePushError, PopError, PushError};

#[cfg(not(target_family = "wasm"))]
use easy_parallel::Parallel;
Expand Down Expand Up @@ -65,6 +65,21 @@ fn close() {
assert_eq!(q.pop(), Err(PopError::Closed));
}

#[test]
fn force_push() {
let q = ConcurrentQueue::<i32>::bounded(1);
assert_eq!(q.force_push(10), Ok(None));

assert!(!q.is_closed());
assert_eq!(q.force_push(20), Ok(Some(10)));
assert_eq!(q.force_push(30), Ok(Some(20)));

assert!(q.close());
assert_eq!(q.force_push(40), Err(ForcePushError(40)));
assert_eq!(q.pop(), Ok(30));
assert_eq!(q.pop(), Err(PopError::Closed));
}

#[cfg(not(target_family = "wasm"))]
#[test]
fn spsc() {
Expand Down
28 changes: 27 additions & 1 deletion tests/unbounded.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![allow(clippy::bool_assert_comparison)]

use concurrent_queue::{ConcurrentQueue, PopError, PushError};
use concurrent_queue::{ConcurrentQueue, ForcePushError, PopError, PushError};

#[cfg(not(target_family = "wasm"))]
use easy_parallel::Parallel;
Expand Down Expand Up @@ -74,6 +74,32 @@ fn close() {
assert_eq!(q.pop(), Err(PopError::Closed));
}

#[test]
fn force_push() {
let q = ConcurrentQueue::<i32>::bounded(5);

for i in 1..=5 {
assert_eq!(q.force_push(i), Ok(None));
}

assert!(!q.is_closed());
for i in 6..=10 {
assert_eq!(q.force_push(i), Ok(Some(i - 5)));
}
assert_eq!(q.pop(), Ok(6));
assert_eq!(q.force_push(11), Ok(None));
for i in 12..=15 {
assert_eq!(q.force_push(i), Ok(Some(i - 5)));
}

assert!(q.close());
assert_eq!(q.force_push(40), Err(ForcePushError(40)));
for i in 11..=15 {
assert_eq!(q.pop(), Ok(i));
}
assert_eq!(q.pop(), Err(PopError::Closed));
}

#[cfg(not(target_family = "wasm"))]
#[test]
fn spsc() {
Expand Down

0 comments on commit c407467

Please sign in to comment.