diff --git a/test/extensions/filters/http/ratelimit/ratelimit_integration_test.cc b/test/extensions/filters/http/ratelimit/ratelimit_integration_test.cc index e8c86a00e06d..f0b19bd43806 100644 --- a/test/extensions/filters/http/ratelimit/ratelimit_integration_test.cc +++ b/test/extensions/filters/http/ratelimit/ratelimit_integration_test.cc @@ -333,7 +333,7 @@ TEST_P(RatelimitIntegrationTest, ConnectImmediateDisconnect) { initiateClientConnection(); ASSERT_TRUE(fake_upstreams_[1]->waitForHttpConnection(*dispatcher_, fake_ratelimit_connection_)); ASSERT_TRUE(fake_ratelimit_connection_->close()); - ASSERT_TRUE(fake_ratelimit_connection_->waitForDisconnect(true)); + ASSERT_TRUE(fake_ratelimit_connection_->waitForDisconnect()); fake_ratelimit_connection_ = nullptr; // Rate limiter fails open waitForSuccessfulUpstreamResponse(); diff --git a/test/extensions/filters/network/local_ratelimit/local_ratelimit_integration_test.cc b/test/extensions/filters/network/local_ratelimit/local_ratelimit_integration_test.cc index 63b684f49d86..1eaa8c66ff74 100644 --- a/test/extensions/filters/network/local_ratelimit/local_ratelimit_integration_test.cc +++ b/test/extensions/filters/network/local_ratelimit/local_ratelimit_integration_test.cc @@ -40,7 +40,7 @@ name: ratelimit ASSERT_TRUE(fake_upstream_connection->write("world")); tcp_client->waitForData("world"); tcp_client->close(); - ASSERT_TRUE(fake_upstream_connection->waitForDisconnect(true)); + ASSERT_TRUE(fake_upstream_connection->waitForDisconnect()); EXPECT_EQ(0, test_server_->counter("local_rate_limit.local_rate_limit_stats.rate_limited")->value()); diff --git a/test/integration/autonomous_upstream.cc b/test/integration/autonomous_upstream.cc index 45a467dcf7e7..f673a8e231a6 100644 --- a/test/integration/autonomous_upstream.cc +++ b/test/integration/autonomous_upstream.cc @@ -48,7 +48,7 @@ void AutonomousStream::sendResponse() { int32_t request_body_length = -1; HeaderToInt(EXPECT_REQUEST_SIZE_BYTES, request_body_length, headers); if (request_body_length >= 0) { - EXPECT_EQ(request_body_length, bodyLength()); + EXPECT_EQ(request_body_length, body_.length()); } if (!headers.get_(RESET_AFTER_REQUEST).empty()) { diff --git a/test/integration/autonomous_upstream.h b/test/integration/autonomous_upstream.h index e9d247a4ba95..96c885be345f 100644 --- a/test/integration/autonomous_upstream.h +++ b/test/integration/autonomous_upstream.h @@ -24,11 +24,11 @@ class AutonomousStream : public FakeStream { AutonomousUpstream& upstream, bool allow_incomplete_streams); ~AutonomousStream() override; - void setEndStream(bool set) override; + void setEndStream(bool set) EXCLUSIVE_LOCKS_REQUIRED(lock_) override; private: AutonomousUpstream& upstream_; - void sendResponse(); + void sendResponse() EXCLUSIVE_LOCKS_REQUIRED(lock_); const bool allow_incomplete_streams_{false}; }; diff --git a/test/integration/cluster_filter_integration_test.cc b/test/integration/cluster_filter_integration_test.cc index 4162bc9273cf..7c19415ba1f1 100644 --- a/test/integration/cluster_filter_integration_test.cc +++ b/test/integration/cluster_filter_integration_test.cc @@ -124,7 +124,7 @@ TEST_P(ClusterFilterIntegrationTest, TestClusterFilter) { ASSERT_TRUE(tcp_client->write("", true)); ASSERT_TRUE(fake_upstream_connection->waitForHalfClose()); ASSERT_TRUE(fake_upstream_connection->write("", true)); - ASSERT_TRUE(fake_upstream_connection->waitForDisconnect(true)); + ASSERT_TRUE(fake_upstream_connection->waitForDisconnect()); tcp_client->waitForDisconnect(); } diff --git a/test/integration/cx_limit_integration_test.cc b/test/integration/cx_limit_integration_test.cc index 126c5c236664..bced2562de71 100644 --- a/test/integration/cx_limit_integration_test.cc +++ b/test/integration/cx_limit_integration_test.cc @@ -47,7 +47,7 @@ class ConnectionLimitIntegrationTest : public testing::TestWithParam= end_time) { - return AssertionFailure() << "Timed out waiting for headers."; - } - time_system_.waitFor(lock_, decoder_event_, 5ms); + absl::MutexLock lock(&lock_); + const auto reached = [this]() { + lock_.AssertReaderHeld(); + return headers_ != nullptr; + }; + if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout, true)) { + return AssertionFailure() << "Timed out waiting for headers."; } return AssertionSuccess(); } +namespace { +// Perform a wait on a condition while still allowing for periodic client dispatcher runs that +// occur on the current thread. +bool waitForWithDispatcherRun(Event::TestTimeSystem& time_system, absl::Mutex& lock, + const std::function& condition, + Event::Dispatcher& client_dispatcher, milliseconds timeout) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock) { + const auto end_time = time_system.monotonicTime() + timeout; + while (time_system.monotonicTime() < end_time) { + // Wake up every 5ms to run the client dispatcher. + if (time_system.waitFor(lock, absl::Condition(&condition), 5ms, true)) { + return true; + } + + // Run the client dispatcher since we may need to process window updates, etc. + client_dispatcher.run(Event::Dispatcher::RunType::NonBlock); + } + return false; +} +} // namespace + AssertionResult FakeStream::waitForData(Event::Dispatcher& client_dispatcher, uint64_t body_length, milliseconds timeout) { - Thread::LockGuard lock(lock_); - auto start_time = time_system_.monotonicTime(); - while (bodyLength() < body_length) { - if (time_system_.monotonicTime() >= start_time + timeout) { - return AssertionFailure() << "Timed out waiting for data."; - } - time_system_.waitFor(lock_, decoder_event_, 5ms); - if (bodyLength() < body_length) { - // Run the client dispatcher since we may need to process window updates, etc. - client_dispatcher.run(Event::Dispatcher::RunType::NonBlock); - } + absl::MutexLock lock(&lock_); + if (!waitForWithDispatcherRun( + time_system_, lock_, + [this, body_length]() { + lock_.AssertReaderHeld(); + return (body_.length() >= body_length); + }, + client_dispatcher, timeout)) { + return AssertionFailure() << "Timed out waiting for data."; } return AssertionSuccess(); } @@ -190,30 +205,23 @@ AssertionResult FakeStream::waitForData(Event::Dispatcher& client_dispatcher, AssertionResult FakeStream::waitForEndStream(Event::Dispatcher& client_dispatcher, milliseconds timeout) { - Thread::LockGuard lock(lock_); - auto start_time = time_system_.monotonicTime(); - while (!end_stream_) { - if (time_system_.monotonicTime() >= start_time + timeout) { - return AssertionFailure() << "Timed out waiting for end of stream."; - } - time_system_.waitFor(lock_, decoder_event_, 5ms); - if (!end_stream_) { - // Run the client dispatcher since we may need to process window updates, etc. - client_dispatcher.run(Event::Dispatcher::RunType::NonBlock); - } + absl::MutexLock lock(&lock_); + if (!waitForWithDispatcherRun( + time_system_, lock_, + [this]() { + lock_.AssertReaderHeld(); + return end_stream_; + }, + client_dispatcher, timeout)) { + return AssertionFailure() << "Timed out waiting for end of stream."; } return AssertionSuccess(); } AssertionResult FakeStream::waitForReset(milliseconds timeout) { - Thread::LockGuard lock(lock_); - auto start_time = time_system_.monotonicTime(); - while (!saw_reset_) { - if (time_system_.monotonicTime() >= start_time + timeout) { - return AssertionFailure() << "Timed out waiting for reset."; - } - // Safe since CondVar::waitFor won't throw. - time_system_.waitFor(lock_, decoder_event_, 5ms); + absl::MutexLock lock(&lock_); + if (!time_system_.waitFor(lock_, absl::Condition(&saw_reset_), timeout, true)) { + return AssertionFailure() << "Timed out waiting for reset."; } return AssertionSuccess(); } @@ -318,6 +326,7 @@ FakeHttpConnection::FakeHttpConnection( } AssertionResult FakeConnectionBase::close(std::chrono::milliseconds timeout) { + ENVOY_LOG(trace, "FakeConnectionBase close"); if (!shared_connection_.connected()) { return AssertionSuccess(); } @@ -340,88 +349,46 @@ AssertionResult FakeConnectionBase::enableHalfClose(bool enable, } Http::RequestDecoder& FakeHttpConnection::newStream(Http::ResponseEncoder& encoder, bool) { - Thread::LockGuard lock(lock_); + absl::MutexLock lock(&lock_); new_streams_.emplace_back(new FakeStream(*this, encoder, time_system_)); - connection_event_.notifyOne(); return *new_streams_.back(); } -AssertionResult FakeConnectionBase::waitForDisconnect(bool ignore_spurious_events, - milliseconds timeout) { +AssertionResult FakeConnectionBase::waitForDisconnect(milliseconds timeout) { ENVOY_LOG(trace, "FakeConnectionBase waiting for disconnect"); - auto end_time = time_system_.monotonicTime() + timeout; - Thread::LockGuard lock(lock_); - while (shared_connection_.connected()) { - if (time_system_.monotonicTime() >= end_time) { - return AssertionFailure() << "Timed out waiting for disconnect."; - } - Thread::CondVar::WaitStatus status = time_system_.waitFor(lock_, connection_event_, 5ms); - // The default behavior of waitForDisconnect is to assume the test cleanly - // calls waitForData, waitForNewStream, etc. to handle all events on the - // connection. If the caller explicitly notes that other events should be - // ignored, continue looping until a disconnect is detected. Otherwise fall - // through and hit the assert below. - if ((status == Thread::CondVar::WaitStatus::NoTimeout) && !ignore_spurious_events) { - break; - } - } - - if (shared_connection_.connected()) { - return AssertionFailure() << "Expected disconnect, but got a different event."; + absl::MutexLock lock(&lock_); + const auto reached = [this]() { + lock_.AssertReaderHeld(); + return !shared_connection_.connectedLockHeld(); + }; + + if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout, true)) { + return AssertionFailure() << "Timed out waiting for disconnect."; } ENVOY_LOG(trace, "FakeConnectionBase done waiting for disconnect"); return AssertionSuccess(); } -AssertionResult FakeConnectionBase::waitForHalfClose(bool ignore_spurious_events, - milliseconds timeout) { - auto end_time = time_system_.monotonicTime() + timeout; - Thread::LockGuard lock(lock_); - while (!half_closed_) { - if (time_system_.monotonicTime() >= end_time) { - return AssertionFailure() << "Timed out waiting for half close."; - } - Thread::CondVar::WaitStatus status = time_system_.waitFor(lock_, connection_event_, 5ms); - // The default behavior of waitForHalfClose is to assume the test cleanly - // calls waitForData, waitForNewStream, etc. to handle all events on the - // connection. If the caller explicitly notes that other events should be - // ignored, continue looping until a disconnect is detected. Otherwise fall - // through and hit the assert below. - if (status == Thread::CondVar::WaitStatus::NoTimeout && !ignore_spurious_events) { - break; - } +AssertionResult FakeConnectionBase::waitForHalfClose(milliseconds timeout) { + absl::MutexLock lock(&lock_); + if (!time_system_.waitFor(lock_, absl::Condition(&half_closed_), timeout, true)) { + return AssertionFailure() << "Timed out waiting for half close."; } - - return half_closed_ - ? AssertionSuccess() - : (AssertionFailure() << "Expected half close event, but got a different event."); + return AssertionSuccess(); } AssertionResult FakeHttpConnection::waitForNewStream(Event::Dispatcher& client_dispatcher, FakeStreamPtr& stream, - bool ignore_spurious_events, - milliseconds timeout) { - auto end_time = time_system_.monotonicTime() + timeout; - Thread::LockGuard lock(lock_); - while (new_streams_.empty()) { - if (time_system_.monotonicTime() >= end_time) { - return AssertionFailure() << "Timed out waiting for new stream."; - } - Thread::CondVar::WaitStatus status = time_system_.waitFor(lock_, connection_event_, 5ms); - // As with waitForDisconnect, by default, waitForNewStream returns after the next event. - // If the caller explicitly notes other events should be ignored, it will instead actually - // wait for the next new stream, ignoring other events such as onData() - if (status == Thread::CondVar::WaitStatus::NoTimeout && !ignore_spurious_events) { - break; - } - if (new_streams_.empty()) { - // Run the client dispatcher since we may need to process window updates, etc. - client_dispatcher.run(Event::Dispatcher::RunType::NonBlock); - } - } - - if (new_streams_.empty()) { - return AssertionFailure() << "Expected new stream event, but got a different event."; + std::chrono::milliseconds timeout) { + absl::MutexLock lock(&lock_); + if (!waitForWithDispatcherRun( + time_system_, lock_, + [this]() { + lock_.AssertReaderHeld(); + return !new_streams_.empty(); + }, + client_dispatcher, timeout)) { + return AssertionFailure() << "Timed out waiting for new stream."; } stream = std::move(new_streams_.front()); new_streams_.pop_front(); @@ -513,14 +480,13 @@ void FakeUpstream::cleanUp() { bool FakeUpstream::createNetworkFilterChain(Network::Connection& connection, const std::vector&) { - Thread::LockGuard lock(lock_); + absl::MutexLock lock(&lock_); if (read_disable_on_new_connection_) { connection.readDisable(true); } auto connection_wrapper = - std::make_unique(connection, allow_unexpected_disconnects_); + std::make_unique(connection, allow_unexpected_disconnects_); LinkedList::moveIntoListBack(std::move(connection_wrapper), new_connections_); - upstream_event_.notifyOne(); return true; } @@ -537,7 +503,7 @@ void FakeUpstream::threadRoutine() { dispatcher_->run(Event::Dispatcher::RunType::Block); handler_.reset(); { - Thread::LockGuard lock(lock_); + absl::MutexLock lock(&lock_); new_connections_.clear(); consumed_connections_.clear(); } @@ -548,26 +514,20 @@ AssertionResult FakeUpstream::waitForHttpConnection( uint32_t max_request_headers_kb, uint32_t max_request_headers_count, envoy::config::core::v3::HttpProtocolOptions::HeadersWithUnderscoresAction headers_with_underscores_action) { - Event::TestTimeSystem& time_system = timeSystem(); - auto end_time = time_system.monotonicTime() + timeout; { - Thread::LockGuard lock(lock_); - while (new_connections_.empty()) { - if (time_system.monotonicTime() >= end_time) { - return AssertionFailure() << "Timed out waiting for new connection."; - } - time_system_.waitFor(lock_, upstream_event_, 5ms); - if (new_connections_.empty()) { - // Run the client dispatcher since we may need to process window updates, etc. - client_dispatcher.run(Event::Dispatcher::RunType::NonBlock); - } + absl::MutexLock lock(&lock_); + if (!waitForWithDispatcherRun( + time_system_, lock_, + [this]() { + lock_.AssertReaderHeld(); + return !new_connections_.empty(); + }, + client_dispatcher, timeout)) { + return AssertionFailure() << "Timed out waiting for new connection."; } - if (new_connections_.empty()) { - return AssertionFailure() << "Got a new connection event, but didn't create a connection."; - } connection = std::make_unique( - *this, consumeConnection(), http_type_, time_system, max_request_headers_kb, + *this, consumeConnection(), http_type_, time_system_, max_request_headers_kb, max_request_headers_count, headers_with_underscores_action); } VERIFY_ASSERTION(connection->initialize()); @@ -589,24 +549,25 @@ FakeUpstream::waitForHttpConnection(Event::Dispatcher& client_dispatcher, while (time_system.monotonicTime() < end_time) { for (auto& it : upstreams) { FakeUpstream& upstream = *it; - Thread::ReleasableLockGuard lock(upstream.lock_); - if (upstream.new_connections_.empty()) { - time_system.waitFor(upstream.lock_, upstream.upstream_event_, 5ms); - } - - if (upstream.new_connections_.empty()) { - // Run the client dispatcher since we may need to process window updates, etc. - client_dispatcher.run(Event::Dispatcher::RunType::NonBlock); - } else { + { + absl::MutexLock lock(&upstream.lock_); + if (!waitForWithDispatcherRun( + time_system, upstream.lock_, + [&upstream]() { + upstream.lock_.AssertReaderHeld(); + return !upstream.new_connections_.empty(); + }, + client_dispatcher, 5ms)) { + continue; + } connection = std::make_unique( upstream, upstream.consumeConnection(), upstream.http_type_, upstream.timeSystem(), Http::DEFAULT_MAX_REQUEST_HEADERS_KB, Http::DEFAULT_MAX_HEADERS_COUNT, envoy::config::core::v3::HttpProtocolOptions::ALLOW); - lock.release(); - VERIFY_ASSERTION(connection->initialize()); - VERIFY_ASSERTION(connection->readDisable(false)); - return AssertionSuccess(); } + VERIFY_ASSERTION(connection->initialize()); + VERIFY_ASSERTION(connection->readDisable(false)); + return AssertionSuccess(); } } return AssertionFailure() << "Timed out waiting for HTTP connection."; @@ -615,14 +576,14 @@ FakeUpstream::waitForHttpConnection(Event::Dispatcher& client_dispatcher, AssertionResult FakeUpstream::waitForRawConnection(FakeRawConnectionPtr& connection, milliseconds timeout) { { - Thread::LockGuard lock(lock_); - if (new_connections_.empty()) { - ENVOY_LOG(debug, "waiting for raw connection"); - time_system_.waitFor(lock_, upstream_event_, - timeout); // Safe since CondVar::waitFor won't throw. - } - - if (new_connections_.empty()) { + absl::MutexLock lock(&lock_); + const auto reached = [this]() { + lock_.AssertReaderHeld(); + return !new_connections_.empty(); + }; + + ENVOY_LOG(debug, "waiting for raw connection"); + if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout, true)) { return AssertionFailure() << "Timed out waiting for raw connection"; } connection = std::make_unique(consumeConnection(), timeSystem()); @@ -636,30 +597,31 @@ AssertionResult FakeUpstream::waitForRawConnection(FakeRawConnectionPtr& connect SharedConnectionWrapper& FakeUpstream::consumeConnection() { ASSERT(!new_connections_.empty()); auto* const connection_wrapper = new_connections_.front().get(); - connection_wrapper->set_parented(); + connection_wrapper->setParented(); connection_wrapper->moveBetweenLists(new_connections_, consumed_connections_); - return connection_wrapper->shared_connection(); + return *connection_wrapper; } testing::AssertionResult FakeUpstream::waitForUdpDatagram(Network::UdpRecvData& data_to_fill, std::chrono::milliseconds timeout) { - Thread::LockGuard lock(lock_); - auto end_time = time_system_.monotonicTime() + timeout; - while (received_datagrams_.empty()) { - if (time_system_.monotonicTime() >= end_time) { - return AssertionFailure() << "Timed out waiting for UDP datagram."; - } - time_system_.waitFor(lock_, upstream_event_, 5ms); // Safe since CondVar::waitFor won't throw. + absl::MutexLock lock(&lock_); + const auto reached = [this]() { + lock_.AssertReaderHeld(); + return !received_datagrams_.empty(); + }; + + if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout, true)) { + return AssertionFailure() << "Timed out waiting for UDP datagram."; } + data_to_fill = std::move(received_datagrams_.front()); received_datagrams_.pop_front(); return AssertionSuccess(); } void FakeUpstream::onRecvDatagram(Network::UdpRecvData& data) { - Thread::LockGuard lock(lock_); + absl::MutexLock lock(&lock_); received_datagrams_.emplace_back(std::move(data)); - upstream_event_.notifyOne(); } void FakeUpstream::sendUdpDatagram(const std::string& buffer, @@ -673,14 +635,14 @@ void FakeUpstream::sendUdpDatagram(const std::string& buffer, AssertionResult FakeRawConnection::waitForData(uint64_t num_bytes, std::string* data, milliseconds timeout) { - Thread::LockGuard lock(lock_); + absl::MutexLock lock(&lock_); + const auto reached = [this, num_bytes]() { + lock_.AssertReaderHeld(); + return data_.size() == num_bytes; + }; ENVOY_LOG(debug, "waiting for {} bytes of data", num_bytes); - auto end_time = time_system_.monotonicTime() + timeout; - while (data_.size() != num_bytes) { - if (time_system_.monotonicTime() >= end_time) { - return AssertionFailure() << "Timed out waiting for data."; - } - time_system_.waitFor(lock_, connection_event_, 5ms); // Safe since CondVar::waitFor won't throw. + if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout, true)) { + return AssertionFailure() << "Timed out waiting for data."; } if (data != nullptr) { *data = data_; @@ -691,14 +653,14 @@ AssertionResult FakeRawConnection::waitForData(uint64_t num_bytes, std::string* AssertionResult FakeRawConnection::waitForData(const std::function& data_validator, std::string* data, milliseconds timeout) { - Thread::LockGuard lock(lock_); + absl::MutexLock lock(&lock_); + const auto reached = [this, &data_validator]() { + lock_.AssertReaderHeld(); + return data_validator(data_); + }; ENVOY_LOG(debug, "waiting for data"); - auto end_time = time_system_.monotonicTime() + timeout; - while (!data_validator(data_)) { - if (time_system_.monotonicTime() >= end_time) { - return AssertionFailure() << "Timed out waiting for data."; - } - time_system_.waitFor(lock_, connection_event_, 5ms); // Safe since CondVar::waitFor won't throw. + if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout, true)) { + return AssertionFailure() << "Timed out waiting for data."; } if (data != nullptr) { *data = data_; @@ -718,12 +680,11 @@ AssertionResult FakeRawConnection::write(const std::string& data, bool end_strea Network::FilterStatus FakeRawConnection::ReadFilter::onData(Buffer::Instance& data, bool end_stream) { - Thread::LockGuard lock(parent_.lock_); + absl::MutexLock lock(&parent_.lock_); ENVOY_LOG(debug, "got {} bytes, end_stream {}", data.length(), end_stream); parent_.data_.append(data.toString()); parent_.half_closed_ = end_stream; data.drain(data.length()); - parent_.connection_event_.notifyOne(); return Network::FilterStatus::StopIteration; } } // namespace Envoy diff --git a/test/integration/fake_upstream.h b/test/integration/fake_upstream.h index 0c23c42e4b41..479300f78db1 100644 --- a/test/integration/fake_upstream.h +++ b/test/integration/fake_upstream.h @@ -56,10 +56,16 @@ class FakeStream : public Http::RequestDecoder, FakeStream(FakeHttpConnection& parent, Http::ResponseEncoder& encoder, Event::TestTimeSystem& time_system); - uint64_t bodyLength() { return body_.length(); } - Buffer::Instance& body() { return body_; } + uint64_t bodyLength() { + absl::MutexLock lock(&lock_); + return body_.length(); + } + Buffer::Instance& body() { + absl::MutexLock lock(&lock_); + return body_; + } bool complete() { - Thread::LockGuard lock(lock_); + absl::MutexLock lock(&lock_); return end_stream_; } @@ -76,7 +82,10 @@ class FakeStream : public Http::RequestDecoder, void encodeResetStream(); void encodeMetadata(const Http::MetadataMapVector& metadata_map_vector); void readDisable(bool disable); - const Http::RequestHeaderMap& headers() { return *headers_; } + const Http::RequestHeaderMap& headers() { + absl::MutexLock lock(&lock_); + return *headers_; + } void setAddServedByHeader(bool add_header) { add_served_by_header_ = add_header; } const Http::RequestTrailerMapPtr& trailers() { return trailers_; } bool receivedData() { return received_data_; } @@ -88,8 +97,12 @@ class FakeStream : public Http::RequestDecoder, const std::function& /*modify_headers*/, const absl::optional grpc_status, absl::string_view /*details*/) override { - const bool is_head_request = - headers_ != nullptr && headers_->getMethodValue() == Http::Headers::get().MethodValues.Head; + bool is_head_request; + { + absl::MutexLock lock(&lock_); + is_head_request = headers_ != nullptr && + headers_->getMethodValue() == Http::Headers::get().MethodValues.Head; + } Http::Utility::sendLocalReply( false, Http::Utility::EncodeFunctions( @@ -160,10 +173,10 @@ class FakeStream : public Http::RequestDecoder, return testing::AssertionFailure() << "Timed out waiting for start of gRPC message."; } { - Thread::LockGuard lock(lock_); - if (!grpc_decoder_.decode(body(), decoded_grpc_frames_)) { + absl::MutexLock lock(&lock_); + if (!grpc_decoder_.decode(body_, decoded_grpc_frames_)) { return testing::AssertionFailure() - << "Couldn't decode gRPC data frame: " << body().toString(); + << "Couldn't decode gRPC data frame: " << body_.toString(); } } if (decoded_grpc_frames_.empty()) { @@ -173,10 +186,10 @@ class FakeStream : public Http::RequestDecoder, return testing::AssertionFailure() << "Timed out waiting for end of gRPC message."; } { - Thread::LockGuard lock(lock_); - if (!grpc_decoder_.decode(body(), decoded_grpc_frames_)) { + absl::MutexLock lock(&lock_); + if (!grpc_decoder_.decode(body_, decoded_grpc_frames_)) { return testing::AssertionFailure() - << "Couldn't decode gRPC data frame: " << body().toString(); + << "Couldn't decode gRPC data frame: " << body_.toString(); } } } @@ -199,7 +212,7 @@ class FakeStream : public Http::RequestDecoder, void onAboveWriteBufferHighWatermark() override {} void onBelowWriteBufferLowWatermark() override {} - virtual void setEndStream(bool end) { end_stream_ = end; } + virtual void setEndStream(bool end) EXCLUSIVE_LOCKS_REQUIRED(lock_) { end_stream_ = end; } Event::TestTimeSystem& timeSystem() { return time_system_; } @@ -209,17 +222,16 @@ class FakeStream : public Http::RequestDecoder, } protected: - Http::RequestHeaderMapPtr headers_; + absl::Mutex lock_; + Http::RequestHeaderMapPtr headers_ ABSL_GUARDED_BY(lock_); + Buffer::OwnedImpl body_ ABSL_GUARDED_BY(lock_); private: FakeHttpConnection& parent_; Http::ResponseEncoder& encoder_; - Thread::MutexBasicLockable lock_; - Thread::CondVar decoder_event_; - Http::RequestTrailerMapPtr trailers_; - bool end_stream_{}; - Buffer::OwnedImpl body_; - bool saw_reset_{}; + Http::RequestTrailerMapPtr trailers_ ABSL_GUARDED_BY(lock_); + bool end_stream_ ABSL_GUARDED_BY(lock_){}; + bool saw_reset_ ABSL_GUARDED_BY(lock_){}; Grpc::Decoder grpc_decoder_; std::vector decoded_grpc_frames_; bool add_served_by_header_{}; @@ -239,33 +251,44 @@ using FakeStreamPtr = std::unique_ptr; // SharedConnectionWrapper that lives from when the Connection is added to the accepted connection // queue and then through the lifetime of the Fake{Raw,Http}Connection that manages the Connection // through active use. -class SharedConnectionWrapper : public Network::ConnectionCallbacks { +class SharedConnectionWrapper : public Network::ConnectionCallbacks, + public LinkedObject { public: using DisconnectCallback = std::function; SharedConnectionWrapper(Network::Connection& connection, bool allow_unexpected_disconnects) : connection_(connection), allow_unexpected_disconnects_(allow_unexpected_disconnects) { connection_.addConnectionCallbacks(*this); + addDisconnectCallback([this] { + lock_.AssertReaderHeld(); // Locked in onEvent(). + RELEASE_ASSERT(parented_ || allow_unexpected_disconnects_, + "An queued upstream connection was torn down without being associated " + "with a fake connection. Either manage the connection via " + "waitForRawConnection() or waitForHttpConnection(), or " + "set_allow_unexpected_disconnects(true).\n See " + "https://github.com/envoyproxy/envoy/blob/master/test/integration/README.md#" + "unparented-upstream-connections"); + }); } Common::CallbackHandle* addDisconnectCallback(DisconnectCallback callback) { - Thread::LockGuard lock(lock_); + absl::MutexLock lock(&lock_); return disconnect_callback_manager_.add(callback); } // Avoid directly removing by caller, since CallbackManager is not thread safe. void removeDisconnectCallback(Common::CallbackHandle* handle) { - Thread::LockGuard lock(lock_); + absl::MutexLock lock(&lock_); handle->remove(); } // Network::ConnectionCallbacks void onEvent(Network::ConnectionEvent event) override { // Throughout this entire function, we know that the connection_ cannot disappear, since this - // callback is invoked prior to connection_ deferred delete. We also know by locking below, that - // elsewhere where we also hold lock_, that the connection cannot disappear inside the locked - // scope. - Thread::LockGuard lock(lock_); + // callback is invoked prior to connection_ deferred delete. We also know by locking below, + // that elsewhere where we also hold lock_, that the connection cannot disappear inside the + // locked scope. + absl::MutexLock lock(&lock_); if (event == Network::ConnectionEvent::RemoteClose || event == Network::ConnectionEvent::LocalClose) { disconnected_ = true; @@ -277,7 +300,14 @@ class SharedConnectionWrapper : public Network::ConnectionCallbacks { void onBelowWriteBufferLowWatermark() override {} bool connected() { - Thread::LockGuard lock(lock_); + absl::MutexLock lock(&lock_); + return connectedLockHeld(); + } + + bool connectedLockHeld() { + lock_.AssertReaderHeld(); // TODO(mattklein123): This can't be annotated because the lock + // is acquired via the base connection reference. Fix this to + // remove the reference. return !disconnected_; } @@ -295,28 +325,28 @@ class SharedConnectionWrapper : public Network::ConnectionCallbacks { testing::AssertionResult executeOnDispatcher(std::function f, std::chrono::milliseconds timeout = TestUtility::DefaultTimeout) { - Thread::LockGuard lock(lock_); + absl::MutexLock lock(&lock_); if (disconnected_) { return testing::AssertionSuccess(); } - Thread::CondVar callback_ready_event; - std::atomic unexpected_disconnect = false; + bool callback_ready_event = false; + bool unexpected_disconnect = false; connection_.dispatcher().post( - [this, f, &callback_ready_event, &unexpected_disconnect]() -> void { + [this, f, &lock = lock_, &callback_ready_event, &unexpected_disconnect]() -> void { // The use of connected() here, vs. !disconnected_, is because we want to use the lock_ - // acquisition to briefly serialize. This avoids us entering this completion and issuing a - // notifyOne() until the wait() is ready to receive it below. + // acquisition to briefly serialize. This avoids us entering this completion and issuing + // a notifyOne() until the wait() is ready to receive it below. if (connected()) { f(connection_); } else { unexpected_disconnect = true; } - callback_ready_event.notifyOne(); + absl::MutexLock lock_guard(&lock); + callback_ready_event = true; }); Event::TestTimeSystem& time_system = dynamic_cast(connection_.dispatcher().timeSource()); - Thread::CondVar::WaitStatus status = time_system.waitFor(lock_, callback_ready_event, timeout); - if (status == Thread::CondVar::WaitStatus::Timeout) { + if (!time_system.waitFor(lock_, absl::Condition(&callback_ready_event), timeout, true)) { return testing::AssertionFailure() << "Timed out while executing on dispatcher."; } if (unexpected_disconnect && !allow_unexpected_disconnects_) { @@ -330,69 +360,30 @@ class SharedConnectionWrapper : public Network::ConnectionCallbacks { return testing::AssertionSuccess(); } + absl::Mutex& lock() { return lock_; } + + void setParented() { + absl::MutexLock lock(&lock_); + parented_ = true; + } + private: Network::Connection& connection_; - Thread::MutexBasicLockable lock_; + absl::Mutex lock_; Common::CallbackManager<> disconnect_callback_manager_ ABSL_GUARDED_BY(lock_); + bool parented_ ABSL_GUARDED_BY(lock_){}; bool disconnected_ ABSL_GUARDED_BY(lock_){}; const bool allow_unexpected_disconnects_; }; using SharedConnectionWrapperPtr = std::unique_ptr; -class QueuedConnectionWrapper; -using QueuedConnectionWrapperPtr = std::unique_ptr; - -/** - * Wraps a raw Network::Connection in a safe way, such that the connection can - * be placed in a queue for an arbitrary amount of time. It handles disconnects - * that take place in the queued state by failing the test. Once a - * QueuedConnectionWrapper object is instantiated by FakeHttpConnection or - * FakeRawConnection, it no longer plays a role. - * TODO(htuch): We can simplify the storage lifetime by destructing if/when - * removeConnectionCallbacks is added. - */ -class QueuedConnectionWrapper : public LinkedObject { -public: - QueuedConnectionWrapper(Network::Connection& connection, bool allow_unexpected_disconnects) - : shared_connection_(connection, allow_unexpected_disconnects), parented_(false), - allow_unexpected_disconnects_(allow_unexpected_disconnects) { - shared_connection_.addDisconnectCallback([this] { - Thread::LockGuard lock(lock_); - RELEASE_ASSERT(parented_ || allow_unexpected_disconnects_, - "An queued upstream connection was torn down without being associated " - "with a fake connection. Either manage the connection via " - "waitForRawConnection() or waitForHttpConnection(), or " - "set_allow_unexpected_disconnects(true).\n See " - "https://github.com/envoyproxy/envoy/blob/master/test/integration/README.md#" - "unparented-upstream-connections"); - }); - } - - void set_parented() { - Thread::LockGuard lock(lock_); - parented_ = true; - } - - SharedConnectionWrapper& shared_connection() { return shared_connection_; } - -private: - SharedConnectionWrapper shared_connection_; - Thread::MutexBasicLockable lock_; - bool parented_ ABSL_GUARDED_BY(lock_); - const bool allow_unexpected_disconnects_; -}; - /** * Base class for both fake raw connections and fake HTTP connections. */ class FakeConnectionBase : public Logger::Loggable { public: - virtual ~FakeConnectionBase() { - ASSERT(initialized_); - ASSERT(disconnect_callback_handle_ != nullptr); - shared_connection_.removeDisconnectCallback(disconnect_callback_handle_); - } + virtual ~FakeConnectionBase() { ASSERT(initialized_); } ABSL_MUST_USE_RESULT testing::AssertionResult close(std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); @@ -401,44 +392,35 @@ class FakeConnectionBase : public Logger::Loggable { testing::AssertionResult readDisable(bool disable, std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); - // By default waitForDisconnect and waitForHalfClose assume the next event is - // a disconnect and return an AssertionFailure if an unexpected event occurs. - // If a caller truly wishes to wait until disconnect, set - // ignore_spurious_events = true. ABSL_MUST_USE_RESULT testing::AssertionResult - waitForDisconnect(bool ignore_spurious_events = false, - std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); + waitForDisconnect(std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); ABSL_MUST_USE_RESULT testing::AssertionResult - waitForHalfClose(bool ignore_spurious_events = false, - std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); + waitForHalfClose(std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); ABSL_MUST_USE_RESULT virtual testing::AssertionResult initialize() { initialized_ = true; - disconnect_callback_handle_ = - shared_connection_.addDisconnectCallback([this] { connection_event_.notifyOne(); }); return testing::AssertionSuccess(); } ABSL_MUST_USE_RESULT testing::AssertionResult enableHalfClose(bool enabled, std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); - SharedConnectionWrapper& shared_connection() { return shared_connection_; } // The same caveats apply here as in SharedConnectionWrapper::connection(). Network::Connection& connection() const { return shared_connection_.connection(); } bool connected() const { return shared_connection_.connected(); } protected: FakeConnectionBase(SharedConnectionWrapper& shared_connection, Event::TestTimeSystem& time_system) - : shared_connection_(shared_connection), time_system_(time_system) {} + : shared_connection_(shared_connection), lock_(shared_connection.lock()), + time_system_(time_system) {} - Common::CallbackHandle* disconnect_callback_handle_; SharedConnectionWrapper& shared_connection_; bool initialized_{}; - Thread::CondVar connection_event_; - Thread::MutexBasicLockable lock_; + absl::Mutex& lock_; // TODO(mattklein123): Use the shared connection lock and figure out better + // guarded by annotations. bool half_closed_ ABSL_GUARDED_BY(lock_){}; Event::TestTimeSystem& time_system_; }; @@ -456,14 +438,9 @@ class FakeHttpConnection : public Http::ServerConnectionCallbacks, public FakeCo envoy::config::core::v3::HttpProtocolOptions::HeadersWithUnderscoresAction headers_with_underscores_action); - // By default waitForNewStream assumes the next event is a new stream and - // returns AssertionFailure if an unexpected event occurs. If a caller truly - // wishes to wait for a new stream, set ignore_spurious_events = true. Returns - // the new stream via the stream argument. ABSL_MUST_USE_RESULT testing::AssertionResult waitForNewStream(Event::Dispatcher& client_dispatcher, FakeStreamPtr& stream, - bool ignore_spurious_events = false, std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); // Http::ServerConnectionCallbacks @@ -498,7 +475,7 @@ class FakeHttpConnection : public Http::ServerConnectionCallbacks, public FakeCo }; Http::ServerConnectionPtr codec_; - std::list new_streams_; + std::list new_streams_ ABSL_GUARDED_BY(lock_); }; using FakeHttpConnectionPtr = std::unique_ptr; @@ -563,7 +540,7 @@ class FakeRawConnection : public FakeConnectionBase { FakeRawConnection& parent_; }; - std::string data_; + std::string data_ ABSL_GUARDED_BY(lock_); }; using FakeRawConnectionPtr = std::unique_ptr; @@ -755,18 +732,17 @@ class FakeUpstream : Logger::Loggable, ConditionalInitializer server_initialized_; // Guards any objects which can be altered both in the upstream thread and the // main test thread. - Thread::MutexBasicLockable lock_; + absl::Mutex lock_; Thread::ThreadPtr thread_; - Thread::CondVar upstream_event_; Api::ApiPtr api_; Event::TestTimeSystem& time_system_; Event::DispatcherPtr dispatcher_; Network::ConnectionHandlerPtr handler_; - std::list new_connections_ ABSL_GUARDED_BY(lock_); + std::list new_connections_ ABSL_GUARDED_BY(lock_); // When a QueuedConnectionWrapper is popped from new_connections_, ownership is transferred to // consumed_connections_. This allows later the Connection destruction (when the FakeUpstream is // deleted) on the same thread that allocated the connection. - std::list consumed_connections_ ABSL_GUARDED_BY(lock_); + std::list consumed_connections_ ABSL_GUARDED_BY(lock_); bool allow_unexpected_disconnects_; bool read_disable_on_new_connection_; const bool enable_half_close_; diff --git a/test/integration/filter_manager_integration_test.cc b/test/integration/filter_manager_integration_test.cc index 0d1c55afa896..d6d43ba0a66a 100644 --- a/test/integration/filter_manager_integration_test.cc +++ b/test/integration/filter_manager_integration_test.cc @@ -515,7 +515,7 @@ TEST_P(InjectDataWithTcpProxyFilterIntegrationTest, UsageOfInjectDataMethodsShou ASSERT_TRUE(fake_upstream_connection->waitForHalfClose()); ASSERT_TRUE(fake_upstream_connection->write("there!", true)); - ASSERT_TRUE(fake_upstream_connection->waitForDisconnect(true)); + ASSERT_TRUE(fake_upstream_connection->waitForDisconnect()); tcp_client->waitForData("there!"); tcp_client->waitForDisconnect(); diff --git a/test/integration/h1_fuzz.cc b/test/integration/h1_fuzz.cc index 3fe7886bd970..5067240254e5 100644 --- a/test/integration/h1_fuzz.cc +++ b/test/integration/h1_fuzz.cc @@ -69,7 +69,7 @@ void H1FuzzIntegrationTest::replay(const test::integration::CaptureFuzzTestCase& AssertionResult result = fake_upstream_connection->close(); RELEASE_ASSERT(result, result.message()); } - AssertionResult result = fake_upstream_connection->waitForDisconnect(true); + AssertionResult result = fake_upstream_connection->waitForDisconnect(); RELEASE_ASSERT(result, result.message()); } tcp_client->close(); diff --git a/test/integration/h2_fuzz.cc b/test/integration/h2_fuzz.cc index c0eeae08152e..a7c7e3195c33 100644 --- a/test/integration/h2_fuzz.cc +++ b/test/integration/h2_fuzz.cc @@ -244,7 +244,7 @@ void H2FuzzIntegrationTest::replay(const test::integration::H2CaptureFuzzTestCas AssertionResult result = fake_upstream_connection->close(); RELEASE_ASSERT(result, result.message()); } - AssertionResult result = fake_upstream_connection->waitForDisconnect(true); + AssertionResult result = fake_upstream_connection->waitForDisconnect(); RELEASE_ASSERT(result, result.message()); } if (tcp_client->connected()) { diff --git a/test/integration/hds_integration_test.cc b/test/integration/hds_integration_test.cc index 1c950e2499b1..2073cb2a6825 100644 --- a/test/integration/hds_integration_test.cc +++ b/test/integration/hds_integration_test.cc @@ -317,7 +317,7 @@ TEST_P(HdsIntegrationTest, SingleEndpointTimeoutHttp) { ASSERT_TRUE(host_upstream_->waitForRawConnection(host_fake_raw_connection_)); // Endpoint doesn't respond to the health check - ASSERT_TRUE(host_fake_raw_connection_->waitForDisconnect(true)); + ASSERT_TRUE(host_fake_raw_connection_->waitForDisconnect()); // Receive updates until the one we expect arrives waitForEndpointHealthResponse(envoy::config::core::v3::TIMEOUT); @@ -391,7 +391,7 @@ TEST_P(HdsIntegrationTest, SingleEndpointTimeoutTcp) { ASSERT_TRUE(host_upstream_->waitForRawConnection(host_fake_raw_connection_)); // No response from the endpoint - ASSERT_TRUE(host_fake_raw_connection_->waitForDisconnect(true)); + ASSERT_TRUE(host_fake_raw_connection_->waitForDisconnect()); // Receive updates until the one we expect arrives waitForEndpointHealthResponse(envoy::config::core::v3::TIMEOUT); diff --git a/test/integration/http2_integration_test.cc b/test/integration/http2_integration_test.cc index 2cc24c148bc7..19037786f98d 100644 --- a/test/integration/http2_integration_test.cc +++ b/test/integration/http2_integration_test.cc @@ -1318,8 +1318,7 @@ void Http2RingHashIntegrationTest::sendMultipleRequests( // As data and streams are interwoven, make sure waitForNewStream() // ignores incoming data and waits for actual stream establishment. upstream_requests.emplace_back(); - ASSERT_TRUE( - fake_upstream_connection->waitForNewStream(*dispatcher_, upstream_requests.back(), true)); + ASSERT_TRUE(fake_upstream_connection->waitForNewStream(*dispatcher_, upstream_requests.back())); upstream_requests.back()->setAddServedByHeader(true); fake_upstream_connections_.push_back(std::move(fake_upstream_connection)); } diff --git a/test/integration/integration.cc b/test/integration/integration.cc index 979e3fd8c47b..3d8d3537640c 100644 --- a/test/integration/integration.cc +++ b/test/integration/integration.cc @@ -279,7 +279,7 @@ BaseIntegrationTest::BaseIntegrationTest(const InstanceConstSharedPtrFn& upstrea // notification and clear the pool connection if necessary. A real fix would require adding fairly // complex test hooks to the server and/or spin waiting on stats, neither of which I think are // necessary right now. - timeSystem().advanceTimeWait(std::chrono::milliseconds(10)); + timeSystem().advanceTimeWait(std::chrono::milliseconds(10), true); ON_CALL(*mock_buffer_factory_, create_(_, _, _)) .WillByDefault(Invoke([](std::function below_low, std::function above_high, std::function above_overflow) -> Buffer::Instance* { @@ -504,7 +504,7 @@ void BaseIntegrationTest::createGeneratedApiTestServer( absl::StrCat("Lds update failed. Details\n", getListenerDetails(test_server_->server()))); } - time_system_.advanceTimeWait(std::chrono::milliseconds(10)); + time_system_.advanceTimeWait(std::chrono::milliseconds(10), true); } registerTestServerPorts(port_names); @@ -706,7 +706,7 @@ AssertionResult BaseIntegrationTest::waitForPortAvailable(uint32_t port, nullptr, true); return AssertionSuccess(); } catch (const EnvoyException&) { - timeSystem().advanceTimeWait(std::chrono::milliseconds(100)); + timeSystem().advanceTimeWait(std::chrono::milliseconds(100), true); } } diff --git a/test/integration/integration_test.cc b/test/integration/integration_test.cc index 9efe9bab87b8..6c526fbb7425 100644 --- a/test/integration/integration_test.cc +++ b/test/integration/integration_test.cc @@ -1432,7 +1432,7 @@ TEST_P(IntegrationTest, Response204WithBody) { // should still see a response. upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "204"}}, false); upstream_request_->encodeData(512, true); - ASSERT_TRUE(fake_upstream_connection_->waitForDisconnect(true)); + ASSERT_TRUE(fake_upstream_connection_->waitForDisconnect()); response->waitForEndStream(); diff --git a/test/integration/redirect_integration_test.cc b/test/integration/redirect_integration_test.cc index 0b03e662e9e8..7deca7a13c15 100644 --- a/test/integration/redirect_integration_test.cc +++ b/test/integration/redirect_integration_test.cc @@ -48,8 +48,8 @@ class RedirectIntegrationTest : public HttpProtocolIntegrationTest { FakeStreamPtr new_stream = nullptr; auto wait_new_stream_fn = [this, &new_stream](FakeHttpConnectionPtr& connection) -> AssertionResult { - AssertionResult result = connection->waitForNewStream(*dispatcher_, new_stream, false, - std::chrono::milliseconds(50)); + AssertionResult result = + connection->waitForNewStream(*dispatcher_, new_stream, std::chrono::milliseconds(50)); if (result) { ASSERT(new_stream); } diff --git a/test/integration/tcp_proxy_integration_test.cc b/test/integration/tcp_proxy_integration_test.cc index 39d6a4779740..5533308b5dc4 100644 --- a/test/integration/tcp_proxy_integration_test.cc +++ b/test/integration/tcp_proxy_integration_test.cc @@ -121,7 +121,7 @@ TEST_P(TcpProxyIntegrationTest, TcpProxyDownstreamDisconnect) { ASSERT_TRUE(fake_upstream_connection->waitForData(10)); ASSERT_TRUE(fake_upstream_connection->waitForHalfClose()); ASSERT_TRUE(fake_upstream_connection->write("", true)); - ASSERT_TRUE(fake_upstream_connection->waitForDisconnect(true)); + ASSERT_TRUE(fake_upstream_connection->waitForDisconnect()); tcp_client->waitForDisconnect(); } @@ -369,7 +369,7 @@ TEST_P(TcpProxyIntegrationTest, ShutdownWithOpenConnections) { test_server_.reset(); ASSERT_TRUE(fake_upstream_connection->waitForHalfClose()); ASSERT_TRUE(fake_upstream_connection->close()); - ASSERT_TRUE(fake_upstream_connection->waitForDisconnect(true)); + ASSERT_TRUE(fake_upstream_connection->waitForDisconnect()); tcp_client->waitForHalfClose(); tcp_client->close(); @@ -397,7 +397,7 @@ TEST_P(TcpProxyIntegrationTest, TestIdletimeoutWithNoData) { initialize(); IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy")); - tcp_client->waitForDisconnect(true); + tcp_client->waitForDisconnect(); } TEST_P(TcpProxyIntegrationTest, TestIdletimeoutWithLargeOutstandingData) { @@ -427,8 +427,8 @@ TEST_P(TcpProxyIntegrationTest, TestIdletimeoutWithLargeOutstandingData) { ASSERT_TRUE(tcp_client->write(data)); ASSERT_TRUE(fake_upstream_connection->write(data)); - tcp_client->waitForDisconnect(true); - ASSERT_TRUE(fake_upstream_connection->waitForDisconnect(true)); + tcp_client->waitForDisconnect(); + ASSERT_TRUE(fake_upstream_connection->waitForDisconnect()); } TEST_P(TcpProxyIntegrationTest, TestNoCloseOnHealthFailure) { @@ -475,7 +475,7 @@ TEST_P(TcpProxyIntegrationTest, TestNoCloseOnHealthFailure) { ASSERT_TRUE(fake_upstream_health_connection->waitForData(8)); ASSERT_TRUE(fake_upstream_health_connection->close()); - ASSERT_TRUE(fake_upstream_health_connection->waitForDisconnect(true)); + ASSERT_TRUE(fake_upstream_health_connection->waitForDisconnect()); // By waiting we know the previous health check attempt completed (with a failure since we closed // the connection on it) @@ -492,10 +492,10 @@ TEST_P(TcpProxyIntegrationTest, TestNoCloseOnHealthFailure) { test_server_.reset(); ASSERT_TRUE(fake_upstream_connection->waitForHalfClose()); ASSERT_TRUE(fake_upstream_connection->close()); - ASSERT_TRUE(fake_upstream_connection->waitForDisconnect(true)); + ASSERT_TRUE(fake_upstream_connection->waitForDisconnect()); ASSERT_TRUE(fake_upstream_health_connection_reconnect->waitForHalfClose()); ASSERT_TRUE(fake_upstream_health_connection_reconnect->close()); - ASSERT_TRUE(fake_upstream_health_connection_reconnect->waitForDisconnect(true)); + ASSERT_TRUE(fake_upstream_health_connection_reconnect->waitForDisconnect()); tcp_client->waitForHalfClose(); tcp_client->close(); } @@ -545,14 +545,14 @@ TEST_P(TcpProxyIntegrationTest, TestCloseOnHealthFailure) { ASSERT_TRUE(fake_upstream_health_connection->waitForData(8)); fake_upstreams_[0]->set_allow_unexpected_disconnects(true); ASSERT_TRUE(fake_upstream_health_connection->close()); - ASSERT_TRUE(fake_upstream_health_connection->waitForDisconnect(true)); + ASSERT_TRUE(fake_upstream_health_connection->waitForDisconnect()); ASSERT_TRUE(fake_upstream_connection->waitForHalfClose()); tcp_client->waitForHalfClose(); ASSERT_TRUE(fake_upstream_connection->close()); tcp_client->close(); - ASSERT_TRUE(fake_upstream_connection->waitForDisconnect(true)); + ASSERT_TRUE(fake_upstream_connection->waitForDisconnect()); } class TcpProxyMetadataMatchIntegrationTest : public TcpProxyIntegrationTest { @@ -636,7 +636,7 @@ void TcpProxyMetadataMatchIntegrationTest::expectEndpointToMatchRoute() { ASSERT_TRUE(fake_upstream_connection->waitForData(10)); ASSERT_TRUE(fake_upstream_connection->waitForHalfClose()); ASSERT_TRUE(fake_upstream_connection->write("", true)); - ASSERT_TRUE(fake_upstream_connection->waitForDisconnect(true)); + ASSERT_TRUE(fake_upstream_connection->waitForDisconnect()); tcp_client->waitForDisconnect(); test_server_->waitForCounterGe("cluster.cluster_0.lb_subsets_selected", 1); @@ -647,9 +647,9 @@ void TcpProxyMetadataMatchIntegrationTest::expectEndpointNotToMatchRoute() { IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy")); ASSERT_TRUE(tcp_client->write("hello", false, false)); - // TODO(yskopets): 'tcp_client->waitForDisconnect(true);' gets stuck indefinitely on Linux builds, + // TODO(yskopets): 'tcp_client->waitForDisconnect();' gets stuck indefinitely on Linux builds, // e.g. on 'envoy-linux (bazel compile_time_options)' and 'envoy-linux (bazel release)' - // tcp_client->waitForDisconnect(true); + // tcp_client->waitForDisconnect(); test_server_->waitForCounterGe("cluster.cluster_0.upstream_cx_none_healthy", 1); test_server_->waitForCounterEq("cluster.cluster_0.lb_subsets_selected", 0); diff --git a/test/integration/tcp_tunneling_integration_test.cc b/test/integration/tcp_tunneling_integration_test.cc index bc270853ae65..333e98650f04 100644 --- a/test/integration/tcp_tunneling_integration_test.cc +++ b/test/integration/tcp_tunneling_integration_test.cc @@ -396,7 +396,7 @@ TEST_P(TcpTunnelingIntegrationTest, ResetStreamTest) { // Reset the stream. upstream_request_->encodeResetStream(); - tcp_client->waitForDisconnect(true); + tcp_client->waitForDisconnect(); } TEST_P(TcpTunnelingIntegrationTest, TestIdletimeoutWithLargeOutstandingData) { @@ -429,7 +429,7 @@ TEST_P(TcpTunnelingIntegrationTest, TestIdletimeoutWithLargeOutstandingData) { ASSERT_TRUE(tcp_client->write(data)); upstream_request_->encodeData(data, false); - tcp_client->waitForDisconnect(true); + tcp_client->waitForDisconnect(); ASSERT_TRUE(upstream_request_->waitForReset()); } diff --git a/test/integration/utility.cc b/test/integration/utility.cc index c969a5b8a2ef..7f4e386f875e 100644 --- a/test/integration/utility.cc +++ b/test/integration/utility.cc @@ -140,7 +140,7 @@ RawConnectionDriver::~RawConnectionDriver() = default; void RawConnectionDriver::waitForConnection() { while (!callbacks_->connected() && !callbacks_->closed()) { - Event::GlobalTimeSystem().timeSystem().advanceTimeWait(std::chrono::milliseconds(10)); + Event::GlobalTimeSystem().timeSystem().advanceTimeWait(std::chrono::milliseconds(10), true); dispatcher_.run(Event::Dispatcher::RunType::NonBlock); } } diff --git a/test/integration/vhds_integration_test.cc b/test/integration/vhds_integration_test.cc index 879fbcf8b90b..efa74c603977 100644 --- a/test/integration/vhds_integration_test.cc +++ b/test/integration/vhds_integration_test.cc @@ -209,7 +209,7 @@ TEST_P(VhdsInitializationTest, InitializeVhdsAfterRdsHasBeenInitialized) { {TestUtility::parseYaml(RdsConfigWithVhosts)}, "2"); - auto result = xds_connection_->waitForNewStream(*dispatcher_, vhds_stream_, true); + auto result = xds_connection_->waitForNewStream(*dispatcher_, vhds_stream_); RELEASE_ASSERT(result, result.message()); vhds_stream_->startGrpcStream(); @@ -301,7 +301,7 @@ class VhdsIntegrationTest : public HttpIntegrationTest, sendSotwDiscoveryResponse( Config::TypeUrl::get().RouteConfiguration, {rdsConfig()}, "1"); - result = xds_connection_->waitForNewStream(*dispatcher_, vhds_stream_, true); + result = xds_connection_->waitForNewStream(*dispatcher_, vhds_stream_); RELEASE_ASSERT(result, result.message()); vhds_stream_->startGrpcStream(); diff --git a/test/mocks/common.h b/test/mocks/common.h index cf5a915825b2..3535d2f91506 100644 --- a/test/mocks/common.h +++ b/test/mocks/common.h @@ -56,14 +56,16 @@ class MockTimeSystem : public Event::TestTimeSystem { Event::CallbackScheduler& cb_scheduler) override { return real_time_.createScheduler(base_scheduler, cb_scheduler); } - void advanceTimeWait(const Duration& duration) override { real_time_.advanceTimeWait(duration); } - void advanceTimeAsync(const Duration& duration) override { - real_time_.advanceTimeAsync(duration); + void advanceTimeWaitImpl(const Duration& duration, bool always_sleep) override { + real_time_.advanceTimeWaitImpl(duration, always_sleep); } - Thread::CondVar::WaitStatus waitFor(Thread::MutexBasicLockable& mutex, Thread::CondVar& condvar, - const Duration& duration) noexcept - ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex) override { - return real_time_.waitFor(mutex, condvar, duration); // NO_CHECK_FORMAT(real_time) + void advanceTimeAsyncImpl(const Duration& duration, bool always_sleep) override { + real_time_.advanceTimeAsyncImpl(duration, always_sleep); + } + bool waitForImpl(absl::Mutex& mutex, const absl::Condition& condition, const Duration& duration, + bool always_sleep) noexcept ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex) override { + return real_time_.waitFor(mutex, condition, duration, // NO_CHECK_FORMAT(real_time) + always_sleep); } MOCK_METHOD(SystemTime, systemTime, ()); MOCK_METHOD(MonotonicTime, monotonicTime, ()); diff --git a/test/test_common/simulated_time_system.cc b/test/test_common/simulated_time_system.cc index d361beddf471..b0135a73f26e 100644 --- a/test/test_common/simulated_time_system.cc +++ b/test/test_common/simulated_time_system.cc @@ -216,21 +216,27 @@ MonotonicTime SimulatedTimeSystemHelper::monotonicTime() { return monotonic_time_; } -void SimulatedTimeSystemHelper::advanceTimeAsync(const Duration& duration) { +void SimulatedTimeSystemHelper::advanceTimeAsyncImpl(const Duration& duration, bool always_sleep) { only_one_thread_.checkOneThread(); absl::MutexLock lock(&mutex_); MonotonicTime monotonic_time = monotonic_time_ + std::chrono::duration_cast(duration); setMonotonicTimeLockHeld(monotonic_time); + if (always_sleep) { + std::this_thread::sleep_for(duration); + } } -void SimulatedTimeSystemHelper::advanceTimeWait(const Duration& duration) { +void SimulatedTimeSystemHelper::advanceTimeWaitImpl(const Duration& duration, bool always_sleep) { only_one_thread_.checkOneThread(); absl::MutexLock lock(&mutex_); MonotonicTime monotonic_time = monotonic_time_ + std::chrono::duration_cast(duration); setMonotonicTimeLockHeld(monotonic_time); waitForNoPendingLockHeld(); + if (always_sleep) { + std::this_thread::sleep_for(duration); + } } void SimulatedTimeSystemHelper::waitForNoPendingLockHeld() const @@ -240,24 +246,18 @@ void SimulatedTimeSystemHelper::waitForNoPendingLockHeld() const &pending_alarms_)); } -Thread::CondVar::WaitStatus SimulatedTimeSystemHelper::waitFor(Thread::MutexBasicLockable& mutex, - Thread::CondVar& condvar, - const Duration& duration) noexcept +bool SimulatedTimeSystemHelper::waitForImpl(absl::Mutex& mutex, const absl::Condition& condition, + const Duration& duration, bool always_sleep) noexcept ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex) { only_one_thread_.checkOneThread(); - // TODO(#10568): This real-time polling delay should not be necessary. Without - // it, test/extensions/filters/http/cache:cache_filter_integration_test fails - // about 40% of the time. - const Duration real_time_poll_delay( - std::min(std::chrono::duration_cast(std::chrono::milliseconds(50)), duration)); const MonotonicTime end_time = monotonicTime() + duration; bool timeout_not_reached = true; while (timeout_not_reached) { // First check to see if the condition is already satisfied without advancing sim time. - if (condvar.waitFor(mutex, real_time_poll_delay) == Thread::CondVar::WaitStatus::NoTimeout) { - return Thread::CondVar::WaitStatus::NoTimeout; + if (condition.Eval()) { + return true; } // This function runs with the caller-provided mutex held. We need to @@ -265,7 +265,7 @@ Thread::CondVar::WaitStatus SimulatedTimeSystemHelper::waitFor(Thread::MutexBasi // callbacks completing. To avoid potential deadlock we must drop // the caller's mutex before taking ours. We also must care to avoid // break/continue/return/throw during this non-RAII lock operation. - mutex.unlock(); + mutex.Unlock(); { absl::MutexLock lock(&mutex_); if (monotonic_time_ < end_time) { @@ -275,8 +275,12 @@ Thread::CondVar::WaitStatus SimulatedTimeSystemHelper::waitFor(Thread::MutexBasi const AlarmRegistration& alarm_registration = *alarms_.begin(); next_wakeup = std::min(alarm_registration.time_, next_wakeup); } + const auto sleep_duration = next_wakeup - monotonic_time_; setMonotonicTimeLockHeld(next_wakeup); waitForNoPendingLockHeld(); + if (always_sleep) { + std::this_thread::sleep_for(sleep_duration); + } } else { // If we reached our end_time, break the loop and return timeout. We // don't break immediately as we have to drop mutex_ and re-take mutex, @@ -284,9 +288,9 @@ Thread::CondVar::WaitStatus SimulatedTimeSystemHelper::waitFor(Thread::MutexBasi timeout_not_reached = false; } } - mutex.lock(); + mutex.Lock(); } - return Thread::CondVar::WaitStatus::Timeout; + return false; } void SimulatedTimeSystemHelper::alarmActivateLockHeld(Alarm& alarm) ABSL_NO_THREAD_SAFETY_ANALYSIS { diff --git a/test/test_common/simulated_time_system.h b/test/test_common/simulated_time_system.h index e8a369e4f9cc..7d0fa78e8392 100644 --- a/test/test_common/simulated_time_system.h +++ b/test/test_common/simulated_time_system.h @@ -29,11 +29,10 @@ class SimulatedTimeSystemHelper : public TestTimeSystem { SchedulerPtr createScheduler(Scheduler& base_scheduler, CallbackScheduler& cb_scheduler) override; // TestTimeSystem - void advanceTimeWait(const Duration& duration) override; - void advanceTimeAsync(const Duration& duration) override; - Thread::CondVar::WaitStatus waitFor(Thread::MutexBasicLockable& mutex, Thread::CondVar& condvar, - const Duration& duration) noexcept - ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex) override; + void advanceTimeWaitImpl(const Duration& duration, bool always_sleep) override; + void advanceTimeAsyncImpl(const Duration& duration, bool always_sleep) override; + bool waitForImpl(absl::Mutex& mutex, const absl::Condition& condition, const Duration& duration, + bool always_sleep) noexcept ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex) override; // TimeSource SystemTime systemTime() override; diff --git a/test/test_common/simulated_time_system_test.cc b/test/test_common/simulated_time_system_test.cc index 13a435148aff..fbacd2c87813 100644 --- a/test/test_common/simulated_time_system_test.cc +++ b/test/test_common/simulated_time_system_test.cc @@ -289,19 +289,25 @@ TEST_P(SimulatedTimeSystemTest, WaitFor) { EXPECT_EQ(start_system_time_, time_system_.systemTime()); // Run an event loop in the background to activate timers. - std::atomic done(false); - auto thread = Thread::threadFactoryForTest().createThread([this, &done]() { - while (!done) { + absl::Mutex mutex; + bool done(false); + auto thread = Thread::threadFactoryForTest().createThread([this, &mutex, &done]() { + for (;;) { + { + absl::MutexLock lock(&mutex); + if (done) { + return; + } + } + base_scheduler_.run(Dispatcher::RunType::Block); } }); - Thread::MutexBasicLockable mutex; - Thread::CondVar condvar; + TimerPtr timer = scheduler_->createTimer( - [&condvar, &mutex, &done]() { - Thread::LockGuard lock(mutex); + [&mutex, &done]() { + absl::MutexLock lock(&mutex); done = true; - condvar.notifyOne(); }, dispatcher_); timer->enableTimer(std::chrono::seconds(60)); @@ -310,9 +316,8 @@ TEST_P(SimulatedTimeSystemTest, WaitFor) { // activate the alarm. We'll get a fast automatic timeout in waitFor because // there are no pending timers. { - Thread::LockGuard lock(mutex); - EXPECT_EQ(Thread::CondVar::WaitStatus::Timeout, - time_system_.waitFor(mutex, condvar, std::chrono::seconds(50))); + absl::MutexLock lock(&mutex); + EXPECT_FALSE(time_system_.waitFor(mutex, absl::Condition(&done), std::chrono::seconds(50))); } EXPECT_FALSE(done); EXPECT_EQ(MonotonicTime(std::chrono::seconds(50)), time_system_.monotonicTime()); @@ -320,13 +325,8 @@ TEST_P(SimulatedTimeSystemTest, WaitFor) { // Waiting another 20 simulated seconds will activate the alarm after 10, // and the event-loop thread will call the corresponding callback quickly. { - Thread::LockGuard lock(mutex); - // We don't check for the return value of waitFor() as it can spuriously - // return timeout even if the condition is satisfied before entering into - // the waitFor(). - // - // TODO(jmarantz): just drop the return value in the API. - time_system_.waitFor(mutex, condvar, std::chrono::seconds(10)); + absl::MutexLock lock(&mutex); + EXPECT_TRUE(time_system_.waitFor(mutex, absl::Condition(&done), std::chrono::seconds(10))); } EXPECT_TRUE(done); EXPECT_EQ(MonotonicTime(std::chrono::seconds(60)), time_system_.monotonicTime()); @@ -335,9 +335,8 @@ TEST_P(SimulatedTimeSystemTest, WaitFor) { // the max duration and return a timeout. done = false; { - Thread::LockGuard lock(mutex); - EXPECT_EQ(Thread::CondVar::WaitStatus::Timeout, - time_system_.waitFor(mutex, condvar, std::chrono::seconds(20))); + absl::MutexLock lock(&mutex); + EXPECT_FALSE(time_system_.waitFor(mutex, absl::Condition(&done), std::chrono::seconds(20))); } EXPECT_FALSE(done); EXPECT_EQ(MonotonicTime(std::chrono::seconds(80)), time_system_.monotonicTime()); @@ -442,26 +441,32 @@ TEST_P(SimulatedTimeSystemTest, DuplicateTimer) { EXPECT_EQ("2", output_); // Now set an alarm which requires 10ms of progress and make sure waitFor works. - std::atomic done(false); - auto thread = Thread::threadFactoryForTest().createThread([this, &done]() { - while (!done) { + absl::Mutex mutex; + bool done(false); + auto thread = Thread::threadFactoryForTest().createThread([this, &mutex, &done]() { + for (;;) { + { + absl::MutexLock lock(&mutex); + if (done) { + return; + } + } + base_scheduler_.run(Dispatcher::RunType::Block); } }); - Thread::MutexBasicLockable mutex; - Thread::CondVar condvar; + TimerPtr timer = scheduler_->createTimer( - [&condvar, &mutex, &done]() { - Thread::LockGuard lock(mutex); + [&mutex, &done]() { + absl::MutexLock lock(&mutex); done = true; - condvar.notifyOne(); }, dispatcher_); timer->enableTimer(std::chrono::seconds(10)); { - Thread::LockGuard lock(mutex); - time_system_.waitFor(mutex, condvar, std::chrono::seconds(10)); + absl::MutexLock lock(&mutex); + EXPECT_TRUE(time_system_.waitFor(mutex, absl::Condition(&done), std::chrono::seconds(10))); } EXPECT_TRUE(done); diff --git a/test/test_common/test_time.cc b/test/test_common/test_time.cc index d5a50fd1de91..a415cf44be09 100644 --- a/test/test_common/test_time.cc +++ b/test/test_common/test_time.cc @@ -18,18 +18,19 @@ TestTimeSystem& GlobalTimeSystem::timeSystem() { return singleton_->timeSystem(make_real_time_system); } -void TestRealTimeSystem::advanceTimeWait(const Duration& duration) { +void TestRealTimeSystem::advanceTimeWaitImpl(const Duration& duration, bool) { only_one_thread_.checkOneThread(); std::this_thread::sleep_for(duration); } -void TestRealTimeSystem::advanceTimeAsync(const Duration& duration) { advanceTimeWait(duration); } +void TestRealTimeSystem::advanceTimeAsyncImpl(const Duration& duration, bool always_sleep) { + advanceTimeWait(duration, always_sleep); +} -Thread::CondVar::WaitStatus TestRealTimeSystem::waitFor(Thread::MutexBasicLockable& lock, - Thread::CondVar& condvar, - const Duration& duration) noexcept { +bool TestRealTimeSystem::waitForImpl(absl::Mutex& mutex, const absl::Condition& condition, + const Duration& duration, bool) noexcept { only_one_thread_.checkOneThread(); - return condvar.waitFor(lock, duration); + return mutex.AwaitWithTimeout(condition, absl::FromChrono(duration)); } SystemTime TestRealTimeSystem::systemTime() { return real_time_system_.systemTime(); } diff --git a/test/test_common/test_time.h b/test/test_common/test_time.h index 31880b73b5e2..38745cb6e094 100644 --- a/test/test_common/test_time.h +++ b/test/test_common/test_time.h @@ -12,11 +12,10 @@ namespace Event { class TestRealTimeSystem : public TestTimeSystem { public: // TestTimeSystem - void advanceTimeAsync(const Duration& duration) override; - void advanceTimeWait(const Duration& duration) override; - Thread::CondVar::WaitStatus waitFor(Thread::MutexBasicLockable& mutex, Thread::CondVar& condvar, - const Duration& duration) noexcept - ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex) override; + void advanceTimeAsyncImpl(const Duration& duration, bool always_sleep) override; + void advanceTimeWaitImpl(const Duration& duration, bool always_sleep) override; + bool waitForImpl(absl::Mutex& mutex, const absl::Condition& condition, const Duration& duration, + bool always_sleep) noexcept ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex) override; // Event::TimeSystem Event::SchedulerPtr createScheduler(Scheduler& base_scheduler, diff --git a/test/test_common/test_time_system.h b/test/test_common/test_time_system.h index bc5d38972879..9247d281f302 100644 --- a/test/test_common/test_time_system.h +++ b/test/test_common/test_time_system.h @@ -25,10 +25,11 @@ class TestTimeSystem : public Event::TimeSystem { * use this variant. * * @param duration The amount of time to sleep. + * @param fixfix */ - virtual void advanceTimeWait(const Duration& duration) PURE; - template void advanceTimeWait(const D& duration) { - advanceTimeWait(std::chrono::duration_cast(duration)); + virtual void advanceTimeWaitImpl(const Duration& duration, bool always_sleep) PURE; + template void advanceTimeWait(const D& duration, bool always_sleep = false) { + advanceTimeWaitImpl(std::chrono::duration_cast(duration), always_sleep); } /** @@ -41,10 +42,11 @@ class TestTimeSystem : public Event::TimeSystem { * loop. Unit tests will often use this variant. * * @param duration The amount of time to sleep. + * @param fixfix */ - virtual void advanceTimeAsync(const Duration& duration) PURE; - template void advanceTimeAsync(const D& duration) { - advanceTimeAsync(std::chrono::duration_cast(duration)); + virtual void advanceTimeAsyncImpl(const Duration& duration, bool always_sleep) PURE; + template void advanceTimeAsync(const D& duration, bool always_sleep = false) { + advanceTimeAsyncImpl(std::chrono::duration_cast(duration), always_sleep); } /** @@ -54,18 +56,18 @@ class TestTimeSystem : public Event::TimeSystem { * @param mutex A mutex which must be held before calling this function. * @param condvar The condition to wait on. * @param duration The maximum amount of time to wait. + * @param fixfix * @return Thread::CondVar::WaitStatus whether the condition timed out or not. */ - virtual Thread::CondVar::WaitStatus waitFor(Thread::MutexBasicLockable& mutex, - Thread::CondVar& condvar, - const Duration& duration) noexcept + virtual bool waitForImpl(absl::Mutex& mutex, const absl::Condition& condition, + const Duration& duration, bool always_sleep) noexcept ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex) PURE; template - Thread::CondVar::WaitStatus waitFor(Thread::MutexBasicLockable& mutex, Thread::CondVar& condvar, - const D& duration) noexcept - ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex) { - return waitFor(mutex, condvar, std::chrono::duration_cast(duration)); + bool waitFor(absl::Mutex& mutex, const absl::Condition& condition, const D& duration, + bool always_sleep = false) noexcept ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex) { + return waitForImpl(mutex, condition, std::chrono::duration_cast(duration), + always_sleep); } }; @@ -102,17 +104,16 @@ class SingletonTimeSystemHelper { // subclass. template class DelegatingTestTimeSystemBase : public TestTimeSystem { public: - void advanceTimeAsync(const Duration& duration) override { - timeSystem().advanceTimeAsync(duration); + void advanceTimeAsyncImpl(const Duration& duration, bool always_sleep) override { + timeSystem().advanceTimeAsyncImpl(duration, always_sleep); } - void advanceTimeWait(const Duration& duration) override { - timeSystem().advanceTimeWait(duration); + void advanceTimeWaitImpl(const Duration& duration, bool always_sleep) override { + timeSystem().advanceTimeWaitImpl(duration, always_sleep); } - Thread::CondVar::WaitStatus waitFor(Thread::MutexBasicLockable& mutex, Thread::CondVar& condvar, - const Duration& duration) noexcept - ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex) override { - return timeSystem().waitFor(mutex, condvar, duration); + bool waitForImpl(absl::Mutex& mutex, const absl::Condition& condition, const Duration& duration, + bool always_sleep) noexcept ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex) override { + return timeSystem().waitForImpl(mutex, condition, duration, always_sleep); } SchedulerPtr createScheduler(Scheduler& base_scheduler, diff --git a/test/test_common/utility.cc b/test/test_common/utility.cc index 13feb48e5df5..201c86987ffb 100644 --- a/test/test_common/utility.cc +++ b/test/test_common/utility.cc @@ -367,29 +367,27 @@ bool TestUtility::gaugesZeroed( } void ConditionalInitializer::setReady() { - Thread::LockGuard lock(mutex_); + absl::MutexLock lock(&mutex_); EXPECT_FALSE(ready_); ready_ = true; - cv_.notifyAll(); } void ConditionalInitializer::waitReady() { - Thread::LockGuard lock(mutex_); + absl::MutexLock lock(&mutex_); if (ready_) { ready_ = false; return; } - cv_.wait(mutex_); + mutex_.Await(absl::Condition(&ready_)); EXPECT_TRUE(ready_); ready_ = false; } void ConditionalInitializer::wait() { - Thread::LockGuard lock(mutex_); - while (!ready_) { - cv_.wait(mutex_); - } + absl::MutexLock lock(&mutex_); + mutex_.Await(absl::Condition(&ready_)); + EXPECT_TRUE(ready_); } constexpr std::chrono::milliseconds TestUtility::DefaultTimeout; diff --git a/test/test_common/utility.h b/test/test_common/utility.h index df96fff5d7f6..1807bf919c86 100644 --- a/test/test_common/utility.h +++ b/test/test_common/utility.h @@ -792,9 +792,8 @@ class ConditionalInitializer { void wait(); private: - Thread::CondVar cv_; - Thread::MutexBasicLockable mutex_; - bool ready_{false}; + absl::Mutex mutex_; + bool ready_ ABSL_GUARDED_BY(mutex_){false}; }; namespace Http {