Skip to content

Commit

Permalink
testing: fix multiple race conditions in simulated time tests
Browse files Browse the repository at this point in the history
Fixes #12480
Fixes #10568

Signed-off-by: Matt Klein <mklein@lyft.com>
  • Loading branch information
mattklein123 committed Aug 7, 2020
1 parent 7c92028 commit f01cc68
Show file tree
Hide file tree
Showing 11 changed files with 96 additions and 69 deletions.
2 changes: 1 addition & 1 deletion test/integration/cx_limit_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class ConnectionLimitIntegrationTest : public testing::TestWithParam<Network::Ad
if (Network::AcceptedSocketImpl::acceptedSocketCount() == expected_connections) {
return AssertionSuccess();
}
timeSystem().advanceTimeWait(std::chrono::milliseconds(500));
timeSystem().advanceTimeWait(std::chrono::milliseconds(500)); // fixfix?
}
if (Network::AcceptedSocketImpl::acceptedSocketCount() == expected_connections) {
return AssertionSuccess();
Expand Down
32 changes: 18 additions & 14 deletions test/integration/fake_upstream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ AssertionResult FakeStream::waitForHeadersComplete(milliseconds timeout) {
if (time_system_.monotonicTime() >= end_time) {
return AssertionFailure() << "Timed out waiting for headers.";
}
time_system_.waitFor(lock_, decoder_event_, 5ms);
time_system_.waitFor(lock_, decoder_event_, 5ms, true);
}
return AssertionSuccess();
}
Expand All @@ -167,7 +167,7 @@ AssertionResult FakeStream::waitForData(Event::Dispatcher& client_dispatcher, ui
if (time_system_.monotonicTime() >= start_time + timeout) {
return AssertionFailure() << "Timed out waiting for data.";
}
time_system_.waitFor(lock_, decoder_event_, 5ms);
time_system_.waitFor(lock_, decoder_event_, 5ms, true);
if (bodyLength() < body_length) {
// Run the client dispatcher since we may need to process window updates, etc.
client_dispatcher.run(Event::Dispatcher::RunType::NonBlock);
Expand Down Expand Up @@ -196,7 +196,7 @@ AssertionResult FakeStream::waitForEndStream(Event::Dispatcher& client_dispatche
if (time_system_.monotonicTime() >= start_time + timeout) {
return AssertionFailure() << "Timed out waiting for end of stream.";
}
time_system_.waitFor(lock_, decoder_event_, 5ms);
time_system_.waitFor(lock_, decoder_event_, 5ms, true);
if (!end_stream_) {
// Run the client dispatcher since we may need to process window updates, etc.
client_dispatcher.run(Event::Dispatcher::RunType::NonBlock);
Expand All @@ -213,7 +213,7 @@ AssertionResult FakeStream::waitForReset(milliseconds timeout) {
return AssertionFailure() << "Timed out waiting for reset.";
}
// Safe since CondVar::waitFor won't throw.
time_system_.waitFor(lock_, decoder_event_, 5ms);
time_system_.waitFor(lock_, decoder_event_, 5ms, true);
}
return AssertionSuccess();
}
Expand Down Expand Up @@ -318,6 +318,7 @@ FakeHttpConnection::FakeHttpConnection(
}

AssertionResult FakeConnectionBase::close(std::chrono::milliseconds timeout) {
ENVOY_LOG(trace, "FakeConnectionBase close");
if (!shared_connection_.connected()) {
return AssertionSuccess();
}
Expand Down Expand Up @@ -355,7 +356,7 @@ AssertionResult FakeConnectionBase::waitForDisconnect(bool ignore_spurious_event
if (time_system_.monotonicTime() >= end_time) {
return AssertionFailure() << "Timed out waiting for disconnect.";
}
Thread::CondVar::WaitStatus status = time_system_.waitFor(lock_, connection_event_, 5ms);
Thread::CondVar::WaitStatus status = time_system_.waitFor(lock_, connection_event_, 5ms, true);
// The default behavior of waitForDisconnect is to assume the test cleanly
// calls waitForData, waitForNewStream, etc. to handle all events on the
// connection. If the caller explicitly notes that other events should be
Expand All @@ -381,7 +382,7 @@ AssertionResult FakeConnectionBase::waitForHalfClose(bool ignore_spurious_events
if (time_system_.monotonicTime() >= end_time) {
return AssertionFailure() << "Timed out waiting for half close.";
}
Thread::CondVar::WaitStatus status = time_system_.waitFor(lock_, connection_event_, 5ms);
Thread::CondVar::WaitStatus status = time_system_.waitFor(lock_, connection_event_, 5ms, true);
// The default behavior of waitForHalfClose is to assume the test cleanly
// calls waitForData, waitForNewStream, etc. to handle all events on the
// connection. If the caller explicitly notes that other events should be
Expand All @@ -407,7 +408,7 @@ AssertionResult FakeHttpConnection::waitForNewStream(Event::Dispatcher& client_d
if (time_system_.monotonicTime() >= end_time) {
return AssertionFailure() << "Timed out waiting for new stream.";
}
Thread::CondVar::WaitStatus status = time_system_.waitFor(lock_, connection_event_, 5ms);
Thread::CondVar::WaitStatus status = time_system_.waitFor(lock_, connection_event_, 5ms, true);
// As with waitForDisconnect, by default, waitForNewStream returns after the next event.
// If the caller explicitly notes other events should be ignored, it will instead actually
// wait for the next new stream, ignoring other events such as onData()
Expand Down Expand Up @@ -556,7 +557,7 @@ AssertionResult FakeUpstream::waitForHttpConnection(
if (time_system.monotonicTime() >= end_time) {
return AssertionFailure() << "Timed out waiting for new connection.";
}
time_system_.waitFor(lock_, upstream_event_, 5ms);
time_system_.waitFor(lock_, upstream_event_, 5ms, true);
if (new_connections_.empty()) {
// Run the client dispatcher since we may need to process window updates, etc.
client_dispatcher.run(Event::Dispatcher::RunType::NonBlock);
Expand Down Expand Up @@ -591,7 +592,7 @@ FakeUpstream::waitForHttpConnection(Event::Dispatcher& client_dispatcher,
FakeUpstream& upstream = *it;
Thread::ReleasableLockGuard lock(upstream.lock_);
if (upstream.new_connections_.empty()) {
time_system.waitFor(upstream.lock_, upstream.upstream_event_, 5ms);
time_system.waitFor(upstream.lock_, upstream.upstream_event_, 5ms, true);
}

if (upstream.new_connections_.empty()) {
Expand All @@ -618,8 +619,8 @@ AssertionResult FakeUpstream::waitForRawConnection(FakeRawConnectionPtr& connect
Thread::LockGuard lock(lock_);
if (new_connections_.empty()) {
ENVOY_LOG(debug, "waiting for raw connection");
time_system_.waitFor(lock_, upstream_event_,
timeout); // Safe since CondVar::waitFor won't throw.
time_system_.waitFor(lock_, upstream_event_, timeout,
true); // Safe since CondVar::waitFor won't throw.
}

if (new_connections_.empty()) {
Expand Down Expand Up @@ -649,7 +650,8 @@ testing::AssertionResult FakeUpstream::waitForUdpDatagram(Network::UdpRecvData&
if (time_system_.monotonicTime() >= end_time) {
return AssertionFailure() << "Timed out waiting for UDP datagram.";
}
time_system_.waitFor(lock_, upstream_event_, 5ms); // Safe since CondVar::waitFor won't throw.
time_system_.waitFor(lock_, upstream_event_, 5ms,
true); // Safe since CondVar::waitFor won't throw.
}
data_to_fill = std::move(received_datagrams_.front());
received_datagrams_.pop_front();
Expand Down Expand Up @@ -680,7 +682,8 @@ AssertionResult FakeRawConnection::waitForData(uint64_t num_bytes, std::string*
if (time_system_.monotonicTime() >= end_time) {
return AssertionFailure() << "Timed out waiting for data.";
}
time_system_.waitFor(lock_, connection_event_, 5ms); // Safe since CondVar::waitFor won't throw.
time_system_.waitFor(lock_, connection_event_, 5ms,
true); // Safe since CondVar::waitFor won't throw.
}
if (data != nullptr) {
*data = data_;
Expand All @@ -698,7 +701,8 @@ FakeRawConnection::waitForData(const std::function<bool(const std::string&)>& da
if (time_system_.monotonicTime() >= end_time) {
return AssertionFailure() << "Timed out waiting for data.";
}
time_system_.waitFor(lock_, connection_event_, 5ms); // Safe since CondVar::waitFor won't throw.
time_system_.waitFor(lock_, connection_event_, 5ms,
true); // Safe since CondVar::waitFor won't throw.
}
if (data != nullptr) {
*data = data_;
Expand Down
6 changes: 5 additions & 1 deletion test/integration/fake_upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,11 @@ class SharedConnectionWrapper : public Network::ConnectionCallbacks {
});
Event::TestTimeSystem& time_system =
dynamic_cast<Event::TestTimeSystem&>(connection_.dispatcher().timeSource());
Thread::CondVar::WaitStatus status = time_system.waitFor(lock_, callback_ready_event, timeout);
// fixfix @jmarantz: This is inherently racy? The notify can happen before the loop and
// then there is no conditional check inside waitFor(). IMO waitFor() needs to be changed
// to take a mutex and a condition and then use Await()?
Thread::CondVar::WaitStatus status =
time_system.waitFor(lock_, callback_ready_event, timeout, true);
if (status == Thread::CondVar::WaitStatus::Timeout) {
return testing::AssertionFailure() << "Timed out while executing on dispatcher.";
}
Expand Down
6 changes: 3 additions & 3 deletions test/integration/integration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ BaseIntegrationTest::BaseIntegrationTest(const InstanceConstSharedPtrFn& upstrea
// notification and clear the pool connection if necessary. A real fix would require adding fairly
// complex test hooks to the server and/or spin waiting on stats, neither of which I think are
// necessary right now.
timeSystem().advanceTimeWait(std::chrono::milliseconds(10));
timeSystem().advanceTimeWait(std::chrono::milliseconds(10), true);
ON_CALL(*mock_buffer_factory_, create_(_, _, _))
.WillByDefault(Invoke([](std::function<void()> below_low, std::function<void()> above_high,
std::function<void()> above_overflow) -> Buffer::Instance* {
Expand Down Expand Up @@ -504,7 +504,7 @@ void BaseIntegrationTest::createGeneratedApiTestServer(
absl::StrCat("Lds update failed. Details\n",
getListenerDetails(test_server_->server())));
}
time_system_.advanceTimeWait(std::chrono::milliseconds(10));
time_system_.advanceTimeWait(std::chrono::milliseconds(10), true);
}

registerTestServerPorts(port_names);
Expand Down Expand Up @@ -706,7 +706,7 @@ AssertionResult BaseIntegrationTest::waitForPortAvailable(uint32_t port,
nullptr, true);
return AssertionSuccess();
} catch (const EnvoyException&) {
timeSystem().advanceTimeWait(std::chrono::milliseconds(100));
timeSystem().advanceTimeWait(std::chrono::milliseconds(100), true);
}
}

Expand Down
2 changes: 1 addition & 1 deletion test/integration/utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ RawConnectionDriver::~RawConnectionDriver() = default;

void RawConnectionDriver::waitForConnection() {
while (!callbacks_->connected() && !callbacks_->closed()) {
Event::GlobalTimeSystem().timeSystem().advanceTimeWait(std::chrono::milliseconds(10));
Event::GlobalTimeSystem().timeSystem().advanceTimeWait(std::chrono::milliseconds(10), true);
dispatcher_.run(Event::Dispatcher::RunType::NonBlock);
}
}
Expand Down
15 changes: 9 additions & 6 deletions test/mocks/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,17 @@ class MockTimeSystem : public Event::TestTimeSystem {
Event::CallbackScheduler& cb_scheduler) override {
return real_time_.createScheduler(base_scheduler, cb_scheduler);
}
void advanceTimeWait(const Duration& duration) override { real_time_.advanceTimeWait(duration); }
void advanceTimeAsync(const Duration& duration) override {
real_time_.advanceTimeAsync(duration);
void advanceTimeWaitImpl(const Duration& duration, bool always_sleep) override {
real_time_.advanceTimeWaitImpl(duration, always_sleep);
}
Thread::CondVar::WaitStatus waitFor(Thread::MutexBasicLockable& mutex, Thread::CondVar& condvar,
const Duration& duration) noexcept
void advanceTimeAsyncImpl(const Duration& duration, bool always_sleep) override {
real_time_.advanceTimeAsyncImpl(duration, always_sleep);
}
Thread::CondVar::WaitStatus waitForImpl(Thread::MutexBasicLockable& mutex,
Thread::CondVar& condvar, const Duration& duration,
bool always_sleep) noexcept
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex) override {
return real_time_.waitFor(mutex, condvar, duration); // NO_CHECK_FORMAT(real_time)
return real_time_.waitFor(mutex, condvar, duration, always_sleep); // NO_CHECK_FORMAT(real_time)
}
MOCK_METHOD(SystemTime, systemTime, ());
MOCK_METHOD(MonotonicTime, monotonicTime, ());
Expand Down
29 changes: 18 additions & 11 deletions test/test_common/simulated_time_system.cc
Original file line number Diff line number Diff line change
Expand Up @@ -216,21 +216,27 @@ MonotonicTime SimulatedTimeSystemHelper::monotonicTime() {
return monotonic_time_;
}

void SimulatedTimeSystemHelper::advanceTimeAsync(const Duration& duration) {
void SimulatedTimeSystemHelper::advanceTimeAsyncImpl(const Duration& duration, bool always_sleep) {
only_one_thread_.checkOneThread();
absl::MutexLock lock(&mutex_);
MonotonicTime monotonic_time =
monotonic_time_ + std::chrono::duration_cast<MonotonicTime::duration>(duration);
setMonotonicTimeLockHeld(monotonic_time);
if (always_sleep) {
std::this_thread::sleep_for(duration);
}
}

void SimulatedTimeSystemHelper::advanceTimeWait(const Duration& duration) {
void SimulatedTimeSystemHelper::advanceTimeWaitImpl(const Duration& duration, bool always_sleep) {
only_one_thread_.checkOneThread();
absl::MutexLock lock(&mutex_);
MonotonicTime monotonic_time =
monotonic_time_ + std::chrono::duration_cast<MonotonicTime::duration>(duration);
setMonotonicTimeLockHeld(monotonic_time);
waitForNoPendingLockHeld();
if (always_sleep) {
std::this_thread::sleep_for(duration);
}
}

void SimulatedTimeSystemHelper::waitForNoPendingLockHeld() const
Expand All @@ -240,23 +246,19 @@ void SimulatedTimeSystemHelper::waitForNoPendingLockHeld() const
&pending_alarms_));
}

Thread::CondVar::WaitStatus SimulatedTimeSystemHelper::waitFor(Thread::MutexBasicLockable& mutex,
Thread::CondVar& condvar,
const Duration& duration) noexcept
Thread::CondVar::WaitStatus
SimulatedTimeSystemHelper::waitForImpl(Thread::MutexBasicLockable& mutex, Thread::CondVar& condvar,
const Duration& duration, bool always_sleep) noexcept
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex) {
only_one_thread_.checkOneThread();

// TODO(#10568): This real-time polling delay should not be necessary. Without
// it, test/extensions/filters/http/cache:cache_filter_integration_test fails
// about 40% of the time.
const Duration real_time_poll_delay(
std::min(std::chrono::duration_cast<Duration>(std::chrono::milliseconds(50)), duration));
const MonotonicTime end_time = monotonicTime() + duration;

bool timeout_not_reached = true;
while (timeout_not_reached) {
// First check to see if the condition is already satisfied without advancing sim time.
if (condvar.waitFor(mutex, real_time_poll_delay) == Thread::CondVar::WaitStatus::NoTimeout) {
if (condvar.waitFor(mutex, std::chrono::milliseconds(0)) ==
Thread::CondVar::WaitStatus::NoTimeout) {
return Thread::CondVar::WaitStatus::NoTimeout;
}

Expand All @@ -275,8 +277,13 @@ Thread::CondVar::WaitStatus SimulatedTimeSystemHelper::waitFor(Thread::MutexBasi
const AlarmRegistration& alarm_registration = *alarms_.begin();
next_wakeup = std::min(alarm_registration.time_, next_wakeup);
}
const auto sleep_duration = next_wakeup - monotonic_time_;
setMonotonicTimeLockHeld(next_wakeup);
waitForNoPendingLockHeld();
if (always_sleep) {
std::cerr << "sleeping for: " << std::chrono::duration_cast<std::chrono::milliseconds>(sleep_duration).count() << "\n\n";
std::this_thread::sleep_for(sleep_duration);
}
} else {
// If we reached our end_time, break the loop and return timeout. We
// don't break immediately as we have to drop mutex_ and re-take mutex,
Expand Down
9 changes: 5 additions & 4 deletions test/test_common/simulated_time_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ class SimulatedTimeSystemHelper : public TestTimeSystem {
SchedulerPtr createScheduler(Scheduler& base_scheduler, CallbackScheduler& cb_scheduler) override;

// TestTimeSystem
void advanceTimeWait(const Duration& duration) override;
void advanceTimeAsync(const Duration& duration) override;
Thread::CondVar::WaitStatus waitFor(Thread::MutexBasicLockable& mutex, Thread::CondVar& condvar,
const Duration& duration) noexcept
void advanceTimeWaitImpl(const Duration& duration, bool always_sleep) override;
void advanceTimeAsyncImpl(const Duration& duration, bool always_sleep) override;
Thread::CondVar::WaitStatus waitForImpl(Thread::MutexBasicLockable& mutex,
Thread::CondVar& condvar, const Duration& duration,
bool always_sleep) noexcept
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex) override;

// TimeSource
Expand Down
13 changes: 8 additions & 5 deletions test/test_common/test_time.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,19 @@ TestTimeSystem& GlobalTimeSystem::timeSystem() {
return singleton_->timeSystem(make_real_time_system);
}

void TestRealTimeSystem::advanceTimeWait(const Duration& duration) {
void TestRealTimeSystem::advanceTimeWaitImpl(const Duration& duration, bool) {
only_one_thread_.checkOneThread();
std::this_thread::sleep_for(duration);
}

void TestRealTimeSystem::advanceTimeAsync(const Duration& duration) { advanceTimeWait(duration); }
void TestRealTimeSystem::advanceTimeAsyncImpl(const Duration& duration, bool always_sleep) {
advanceTimeWait(duration, always_sleep);
}

Thread::CondVar::WaitStatus TestRealTimeSystem::waitFor(Thread::MutexBasicLockable& lock,
Thread::CondVar& condvar,
const Duration& duration) noexcept {
Thread::CondVar::WaitStatus TestRealTimeSystem::waitForImpl(Thread::MutexBasicLockable& lock,
Thread::CondVar& condvar,
const Duration& duration,
bool) noexcept {
only_one_thread_.checkOneThread();
return condvar.waitFor(lock, duration);
}
Expand Down
9 changes: 5 additions & 4 deletions test/test_common/test_time.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ namespace Event {
class TestRealTimeSystem : public TestTimeSystem {
public:
// TestTimeSystem
void advanceTimeAsync(const Duration& duration) override;
void advanceTimeWait(const Duration& duration) override;
Thread::CondVar::WaitStatus waitFor(Thread::MutexBasicLockable& mutex, Thread::CondVar& condvar,
const Duration& duration) noexcept
void advanceTimeAsyncImpl(const Duration& duration, bool always_sleep) override;
void advanceTimeWaitImpl(const Duration& duration, bool always_sleep) override;
Thread::CondVar::WaitStatus waitForImpl(Thread::MutexBasicLockable& mutex,
Thread::CondVar& condvar, const Duration& duration,
bool always_sleep) noexcept
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex) override;

// Event::TimeSystem
Expand Down
Loading

0 comments on commit f01cc68

Please sign in to comment.