Skip to content

Commit

Permalink
[eclipse-iceoryx#129] Integrate used chunk list into publisher
Browse files Browse the repository at this point in the history
  • Loading branch information
elfenpiff committed Feb 22, 2024
1 parent 5440a93 commit 449f2bb
Show file tree
Hide file tree
Showing 8 changed files with 174 additions and 86 deletions.
7 changes: 5 additions & 2 deletions iceoryx2/src/port/details/publisher_connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ impl<Service: service::Service> Connection<Service> {
fn new(
this: &PublisherConnections<Service>,
publisher_id: UniquePublisherId,
number_of_samples: usize,
) -> Result<Self, ConnectionFailure> {
let msg = format!(
"Unable to establish connection to publisher {:?} from subscriber {:?}.",
Expand All @@ -56,7 +57,8 @@ impl<Service: service::Service> Connection<Service> {
.buffer_size(this.static_config.subscriber_max_buffer_size)
.receiver_max_borrowed_samples(this.static_config.subscriber_max_borrowed_samples)
.enable_safe_overflow(this.static_config.enable_safe_overflow)
.create_receiver(),
.number_of_samples(number_of_samples)
.create_receiver(this.static_config.type_size),
"{} since the zero copy connection could not be established.", msg);

let data_segment = fail!(from this,
Expand Down Expand Up @@ -116,9 +118,10 @@ impl<Service: service::Service> PublisherConnections<Service> {
&self,
index: usize,
publisher_id: UniquePublisherId,
number_of_samples: usize,
) -> Result<(), ConnectionFailure> {
if self.get(index).is_none() {
*self.get_mut(index) = Some(Connection::new(self, publisher_id)?);
*self.get_mut(index) = Some(Connection::new(self, publisher_id, number_of_samples)?);
}

Ok(())
Expand Down
19 changes: 16 additions & 3 deletions iceoryx2/src/port/details/subscriber_connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,30 @@ use crate::{
#[derive(Debug)]
pub(crate) struct Connection<Service: service::Service> {
pub(crate) sender: <Service::Connection as ZeroCopyConnection>::Sender,
pub(crate) subscriber_id: UniqueSubscriberId,
}

impl<Service: service::Service> Connection<Service> {
fn new(
this: &SubscriberConnections<Service>,
subscriber_id: UniqueSubscriberId,
number_of_samples: usize,
) -> Result<Self, ZeroCopyCreationError> {
let sender = fail!(from this, when <Service::Connection as ZeroCopyConnection>::
Builder::new( &connection_name(this.port_id, subscriber_id))
.config(&connection_config::<Service>(this.config.as_ref()))
.buffer_size(this.static_config.subscriber_max_buffer_size)
.receiver_max_borrowed_samples(this.static_config.subscriber_max_borrowed_samples)
.enable_safe_overflow(this.static_config.enable_safe_overflow)
.create_sender(),
.number_of_samples(number_of_samples)
.create_sender(this.static_config.type_size),
"Unable to establish connection to subscriber {:?} from publisher {:?}.",
subscriber_id, this.port_id);

Ok(Self { sender })
Ok(Self {
sender,
subscriber_id,
})
}
}

Expand All @@ -57,6 +63,7 @@ pub(crate) struct SubscriberConnections<Service: service::Service> {
port_id: UniquePublisherId,
config: Rc<config::Config>,
static_config: StaticConfig,
number_of_samples: usize,
}

impl<Service: service::Service> SubscriberConnections<Service> {
Expand All @@ -65,12 +72,14 @@ impl<Service: service::Service> SubscriberConnections<Service> {
config: &Rc<config::Config>,
port_id: UniquePublisherId,
static_config: &StaticConfig,
number_of_samples: usize,
) -> Self {
Self {
connections: (0..capacity).map(|_| UnsafeCell::new(None)).collect(),
config: Rc::clone(config),
port_id,
static_config: static_config.clone(),
number_of_samples,
}
}

Expand All @@ -97,7 +106,11 @@ impl<Service: service::Service> SubscriberConnections<Service> {
subscriber_id: UniqueSubscriberId,
) -> Result<bool, ZeroCopyCreationError> {
if self.get(index).is_none() {
*self.get_mut(index) = Some(Connection::new(self, subscriber_id)?);
*self.get_mut(index) = Some(Connection::new(
self,
subscriber_id,
self.number_of_samples,
)?);
Ok(true)
} else {
Ok(false)
Expand Down
62 changes: 50 additions & 12 deletions iceoryx2/src/port/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ use crate::port::DegrationAction;
use crate::raw_sample::RawSampleMut;
use crate::service;
use crate::service::config_scheme::data_segment_config;
use crate::service::dynamic_config::publish_subscribe::PublisherDetails;
use crate::service::header::publish_subscribe::Header;
use crate::service::naming_scheme::data_segment_name;
use crate::service::port_factory::publisher::{LocalPublisherConfig, UnableToDeliverStrategy};
Expand All @@ -78,7 +79,7 @@ use iceoryx2_bb_container::queue::Queue;
use iceoryx2_bb_elementary::allocator::AllocationError;
use iceoryx2_bb_elementary::enum_gen;
use iceoryx2_bb_lock_free::mpmc::container::{ContainerHandle, ContainerState};
use iceoryx2_bb_log::{fail, fatal_panic, warn};
use iceoryx2_bb_log::{error, fail, fatal_panic, warn};
use iceoryx2_cal::dynamic_storage::DynamicStorage;
use iceoryx2_cal::named_concept::NamedConceptBuilder;
use iceoryx2_cal::shared_memory::{
Expand Down Expand Up @@ -127,7 +128,8 @@ enum_gen! {
/// Failure that can be emitted when a [`crate::sample::Sample`] is sent via [`Publisher::send()`].
PublisherSendError
entry:
ConnectionBrokenSincePublisherNoLongerExists
ConnectionBrokenSincePublisherNoLongerExists,
ConnectionCorrupted
mapping:
PublisherLoanError to LoanError,
ConnectionFailure to ConnectionError
Expand Down Expand Up @@ -238,7 +240,7 @@ impl<Service: service::Service> DataSegment<Service> {
}
}

fn deliver_sample(&self, address_to_chunk: usize) -> usize {
fn deliver_sample(&self, address_to_chunk: usize) -> Result<usize, PublisherSendError> {
self.retrieve_returned_samples();

let deliver_call = match self.config.unable_to_deliver_strategy {
Expand All @@ -262,6 +264,32 @@ impl<Service: service::Service> DataSegment<Service> {
* try_send => we tried and expect that the buffer is full
* */
}
Err(ZeroCopySendError::ConnectionCorrupted) => {
match &self.config.degration_callback {
Some(c) => match c.call(
self.static_config.clone(),
self.port_id,
connection.subscriber_id,
) {
DegrationAction::Ignore => (),
DegrationAction::Warn => {
error!(from self,
"While delivering the sample: {:?} a corrupted connection was detected with subscriber {:?}.",
address_to_chunk, connection.subscriber_id);
}
DegrationAction::Fail => {
fail!(from self, with PublisherSendError::ConnectionCorrupted,
"While delivering the sample: {:?} a corrupted connection was detected with subscriber {:?}.",
address_to_chunk, connection.subscriber_id);
}
},
None => {
error!(from self,
"While delivering the sample: {:?} a corrupted connection was detected with subscriber {:?}.",
address_to_chunk, connection.subscriber_id);
}
}
}
Ok(overflow) => {
self.borrow_sample(address_to_chunk);
number_of_recipients += 1;
Expand All @@ -275,7 +303,7 @@ impl<Service: service::Service> DataSegment<Service> {
None => (),
}
}
number_of_recipients
Ok(number_of_recipients)
}

fn populate_subscriber_channels(&self) -> Result<(), ZeroCopyCreationError> {
Expand Down Expand Up @@ -375,7 +403,7 @@ impl<Service: service::Service> DataSegment<Service> {
"{} since the connections could not be updated.", msg);

self.add_sample_to_history(address_to_chunk);
Ok(self.deliver_sample(address_to_chunk))
self.deliver_sample(address_to_chunk)
}
}

Expand Down Expand Up @@ -422,7 +450,7 @@ impl<Service: service::Service, MessageType: Debug> Publisher<Service, MessageTy
.messaging_pattern
.required_amount_of_samples_per_data_segment(config.max_loaned_samples);

let data_segment = fail!(from origin, when Self::create_data_segment(port_id, service.state().global_config.as_ref(), number_of_samples),
let data_segment = fail!(from origin, when Self::create_data_segment(port_id, service.state().global_config.as_ref(), number_of_samples, static_config),
with PublisherCreateError::UnableToCreateDataSegment,
"{} since the data segment could not be acquired.", msg);

Expand All @@ -445,6 +473,7 @@ impl<Service: service::Service, MessageType: Debug> Publisher<Service, MessageTy
&service.state().global_config,
port_id,
static_config,
number_of_samples,
),
config,
subscriber_list_state: unsafe { UnsafeCell::new(subscriber_list.get_state()) },
Expand Down Expand Up @@ -475,8 +504,10 @@ impl<Service: service::Service, MessageType: Debug> Publisher<Service, MessageTy
.dynamic_storage
.get()
.publish_subscribe()
.add_publisher_id(port_id)
{
.add_publisher_id(PublisherDetails {
publisher_id: port_id,
number_of_samples,
}) {
Some(unique_index) => unique_index,
None => {
fail!(from origin, with PublisherCreateError::ExceedsMaxSupportedPublishers,
Expand All @@ -494,19 +525,26 @@ impl<Service: service::Service, MessageType: Debug> Publisher<Service, MessageTy
port_id: UniquePublisherId,
global_config: &config::Config,
number_of_samples: usize,
static_config: &publish_subscribe::StaticConfig,
) -> Result<Service::SharedMemory, SharedMemoryCreateError> {
let allocator_config = shm_allocator::pool_allocator::Config {
bucket_layout: Layout::new::<Message<Header, MessageType>>(),
bucket_layout:
// # SAFETY: type_size and type_alignment are acquired via
// core::mem::{size_of|align_of}
unsafe {
Layout::from_size_align_unchecked(
static_config.type_size,
static_config.type_alignment,
)
},
};
let chunk_size = allocator_config.bucket_layout.size();
let chunk_align = allocator_config.bucket_layout.align();

Ok(fail!(from "Publisher::create_data_segment()",
when <<Service::SharedMemory as SharedMemory<PoolAllocator>>::Builder as NamedConceptBuilder<
Service::SharedMemory,
>>::new(&data_segment_name(port_id))
.config(&data_segment_config::<Service>(global_config))
.size(chunk_size * number_of_samples + chunk_align - 1)
.size(static_config.type_size * number_of_samples + static_config.type_alignment - 1)
.create(&allocator_config),
"Unable to create the data segment."))
}
Expand Down
25 changes: 16 additions & 9 deletions iceoryx2/src/port/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use iceoryx2_cal::dynamic_storage::DynamicStorage;
use iceoryx2_cal::{shared_memory::*, zero_copy_connection::*};

use crate::port::DegrationAction;
use crate::service::dynamic_config::publish_subscribe::PublisherDetails;
use crate::service::port_factory::subscriber::SubscriberConfig;
use crate::service::static_config::publish_subscribe::StaticConfig;
use crate::{
Expand All @@ -51,7 +52,7 @@ use crate::{
};

use super::details::publisher_connections::{Connection, PublisherConnections};
use super::port_identifiers::{UniquePublisherId, UniqueSubscriberId};
use super::port_identifiers::UniqueSubscriberId;
use super::update_connections::ConnectionFailure;

/// Defines the failure that can occur when receiving data with [`Subscriber::receive()`].
Expand Down Expand Up @@ -101,7 +102,7 @@ pub struct Subscriber<Service: service::Service, MessageType: Debug> {
static_config: crate::service::static_config::StaticConfig,
config: SubscriberConfig,

publisher_list_state: UnsafeCell<ContainerState<UniquePublisherId>>,
publisher_list_state: UnsafeCell<ContainerState<PublisherDetails>>,
_phantom_message_type: PhantomData<MessageType>,
}

Expand Down Expand Up @@ -185,32 +186,38 @@ impl<Service: service::Service, MessageType: Debug> Subscriber<Service, MessageT
visited_indices.resize(self.publisher_connections.capacity(), None);

unsafe {
(*self.publisher_list_state.get()).for_each(|index, publisher_id| {
visited_indices[index as usize] = Some(*publisher_id);
(*self.publisher_list_state.get()).for_each(|index, details| {
visited_indices[index as usize] = Some(*details);
})
};

// update all connections
for (i, index) in visited_indices.iter().enumerate() {
match index {
Some(publisher_id) => match self.publisher_connections.create(i, *publisher_id) {
Some(details) => match self.publisher_connections.create(
i,
details.publisher_id,
details.number_of_samples,
) {
Ok(()) => (),
Err(e) => match &self.config.degration_callback {
None => {
warn!(from self, "Unable to establish connection to new publisher {:?}.", publisher_id)
warn!(from self, "Unable to establish connection to new publisher {:?}.", details.publisher_id)
}
Some(c) => {
match c.call(
self.static_config.clone(),
*publisher_id,
details.publisher_id,
self.publisher_connections.subscriber_id(),
) {
DegrationAction::Ignore => (),
DegrationAction::Warn => {
warn!(from self, "Unable to establish connection to new publisher {:?}.", publisher_id)
warn!(from self, "Unable to establish connection to new publisher {:?}.",
details.publisher_id)
}
DegrationAction::Fail => {
fail!(from self, with e, "Unable to establish connection to new publisher {:?}.", publisher_id);
fail!(from self, with e, "Unable to establish connection to new publisher {:?}.",
details.publisher_id);
}
}
}
Expand Down
15 changes: 12 additions & 3 deletions iceoryx2/src/service/builder/publish_subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
//!
//! See [`crate::service`]
//!
use crate::message::Message;
use crate::service;
use crate::service::dynamic_config::publish_subscribe::DynamicConfigSettings;
use crate::service::header::publish_subscribe::Header;
use crate::service::messaging_pattern::MessagingPattern;
use crate::service::port_factory::publish_subscribe;
use crate::service::*;
Expand Down Expand Up @@ -225,6 +227,13 @@ impl<ServiceType: service::Service> Builder<ServiceType> {
}
}

fn finalize_config<MessageType: Debug>(&mut self) {
self.config_details_mut().type_name = std::any::type_name::<MessageType>().to_string();
self.config_details_mut().type_size = core::mem::size_of::<Message<Header, MessageType>>();
self.config_details_mut().type_alignment =
core::mem::align_of::<Message<Header, MessageType>>();
}

/// If the [`Service`] exists, it will be opened otherwise a new [`Service`] will be
/// created.
pub fn open_or_create<MessageType: Debug>(
Expand All @@ -234,7 +243,7 @@ impl<ServiceType: service::Service> Builder<ServiceType> {
PublishSubscribeOpenOrCreateError,
> {
let msg = "Unable to open or create publish subscribe service";
self.config_details_mut().type_name = std::any::type_name::<MessageType>().to_string();
self.finalize_config::<MessageType>();

match self.is_service_available(msg) {
Ok(Some(_)) => Ok(self.open::<MessageType>()?),
Expand Down Expand Up @@ -276,7 +285,7 @@ impl<ServiceType: service::Service> Builder<ServiceType> {
) -> Result<publish_subscribe::PortFactory<ServiceType, MessageType>, PublishSubscribeOpenError>
{
let msg = "Unable to open publish subscribe service";
self.config_details_mut().type_name = std::any::type_name::<MessageType>().to_string();
self.finalize_config::<MessageType>();

let mut adaptive_wait = fail!(from self, when AdaptiveWaitBuilder::new().create(),
with PublishSubscribeOpenError::InternalFailure,
Expand Down Expand Up @@ -351,7 +360,7 @@ impl<ServiceType: service::Service> Builder<ServiceType> {
self.adjust_properties_to_meaningful_values();

let msg = "Unable to create publish subscribe service";
self.config_details_mut().type_name = std::any::type_name::<MessageType>().to_string();
self.finalize_config::<MessageType>();

if !self.config_details().enable_safe_overflow
&& (self.config_details().subscriber_max_buffer_size
Expand Down
Loading

0 comments on commit 449f2bb

Please sign in to comment.