Skip to content

Commit

Permalink
[eclipse-iceoryx#129] Remove add with handle from container; fix orde…
Browse files Browse the repository at this point in the history
…ring issue in container update
  • Loading branch information
elfenpiff committed Feb 23, 2024
1 parent 8ccb385 commit 648b246
Show file tree
Hide file tree
Showing 5 changed files with 228 additions and 268 deletions.
127 changes: 35 additions & 92 deletions iceoryx2-bb/lock-free/src/mpmc/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ use iceoryx2_bb_log::{fail, fatal_panic};
use crate::mpmc::unique_index_set::*;
use std::alloc::Layout;
use std::fmt::Debug;
use std::sync::atomic::AtomicU64;
use std::{
cell::UnsafeCell,
mem::MaybeUninit,
Expand All @@ -77,7 +78,7 @@ pub struct ContainerHandle {
#[derive(Debug)]
pub struct ContainerState<T: Copy + Debug> {
container_id: u64,
current_index_set_head: u64,
current_change_counter: u64,
data: Vec<MaybeUninit<(u32, T)>>,
active_index: Vec<bool>,
}
Expand All @@ -86,7 +87,7 @@ impl<T: Copy + Debug> ContainerState<T> {
fn new(container_id: u64, capacity: usize) -> Self {
Self {
container_id,
current_index_set_head: 0,
current_change_counter: 0,
data: vec![MaybeUninit::uninit(); capacity],
active_index: vec![false; capacity],
}
Expand Down Expand Up @@ -125,6 +126,7 @@ pub struct Container<T: Copy + Debug> {
active_index_ptr: RelocatablePointer<AtomicBool>,
data_ptr: RelocatablePointer<UnsafeCell<MaybeUninit<T>>>,
capacity: usize,
change_counter: AtomicU64,
is_memory_initialized: AtomicBool,
container_id: UniqueId,
// must be the last member, since it is a relocatable container as well and then the offset
Expand All @@ -146,6 +148,7 @@ impl<T: Copy + Debug> RelocatableContainer for Container<T> {
distance_to_active_index as usize + capacity * std::mem::size_of::<AtomicBool>(),
) as isize),
capacity,
change_counter: AtomicU64::new(0),
index_set: UniqueIndexSet::new_uninit(capacity),
is_memory_initialized: AtomicBool::new(false),
}
Expand Down Expand Up @@ -206,6 +209,7 @@ impl<T: Copy + Debug> RelocatableContainer for Container<T> {
active_index_ptr: RelocatablePointer::new(distance_to_active_index),
data_ptr: RelocatablePointer::new(distance_to_container_data),
capacity,
change_counter: AtomicU64::new(0),
index_set: UniqueIndexSet::new(capacity, unique_index_set_distance),
is_memory_initialized: AtomicBool::new(true),
}
Expand Down Expand Up @@ -248,59 +252,6 @@ impl<T: Copy + Debug> Container<T> {
self.len() == 0
}

/// Adds a new element to the [`Container`]. If there is no more space available it returns
/// [`None`], otherwise [`Some`] containing the [`UniqueIndex`] to the underlying element.
/// If the [`UniqueIndex`] goes out of scope the added element is removed.
///
/// ```
/// use iceoryx2_bb_lock_free::mpmc::container::*;
///
/// const CAPACITY: usize = 139;
/// let container = FixedSizeContainer::<u128, CAPACITY>::new();
///
/// match container.add(1234567) {
/// Some(index) => println!("added at index {}", index.value()),
/// None => println!("container is full"),
/// };
/// ```
///
/// # Safety
///
/// * Ensure that the either [`Container::new()`] was used or [`Container::init()`] was used
/// before calling this method
///
pub unsafe fn add(&self, value: T) -> Option<UniqueIndex<'_>> {
self.verify_memory_initialization("add");
let cleanup_call = |index: u32| {
// set deactivate the active index to indicate that the value can be used again
// requires that T does not implement drop
unsafe { &*self.active_index_ptr.as_ptr().offset(index as isize) }
.store(false, Ordering::Relaxed);
};

match self.index_set.acquire_with_additional_cleanup(cleanup_call) {
Some(index) => {
unsafe {
*(*self.data_ptr.as_ptr().offset(index.value() as isize)).get() =
MaybeUninit::new(value)
};

//////////////////////////////////////
// SYNC POINT with reading data values
//////////////////////////////////////
unsafe {
&*self
.active_index_ptr
.as_ptr()
.offset(index.value() as isize)
}
.store(true, Ordering::Release);
Some(index)
}
None => None,
}
}

/// Adds a new element to the [`Container`]. If there is no more space available it returns
/// [`None`], otherwise [`Some`] containing the the index value to the underlying element.
///
Expand All @@ -311,8 +262,8 @@ impl<T: Copy + Debug> Container<T> {
/// * Use [`Container::remove_with_handle()`] to release the acquired index again. Otherwise, the
/// element will leak.
///
pub unsafe fn add_with_handle(&self, value: T) -> Option<ContainerHandle> {
self.verify_memory_initialization("add_with_handle");
pub unsafe fn add(&self, value: T) -> Option<ContainerHandle> {
self.verify_memory_initialization("add");

match self.index_set.acquire_raw_index() {
Some(index) => {
Expand All @@ -327,6 +278,8 @@ impl<T: Copy + Debug> Container<T> {
unsafe { &*self.active_index_ptr.as_ptr().offset(index as isize) }
.store(true, Ordering::Release);

// MUST HAPPEN AFTER all other operations
self.change_counter.fetch_add(1, Ordering::Release);
Some(ContainerHandle {
index,
container_id: self.container_id.value(),
Expand All @@ -350,7 +303,7 @@ impl<T: Copy + Debug> Container<T> {
/// **Important:** If the UniqueIndex still exists it causes double frees or freeing an index
/// which was allocated afterwards
///
pub unsafe fn remove_with_handle(&self, handle: ContainerHandle) {
pub unsafe fn remove(&self, handle: ContainerHandle) {
self.verify_memory_initialization("remove_with_handle");
if handle.container_id != self.container_id.value() {
fatal_panic!(from self,
Expand All @@ -360,6 +313,9 @@ impl<T: Copy + Debug> Container<T> {
unsafe { &*self.active_index_ptr.as_ptr().offset(handle.index as isize) }
.store(false, Ordering::Relaxed);
self.index_set.release_raw_index(handle.index);

// MUST HAPPEN AFTER all other operations
self.change_counter.fetch_add(1, Ordering::Release);
}

/// Returns [`ContainerState`] which contains all elements of this container. Be aware that
Expand Down Expand Up @@ -394,15 +350,16 @@ impl<T: Copy + Debug> Container<T> {
"The ContainerState used as previous_state was not created by this Container instance.");
}

let mut current_index_set_head = self.index_set.head.load(Ordering::Relaxed);
// MUST HAPPEN BEFORE all other operations
let mut current_change_counter = self.change_counter.load(Ordering::Acquire);

if previous_state.current_index_set_head == current_index_set_head {
if previous_state.current_change_counter == current_change_counter {
return false;
}

// must be set once here, if the current_index_set_head changes in the loop below
// must be set once here, if the current_change_counter changes in the loop below
// the previous_state is updated again with the next clone_state iteration
previous_state.current_index_set_head = current_index_set_head;
previous_state.current_change_counter = current_change_counter;

for i in 0..self.capacity {
// go through here element by element and do not start the operation from the
Expand All @@ -428,12 +385,17 @@ impl<T: Copy + Debug> Container<T> {
};
}

let new_current_index_set_head = self.index_set.head.load(Ordering::Relaxed);
if new_current_index_set_head == current_index_set_head {
// MUST HAPPEN AFTER all other operations
if let Err(counter) = self.change_counter.compare_exchange(
current_change_counter,
current_change_counter,
Ordering::AcqRel,
Ordering::Acquire,
) {
current_change_counter = counter;
} else {
break;
}

current_index_set_head = new_current_index_set_head;
}
}

Expand Down Expand Up @@ -491,25 +453,6 @@ impl<T: Copy + Debug, const CAPACITY: usize> FixedSizeContainer<T, CAPACITY> {
self.container.capacity()
}

/// Adds a new element to the [`FixedSizeContainer`]. If there is no more space available it returns
/// [`None`], otherwise [`Some`] containing the [`UniqueIndex`] to the underlying element.
/// If the [`UniqueIndex`] goes out of scope the added element is removed.
///
/// ```
/// use iceoryx2_bb_lock_free::mpmc::container::*;
///
/// const CAPACITY: usize = 139;
/// let container = FixedSizeContainer::<u128, CAPACITY>::new();
///
/// match container.add(1234567) {
/// Some(index) => println!("added at index {}", index.value()),
/// None => println!("container is full"),
/// };
/// ```
pub fn add(&self, value: T) -> Option<UniqueIndex<'_>> {
unsafe { self.container.add(value) }
}

/// Adds a new element to the [`FixedSizeContainer`]. If there is no more space available it returns
/// [`None`], otherwise [`Some`] containing the the index value to the underlying element.
///
Expand All @@ -519,10 +462,10 @@ impl<T: Copy + Debug, const CAPACITY: usize> FixedSizeContainer<T, CAPACITY> {
/// const CAPACITY: usize = 139;
/// let container = FixedSizeContainer::<u128, CAPACITY>::new();
///
/// match unsafe { container.add_with_handle(1234567) } {
/// match unsafe { container.add(1234567) } {
/// Some(index) => {
/// println!("added at index {:?}", index);
/// unsafe { container.remove_with_handle(index) };
/// unsafe { container.remove(index) };
/// },
/// None => println!("container is full"),
/// };
Expand All @@ -531,11 +474,11 @@ impl<T: Copy + Debug, const CAPACITY: usize> FixedSizeContainer<T, CAPACITY> {
///
/// # Safety
///
/// * Use [`FixedSizeContainer::remove_with_handle()`] to release the acquired index again. Otherwise,
/// * Use [`FixedSizeContainer::remove()`] to release the acquired index again. Otherwise,
/// the element will leak.
///
pub unsafe fn add_with_handle(&self, value: T) -> Option<ContainerHandle> {
self.container.add_with_handle(value)
pub unsafe fn add(&self, value: T) -> Option<ContainerHandle> {
self.container.add(value)
}

/// Useful in IPC context when an application holding the UniqueIndex has died.
Expand All @@ -544,8 +487,8 @@ impl<T: Copy + Debug, const CAPACITY: usize> FixedSizeContainer<T, CAPACITY> {
///
/// * If the UniqueIndex still exists it causes double frees or freeing an index
/// which was allocated afterwards
pub unsafe fn remove_with_handle(&self, handle: ContainerHandle) {
self.container.remove_with_handle(handle)
pub unsafe fn remove(&self, handle: ContainerHandle) {
self.container.remove(handle)
}

/// Returns [`ContainerState`] which contains all elements of this container. Be aware that
Expand Down
Loading

0 comments on commit 648b246

Please sign in to comment.