From fff75829afc89986dd6e05e78bd37dcd9b82a961 Mon Sep 17 00:00:00 2001 From: jsantiago-eProsima <90755661+jsantiago-eProsima@users.noreply.github.com> Date: Wed, 12 Jul 2023 07:47:57 +0200 Subject: [PATCH] Improved shm listener status update mechanism (#3639) * Refs #18966: Improved shm listener status update mechanism Signed-off-by: Javier Santiago * Refs #18966: Uncrustify Signed-off-by: Javier Santiago * Refs #18966: Fix Mac warnings Signed-off-by: Javier Santiago * Refs #18966: Typo Signed-off-by: Javier Santiago --------- Signed-off-by: Javier Santiago (cherry picked from commit 3b7e8af800cbbbaaf3ddd54e13631c22ece9b585) --- .../transport/shared_mem/SharedMemGlobal.hpp | 39 ++++++++++++++----- 1 file changed, 29 insertions(+), 10 deletions(-) diff --git a/src/cpp/rtps/transport/shared_mem/SharedMemGlobal.hpp b/src/cpp/rtps/transport/shared_mem/SharedMemGlobal.hpp index fa12d69f33c..be6888576cf 100644 --- a/src/cpp/rtps/transport/shared_mem/SharedMemGlobal.hpp +++ b/src/cpp/rtps/transport/shared_mem/SharedMemGlobal.hpp @@ -206,6 +206,7 @@ class SharedMemGlobal std::shared_ptr port_segment; PortNode* node; MultiProducerConsumerRingBuffer* buffer; + BufferDescriptor last_checked_buffer[PortNode::LISTENERS_STATUS_SIZE]; }; static const std::shared_ptr& get() @@ -267,30 +268,48 @@ class SharedMemGlobal } bool update_status_all_listeners( - PortNode* port_node) + PortContext& port_context) { uint32_t listeners_found = 0; + auto port_node = port_context.node; + for (uint32_t i = 0; i < PortNode::LISTENERS_STATUS_SIZE; i++) { auto& status = port_node->listeners_status[i]; + auto& last_checked_buffer = port_context.last_checked_buffer[i]; if (status.is_in_use) { listeners_found++; - // Check only currently waiting listeners - if (status.is_waiting) + // Check if a listener is processing first + if (status.is_processing) { - if (status.counter != status.last_verified_counter) + if ((last_checked_buffer.validity_id == status.descriptor.validity_id) && + (last_checked_buffer.source_segment_id == status.descriptor.source_segment_id) && + (last_checked_buffer.buffer_node_offset == status.descriptor.buffer_node_offset)) + { - status.last_verified_counter = status.counter; + return false; } - else // Counter is freeze => this listener is blocked!!! + last_checked_buffer = status.descriptor; + } + else + { + last_checked_buffer = BufferDescriptor{}; + // Check if it is waiting next + if (status.is_waiting) { - return false; + if (status.counter != status.last_verified_counter) + { + status.last_verified_counter = status.counter; + } + else // Counter is frozen => this listener is blocked!!! + { + return false; + } } } - if (listeners_found == port_node->num_listeners) { break; @@ -333,7 +352,7 @@ class SharedMemGlobal // Check again, there can be races before locking the mutex. if (timeout_elapsed(now, *(*port_it))) { - if (!update_status_all_listeners((*port_it)->node)) + if (!update_status_all_listeners(*(*port_it))) { (*port_it)->node->is_port_ok = false; } @@ -442,7 +461,7 @@ class SharedMemGlobal node_->ref_counter.fetch_add(1); auto port_context = std::make_shared(); - *port_context = {port_segment_, node_, buffer_.get()}; + *port_context = {port_segment_, node_, buffer_.get(), { BufferDescriptor{} } }; Port::WatchTask::get()->add_port(std::move(port_context)); }