From b38ade0915430473b58e785f85cebeb655e5906f Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Wed, 11 Jan 2023 07:23:09 +0100 Subject: [PATCH] Fix notification lost (#3199) * Fix notification lost (#3194) * fix notification lost Signed-off-by: Chen Lihui * add a regression test Signed-off-by: Chen Lihui * rename a variable name and update comments Signed-off-by: Chen Lihui * fix uncrustify issue Signed-off-by: Chen Lihui * make the regression test better Signed-off-by: Chen Lihui * fix uncrustify Signed-off-by: Chen Lihui * Refs #16192. Fix deadlock on WaitSetImpl. Signed-off-by: Miguel Company * Refs #16192. Fix deadlock on ReadConditionImpl. Signed-off-by: Miguel Company * Refs #16192. Use mutex instead of atomic. Signed-off-by: Miguel Company * Refs #16192. Fix deadlock when constructing ReadConditionImpl. Signed-off-by: Miguel Company Signed-off-by: Chen Lihui Signed-off-by: Miguel Company Co-authored-by: Chen Lihui (cherry picked from commit df2857a5fc2c2b4a661500843629c08dde08ab1c) # Conflicts: # src/cpp/fastdds/subscriber/ReadConditionImpl.hpp * Refs #16192. Fixed conflicts. Signed-off-by: Miguel Company Signed-off-by: Miguel Company Co-authored-by: Miguel Company --- .../fastdds/core/condition/WaitSetImpl.cpp | 15 +++- src/cpp/fastdds/subscriber/DataReaderImpl.cpp | 25 ++++--- .../fastdds/subscriber/ReadConditionImpl.hpp | 22 +++--- .../dds/core/condition/WaitSetImplTests.cpp | 71 ++++++++++++++++++- 4 files changed, 111 insertions(+), 22 deletions(-) diff --git a/src/cpp/fastdds/core/condition/WaitSetImpl.cpp b/src/cpp/fastdds/core/condition/WaitSetImpl.cpp index 73390e35de6..d74115a1d58 100644 --- a/src/cpp/fastdds/core/condition/WaitSetImpl.cpp +++ b/src/cpp/fastdds/core/condition/WaitSetImpl.cpp @@ -36,8 +36,16 @@ namespace detail { WaitSetImpl::~WaitSetImpl() { - std::lock_guard guard(mutex_); - for (const Condition* c : entries_) + eprosima::utilities::collections::unordered_vector old_entries; + + { + // We only need to protect access to the collection. + std::lock_guard guard(mutex_); + old_entries = entries_; + entries_.clear(); + } + + for (const Condition* c : old_entries) { c->get_notifier()->detach_from(this); } @@ -68,7 +76,7 @@ ReturnCode_t WaitSetImpl::attach_condition( // Should wake_up when adding a new triggered condition if (is_waiting_ && condition.get_trigger_value()) { - wake_up(); + cond_.notify_one(); } } } @@ -156,6 +164,7 @@ ReturnCode_t WaitSetImpl::get_conditions( void WaitSetImpl::wake_up() { + std::lock_guard guard(mutex_); cond_.notify_one(); } diff --git a/src/cpp/fastdds/subscriber/DataReaderImpl.cpp b/src/cpp/fastdds/subscriber/DataReaderImpl.cpp index cb8df01dc64..e51175c840a 100644 --- a/src/cpp/fastdds/subscriber/DataReaderImpl.cpp +++ b/src/cpp/fastdds/subscriber/DataReaderImpl.cpp @@ -1861,6 +1861,13 @@ ReadCondition* DataReaderImpl::create_readcondition( return nullptr; } + + eprosima::fastdds::dds::detail::StateFilter current_mask{}; + if (nullptr != reader_) + { + current_mask = get_last_mask_state(); + } + std::lock_guard _(get_conditions_mutex()); // Check if there is an associated ReadConditionImpl object already @@ -1891,6 +1898,7 @@ ReadCondition* DataReaderImpl::create_readcondition( { // create a new one impl = std::make_shared(*this, key); + impl->set_trigger_value(current_mask); // Add the implementation object to the collection read_conditions_.insert(impl.get()); } @@ -1978,28 +1986,29 @@ void DataReaderImpl::try_notify_read_conditions() noexcept } // Update and check the mask change requires notification + eprosima::fastdds::dds::detail::StateFilter current_mask{}; + bool notify = false; { std::lock_guard _(reader_->getMutex()); auto old_mask = last_mask_state_; last_mask_state_ = history_.get_mask_status(); + current_mask = last_mask_state_; - bool notify = last_mask_state_.sample_states & ~old_mask.sample_states || + notify = last_mask_state_.sample_states & ~old_mask.sample_states || last_mask_state_.view_states & ~old_mask.view_states || last_mask_state_.instance_states & ~old_mask.instance_states; - - if (!notify) - { - return; - } } // traverse the conditions notifying std::lock_guard _(get_conditions_mutex()); - for (detail::ReadConditionImpl* impl : read_conditions_) { - impl->notify(); + impl->set_trigger_value(current_mask); + if (notify) + { + impl->notify(); + } } } diff --git a/src/cpp/fastdds/subscriber/ReadConditionImpl.hpp b/src/cpp/fastdds/subscriber/ReadConditionImpl.hpp index 98fc0dc280c..4a7d99ba7e4 100644 --- a/src/cpp/fastdds/subscriber/ReadConditionImpl.hpp +++ b/src/cpp/fastdds/subscriber/ReadConditionImpl.hpp @@ -44,6 +44,8 @@ class ReadConditionImpl : public std::enable_shared_from_this { DataReaderImpl& data_reader_; const StateFilter state_; + StateFilter value_; + mutable std::mutex value_mtx_; std::recursive_mutex& mutex_; std::forward_list conditions_; @@ -56,6 +58,7 @@ class ReadConditionImpl : public std::enable_shared_from_this const StateFilter& state) : data_reader_(data_reader) , state_(state) + , value_() , mutex_(data_reader.get_conditions_mutex()) { } @@ -98,16 +101,8 @@ class ReadConditionImpl : public std::enable_shared_from_this bool get_trigger_value() const noexcept { - try - { - return get_trigger_value(data_reader_.get_last_mask_state()); - } - catch (std::runtime_error& e) - { - // DataReader not enabled yet - logWarning(READCONDITION, e.what()); - return false; - } + std::lock_guard _(value_mtx_); + return get_trigger_value(value_); } DataReader* get_datareader() const noexcept @@ -207,6 +202,13 @@ class ReadConditionImpl : public std::enable_shared_from_this return ReturnCode_t::RETCODE_PRECONDITION_NOT_MET; } + void set_trigger_value( + const StateFilter& value) noexcept + { + std::lock_guard _(value_mtx_); + value_ = value; + } + /** * Notify all the associated ReadConditions */ diff --git a/test/unittest/dds/core/condition/WaitSetImplTests.cpp b/test/unittest/dds/core/condition/WaitSetImplTests.cpp index 2d13a7a3f8d..798a6355103 100644 --- a/test/unittest/dds/core/condition/WaitSetImplTests.cpp +++ b/test/unittest/dds/core/condition/WaitSetImplTests.cpp @@ -13,6 +13,8 @@ // limitations under the License. #include +#include +#include #include @@ -33,7 +35,7 @@ class TestCondition : public Condition { public: - bool trigger_value = false; + volatile bool trigger_value = false; bool get_trigger_value() const override { @@ -197,6 +199,73 @@ TEST(WaitSetImplTests, wait) } } +TEST(WaitSetImplTests, fix_wait_notification_lost) +{ + ConditionSeq conditions; + WaitSetImpl wait_set; + + // Waiting should return the added connection after the trigger value is updated and the wait_set waken. + { + TestCondition triggered_condition; + + // Expecting calls on the notifier of triggered_condition. + auto notifier = triggered_condition.get_notifier(); + EXPECT_CALL(*notifier, attach_to(_)).Times(1); + EXPECT_CALL(*notifier, will_be_deleted(_)).Times(1); + + class AnotherTestCondition : public Condition + { + public: + + bool get_trigger_value() const override + { + // Time to simulate thread context switch or something else + std::this_thread::sleep_for(std::chrono::seconds(2)); + return false; + } + + } + second_simulator_condition; + + // Expecting calls on the notifier of second_simulator_condition. + notifier = second_simulator_condition.get_notifier(); + EXPECT_CALL(*notifier, attach_to(_)).Times(1); + EXPECT_CALL(*notifier, will_be_deleted(_)).Times(1); + + wait_set.attach_condition(triggered_condition); + wait_set.attach_condition(second_simulator_condition); + + std::promise promise; + std::future future = promise.get_future(); + ReturnCode_t ret = ReturnCode_t::RETCODE_ERROR; + std::thread wait_conditions([&]() + { + // Not to use `WaitSetImpl::wait` with a timeout value, because the + // `condition_variable::wait_for` could call _Predicate function again. + ret = wait_set.wait(conditions, eprosima::fastrtps::c_TimeInfinite); + promise.set_value(); + }); + + // One second sleep to make the `wait_set.wait` check `triggered_condition` in the above thread + std::this_thread::sleep_for(std::chrono::seconds(1)); + triggered_condition.trigger_value = true; + wait_set.wake_up(); + + // Expecting get notification after wake_up, otherwise output error within 5 seconds. + future.wait_for(std::chrono::seconds(5)); + EXPECT_EQ(ReturnCode_t::RETCODE_OK, ret); + EXPECT_EQ(1u, conditions.size()); + EXPECT_NE(conditions.cend(), std::find(conditions.cbegin(), conditions.cend(), &triggered_condition)); + + // Wake up the `wait_set` to make sure the thread exit + wait_set.wake_up(); + wait_conditions.join(); + + wait_set.will_be_deleted(triggered_condition); + wait_set.will_be_deleted(second_simulator_condition); + } +} + int main( int argc, char** argv)