Skip to content

Commit

Permalink
Fix notification lost (#3198)
Browse files Browse the repository at this point in the history
* Fix notification lost (#3194)

* fix notification lost

Signed-off-by: Chen Lihui <lihui.chen@sony.com>

* add a regression test

Signed-off-by: Chen Lihui <lihui.chen@sony.com>

* rename a variable name and update comments

Signed-off-by: Chen Lihui <lihui.chen@sony.com>

* fix uncrustify issue

Signed-off-by: Chen Lihui <lihui.chen@sony.com>

* make the regression test better

Signed-off-by: Chen Lihui <lihui.chen@sony.com>

* fix uncrustify

Signed-off-by: Chen Lihui <lihui.chen@sony.com>

* Refs #16192. Fix deadlock on WaitSetImpl.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #16192. Fix deadlock on ReadConditionImpl.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #16192. Use mutex instead of atomic.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #16192. Fix deadlock when constructing ReadConditionImpl.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

Signed-off-by: Chen Lihui <lihui.chen@sony.com>
Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>
Co-authored-by: Chen Lihui <lihui.chen@sony.com>
(cherry picked from commit df2857a)

# Conflicts:
#	src/cpp/fastdds/subscriber/ReadConditionImpl.hpp

* Refs #16192. Fixed conflicts.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>
Co-authored-by: Miguel Company <miguelcompany@eprosima.com>
  • Loading branch information
mergify[bot] and MiguelCompany authored Jan 11, 2023
1 parent 0443371 commit 4dd9652
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 22 deletions.
15 changes: 12 additions & 3 deletions src/cpp/fastdds/core/condition/WaitSetImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,16 @@ namespace detail {

WaitSetImpl::~WaitSetImpl()
{
std::lock_guard<std::mutex> guard(mutex_);
for (const Condition* c : entries_)
eprosima::utilities::collections::unordered_vector<const Condition*> old_entries;

{
// We only need to protect access to the collection.
std::lock_guard<std::mutex> guard(mutex_);
old_entries = entries_;
entries_.clear();
}

for (const Condition* c : old_entries)
{
c->get_notifier()->detach_from(this);
}
Expand Down Expand Up @@ -68,7 +76,7 @@ ReturnCode_t WaitSetImpl::attach_condition(
// Should wake_up when adding a new triggered condition
if (is_waiting_ && condition.get_trigger_value())
{
wake_up();
cond_.notify_one();
}
}
}
Expand Down Expand Up @@ -156,6 +164,7 @@ ReturnCode_t WaitSetImpl::get_conditions(

void WaitSetImpl::wake_up()
{
std::lock_guard<std::mutex> guard(mutex_);
cond_.notify_one();
}

Expand Down
25 changes: 17 additions & 8 deletions src/cpp/fastdds/subscriber/DataReaderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1915,6 +1915,13 @@ ReadCondition* DataReaderImpl::create_readcondition(
return nullptr;
}


eprosima::fastdds::dds::detail::StateFilter current_mask{};
if (nullptr != reader_)
{
current_mask = get_last_mask_state();
}

std::lock_guard<std::recursive_mutex> _(get_conditions_mutex());

// Check if there is an associated ReadConditionImpl object already
Expand Down Expand Up @@ -1945,6 +1952,7 @@ ReadCondition* DataReaderImpl::create_readcondition(
{
// create a new one
impl = std::make_shared<detail::ReadConditionImpl>(*this, key);
impl->set_trigger_value(current_mask);
// Add the implementation object to the collection
read_conditions_.insert(impl.get());
}
Expand Down Expand Up @@ -2032,28 +2040,29 @@ void DataReaderImpl::try_notify_read_conditions() noexcept
}

// Update and check the mask change requires notification
eprosima::fastdds::dds::detail::StateFilter current_mask{};
bool notify = false;
{
std::lock_guard<RecursiveTimedMutex> _(reader_->getMutex());

auto old_mask = last_mask_state_;
last_mask_state_ = history_.get_mask_status();
current_mask = last_mask_state_;

bool notify = last_mask_state_.sample_states & ~old_mask.sample_states ||
notify = last_mask_state_.sample_states & ~old_mask.sample_states ||
last_mask_state_.view_states & ~old_mask.view_states ||
last_mask_state_.instance_states & ~old_mask.instance_states;

if (!notify)
{
return;
}
}

// traverse the conditions notifying
std::lock_guard<std::recursive_mutex> _(get_conditions_mutex());

for (detail::ReadConditionImpl* impl : read_conditions_)
{
impl->notify();
impl->set_trigger_value(current_mask);
if (notify)
{
impl->notify();
}
}
}

Expand Down
22 changes: 12 additions & 10 deletions src/cpp/fastdds/subscriber/ReadConditionImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class ReadConditionImpl : public std::enable_shared_from_this<ReadConditionImpl>
{
DataReaderImpl& data_reader_;
const StateFilter state_;
StateFilter value_;
mutable std::mutex value_mtx_;
std::recursive_mutex& mutex_;
std::forward_list<const ReadCondition*> conditions_;

Expand All @@ -56,6 +58,7 @@ class ReadConditionImpl : public std::enable_shared_from_this<ReadConditionImpl>
const StateFilter& state)
: data_reader_(data_reader)
, state_(state)
, value_()
, mutex_(data_reader.get_conditions_mutex())
{
}
Expand Down Expand Up @@ -98,16 +101,8 @@ class ReadConditionImpl : public std::enable_shared_from_this<ReadConditionImpl>

bool get_trigger_value() const noexcept
{
try
{
return get_trigger_value(data_reader_.get_last_mask_state());
}
catch (std::runtime_error& e)
{
// DataReader not enabled yet
logWarning(READCONDITION, e.what());
return false;
}
std::lock_guard<std::mutex> _(value_mtx_);
return get_trigger_value(value_);
}

DataReader* get_datareader() const noexcept
Expand Down Expand Up @@ -207,6 +202,13 @@ class ReadConditionImpl : public std::enable_shared_from_this<ReadConditionImpl>
return ReturnCode_t::RETCODE_PRECONDITION_NOT_MET;
}

void set_trigger_value(
const StateFilter& value) noexcept
{
std::lock_guard<std::mutex> _(value_mtx_);
value_ = value;
}

/**
* Notify all the associated ReadConditions
*/
Expand Down
71 changes: 70 additions & 1 deletion test/unittest/dds/core/condition/WaitSetImplTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
// limitations under the License.

#include <algorithm>
#include <future>
#include <thread>

#include <gtest/gtest.h>

Expand All @@ -33,7 +35,7 @@ class TestCondition : public Condition
{
public:

bool trigger_value = false;
volatile bool trigger_value = false;

bool get_trigger_value() const override
{
Expand Down Expand Up @@ -197,6 +199,73 @@ TEST(WaitSetImplTests, wait)
}
}

TEST(WaitSetImplTests, fix_wait_notification_lost)
{
ConditionSeq conditions;
WaitSetImpl wait_set;

// Waiting should return the added connection after the trigger value is updated and the wait_set waken.
{
TestCondition triggered_condition;

// Expecting calls on the notifier of triggered_condition.
auto notifier = triggered_condition.get_notifier();
EXPECT_CALL(*notifier, attach_to(_)).Times(1);
EXPECT_CALL(*notifier, will_be_deleted(_)).Times(1);

class AnotherTestCondition : public Condition
{
public:

bool get_trigger_value() const override
{
// Time to simulate thread context switch or something else
std::this_thread::sleep_for(std::chrono::seconds(2));
return false;
}

}
second_simulator_condition;

// Expecting calls on the notifier of second_simulator_condition.
notifier = second_simulator_condition.get_notifier();
EXPECT_CALL(*notifier, attach_to(_)).Times(1);
EXPECT_CALL(*notifier, will_be_deleted(_)).Times(1);

wait_set.attach_condition(triggered_condition);
wait_set.attach_condition(second_simulator_condition);

std::promise<void> promise;
std::future<void> future = promise.get_future();
ReturnCode_t ret = ReturnCode_t::RETCODE_ERROR;
std::thread wait_conditions([&]()
{
// Not to use `WaitSetImpl::wait` with a timeout value, because the
// `condition_variable::wait_for` could call _Predicate function again.
ret = wait_set.wait(conditions, eprosima::fastrtps::c_TimeInfinite);
promise.set_value();
});

// One second sleep to make the `wait_set.wait` check `triggered_condition` in the above thread
std::this_thread::sleep_for(std::chrono::seconds(1));
triggered_condition.trigger_value = true;
wait_set.wake_up();

// Expecting get notification after wake_up, otherwise output error within 5 seconds.
future.wait_for(std::chrono::seconds(5));
EXPECT_EQ(ReturnCode_t::RETCODE_OK, ret);
EXPECT_EQ(1u, conditions.size());
EXPECT_NE(conditions.cend(), std::find(conditions.cbegin(), conditions.cend(), &triggered_condition));

// Wake up the `wait_set` to make sure the thread exit
wait_set.wake_up();
wait_conditions.join();

wait_set.will_be_deleted(triggered_condition);
wait_set.will_be_deleted(second_simulator_condition);
}
}

int main(
int argc,
char** argv)
Expand Down

0 comments on commit 4dd9652

Please sign in to comment.