Skip to content

Commit

Permalink
📝 BlockNode should be used only internally
Browse files Browse the repository at this point in the history
  • Loading branch information
Xudong-Huang committed Sep 1, 2024
1 parent 8a04a05 commit 8a8768c
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 20 deletions.
18 changes: 9 additions & 9 deletions may_queue/src/mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ struct BlockNode<T> {
impl<T> BlockNode<T> {
/// create a new BlockNode with uninitialized data
#[inline]
pub fn new_box(index: usize) -> *mut BlockNode<T> {
fn new_box(index: usize) -> *mut BlockNode<T> {
Box::into_raw(Box::new(BlockNode::new(index)))
}

/// create a new BlockNode with uninitialized data
#[inline]
pub fn new(index: usize) -> BlockNode<T> {
fn new(index: usize) -> BlockNode<T> {
BlockNode {
next: AtomicPtr::new(ptr::null_mut()),
data: [Slot::UNINIT; BLOCK_SIZE],
Expand All @@ -66,7 +66,7 @@ impl<T> BlockNode<T> {

/// write index with data
#[inline]
pub fn set(&self, id: usize, v: T) {
fn set(&self, id: usize, v: T) {
unsafe {
let data = self.data.get_unchecked(id);
data.value.get().write(MaybeUninit::new(v));
Expand All @@ -78,7 +78,7 @@ impl<T> BlockNode<T> {
}

#[inline]
pub fn try_get(&self, id: usize) -> Option<T> {
fn try_get(&self, id: usize) -> Option<T> {
let data = unsafe { self.data.get_unchecked(id) };
if data.ready.load(Ordering::Acquire) != 0 {
Some(unsafe { data.value.get().read().assume_init() })
Expand All @@ -88,7 +88,7 @@ impl<T> BlockNode<T> {
}

#[inline]
pub fn get(&self, id: usize) -> T {
fn get(&self, id: usize) -> T {
let data = unsafe { self.data.get_unchecked(id) };
while data.ready.load(Ordering::Acquire) == 0 {
std::hint::spin_loop();
Expand All @@ -99,7 +99,7 @@ impl<T> BlockNode<T> {
/// peek the indexed value
/// not safe if pop out a value when hold the data ref
#[inline]
pub unsafe fn peek(&self, id: usize) -> &T {
unsafe fn peek(&self, id: usize) -> &T {
let data = unsafe { self.data.get_unchecked(id) };
while data.ready.load(Ordering::Acquire) == 0 {
std::hint::spin_loop();
Expand All @@ -108,7 +108,7 @@ impl<T> BlockNode<T> {
}

#[inline]
pub fn wait_next_block(&self) -> *mut BlockNode<T> {
fn wait_next_block(&self) -> *mut BlockNode<T> {
let mut next: *mut BlockNode<T> = self.next.load(Ordering::Acquire);
while next.is_null() {
std::hint::spin_loop();
Expand All @@ -118,7 +118,7 @@ impl<T> BlockNode<T> {
}

#[inline]
pub fn copy_to_bulk(&self, start: usize, end: usize) -> SmallVec<[T; BLOCK_SIZE]> {
fn copy_to_bulk(&self, start: usize, end: usize) -> SmallVec<[T; BLOCK_SIZE]> {
let len = end - start;
let start = start & BLOCK_MASK;
SmallVec::from_iter((start..start + len).map(|i| self.get(i)))
Expand All @@ -127,7 +127,7 @@ impl<T> BlockNode<T> {

/// return the bulk end with in the block
#[inline]
pub fn bulk_end(start: usize, end: usize) -> usize {
fn bulk_end(start: usize, end: usize) -> usize {
let block_end = (start + BLOCK_SIZE) & !BLOCK_MASK;
cmp::min(end, block_end)
}
Expand Down
10 changes: 5 additions & 5 deletions may_queue/src/spmc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ struct BlockNode<T> {
impl<T> BlockNode<T> {
/// create a new BlockNode with uninitialized data
#[inline]
pub fn new(index: usize) -> *mut BlockNode<T> {
fn new(index: usize) -> *mut BlockNode<T> {
Box::into_raw(Box::new(BlockNode {
next: AtomicPtr::new(ptr::null_mut()),
used: AtomicUsize::new(BLOCK_SIZE),
Expand All @@ -58,7 +58,7 @@ impl<T> BlockNode<T> {

/// write index with data
#[inline]
pub fn set(&self, index: usize, v: T) {
fn set(&self, index: usize, v: T) {
unsafe {
let data = self.data.get_unchecked(index & BLOCK_MASK);
data.value.get().write(MaybeUninit::new(v));
Expand All @@ -68,7 +68,7 @@ impl<T> BlockNode<T> {
/// read out indexed value
/// this would make the underlying data dropped when it get out of scope
#[inline]
pub fn get(&self, id: usize) -> T {
fn get(&self, id: usize) -> T {
unsafe {
let data = self.data.get_unchecked(id);
data.value.get().read().assume_init()
Expand All @@ -78,13 +78,13 @@ impl<T> BlockNode<T> {
/// make a range slots read
/// if all slots read, then we can safely free the block
#[inline]
pub fn mark_slots_read(&self, size: usize) -> bool {
fn mark_slots_read(&self, size: usize) -> bool {
let old = self.used.fetch_sub(size, Ordering::Relaxed);
old == size
}

#[inline]
pub fn copy_to_bulk(&self, start: usize, end: usize) -> SmallVec<[T; BLOCK_SIZE]> {
fn copy_to_bulk(&self, start: usize, end: usize) -> SmallVec<[T; BLOCK_SIZE]> {
let len = end - start;
let start = start & BLOCK_MASK;
(start..start + len).map(|id| self.get(id)).collect()
Expand Down
12 changes: 6 additions & 6 deletions may_queue/src/spsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ struct BlockNode<T> {
impl<T> BlockNode<T> {
/// create a new BlockNode with uninitialized data
#[inline]
pub fn new() -> *mut BlockNode<T> {
fn new() -> *mut BlockNode<T> {
Box::into_raw(Box::new(BlockNode {
next: AtomicPtr::new(ptr::null_mut()),
data: [Slot::UNINIT; BLOCK_SIZE],
Expand All @@ -53,7 +53,7 @@ impl<T> BlockNode<T> {

/// write index with data
#[inline]
pub fn set(&self, index: usize, v: T) {
fn set(&self, index: usize, v: T) {
unsafe {
let data = self.data.get_unchecked(index & BLOCK_MASK);
data.value.get().write(MaybeUninit::new(v));
Expand All @@ -65,23 +65,23 @@ impl<T> BlockNode<T> {
/// peek the indexed value
/// not safe if pop out a value when hold the data ref
#[inline]
pub unsafe fn peek(&self, index: usize) -> &T {
unsafe fn peek(&self, index: usize) -> &T {
let data = self.data.get_unchecked(index & BLOCK_MASK);
(*data.value.get()).assume_init_ref()
}

/// read out indexed value
/// this would make the underlying data dropped when it get out of scope
#[inline]
pub fn get(&self, id: usize) -> T {
fn get(&self, id: usize) -> T {
unsafe {
let data = self.data.get_unchecked(id);
data.value.get().read().assume_init()
}
}

#[inline]
pub fn copy_to_bulk(&self, start: usize, end: usize) -> SmallVec<[T; BLOCK_SIZE]> {
fn copy_to_bulk(&self, start: usize, end: usize) -> SmallVec<[T; BLOCK_SIZE]> {
let len = end - start;
let start = start & BLOCK_MASK;
(start..start + len).map(|id| self.get(id)).collect()
Expand All @@ -90,7 +90,7 @@ impl<T> BlockNode<T> {

/// return the bulk end with in the block
#[inline]
pub fn bulk_end(start: usize, end: usize) -> usize {
fn bulk_end(start: usize, end: usize) -> usize {
let block_end = (start + BLOCK_SIZE) & !BLOCK_MASK;
cmp::min(end, block_end)
}
Expand Down

0 comments on commit 8a8768c

Please sign in to comment.