Skip to content

Commit

Permalink
[eclipse-iceoryx#129] Fix container race between add and update
Browse files Browse the repository at this point in the history
  • Loading branch information
elfenpiff committed Mar 5, 2024
1 parent d6fd33a commit 9876b0f
Showing 1 changed file with 24 additions and 20 deletions.
44 changes: 24 additions & 20 deletions iceoryx2-bb/lock-free/src/mpmc/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ pub struct ContainerState<T: Copy + Debug> {
container_id: u64,
current_change_counter: u64,
data: Vec<MaybeUninit<T>>,
active_index: Vec<bool>,
active_index: Vec<u64>,
}

impl<T: Copy + Debug> ContainerState<T> {
Expand All @@ -96,7 +96,7 @@ impl<T: Copy + Debug> ContainerState<T> {
container_id,
current_change_counter: 0,
data: vec![MaybeUninit::uninit(); capacity],
active_index: vec![false; capacity],
active_index: vec![0; capacity],
}
}

Expand All @@ -114,7 +114,7 @@ impl<T: Copy + Debug> ContainerState<T> {
/// ```
pub fn for_each<F: FnMut(u32, &T)>(&self, mut callback: F) {
for i in 0..self.data.len() {
if self.active_index[i] {
if self.active_index[i] % 2 == 1 {
callback(i as _, unsafe { self.data[i].assume_init_ref() });
}
}
Expand Down Expand Up @@ -283,7 +283,7 @@ impl<T: Copy + Debug> Container<T> {
// SYNC POINT with reading data values
//////////////////////////////////////
unsafe { &*self.active_index_ptr.as_ptr().add(index as _) }
.store(1, Ordering::Release);
.fetch_add(1, Ordering::Release);

// MUST HAPPEN AFTER all other operations
self.change_counter.fetch_add(1, Ordering::Release);
Expand Down Expand Up @@ -318,7 +318,7 @@ impl<T: Copy + Debug> Container<T> {
}

unsafe { &*self.active_index_ptr.as_ptr().add(handle.index as _) }
.store(0, Ordering::Relaxed);
.fetch_add(1, Ordering::Relaxed);
self.index_set.release_raw_index(handle.index);

// MUST HAPPEN AFTER all other operations
Expand Down Expand Up @@ -358,7 +358,7 @@ impl<T: Copy + Debug> Container<T> {
}

// MUST HAPPEN BEFORE all other operations
let mut current_change_counter = self.change_counter.load(Ordering::Acquire);
let current_change_counter = self.change_counter.load(Ordering::Acquire);

if previous_state.current_change_counter == current_change_counter {
return false;
Expand All @@ -373,17 +373,21 @@ impl<T: Copy + Debug> Container<T> {
// beginning when the content has changed.
// only copy single entries otherwise we encounter starvation since this is a
// heavy-weight operation

//////////////////////////////////////
// SYNC POINT with reading data values
//////////////////////////////////////
let mut current_index_count =
unsafe { (*self.active_index_ptr.as_ptr().add(i)).load(Ordering::Acquire) };

loop {
//////////////////////////////////////
// SYNC POINT with reading data values
//////////////////////////////////////
unsafe {
// TODO: can be implemented more efficiently when only elements are copied that
// have been changed
previous_state.active_index[i] =
(*self.active_index_ptr.as_ptr().add(i)).load(Ordering::Acquire) == 1
};
if previous_state.active_index[i] {
if current_index_count == previous_state.active_index[i] {
break;
}

previous_state.active_index[i] = current_index_count;

if previous_state.active_index[i] % 2 == 1 {
core::ptr::copy_nonoverlapping(
(*self.data_ptr.as_ptr().add(i)).get(),
previous_state.data.as_mut_ptr().add(i),
Expand All @@ -392,13 +396,13 @@ impl<T: Copy + Debug> Container<T> {
}

// MUST HAPPEN AFTER all other operations
if let Err(counter) = self.change_counter.compare_exchange(
current_change_counter,
current_change_counter,
if let Err(count) = (*self.active_index_ptr.as_ptr().add(i)).compare_exchange(
current_index_count,
current_index_count,
Ordering::AcqRel,
Ordering::Acquire,
) {
current_change_counter = counter;
current_index_count = count
} else {
break;
}
Expand Down

0 comments on commit 9876b0f

Please sign in to comment.