Skip to content

Commit

Permalink
Revert "Fix waitset notification (#3087)" (#3193)
Browse files Browse the repository at this point in the history
This reverts commit a9e49cd.
  • Loading branch information
MiguelCompany authored Jan 9, 2023
1 parent a9e49cd commit e4c5354
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 112 deletions.
54 changes: 14 additions & 40 deletions src/cpp/fastdds/core/condition/WaitSetImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,6 @@ ReturnCode_t WaitSetImpl::wait(
const fastrtps::Duration_t& timeout)
{
std::unique_lock<std::mutex> lock(mutex_);
// last notification processed
unsigned int old_counter = notifications_ - 1;

if (is_waiting_)
{
Expand All @@ -113,54 +111,31 @@ ReturnCode_t WaitSetImpl::wait(

auto fill_active_conditions = [&]()
{
bool ret_val;

if ( old_counter == notifications_ )
{
// spurious wakeup
return false;
}

// Loop if predicate may be outdated
do
bool ret_val = false;
active_conditions.clear();
for (const Condition* c : entries_)
{
ret_val = false;
old_counter = notifications_;
active_conditions.clear();

for (const Condition* c : entries_)
if (c->get_trigger_value())
{
if (c->get_trigger_value())
{
ret_val = true;
active_conditions.push_back(const_cast<Condition*>(c));
}
ret_val = true;
active_conditions.push_back(const_cast<Condition*>(c));
}
}
while (old_counter != notifications_
&& active_conditions.size() != entries_.size());

return ret_val;
};

bool condition_value = false;
is_waiting_ = true;
auto missing_notification_outage = std::chrono::milliseconds(500);
auto now = std::chrono::steady_clock::now();
auto deadline = fastrtps::c_TimeInfinite == timeout ?
std::chrono::steady_clock::time_point::max() :
now + std::chrono::nanoseconds(timeout.to_ns());

do
if (fastrtps::c_TimeInfinite == timeout)
{
now = std::chrono::steady_clock::now();
auto next_outage_timeout = now + missing_notification_outage;
auto ctimeout = std::min(next_outage_timeout, deadline);

condition_value = cond_.wait_until(lock, ctimeout, fill_active_conditions);
cond_.wait(lock, fill_active_conditions);
condition_value = true;
}
else
{
auto ns = timeout.to_ns();
condition_value = cond_.wait_for(lock, std::chrono::nanoseconds(ns), fill_active_conditions);
}
while (!condition_value && ( old_counter != notifications_ || deadline > now));

is_waiting_ = false;

return condition_value ? ReturnCode_t::RETCODE_OK : ReturnCode_t::RETCODE_TIMEOUT;
Expand All @@ -181,7 +156,6 @@ ReturnCode_t WaitSetImpl::get_conditions(

void WaitSetImpl::wake_up()
{
++notifications_;
cond_.notify_one();
}

Expand Down
2 changes: 0 additions & 2 deletions src/cpp/fastdds/core/condition/WaitSetImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#ifndef _FASTDDS_CORE_CONDITION_WAITSETIMPL_HPP_
#define _FASTDDS_CORE_CONDITION_WAITSETIMPL_HPP_

#include <atomic>
#include <condition_variable>
#include <mutex>

Expand Down Expand Up @@ -114,7 +113,6 @@ struct WaitSetImpl
std::condition_variable cond_;
eprosima::utilities::collections::unordered_vector<const Condition*> entries_;
bool is_waiting_ = false;
std::atomic_uint notifications_ = {1};
};

} // namespace detail
Expand Down
71 changes: 1 addition & 70 deletions test/unittest/dds/core/condition/WaitSetImplTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
// limitations under the License.

#include <algorithm>
#include <future>
#include <thread>

#include <gtest/gtest.h>

Expand All @@ -35,7 +33,7 @@ class TestCondition : public Condition
{
public:

volatile bool trigger_value = false;
bool trigger_value = false;

bool get_trigger_value() const override
{
Expand Down Expand Up @@ -199,73 +197,6 @@ TEST(WaitSetImplTests, wait)
}
}

TEST(WaitSetImplTests, fix_wait_notification_lost)
{
ConditionSeq conditions;
WaitSetImpl wait_set;

// Waiting should return the added connection after the trigger value is updated and the wait_set waken.
{
TestCondition triggered_condition;

// Expecting calls on the notifier of triggered_condition.
auto notifier = triggered_condition.get_notifier();
EXPECT_CALL(*notifier, attach_to(_)).Times(1);
EXPECT_CALL(*notifier, will_be_deleted(_)).Times(1);

class AnotherTestCondition : public Condition
{
public:

bool get_trigger_value() const override
{
// Time to simulate thread context switch or something else
std::this_thread::sleep_for(std::chrono::seconds(2));
return false;
}

}
second_simulator_condition;

// Expecting calls on the notifier of second_simulator_condition.
notifier = second_simulator_condition.get_notifier();
EXPECT_CALL(*notifier, attach_to(_)).Times(1);
EXPECT_CALL(*notifier, will_be_deleted(_)).Times(1);

wait_set.attach_condition(triggered_condition);
wait_set.attach_condition(second_simulator_condition);

std::promise<void> promise;
std::future<void> future = promise.get_future();
ReturnCode_t ret = ReturnCode_t::RETCODE_ERROR;
std::thread wait_conditions([&]()
{
// Not to use `WaitSetImpl::wait` with a timeout value, because the
// `condition_variable::wait_for` could call _Predicate function again.
ret = wait_set.wait(conditions, eprosima::fastrtps::c_TimeInfinite);
promise.set_value();
});

// One second sleep to make the `wait_set.wait` check `triggered_condition` in the above thread
std::this_thread::sleep_for(std::chrono::seconds(1));
triggered_condition.trigger_value = true;
wait_set.wake_up();

// Expecting get notification after wake_up, otherwise output error within 5 seconds.
future.wait_for(std::chrono::seconds(5));
EXPECT_EQ(ReturnCode_t::RETCODE_OK, ret);
EXPECT_EQ(1u, conditions.size());
EXPECT_NE(conditions.cend(), std::find(conditions.cbegin(), conditions.cend(), &triggered_condition));

// Wake up the `wait_set` to make sure the thread exit
wait_set.wake_up();
wait_conditions.join();

wait_set.will_be_deleted(triggered_condition);
wait_set.will_be_deleted(second_simulator_condition);
}
}

int main(
int argc,
char** argv)
Expand Down

0 comments on commit e4c5354

Please sign in to comment.