diff --git a/src/cpp/rtps/transport/shared_mem/SharedMemGlobal.hpp b/src/cpp/rtps/transport/shared_mem/SharedMemGlobal.hpp index fa12d69f33c..be6888576cf 100644 --- a/src/cpp/rtps/transport/shared_mem/SharedMemGlobal.hpp +++ b/src/cpp/rtps/transport/shared_mem/SharedMemGlobal.hpp @@ -206,6 +206,7 @@ class SharedMemGlobal std::shared_ptr port_segment; PortNode* node; MultiProducerConsumerRingBuffer* buffer; + BufferDescriptor last_checked_buffer[PortNode::LISTENERS_STATUS_SIZE]; }; static const std::shared_ptr& get() @@ -267,30 +268,48 @@ class SharedMemGlobal } bool update_status_all_listeners( - PortNode* port_node) + PortContext& port_context) { uint32_t listeners_found = 0; + auto port_node = port_context.node; + for (uint32_t i = 0; i < PortNode::LISTENERS_STATUS_SIZE; i++) { auto& status = port_node->listeners_status[i]; + auto& last_checked_buffer = port_context.last_checked_buffer[i]; if (status.is_in_use) { listeners_found++; - // Check only currently waiting listeners - if (status.is_waiting) + // Check if a listener is processing first + if (status.is_processing) { - if (status.counter != status.last_verified_counter) + if ((last_checked_buffer.validity_id == status.descriptor.validity_id) && + (last_checked_buffer.source_segment_id == status.descriptor.source_segment_id) && + (last_checked_buffer.buffer_node_offset == status.descriptor.buffer_node_offset)) + { - status.last_verified_counter = status.counter; + return false; } - else // Counter is freeze => this listener is blocked!!! + last_checked_buffer = status.descriptor; + } + else + { + last_checked_buffer = BufferDescriptor{}; + // Check if it is waiting next + if (status.is_waiting) { - return false; + if (status.counter != status.last_verified_counter) + { + status.last_verified_counter = status.counter; + } + else // Counter is frozen => this listener is blocked!!! + { + return false; + } } } - if (listeners_found == port_node->num_listeners) { break; @@ -333,7 +352,7 @@ class SharedMemGlobal // Check again, there can be races before locking the mutex. if (timeout_elapsed(now, *(*port_it))) { - if (!update_status_all_listeners((*port_it)->node)) + if (!update_status_all_listeners(*(*port_it))) { (*port_it)->node->is_port_ok = false; } @@ -442,7 +461,7 @@ class SharedMemGlobal node_->ref_counter.fetch_add(1); auto port_context = std::make_shared(); - *port_context = {port_segment_, node_, buffer_.get()}; + *port_context = {port_segment_, node_, buffer_.get(), { BufferDescriptor{} } }; Port::WatchTask::get()->add_port(std::move(port_context)); }