Skip to content

Commit

Permalink
[eclipse-iceoryx#129] Fix problem where port handle was added to early
Browse files Browse the repository at this point in the history
  • Loading branch information
elfenpiff committed Feb 21, 2024
1 parent 72287f9 commit 26139b4
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 109 deletions.
29 changes: 18 additions & 11 deletions iceoryx2/src/port/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use iceoryx2_cal::named_concept::NamedConceptBuilder;
use crate::service::naming_scheme::event_concept_name;
use crate::{port::port_identifiers::UniqueListenerId, service};
use std::rc::Rc;
use std::sync::atomic::Ordering;
use std::time::Duration;

use super::event_id::EventId;
Expand All @@ -64,18 +65,20 @@ impl std::error::Error for ListenerCreateError {}
/// Represents the receiving endpoint of an event based communication.
#[derive(Debug)]
pub struct Listener<Service: service::Service> {
dynamic_listener_handle: ContainerHandle,
dynamic_listener_handle: Option<ContainerHandle>,
listener: <Service::Event as iceoryx2_cal::event::Event<EventId>>::Listener,
cache: Vec<EventId>,
dynamic_storage: Rc<Service::DynamicStorage>,
}

impl<Service: service::Service> Drop for Listener<Service> {
fn drop(&mut self) {
self.dynamic_storage
.get()
.event()
.release_listener_handle(self.dynamic_listener_handle)
if let Some(handle) = self.dynamic_listener_handle {
self.dynamic_storage
.get()
.event()
.release_listener_handle(handle)
}
}
}

Expand All @@ -93,6 +96,15 @@ impl<Service: service::Service> Listener<Service> {
with ListenerCreateError::ResourceCreationFailed,
"{} since the underlying event concept \"{}\" could not be created.", msg, event_name);

let mut new_self = Self {
dynamic_storage,
dynamic_listener_handle: None,
listener,
cache: vec![],
};

std::sync::atomic::compiler_fence(Ordering::SeqCst);

// !MUST! be the last task otherwise a listener is added to the dynamic config without
// the creation of all required channels
let dynamic_listener_handle = match service
Expand All @@ -110,12 +122,7 @@ impl<Service: service::Service> Listener<Service> {
}
};

let new_self = Self {
dynamic_storage,
dynamic_listener_handle,
listener,
cache: vec![],
};
new_self.dynamic_listener_handle = Some(dynamic_listener_handle);

Ok(new_self)
}
Expand Down
46 changes: 29 additions & 17 deletions iceoryx2/src/port/notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use iceoryx2_bb_lock_free::mpmc::container::{ContainerHandle, ContainerState};
use iceoryx2_bb_log::{fail, warn};
use iceoryx2_cal::named_concept::NamedConceptBuilder;
use iceoryx2_cal::{dynamic_storage::DynamicStorage, event::NotifierBuilder};
use std::{cell::UnsafeCell, rc::Rc};
use std::{cell::UnsafeCell, rc::Rc, sync::atomic::Ordering};

/// Failures that can occur when a new [`Notifier`] is created with the
/// [`crate::service::port_factory::notifier::PortFactoryNotifier`].
Expand Down Expand Up @@ -141,15 +141,17 @@ pub struct Notifier<Service: service::Service> {
listener_list_state: UnsafeCell<ContainerState<UniqueListenerId>>,
default_event_id: EventId,
dynamic_storage: Rc<Service::DynamicStorage>,
dynamic_notifier_handle: ContainerHandle,
dynamic_notifier_handle: Option<ContainerHandle>,
}

impl<Service: service::Service> Drop for Notifier<Service> {
fn drop(&mut self) {
self.dynamic_storage
.get()
.event()
.release_notifier_handle(self.dynamic_notifier_handle)
if let Some(handle) = self.dynamic_notifier_handle {
self.dynamic_storage
.get()
.event()
.release_notifier_handle(handle)
}
}
}

Expand All @@ -165,27 +167,37 @@ impl<Service: service::Service> Notifier<Service> {
let listener_list = &service.state().dynamic_storage.get().event().listeners;
let dynamic_storage = Rc::clone(&service.state().dynamic_storage);

let dynamic_notifier_handle = match dynamic_storage.get().event().add_notifier_id(port_id) {
Some(handle) => handle,
None => {
fail!(from origin, with NotifierCreateError::ExceedsMaxSupportedNotifiers,
"{} since it would exceed the maximum supported amount of notifiers of {}.",
msg, service.state().static_config.event().max_notifiers);
}
};

let new_self = Self {
let mut new_self = Self {
listener_connections: ListenerConnections::new(listener_list.capacity()),
default_event_id,
listener_list_state: unsafe { UnsafeCell::new(listener_list.get_state()) },
dynamic_storage,
dynamic_notifier_handle,
dynamic_notifier_handle: None,
};

if let Err(e) = new_self.populate_listener_channels() {
warn!(from new_self, "The new Notifier port is unable to connect to every Listener port, caused by {:?}.", e);
}

std::sync::atomic::compiler_fence(Ordering::SeqCst);

// !MUST! be the last task otherwise a notifier is added to the dynamic config without
// the creation of all required channels
let dynamic_notifier_handle = match new_self
.dynamic_storage
.get()
.event()
.add_notifier_id(port_id)
{
Some(handle) => handle,
None => {
fail!(from origin, with NotifierCreateError::ExceedsMaxSupportedNotifiers,
"{} since it would exceed the maximum supported amount of notifiers of {}.",
msg, service.state().static_config.event().max_notifiers);
}
};
new_self.dynamic_notifier_handle = Some(dynamic_notifier_handle);

Ok(new_self)
}

Expand Down
94 changes: 51 additions & 43 deletions iceoryx2/src/port/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,17 +382,19 @@ impl<Service: service::Service> DataSegment<Service> {
#[derive(Debug)]
pub struct Publisher<Service: service::Service, MessageType: Debug> {
pub(crate) data_segment: Rc<DataSegment<Service>>,
dynamic_publisher_handle: ContainerHandle,
dynamic_publisher_handle: Option<ContainerHandle>,
_phantom_message_type: PhantomData<MessageType>,
}

impl<Service: service::Service, MessageType: Debug> Drop for Publisher<Service, MessageType> {
fn drop(&mut self) {
self.data_segment
.dynamic_storage
.get()
.publish_subscribe()
.release_publisher_handle(self.dynamic_publisher_handle)
if let Some(handle) = self.dynamic_publisher_handle {
self.data_segment
.dynamic_storage
.get()
.publish_subscribe()
.release_publisher_handle(handle)
}
}
}

Expand Down Expand Up @@ -423,6 +425,48 @@ impl<Service: service::Service, MessageType: Debug> Publisher<Service, MessageTy
with PublisherCreateError::UnableToCreateDataSegment,
"{} since the data segment could not be acquired.", msg);

let data_segment = Rc::new(DataSegment {
is_active: AtomicBool::new(true),
memory: data_segment,
message_size: std::mem::size_of::<Message<Header, MessageType>>(),
message_type_layout: Layout::new::<MessageType>(),
sample_reference_counter: {
let mut v = Vec::with_capacity(number_of_samples);
for _ in 0..number_of_samples {
v.push(AtomicU64::new(0));
}
v
},
dynamic_storage,
port_id,
subscriber_connections: SubscriberConnections::new(
subscriber_list.capacity(),
&service.state().global_config,
port_id,
static_config,
),
config,
subscriber_list_state: unsafe { UnsafeCell::new(subscriber_list.get_state()) },
history: match static_config.history_size == 0 {
true => None,
false => Some(UnsafeCell::new(Queue::new(static_config.history_size))),
},
static_config: service.state().static_config.clone(),
loan_counter: AtomicUsize::new(0),
});

let mut new_self = Self {
data_segment,
dynamic_publisher_handle: None,
_phantom_message_type: PhantomData,
};

if let Err(e) = new_self.data_segment.populate_subscriber_channels() {
warn!(from new_self, "The new Publisher port is unable to connect to every Subscriber port, caused by {:?}.", e);
}

std::sync::atomic::compiler_fence(Ordering::SeqCst);

// !MUST! be the last task otherwise a publisher is added to the dynamic config without the
// creation of all required resources
let dynamic_publisher_handle = match service
Expand All @@ -440,43 +484,7 @@ impl<Service: service::Service, MessageType: Debug> Publisher<Service, MessageTy
}
};

let new_self = Self {
data_segment: Rc::new(DataSegment {
is_active: AtomicBool::new(true),
memory: data_segment,
message_size: std::mem::size_of::<Message<Header, MessageType>>(),
message_type_layout: Layout::new::<MessageType>(),
sample_reference_counter: {
let mut v = Vec::with_capacity(number_of_samples);
for _ in 0..number_of_samples {
v.push(AtomicU64::new(0));
}
v
},
dynamic_storage,
port_id,
subscriber_connections: SubscriberConnections::new(
subscriber_list.capacity(),
&service.state().global_config,
port_id,
static_config,
),
config,
subscriber_list_state: unsafe { UnsafeCell::new(subscriber_list.get_state()) },
history: match static_config.history_size == 0 {
true => None,
false => Some(UnsafeCell::new(Queue::new(static_config.history_size))),
},
static_config: service.state().static_config.clone(),
loan_counter: AtomicUsize::new(0),
}),
dynamic_publisher_handle,
_phantom_message_type: PhantomData,
};

if let Err(e) = new_self.data_segment.populate_subscriber_channels() {
warn!(from new_self, "The new Publisher port is unable to connect to every Subscriber port, caused by {:?}.", e);
}
new_self.dynamic_publisher_handle = Some(dynamic_publisher_handle);

Ok(new_self)
}
Expand Down
56 changes: 33 additions & 23 deletions iceoryx2/src/port/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use std::cell::UnsafeCell;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::rc::Rc;
use std::sync::atomic::Ordering;

use iceoryx2_bb_lock_free::mpmc::container::{ContainerHandle, ContainerState};
use iceoryx2_bb_log::{fail, warn};
Expand Down Expand Up @@ -94,7 +95,7 @@ pub(crate) mod internal {
/// The receiving endpoint of a publish-subscribe communication.
#[derive(Debug)]
pub struct Subscriber<Service: service::Service, MessageType: Debug> {
dynamic_subscriber_handle: ContainerHandle,
dynamic_subscriber_handle: Option<ContainerHandle>,
publisher_connections: Rc<PublisherConnections<Service>>,
dynamic_storage: Rc<Service::DynamicStorage>,
static_config: crate::service::static_config::StaticConfig,
Expand All @@ -106,10 +107,12 @@ pub struct Subscriber<Service: service::Service, MessageType: Debug> {

impl<Service: service::Service, MessageType: Debug> Drop for Subscriber<Service, MessageType> {
fn drop(&mut self) {
self.dynamic_storage
.get()
.publish_subscribe()
.release_subscriber_handle(self.dynamic_subscriber_handle)
if let Some(handle) = self.dynamic_subscriber_handle {
self.dynamic_storage
.get()
.publish_subscribe()
.release_subscriber_handle(handle)
}
}
}

Expand All @@ -131,6 +134,30 @@ impl<Service: service::Service, MessageType: Debug> Subscriber<Service, MessageT
.publishers;

let dynamic_storage = Rc::clone(&service.state().dynamic_storage);

let publisher_connections = Rc::new(PublisherConnections::new(
publisher_list.capacity(),
port_id,
&service.state().global_config,
static_config,
));

let mut new_self = Self {
config,
publisher_connections,
dynamic_storage,
publisher_list_state: UnsafeCell::new(unsafe { publisher_list.get_state() }),
dynamic_subscriber_handle: None,
static_config: service.state().static_config.clone(),
_phantom_message_type: PhantomData,
};

if let Err(e) = new_self.populate_publisher_channels() {
warn!(from new_self, "The new subscriber is unable to connect to every publisher, caused by {:?}.", e);
}

std::sync::atomic::compiler_fence(Ordering::SeqCst);

// !MUST! be the last task otherwise a subscriber is added to the dynamic config without
// the creation of all required channels
let dynamic_subscriber_handle = match service
Expand All @@ -148,24 +175,7 @@ impl<Service: service::Service, MessageType: Debug> Subscriber<Service, MessageT
}
};

let new_self = Self {
config,
publisher_connections: Rc::new(PublisherConnections::new(
publisher_list.capacity(),
port_id,
&service.state().global_config,
static_config,
)),
dynamic_storage,
publisher_list_state: UnsafeCell::new(unsafe { publisher_list.get_state() }),
dynamic_subscriber_handle,
static_config: service.state().static_config.clone(),
_phantom_message_type: PhantomData,
};

if let Err(e) = new_self.populate_publisher_channels() {
warn!(from new_self, "The new subscriber is unable to connect to every publisher, caused by {:?}.", e);
}
new_self.dynamic_subscriber_handle = Some(dynamic_subscriber_handle);

Ok(new_self)
}
Expand Down
Loading

0 comments on commit 26139b4

Please sign in to comment.