Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#133] write drop tests #144

Merged
merged 8 commits into from
Mar 11, 2024
4 changes: 4 additions & 0 deletions doc/release-notes/iceoryx2-unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
* Introduce semantic string `iceoryx2-bb-system-types::base64url`
* Introduce `iceoryx2-cal::hash::HashValue` that contains the result of a hash
* Port `UsedChunkList` from iceoryx1 [#129](https://github.com/eclipse-iceoryx/iceoryx2/issues/129)
* From [#133](https://github.com/eclipse-iceoryx/iceoryx2/issues/133)
* Add `Notifier|Listener|Publisher|Subscriber::id()` method to acquire unique port id
* Add `Sample::origin()` to determine the `UniquePublisherId` of the sender
* Performance improvements, especially for AMD CPUs [#136](https://github.com/eclipse-iceoryx/iceoryx2/issues/136)

### Bugfixes
Expand All @@ -33,6 +36,7 @@
* Fix broken `Publisher|Subscriber::populate_{subscriber|publisher}_channels()` [#129](https://github.com/eclipse-iceoryx/iceoryx2/issues/129)
* Fix failing reacquire of delivered samples in the zero copy receive channel [#130](https://github.com/eclipse-iceoryx/iceoryx2/issues/130)
* Fix receiving of invalid samples when subscriber is connected [#131](https://github.com/eclipse-iceoryx/iceoryx2/issues/131)
* Fix problem where sample is released to the wrong publisher [#133](https://github.com/eclipse-iceoryx/iceoryx2/issues/133)
* Fixes for FreeBSD 14.0 [#140](https://github.com/eclipse-iceoryx/iceoryx2/issues/140)
* Fix segfault in `iceoryx2-pal-posix;:shm_list()` caused by `sysctl`
* Adjust test to handle unordered event notifications
Expand Down
2 changes: 2 additions & 0 deletions iceoryx2-cal/src/shm_allocator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ pub enum ShmAllocatorInitError {
pub trait ShmAllocator: Send + Sync + 'static {
type Configuration: ShmAllocatorConfig;

/// Returns the required memory size of the additional dynamic part of the allocator that is
/// allocated in [`ShmAllocator::init()`].
fn management_size(memory_size: usize, config: &Self::Configuration) -> usize;

/// Creates a new uninitialized shared memory allocator.
Expand Down
140 changes: 140 additions & 0 deletions iceoryx2-cal/tests/shm_allocator_pool_allocator_tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright (c) 2024 Contributors to the Eclipse Foundation
//
// See the NOTICE file(s) distributed with this work for additional
// information regarding copyright ownership.
//
// This program and the accompanying materials are made available under the
// terms of the Apache Software License 2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license
// which is available at https://opensource.org/licenses/MIT.
//
// SPDX-License-Identifier: Apache-2.0 OR MIT

mod shm_allocator_pool_allocator {
use std::{alloc::Layout, collections::HashSet, ptr::NonNull};

use iceoryx2_bb_elementary::allocator::AllocationError;
use iceoryx2_bb_memory::bump_allocator::BumpAllocator;
use iceoryx2_bb_testing::assert_that;
use iceoryx2_cal::{
shm_allocator::{pool_allocator::*, ShmAllocationError, ShmAllocator},
zero_copy_connection::PointerOffset,
};

const MAX_SUPPORTED_ALIGNMENT: usize = 32;
const BUCKET_CONFIG: Layout = unsafe { Layout::from_size_align_unchecked(32, 4) };
const MEM_SIZE: usize = 8192;
const PAYLOAD_SIZE: usize = 1024;

struct TestFixture {
_payload_memory: Box<[u8; MEM_SIZE]>,
_base_address: NonNull<[u8]>,
sut: Box<PoolAllocator>,
}

impl TestFixture {
fn new(bucket_layout: Layout) -> Self {
let mut payload_memory = Box::new([0u8; MEM_SIZE]);
let base_address =
unsafe { NonNull::<[u8]>::new_unchecked(&mut payload_memory[0..PAYLOAD_SIZE]) };
let allocator = BumpAllocator::new(
unsafe { NonNull::new_unchecked(payload_memory[PAYLOAD_SIZE..].as_mut_ptr()) },
MEM_SIZE,
);
let config = &Config { bucket_layout };
let sut = Box::new(unsafe {
PoolAllocator::new_uninit(MAX_SUPPORTED_ALIGNMENT, base_address, config)
});

unsafe { sut.init(&allocator).unwrap() };

Self {
_payload_memory: payload_memory,
_base_address: base_address,
sut,
}
}
}

#[test]
fn is_setup_correctly() {
let test = TestFixture::new(Layout::from_size_align(2, 1).unwrap());

assert_that!(test.sut.number_of_buckets() as usize, eq PAYLOAD_SIZE / 2);
assert_that!(test.sut.relative_start_address() as usize, eq 0);

let test = TestFixture::new(BUCKET_CONFIG);

assert_that!(test.sut.bucket_size(), eq BUCKET_CONFIG.size());
assert_that!(test.sut.max_alignment(), eq BUCKET_CONFIG.align());
}

#[test]
fn allocate_and_release_all_buckets_works() {
const REPETITIONS: usize = 10;
let test = TestFixture::new(BUCKET_CONFIG);

for _ in 0..REPETITIONS {
let mut mem_set = HashSet::new();
for _ in 0..test.sut.number_of_buckets() {
let memory = unsafe { test.sut.allocate(BUCKET_CONFIG).unwrap() };
// the returned offset must be a multiple of the bucket size
assert_that!((memory.value() - test.sut.relative_start_address()) % BUCKET_CONFIG.size(), eq 0);
assert_that!(mem_set.insert(memory.value()), eq true);
elfenpiff marked this conversation as resolved.
Show resolved Hide resolved
}

assert_that!(unsafe { test.sut.allocate(BUCKET_CONFIG) }, eq Err(ShmAllocationError::AllocationError(AllocationError::OutOfMemory)));

for memory in mem_set {
unsafe {
test.sut
.deallocate(PointerOffset::new(memory), BUCKET_CONFIG)
}
}
}
}

#[test]
fn allocate_twice_release_once_until_memory_is_exhausted_works() {
const REPETITIONS: usize = 10;
let test = TestFixture::new(BUCKET_CONFIG);

for _ in 0..REPETITIONS {
let mut mem_set = HashSet::new();
for _ in 0..(test.sut.number_of_buckets() - 1) {
let memory_1 = unsafe { test.sut.allocate(BUCKET_CONFIG).unwrap() };
// the returned offset must be a multiple of the bucket size
assert_that!((memory_1.value() - test.sut.relative_start_address()) % BUCKET_CONFIG.size(), eq 0);

let memory_2 = unsafe { test.sut.allocate(BUCKET_CONFIG).unwrap() };
// the returned offset must be a multiple of the bucket size
assert_that!((memory_2.value() - test.sut.relative_start_address()) % BUCKET_CONFIG.size(), eq 0);
assert_that!(mem_set.insert(memory_2.value()), eq true);

unsafe {
test.sut.deallocate(memory_1, BUCKET_CONFIG);
}
}

let memory = unsafe { test.sut.allocate(BUCKET_CONFIG).unwrap() };
// the returned offset must be a multiple of the bucket size
assert_that!((memory.value() - test.sut.relative_start_address()) % BUCKET_CONFIG.size(), eq 0);
assert_that!(mem_set.insert(memory.value()), eq true);

assert_that!(unsafe { test.sut.allocate(BUCKET_CONFIG) }, eq Err(ShmAllocationError::AllocationError(AllocationError::OutOfMemory)));

for memory in mem_set {
unsafe {
test.sut
.deallocate(PointerOffset::new(memory), BUCKET_CONFIG)
}
}
}
}

#[test]
fn allocate_with_unsupported_alignment_fails() {
let test = TestFixture::new(Layout::from_size_align(BUCKET_CONFIG.size(), 1).unwrap());
assert_that!(unsafe { test.sut.allocate(BUCKET_CONFIG) }, eq Err(ShmAllocationError::ExceedsMaxSupportedAlignment));
}
}
7 changes: 7 additions & 0 deletions iceoryx2/src/port/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ pub struct Listener<Service: service::Service> {
listener: <Service::Event as iceoryx2_cal::event::Event<EventId>>::Listener,
cache: Vec<EventId>,
dynamic_storage: Rc<Service::DynamicStorage>,
port_id: UniqueListenerId,
}

impl<Service: service::Service> Drop for Listener<Service> {
Expand Down Expand Up @@ -101,6 +102,7 @@ impl<Service: service::Service> Listener<Service> {
dynamic_listener_handle: None,
listener,
cache: vec![],
port_id,
};

std::sync::atomic::compiler_fence(Ordering::SeqCst);
Expand Down Expand Up @@ -194,4 +196,9 @@ impl<Service: service::Service> Listener<Service> {

Ok(self.cache())
}

/// Returns the [`UniqueListenerId`] of the [`Listener`]
pub fn id(&self) -> UniqueListenerId {
self.port_id
}
}
7 changes: 7 additions & 0 deletions iceoryx2/src/port/notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ pub struct Notifier<Service: service::Service> {
default_event_id: EventId,
dynamic_storage: Rc<Service::DynamicStorage>,
dynamic_notifier_handle: Option<ContainerHandle>,
port_id: UniqueNotifierId,
}

impl<Service: service::Service> Drop for Notifier<Service> {
Expand Down Expand Up @@ -173,6 +174,7 @@ impl<Service: service::Service> Notifier<Service> {
listener_list_state: unsafe { UnsafeCell::new(listener_list.get_state()) },
dynamic_storage,
dynamic_notifier_handle: None,
port_id,
};

if let Err(e) = new_self.populate_listener_channels() {
Expand Down Expand Up @@ -243,6 +245,11 @@ impl<Service: service::Service> Notifier<Service> {
Ok(())
}

/// Returns the [`UniqueNotifierId`] of the [`Notifier`]
pub fn id(&self) -> UniqueNotifierId {
self.port_id
}

/// Notifies all [`crate::port::listener::Listener`] connected to the service with the default
/// event id provided on creation.
/// On success the number of
Expand Down
5 changes: 5 additions & 0 deletions iceoryx2/src/port/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,11 @@ impl<Service: service::Service, MessageType: Debug> Publisher<Service, MessageTy
"Unable to create the data segment."))
}

/// Returns the [`UniquePublisherId`] of the [`Publisher`]
pub fn id(&self) -> UniquePublisherId {
self.data_segment.port_id
}

/// Copies the input `value` into a [`crate::sample_mut::SampleMut`] and delivers it.
/// On success it returns the number of [`crate::port::subscriber::Subscriber`]s that received
/// the data, otherwise a [`PublisherSendError`] describing the failure.
Expand Down
11 changes: 9 additions & 2 deletions iceoryx2/src/port/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,9 @@ impl<Service: service::Service, MessageType: Debug> Subscriber<Service, MessageT
match connection.receiver.receive() {
Ok(data) => match data {
None => Ok(None),
Some(relative_addr) => {
Some(offset) => {
let absolute_address =
relative_addr.value() + connection.data_segment.payload_start_address();
offset.value() + connection.data_segment.payload_start_address();
Ok(Some(Sample {
publisher_connections: Rc::clone(&self.publisher_connections),
channel_id,
Expand All @@ -259,6 +259,8 @@ impl<Service: service::Service, MessageType: Debug> Subscriber<Service, MessageT
absolute_address as *mut Message<Header, MessageType>,
)
},
offset,
origin: connection.publisher_id,
}))
}
},
Expand All @@ -270,6 +272,11 @@ impl<Service: service::Service, MessageType: Debug> Subscriber<Service, MessageT
}
}

/// Returns the [`UniqueSubscriberId`] of the [`Subscriber`]
pub fn id(&self) -> UniqueSubscriberId {
self.publisher_connections.subscriber_id()
}

/// Receives a [`crate::sample::Sample`] from [`crate::port::publisher::Publisher`]. If no sample could be
/// received [`None`] is returned. If a failure occurs [`SubscriberReceiveError`] is returned.
pub fn receive(&self) -> Result<Option<Sample<MessageType, Service>>, SubscriberReceiveError> {
Expand Down
21 changes: 14 additions & 7 deletions iceoryx2/src/sample.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ use std::rc::Rc;
use std::{fmt::Debug, ops::Deref};

use iceoryx2_bb_log::{fatal_panic, warn};
use iceoryx2_cal::shared_memory::SharedMemory;
use iceoryx2_cal::zero_copy_connection::{PointerOffset, ZeroCopyReceiver, ZeroCopyReleaseError};

use crate::port::details::publisher_connections::PublisherConnections;
use crate::port::port_identifiers::UniquePublisherId;
use crate::raw_sample::RawSample;
use crate::service::header::publish_subscribe::Header;

Expand All @@ -51,6 +51,8 @@ pub struct Sample<MessageType: Debug, Service: crate::service::Service> {
pub(crate) publisher_connections: Rc<PublisherConnections<Service>>,
pub(crate) ptr: RawSample<Header, MessageType>,
pub(crate) channel_id: usize,
pub(crate) offset: PointerOffset,
pub(crate) origin: UniquePublisherId,
}

impl<MessageType: Debug, Service: crate::service::Service> Deref for Sample<MessageType, Service> {
Expand All @@ -64,12 +66,12 @@ impl<MessageType: Debug, Service: crate::service::Service> Drop for Sample<Messa
fn drop(&mut self) {
match self.publisher_connections.get(self.channel_id) {
Some(c) => {
let distance = self.ptr.as_ptr() as usize - c.data_segment.payload_start_address();
match c.receiver.release(PointerOffset::new(distance)) {
Ok(()) => (),
Err(ZeroCopyReleaseError::RetrieveBufferFull) => {
fatal_panic!(from self, when c.receiver.release(PointerOffset::new(distance)),
"This should never happen! The publishers retrieve channel is full and the sample cannot be returned.");
if c.publisher_id == self.origin {
match c.receiver.release(self.offset) {
Ok(()) => (),
Err(ZeroCopyReleaseError::RetrieveBufferFull) => {
fatal_panic!(from self, "This should never happen! The publishers retrieve channel is full and the sample cannot be returned.");
}
}
}
}
Expand All @@ -90,4 +92,9 @@ impl<MessageType: Debug, Service: crate::service::Service> Sample<MessageType, S
pub fn header(&self) -> &Header {
self.ptr.as_header_ref()
}

/// Returns the [`UniquePublisherId`] of the sender
pub fn origin(&self) -> UniquePublisherId {
self.origin
}
}
23 changes: 17 additions & 6 deletions iceoryx2/src/service/builder/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::service::{self, dynamic_config::event::DynamicConfigSettings};
use iceoryx2_bb_elementary::enum_gen;
use iceoryx2_bb_log::{fail, fatal_panic};
use iceoryx2_bb_posix::adaptive_wait::AdaptiveWaitBuilder;
use iceoryx2_cal::dynamic_storage::DynamicStorageCreateError;

use super::ServiceState;

Expand Down Expand Up @@ -55,6 +56,7 @@ pub enum EventCreateError {
AlreadyExists,
PermissionDenied,
UnableToCreateStaticServiceInformation,
OldConnectionsStillActive,
}

impl std::fmt::Display for EventCreateError {
Expand Down Expand Up @@ -259,25 +261,34 @@ impl<ServiceType: service::Service> Builder<ServiceType> {
number_of_notifiers: event_config.max_notifiers,
};

let dynamic_config = self.base.create_dynamic_config_storage(
let dynamic_config = match self.base.create_dynamic_config_storage(
dynamic_config::MessagingPattern::Event(
dynamic_config::event::DynamicConfig::new(&dynamic_config_setting),
),
dynamic_config::event::DynamicConfig::memory_size(&dynamic_config_setting),
);
let dynamic_config = Rc::new(fail!(from self, when dynamic_config,
with EventCreateError::InternalFailure,
"{} since the dynamic service segment could not be created.", msg));
) {
Ok(c) => Rc::new(c),
Err(DynamicStorageCreateError::AlreadyExists) => {
fail!(from self, with EventCreateError::OldConnectionsStillActive,
"{} since there are still active Listeners or Notifiers.", msg);
}
Err(e) => {
fail!(from self, with EventCreateError::InternalFailure,
"{} since the dynamic service segment could not be created ({:?}).", msg, e);
}
};

let service_config = fail!(from self, when ServiceType::ConfigSerializer::serialize(&self.base.service_config),
with EventCreateError::Corrupted,
"{} since the configuration could not be serialized.", msg);

// only unlock the static details when the service is successfully created
let unlocked_static_details = fail!(from self, when static_config.unlock(service_config.as_slice()),
let mut unlocked_static_details = fail!(from self, when static_config.unlock(service_config.as_slice()),
with EventCreateError::Corrupted,
"{} since the configuration could not be written to the static storage.", msg);

unlocked_static_details.release_ownership();

Ok(event::PortFactory::new(ServiceType::from_state(
service::ServiceState::new(
self.base.service_config.clone(),
Expand Down
2 changes: 1 addition & 1 deletion iceoryx2/src/service/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ impl<ServiceType: service::Service> BuilderWithServiceType<ServiceType> {
.config(&static_config_storage_config::<ServiceType>(
self.global_config.as_ref(),
))
.has_ownership(false)
.has_ownership(true)
elfenpiff marked this conversation as resolved.
Show resolved Hide resolved
.create_locked(),
"Failed to create static service information since the underlying static storage could not be created."),
)
Expand Down
Loading
Loading