Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[16192] fix notification lost (backport #3087) #3191

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 40 additions & 14 deletions src/cpp/fastdds/core/condition/WaitSetImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ ReturnCode_t WaitSetImpl::wait(
const fastrtps::Duration_t& timeout)
{
std::unique_lock<std::mutex> lock(mutex_);
// last notification processed
unsigned int old_counter = notifications_ - 1;

if (is_waiting_)
{
Expand All @@ -111,31 +113,54 @@ ReturnCode_t WaitSetImpl::wait(

auto fill_active_conditions = [&]()
{
bool ret_val = false;
active_conditions.clear();
for (const Condition* c : entries_)
bool ret_val;

if ( old_counter == notifications_ )
{
// spurious wakeup
return false;
}

// Loop if predicate may be outdated
do
{
if (c->get_trigger_value())
ret_val = false;
old_counter = notifications_;
active_conditions.clear();

for (const Condition* c : entries_)
{
ret_val = true;
active_conditions.push_back(const_cast<Condition*>(c));
if (c->get_trigger_value())
{
ret_val = true;
active_conditions.push_back(const_cast<Condition*>(c));
}
}
}
while (old_counter != notifications_
&& active_conditions.size() != entries_.size());

return ret_val;
};

bool condition_value = false;
is_waiting_ = true;
if (fastrtps::c_TimeInfinite == timeout)
{
cond_.wait(lock, fill_active_conditions);
condition_value = true;
}
else
auto missing_notification_outage = std::chrono::milliseconds(500);
auto now = std::chrono::steady_clock::now();
auto deadline = fastrtps::c_TimeInfinite == timeout ?
std::chrono::steady_clock::time_point::max() :
now + std::chrono::nanoseconds(timeout.to_ns());

do
{
auto ns = timeout.to_ns();
condition_value = cond_.wait_for(lock, std::chrono::nanoseconds(ns), fill_active_conditions);
now = std::chrono::steady_clock::now();
auto next_outage_timeout = now + missing_notification_outage;
auto ctimeout = std::min(next_outage_timeout, deadline);

condition_value = cond_.wait_until(lock, ctimeout, fill_active_conditions);
}
while (!condition_value && ( old_counter != notifications_ || deadline > now));

is_waiting_ = false;

return condition_value ? ReturnCode_t::RETCODE_OK : ReturnCode_t::RETCODE_TIMEOUT;
Expand All @@ -156,6 +181,7 @@ ReturnCode_t WaitSetImpl::get_conditions(

void WaitSetImpl::wake_up()
{
++notifications_;
cond_.notify_one();
}

Expand Down
2 changes: 2 additions & 0 deletions src/cpp/fastdds/core/condition/WaitSetImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#ifndef _FASTDDS_CORE_CONDITION_WAITSETIMPL_HPP_
#define _FASTDDS_CORE_CONDITION_WAITSETIMPL_HPP_

#include <atomic>
#include <condition_variable>
#include <mutex>

Expand Down Expand Up @@ -113,6 +114,7 @@ struct WaitSetImpl
std::condition_variable cond_;
eprosima::utilities::collections::unordered_vector<const Condition*> entries_;
bool is_waiting_ = false;
std::atomic_uint notifications_ = {1};
};

} // namespace detail
Expand Down
71 changes: 70 additions & 1 deletion test/unittest/dds/core/condition/WaitSetImplTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
// limitations under the License.

#include <algorithm>
#include <future>
#include <thread>

#include <gtest/gtest.h>

Expand All @@ -33,7 +35,7 @@ class TestCondition : public Condition
{
public:

bool trigger_value = false;
volatile bool trigger_value = false;

bool get_trigger_value() const override
{
Expand Down Expand Up @@ -197,6 +199,73 @@ TEST(WaitSetImplTests, wait)
}
}

TEST(WaitSetImplTests, fix_wait_notification_lost)
{
ConditionSeq conditions;
WaitSetImpl wait_set;

// Waiting should return the added connection after the trigger value is updated and the wait_set waken.
{
TestCondition triggered_condition;

// Expecting calls on the notifier of triggered_condition.
auto notifier = triggered_condition.get_notifier();
EXPECT_CALL(*notifier, attach_to(_)).Times(1);
EXPECT_CALL(*notifier, will_be_deleted(_)).Times(1);

class AnotherTestCondition : public Condition
{
public:

bool get_trigger_value() const override
{
// Time to simulate thread context switch or something else
std::this_thread::sleep_for(std::chrono::seconds(2));
return false;
}

}
second_simulator_condition;

// Expecting calls on the notifier of second_simulator_condition.
notifier = second_simulator_condition.get_notifier();
EXPECT_CALL(*notifier, attach_to(_)).Times(1);
EXPECT_CALL(*notifier, will_be_deleted(_)).Times(1);

wait_set.attach_condition(triggered_condition);
wait_set.attach_condition(second_simulator_condition);

std::promise<void> promise;
std::future<void> future = promise.get_future();
ReturnCode_t ret = ReturnCode_t::RETCODE_ERROR;
std::thread wait_conditions([&]()
{
// Not to use `WaitSetImpl::wait` with a timeout value, because the
// `condition_variable::wait_for` could call _Predicate function again.
ret = wait_set.wait(conditions, eprosima::fastrtps::c_TimeInfinite);
promise.set_value();
});

// One second sleep to make the `wait_set.wait` check `triggered_condition` in the above thread
std::this_thread::sleep_for(std::chrono::seconds(1));
triggered_condition.trigger_value = true;
wait_set.wake_up();

// Expecting get notification after wake_up, otherwise output error within 5 seconds.
future.wait_for(std::chrono::seconds(5));
EXPECT_EQ(ReturnCode_t::RETCODE_OK, ret);
EXPECT_EQ(1u, conditions.size());
EXPECT_NE(conditions.cend(), std::find(conditions.cbegin(), conditions.cend(), &triggered_condition));

// Wake up the `wait_set` to make sure the thread exit
wait_set.wake_up();
wait_conditions.join();

wait_set.will_be_deleted(triggered_condition);
wait_set.will_be_deleted(second_simulator_condition);
}
}

int main(
int argc,
char** argv)
Expand Down