Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[16192] Fix notification lost #3194

Merged
merged 10 commits into from
Jan 10, 2023
Merged
15 changes: 12 additions & 3 deletions src/cpp/fastdds/core/condition/WaitSetImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,16 @@ namespace detail {

WaitSetImpl::~WaitSetImpl()
{
std::lock_guard<std::mutex> guard(mutex_);
for (const Condition* c : entries_)
eprosima::utilities::collections::unordered_vector<const Condition*> old_entries;

{
// We only need to protect access to the collection.
std::lock_guard<std::mutex> guard(mutex_);
old_entries = entries_;
entries_.clear();
}

for (const Condition* c : old_entries)
{
c->get_notifier()->detach_from(this);
}
Expand Down Expand Up @@ -68,7 +76,7 @@ ReturnCode_t WaitSetImpl::attach_condition(
// Should wake_up when adding a new triggered condition
if (is_waiting_ && condition.get_trigger_value())
{
wake_up();
cond_.notify_one();
}
}
}
Expand Down Expand Up @@ -156,6 +164,7 @@ ReturnCode_t WaitSetImpl::get_conditions(

void WaitSetImpl::wake_up()
{
std::lock_guard<std::mutex> guard(mutex_);
cond_.notify_one();
}

Expand Down
25 changes: 17 additions & 8 deletions src/cpp/fastdds/subscriber/DataReaderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1920,6 +1920,13 @@ ReadCondition* DataReaderImpl::create_readcondition(
return nullptr;
}


eprosima::fastdds::dds::detail::StateFilter current_mask{};
if (nullptr != reader_)
{
current_mask = get_last_mask_state();
}

std::lock_guard<std::recursive_mutex> _(get_conditions_mutex());

// Check if there is an associated ReadConditionImpl object already
Expand Down Expand Up @@ -1950,6 +1957,7 @@ ReadCondition* DataReaderImpl::create_readcondition(
{
// create a new one
impl = std::make_shared<detail::ReadConditionImpl>(*this, key);
impl->set_trigger_value(current_mask);
// Add the implementation object to the collection
read_conditions_.insert(impl.get());
}
Expand Down Expand Up @@ -2037,28 +2045,29 @@ void DataReaderImpl::try_notify_read_conditions() noexcept
}

// Update and check the mask change requires notification
eprosima::fastdds::dds::detail::StateFilter current_mask{};
bool notify = false;
{
std::lock_guard<RecursiveTimedMutex> _(reader_->getMutex());

auto old_mask = last_mask_state_;
last_mask_state_ = history_.get_mask_status();
current_mask = last_mask_state_;

bool notify = last_mask_state_.sample_states & ~old_mask.sample_states ||
notify = last_mask_state_.sample_states & ~old_mask.sample_states ||
last_mask_state_.view_states & ~old_mask.view_states ||
last_mask_state_.instance_states & ~old_mask.instance_states;

if (!notify)
{
return;
}
}

// traverse the conditions notifying
std::lock_guard<std::recursive_mutex> _(get_conditions_mutex());

for (detail::ReadConditionImpl* impl : read_conditions_)
{
impl->notify();
impl->set_trigger_value(current_mask);
if (notify)
{
impl->notify();
}
}
}

Expand Down
22 changes: 12 additions & 10 deletions src/cpp/fastdds/subscriber/ReadConditionImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class ReadConditionImpl : public std::enable_shared_from_this<ReadConditionImpl>
{
DataReaderImpl& data_reader_;
const StateFilter state_;
StateFilter value_;
mutable std::mutex value_mtx_;
std::recursive_mutex& mutex_;
std::forward_list<const ReadCondition*> conditions_;

Expand All @@ -56,6 +58,7 @@ class ReadConditionImpl : public std::enable_shared_from_this<ReadConditionImpl>
const StateFilter& state)
: data_reader_(data_reader)
, state_(state)
, value_()
, mutex_(data_reader.get_conditions_mutex())
{
}
Expand Down Expand Up @@ -98,16 +101,8 @@ class ReadConditionImpl : public std::enable_shared_from_this<ReadConditionImpl>

bool get_trigger_value() const noexcept
{
try
{
return get_trigger_value(data_reader_.get_last_mask_state());
}
catch (std::runtime_error& e)
{
// DataReader not enabled yet
EPROSIMA_LOG_WARNING(READCONDITION, e.what());
return false;
}
std::lock_guard<std::mutex> _(value_mtx_);
return get_trigger_value(value_);
}

DataReader* get_datareader() const noexcept
Expand Down Expand Up @@ -207,6 +202,13 @@ class ReadConditionImpl : public std::enable_shared_from_this<ReadConditionImpl>
return ReturnCode_t::RETCODE_PRECONDITION_NOT_MET;
}

void set_trigger_value(
const StateFilter& value) noexcept
{
std::lock_guard<std::mutex> _(value_mtx_);
value_ = value;
}

/**
* Notify all the associated ReadConditions
*/
Expand Down
71 changes: 70 additions & 1 deletion test/unittest/dds/core/condition/WaitSetImplTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
// limitations under the License.

#include <algorithm>
#include <future>
#include <thread>

#include <gtest/gtest.h>

Expand All @@ -33,7 +35,7 @@ class TestCondition : public Condition
{
public:

bool trigger_value = false;
volatile bool trigger_value = false;

bool get_trigger_value() const override
{
Expand Down Expand Up @@ -197,6 +199,73 @@ TEST(WaitSetImplTests, wait)
}
}

TEST(WaitSetImplTests, fix_wait_notification_lost)
{
ConditionSeq conditions;
WaitSetImpl wait_set;

// Waiting should return the added connection after the trigger value is updated and the wait_set waken.
{
TestCondition triggered_condition;

// Expecting calls on the notifier of triggered_condition.
auto notifier = triggered_condition.get_notifier();
EXPECT_CALL(*notifier, attach_to(_)).Times(1);
EXPECT_CALL(*notifier, will_be_deleted(_)).Times(1);

class AnotherTestCondition : public Condition
{
public:

bool get_trigger_value() const override
{
// Time to simulate thread context switch or something else
std::this_thread::sleep_for(std::chrono::seconds(2));
return false;
}

}
second_simulator_condition;

// Expecting calls on the notifier of second_simulator_condition.
notifier = second_simulator_condition.get_notifier();
EXPECT_CALL(*notifier, attach_to(_)).Times(1);
EXPECT_CALL(*notifier, will_be_deleted(_)).Times(1);

wait_set.attach_condition(triggered_condition);
wait_set.attach_condition(second_simulator_condition);

std::promise<void> promise;
std::future<void> future = promise.get_future();
ReturnCode_t ret = ReturnCode_t::RETCODE_ERROR;
std::thread wait_conditions([&]()
{
// Not to use `WaitSetImpl::wait` with a timeout value, because the
// `condition_variable::wait_for` could call _Predicate function again.
ret = wait_set.wait(conditions, eprosima::fastrtps::c_TimeInfinite);
promise.set_value();
});

// One second sleep to make the `wait_set.wait` check `triggered_condition` in the above thread
std::this_thread::sleep_for(std::chrono::seconds(1));
triggered_condition.trigger_value = true;
wait_set.wake_up();

// Expecting get notification after wake_up, otherwise output error within 5 seconds.
future.wait_for(std::chrono::seconds(5));
EXPECT_EQ(ReturnCode_t::RETCODE_OK, ret);
EXPECT_EQ(1u, conditions.size());
EXPECT_NE(conditions.cend(), std::find(conditions.cbegin(), conditions.cend(), &triggered_condition));

// Wake up the `wait_set` to make sure the thread exit
wait_set.wake_up();
wait_conditions.join();

wait_set.will_be_deleted(triggered_condition);
wait_set.will_be_deleted(second_simulator_condition);
}
}

int main(
int argc,
char** argv)
Expand Down