diff --git a/docs/root/version_history/current.rst b/docs/root/version_history/current.rst index 77b1b5b6d733..3ae7afe1afa8 100644 --- a/docs/root/version_history/current.rst +++ b/docs/root/version_history/current.rst @@ -21,6 +21,7 @@ Removed Config or Runtime New Features ------------ +* dispatcher: supports a stack of `Envoy::ScopeTrackedObject` instead of a single tracked object. This will allow Envoy to dump more debug information on crash. Deprecated ---------- diff --git a/include/envoy/event/BUILD b/include/envoy/event/BUILD index a4c865a2d225..75a7b759a9f7 100644 --- a/include/envoy/event/BUILD +++ b/include/envoy/event/BUILD @@ -13,11 +13,17 @@ envoy_cc_library( hdrs = ["deferred_deletable.h"], ) +envoy_cc_library( + name = "dispatcher_thread_deletable", + hdrs = ["dispatcher_thread_deletable.h"], +) + envoy_cc_library( name = "dispatcher_interface", hdrs = ["dispatcher.h"], deps = [ ":deferred_deletable", + ":dispatcher_thread_deletable", ":file_event_interface", ":schedulable_cb_interface", ":signal_interface", diff --git a/include/envoy/event/dispatcher.h b/include/envoy/event/dispatcher.h index 599617e87d28..f6ffdc2ccc8a 100644 --- a/include/envoy/event/dispatcher.h +++ b/include/envoy/event/dispatcher.h @@ -8,6 +8,7 @@ #include "envoy/common/scope_tracker.h" #include "envoy/common/time.h" +#include "envoy/event/dispatcher_thread_deletable.h" #include "envoy/event/file_event.h" #include "envoy/event/schedulable_cb.h" #include "envoy/event/signal.h" @@ -86,15 +87,18 @@ class DispatcherBase { virtual Event::SchedulableCallbackPtr createSchedulableCallback(std::function cb) PURE; /** - * Sets a tracked object, which is currently operating in this Dispatcher. - * This should be cleared with another call to setTrackedObject() when the object is done doing - * work. Calling setTrackedObject(nullptr) results in no object being tracked. + * Appends a tracked object to the current stack of tracked objects operating + * in the dispatcher. * - * This is optimized for performance, to avoid allocation where we do scoped object tracking. - * - * @return The previously tracked object or nullptr if there was none. + * It's recommended to use ScopeTrackerScopeState to manage the object's tracking. If directly + * invoking, there needs to be a subsequent call to popTrackedObject(). */ - virtual const ScopeTrackedObject* setTrackedObject(const ScopeTrackedObject* object) PURE; + virtual void pushTrackedObject(const ScopeTrackedObject* object) PURE; + + /** + * Removes the top of the stack of tracked object and asserts that it was expected. + */ + virtual void popTrackedObject(const ScopeTrackedObject* expected_object) PURE; /** * Validates that an operation is thread-safe with respect to this dispatcher; i.e. that the @@ -242,6 +246,12 @@ class Dispatcher : public DispatcherBase { */ virtual void post(PostCb callback) PURE; + /** + * Post the deletable to this dispatcher. The deletable objects are guaranteed to be destroyed on + * the dispatcher's thread before dispatcher destroy. This is safe cross thread. + */ + virtual void deleteInDispatcherThread(DispatcherThreadDeletableConstPtr deletable) PURE; + /** * Runs the event loop. This will not return until exit() is called either from within a callback * or from a different thread. @@ -269,6 +279,11 @@ class Dispatcher : public DispatcherBase { * Updates approximate monotonic time to current value. */ virtual void updateApproximateMonotonicTime() PURE; + + /** + * Shutdown the dispatcher by clear dispatcher thread deletable. + */ + virtual void shutdown() PURE; }; using DispatcherPtr = std::unique_ptr; diff --git a/include/envoy/event/dispatcher_thread_deletable.h b/include/envoy/event/dispatcher_thread_deletable.h new file mode 100644 index 000000000000..bf5b1808e0e3 --- /dev/null +++ b/include/envoy/event/dispatcher_thread_deletable.h @@ -0,0 +1,21 @@ +#pragma once + +#include + +namespace Envoy { +namespace Event { + +/** + * If an object derives from this class, it can be passed to the destination dispatcher who + * guarantees to delete it in that dispatcher thread. The common use case is to ensure config + * related objects are deleted in the main thread. + */ +class DispatcherThreadDeletable { +public: + virtual ~DispatcherThreadDeletable() = default; +}; + +using DispatcherThreadDeletableConstPtr = std::unique_ptr; + +} // namespace Event +} // namespace Envoy diff --git a/include/envoy/server/fatal_action_config.h b/include/envoy/server/fatal_action_config.h index c8768dced40a..1e5914ac2592 100644 --- a/include/envoy/server/fatal_action_config.h +++ b/include/envoy/server/fatal_action_config.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include "envoy/common/pure.h" #include "envoy/config/bootstrap/v3/bootstrap.pb.h" @@ -17,11 +18,10 @@ class FatalAction { public: virtual ~FatalAction() = default; /** - * Callback function to run when we are crashing. - * @param current_object the object we were working on when we started - * crashing. + * Callback function to run when Envoy is crashing. + * @param tracked_objects a span of objects Envoy was working on when Envoy started crashing. */ - virtual void run(const ScopeTrackedObject* current_object) PURE; + virtual void run(absl::Span tracked_objects) PURE; /** * @return whether the action is async-signal-safe. diff --git a/source/common/common/scope_tracker.h b/source/common/common/scope_tracker.h index bed58c3fa8c0..4426bbaca5cc 100644 --- a/source/common/common/scope_tracker.h +++ b/source/common/common/scope_tracker.h @@ -3,24 +3,35 @@ #include "envoy/common/scope_tracker.h" #include "envoy/event/dispatcher.h" +#include "common/common/assert.h" + namespace Envoy { -// A small class for tracking the scope of the object which is currently having +// A small class for managing the scope of a tracked object which is currently having // work done in this thread. // -// When created, it sets the tracked object in the dispatcher, and when destroyed it points the -// dispatcher at the previously tracked object. +// When created, it appends the tracked object to the dispatcher's stack of tracked objects, and +// when destroyed it pops the dispatcher's stack of tracked object, which should be the object it +// registered. class ScopeTrackerScopeState { public: ScopeTrackerScopeState(const ScopeTrackedObject* object, Event::Dispatcher& dispatcher) - : dispatcher_(dispatcher) { - latched_object_ = dispatcher_.setTrackedObject(object); + : registered_object_(object), dispatcher_(dispatcher) { + dispatcher_.pushTrackedObject(registered_object_); + } + + ~ScopeTrackerScopeState() { + // If ScopeTrackerScopeState is always used for managing tracked objects, + // then the object popped off should be the object we registered. + dispatcher_.popTrackedObject(registered_object_); } - ~ScopeTrackerScopeState() { dispatcher_.setTrackedObject(latched_object_); } + // Make this object stack-only, it doesn't make sense for it + // to be on the heap since it's tracking a stack of active operations. + void* operator new(std::size_t) = delete; private: - const ScopeTrackedObject* latched_object_; + const ScopeTrackedObject* registered_object_; Event::Dispatcher& dispatcher_; }; diff --git a/source/common/event/BUILD b/source/common/event/BUILD index c14a6ee08e99..09d640b6b817 100644 --- a/source/common/event/BUILD +++ b/source/common/event/BUILD @@ -112,6 +112,9 @@ envoy_cc_library( "file_event_impl.h", "schedulable_cb_impl.h", ], + external_deps = [ + "abseil_inlined_vector", + ], deps = [ ":libevent_lib", ":libevent_scheduler_lib", diff --git a/source/common/event/dispatcher_impl.cc b/source/common/event/dispatcher_impl.cc index 558d82b9230d..281caa0dd8c5 100644 --- a/source/common/event/dispatcher_impl.cc +++ b/source/common/event/dispatcher_impl.cc @@ -7,10 +7,12 @@ #include #include "envoy/api/api.h" +#include "envoy/common/scope_tracker.h" #include "envoy/network/listen_socket.h" #include "envoy/network/listener.h" #include "common/buffer/buffer_impl.h" +#include "common/common/assert.h" #include "common/common/lock_guard.h" #include "common/common/thread.h" #include "common/event/file_event_impl.h" @@ -44,6 +46,8 @@ DispatcherImpl::DispatcherImpl(const std::string& name, Api::Api& api, buffer_factory_(factory != nullptr ? factory : std::make_shared()), scheduler_(time_system.createScheduler(base_scheduler_, base_scheduler_)), + thread_local_delete_cb_( + base_scheduler_.createSchedulableCallback([this]() -> void { runThreadLocalDelete(); })), deferred_delete_cb_(base_scheduler_.createSchedulableCallback( [this]() -> void { clearDeferredDeleteList(); })), post_cb_(base_scheduler_.createSchedulableCallback([this]() -> void { runPostCallbacks(); })), @@ -55,7 +59,12 @@ DispatcherImpl::DispatcherImpl(const std::string& name, Api::Api& api, std::bind(&DispatcherImpl::updateApproximateMonotonicTime, this)); } -DispatcherImpl::~DispatcherImpl() { FatalErrorHandler::removeFatalErrorHandler(*this); } +DispatcherImpl::~DispatcherImpl() { + ENVOY_LOG(debug, "destroying dispatcher {}", name_); + FatalErrorHandler::removeFatalErrorHandler(*this); + // TODO(lambdai): Resolve https://github.com/envoyproxy/envoy/issues/15072 and enable + // ASSERT(deletable_in_dispatcher_thread_.empty()) +} void DispatcherImpl::registerWatchdog(const Server::WatchDogSharedPtr& watchdog, std::chrono::milliseconds min_touch_interval) { @@ -236,9 +245,23 @@ void DispatcherImpl::post(std::function callback) { } } +void DispatcherImpl::deleteInDispatcherThread(DispatcherThreadDeletableConstPtr deletable) { + bool need_schedule; + { + Thread::LockGuard lock(thread_local_deletable_lock_); + need_schedule = deletables_in_dispatcher_thread_.empty(); + deletables_in_dispatcher_thread_.emplace_back(std::move(deletable)); + // TODO(lambdai): Enable below after https://github.com/envoyproxy/envoy/issues/15072 + // ASSERT(!shutdown_called_, "inserted after shutdown"); + } + + if (need_schedule) { + thread_local_delete_cb_->scheduleCallbackCurrentIteration(); + } +} + void DispatcherImpl::run(RunType type) { run_tid_ = api_.threadFactory().currentThreadId(); - // Flush all post callbacks before we run the event loop. We do this because there are post // callbacks that have to get run before the initial event loop starts running. libevent does // not guarantee that events are run in any particular order. So even if we post() and call @@ -251,12 +274,56 @@ MonotonicTime DispatcherImpl::approximateMonotonicTime() const { return approximate_monotonic_time_; } +void DispatcherImpl::shutdown() { + // TODO(lambdai): Resolve https://github.com/envoyproxy/envoy/issues/15072 and loop delete below + // below 3 lists until all lists are empty. The 3 lists are list of deferred delete objects, post + // callbacks and dispatcher thread deletable objects. + ASSERT(isThreadSafe()); + auto deferred_deletables_size = current_to_delete_->size(); + std::list>::size_type post_callbacks_size; + { + Thread::LockGuard lock(post_lock_); + post_callbacks_size = post_callbacks_.size(); + } + + std::list local_deletables; + { + Thread::LockGuard lock(thread_local_deletable_lock_); + local_deletables = std::move(deletables_in_dispatcher_thread_); + } + auto thread_local_deletables_size = local_deletables.size(); + while (!local_deletables.empty()) { + local_deletables.pop_front(); + } + ASSERT(!shutdown_called_); + shutdown_called_ = true; + ENVOY_LOG( + trace, + "{} destroyed {} thread local objects. Peek {} deferred deletables, {} post callbacks. ", + __FUNCTION__, deferred_deletables_size, post_callbacks_size, thread_local_deletables_size); +} + void DispatcherImpl::updateApproximateMonotonicTime() { updateApproximateMonotonicTimeInternal(); } void DispatcherImpl::updateApproximateMonotonicTimeInternal() { approximate_monotonic_time_ = api_.timeSource().monotonicTime(); } +void DispatcherImpl::runThreadLocalDelete() { + std::list to_be_delete; + { + Thread::LockGuard lock(thread_local_deletable_lock_); + to_be_delete = std::move(deletables_in_dispatcher_thread_); + ASSERT(deletables_in_dispatcher_thread_.empty()); + } + while (!to_be_delete.empty()) { + // Touch the watchdog before deleting the objects to avoid spurious watchdog miss events when + // executing complicated destruction. + touchWatchdog(); + // Delete in FIFO order. + to_be_delete.pop_front(); + } +} void DispatcherImpl::runPostCallbacks() { // Clear the deferred delete list before running post callbacks to reduce non-determinism in // callback processing, and more easily detect if a scheduled post callback refers to one of the @@ -287,6 +354,16 @@ void DispatcherImpl::runPostCallbacks() { } } +void DispatcherImpl::onFatalError(std::ostream& os) const { + // Dump the state of the tracked objects in the dispatcher if thread safe. This generally + // results in dumping the active state only for the thread which caused the fatal error. + if (isThreadSafe()) { + for (auto iter = tracked_object_stack_.rbegin(); iter != tracked_object_stack_.rend(); ++iter) { + (*iter)->dumpState(os); + } + } +} + void DispatcherImpl::runFatalActionsOnTrackedObject( const FatalAction::FatalActionPtrList& actions) const { // Only run if this is the dispatcher of the current thread and @@ -296,7 +373,7 @@ void DispatcherImpl::runFatalActionsOnTrackedObject( } for (const auto& action : actions) { - action->run(current_object_); + action->run(tracked_object_stack_); } } @@ -306,5 +383,23 @@ void DispatcherImpl::touchWatchdog() { } } +void DispatcherImpl::pushTrackedObject(const ScopeTrackedObject* object) { + ASSERT(isThreadSafe()); + ASSERT(object != nullptr); + tracked_object_stack_.push_back(object); + ASSERT(tracked_object_stack_.size() <= ExpectedMaxTrackedObjectStackDepth); +} + +void DispatcherImpl::popTrackedObject(const ScopeTrackedObject* expected_object) { + ASSERT(isThreadSafe()); + ASSERT(expected_object != nullptr); + RELEASE_ASSERT(!tracked_object_stack_.empty(), "Tracked Object Stack is empty, nothing to pop!"); + + const ScopeTrackedObject* top = tracked_object_stack_.back(); + tracked_object_stack_.pop_back(); + ASSERT(top == expected_object, + "Popped the top of the tracked object stack, but it wasn't the expected object!"); +} + } // namespace Event } // namespace Envoy diff --git a/source/common/event/dispatcher_impl.h b/source/common/event/dispatcher_impl.h index bd3b698af11f..1d61f3f8a2fd 100644 --- a/source/common/event/dispatcher_impl.h +++ b/source/common/event/dispatcher_impl.h @@ -20,9 +20,16 @@ #include "common/event/libevent_scheduler.h" #include "common/signal/fatal_error_handler.h" +#include "absl/container/inlined_vector.h" + namespace Envoy { namespace Event { +// The tracked object stack likely won't grow larger than this initial +// reservation; this should make appends constant time since the stack +// shouldn't have to grow larger. +inline constexpr size_t ExpectedMaxTrackedObjectStackDepth = 10; + /** * libevent implementation of Event::Dispatcher. */ @@ -72,27 +79,17 @@ class DispatcherImpl : Logger::Loggable, void exit() override; SignalEventPtr listenForSignal(signal_t signal_num, SignalCb cb) override; void post(std::function callback) override; + void deleteInDispatcherThread(DispatcherThreadDeletableConstPtr deletable) override; void run(RunType type) override; Buffer::WatermarkFactory& getWatermarkFactory() override { return *buffer_factory_; } - const ScopeTrackedObject* setTrackedObject(const ScopeTrackedObject* object) override { - const ScopeTrackedObject* return_object = current_object_; - current_object_ = object; - return return_object; - } + void pushTrackedObject(const ScopeTrackedObject* object) override; + void popTrackedObject(const ScopeTrackedObject* expected_object) override; MonotonicTime approximateMonotonicTime() const override; void updateApproximateMonotonicTime() override; + void shutdown() override; // FatalErrorInterface - void onFatalError(std::ostream& os) const override { - // Dump the state of the tracked object if it is in the current thread. This generally results - // in dumping the active state only for the thread which caused the fatal error. - if (isThreadSafe()) { - if (current_object_) { - current_object_->dumpState(os); - } - } - } - + void onFatalError(std::ostream& os) const override; void runFatalActionsOnTrackedObject(const FatalAction::FatalActionPtrList& actions) const override; @@ -125,6 +122,8 @@ class DispatcherImpl : Logger::Loggable, TimerPtr createTimerInternal(TimerCb cb); void updateApproximateMonotonicTimeInternal(); void runPostCallbacks(); + void runThreadLocalDelete(); + // Helper used to touch the watchdog after most schedulable, fd, and timer callbacks. void touchWatchdog(); @@ -143,14 +142,26 @@ class DispatcherImpl : Logger::Loggable, Buffer::WatermarkFactorySharedPtr buffer_factory_; LibeventScheduler base_scheduler_; SchedulerPtr scheduler_; + + SchedulableCallbackPtr thread_local_delete_cb_; + Thread::MutexBasicLockable thread_local_deletable_lock_; + // `deletables_in_dispatcher_thread` must be destroyed last to allow other callbacks populate. + std::list + deletables_in_dispatcher_thread_ ABSL_GUARDED_BY(thread_local_deletable_lock_); + bool shutdown_called_{false}; + SchedulableCallbackPtr deferred_delete_cb_; + SchedulableCallbackPtr post_cb_; + Thread::MutexBasicLockable post_lock_; + std::list> post_callbacks_ ABSL_GUARDED_BY(post_lock_); + std::vector to_delete_1_; std::vector to_delete_2_; std::vector* current_to_delete_; - Thread::MutexBasicLockable post_lock_; - std::list> post_callbacks_ ABSL_GUARDED_BY(post_lock_); - const ScopeTrackedObject* current_object_{}; + + absl::InlinedVector + tracked_object_stack_; bool deferred_deleting_{}; MonotonicTime approximate_monotonic_time_; WatchdogRegistrationPtr watchdog_registration_; diff --git a/source/common/grpc/async_client_impl.cc b/source/common/grpc/async_client_impl.cc index 9128b296ccb7..6d1152ef9f45 100644 --- a/source/common/grpc/async_client_impl.cc +++ b/source/common/grpc/async_client_impl.cc @@ -210,6 +210,7 @@ void AsyncStreamImpl::cleanup() { // This will destroy us, but only do so if we are actually in a list. This does not happen in // the immediate failure case. if (LinkedObject::inserted()) { + ASSERT(dispatcher_->isThreadSafe()); dispatcher_->deferredDelete( LinkedObject::removeFromList(parent_.active_streams_)); } diff --git a/source/common/http/async_client_impl.cc b/source/common/http/async_client_impl.cc index 361fdfb173ac..ffe65b5ed62f 100644 --- a/source/common/http/async_client_impl.cc +++ b/source/common/http/async_client_impl.cc @@ -149,6 +149,7 @@ void AsyncStreamImpl::sendHeaders(RequestHeaderMap& headers, bool end_stream) { } void AsyncStreamImpl::sendData(Buffer::Instance& data, bool end_stream) { + ASSERT(dispatcher().isThreadSafe()); // Map send calls after local closure to no-ops. The send call could have been queued prior to // remote reset or closure, and/or closure could have occurred synchronously in response to a // previous send. In these cases the router will have already cleaned up stream state. This @@ -169,6 +170,7 @@ void AsyncStreamImpl::sendData(Buffer::Instance& data, bool end_stream) { } void AsyncStreamImpl::sendTrailers(RequestTrailerMap& trailers) { + ASSERT(dispatcher().isThreadSafe()); // See explanation in sendData. if (local_closed_) { return; @@ -226,6 +228,7 @@ void AsyncStreamImpl::reset() { } void AsyncStreamImpl::cleanup() { + ASSERT(dispatcher().isThreadSafe()); local_closed_ = remote_closed_ = true; // This will destroy us, but only do so if we are actually in a list. This does not happen in // the immediate failure case. diff --git a/source/common/network/connection_impl.cc b/source/common/network/connection_impl.cc index 4ce8ec63dd38..4a7a9260bbac 100644 --- a/source/common/network/connection_impl.cc +++ b/source/common/network/connection_impl.cc @@ -210,6 +210,7 @@ Connection::State ConnectionImpl::state() const { void ConnectionImpl::closeConnectionImmediately() { closeSocket(ConnectionEvent::LocalClose); } void ConnectionImpl::setTransportSocketIsReadable() { + ASSERT(dispatcher_.isThreadSafe()); // Remember that the transport requested read resumption, in case the resumption event is not // scheduled immediately or is "lost" because read was disabled. transport_wants_read_ = true; @@ -300,6 +301,7 @@ void ConnectionImpl::noDelay(bool enable) { } void ConnectionImpl::onRead(uint64_t read_buffer_size) { + ASSERT(dispatcher_.isThreadSafe()); if (inDelayedClose() || !filterChainWantsData()) { return; } @@ -419,6 +421,7 @@ void ConnectionImpl::raiseEvent(ConnectionEvent event) { bool ConnectionImpl::readEnabled() const { // Calls to readEnabled on a closed socket are considered to be an error. ASSERT(state() == State::Open); + ASSERT(dispatcher_.isThreadSafe()); return read_disable_count_ == 0; } @@ -436,6 +439,7 @@ void ConnectionImpl::write(Buffer::Instance& data, bool end_stream) { void ConnectionImpl::write(Buffer::Instance& data, bool end_stream, bool through_filter_chain) { ASSERT(!end_stream || enable_half_close_); + ASSERT(dispatcher_.isThreadSafe()); if (write_end_stream_) { // It is an API violation to write more data after writing end_stream, but a duplicate diff --git a/source/common/upstream/upstream_impl.cc b/source/common/upstream/upstream_impl.cc index 18025dfca211..4d44e88846fa 100644 --- a/source/common/upstream/upstream_impl.cc +++ b/source/common/upstream/upstream_impl.cc @@ -919,9 +919,16 @@ ClusterImplBase::ClusterImplBase( auto socket_matcher = std::make_unique( cluster.transport_socket_matches(), factory_context, socket_factory, *stats_scope); - info_ = std::make_unique(cluster, factory_context.clusterManager().bindConfig(), - runtime, std::move(socket_matcher), - std::move(stats_scope), added_via_api, factory_context); + auto& dispatcher = factory_context.dispatcher(); + info_ = std::shared_ptr( + new ClusterInfoImpl(cluster, factory_context.clusterManager().bindConfig(), runtime, + std::move(socket_matcher), std::move(stats_scope), added_via_api, + factory_context), + [&dispatcher](const ClusterInfoImpl* self) { + ENVOY_LOG(trace, "Schedule destroy cluster info {}", self->name()); + dispatcher.deleteInDispatcherThread( + std::unique_ptr(self)); + }); if ((info_->features() & ClusterInfoImpl::Features::USE_ALPN) && !raw_factory_pointer->supportsAlpn()) { @@ -1098,7 +1105,7 @@ void ClusterImplBase::reloadHealthyHostsHelper(const HostSharedPtr&) { for (size_t priority = 0; priority < host_sets.size(); ++priority) { const auto& host_set = host_sets[priority]; // TODO(htuch): Can we skip these copies by exporting out const shared_ptr from HostSet? - HostVectorConstSharedPtr hosts_copy(new HostVector(host_set->hosts())); + HostVectorConstSharedPtr hosts_copy = std::make_shared(host_set->hosts()); HostsPerLocalityConstSharedPtr hosts_per_locality_copy = host_set->hostsPerLocality().clone(); prioritySet().updateHosts(priority, @@ -1289,10 +1296,10 @@ void PriorityStateManager::registerHostForPriority( auto metadata = lb_endpoint.has_metadata() ? parent_.constMetadataSharedPool()->getObject(lb_endpoint.metadata()) : nullptr; - const HostSharedPtr host(new HostImpl( + const auto host = std::make_shared( parent_.info(), hostname, address, metadata, lb_endpoint.load_balancing_weight().value(), locality_lb_endpoint.locality(), lb_endpoint.endpoint().health_check_config(), - locality_lb_endpoint.priority(), lb_endpoint.health_status(), time_source)); + locality_lb_endpoint.priority(), lb_endpoint.health_status(), time_source); registerHostForPriority(host, locality_lb_endpoint); } diff --git a/source/common/upstream/upstream_impl.h b/source/common/upstream/upstream_impl.h index 3ff38c4b770d..7a2401425161 100644 --- a/source/common/upstream/upstream_impl.h +++ b/source/common/upstream/upstream_impl.h @@ -517,7 +517,9 @@ class PrioritySetImpl : public PrioritySet { /** * Implementation of ClusterInfo that reads from JSON. */ -class ClusterInfoImpl : public ClusterInfo, protected Logger::Loggable { +class ClusterInfoImpl : public ClusterInfo, + public Event::DispatcherThreadDeletable, + protected Logger::Loggable { public: using HttpProtocolOptionsConfigImpl = Envoy::Extensions::Upstreams::Http::ProtocolOptionsConfigImpl; diff --git a/source/server/config_validation/server.cc b/source/server/config_validation/server.cc index e4130e383ff2..a2bfd270dbb2 100644 --- a/source/server/config_validation/server.cc +++ b/source/server/config_validation/server.cc @@ -116,6 +116,7 @@ void ValidationInstance::shutdown() { config_.clusterManager()->shutdown(); } thread_local_.shutdownThread(); + dispatcher_->shutdown(); } } // namespace Server diff --git a/source/server/server.cc b/source/server/server.cc index 26795a8b2aed..e4757d638654 100644 --- a/source/server/server.cc +++ b/source/server/server.cc @@ -139,6 +139,7 @@ InstanceImpl::~InstanceImpl() { ENVOY_LOG(debug, "destroying listener manager"); listener_manager_.reset(); ENVOY_LOG(debug, "destroyed listener manager"); + dispatcher_->shutdown(); } Upstream::ClusterManager& InstanceImpl::clusterManager() { return *config_.clusterManager(); } diff --git a/source/server/worker_impl.cc b/source/server/worker_impl.cc index 760b7ca630bc..30c519c50109 100644 --- a/source/server/worker_impl.cc +++ b/source/server/worker_impl.cc @@ -134,6 +134,7 @@ void WorkerImpl::threadRoutine(GuardDog& guard_dog) { dispatcher_->run(Event::Dispatcher::RunType::Block); ENVOY_LOG(debug, "worker exited dispatch loop"); guard_dog.stopWatching(watch_dog_); + dispatcher_->shutdown(); // We must close all active connections before we actually exit the thread. This prevents any // destructors from running on the main thread which might reference thread locals. Destroying diff --git a/test/common/common/BUILD b/test/common/common/BUILD index f0e0b68986ca..2a451dd88615 100644 --- a/test/common/common/BUILD +++ b/test/common/common/BUILD @@ -361,3 +361,15 @@ envoy_cc_test( srcs = ["interval_value_test.cc"], deps = ["//source/common/common:interval_value"], ) + +envoy_cc_test( + name = "scope_tracker_test", + srcs = ["scope_tracker_test.cc"], + deps = [ + "//source/common/api:api_lib", + "//source/common/common:scope_tracker", + "//source/common/event:dispatcher_lib", + "//test/mocks:common_lib", + "//test/test_common:utility_lib", + ], +) diff --git a/test/common/common/scope_tracker_test.cc b/test/common/common/scope_tracker_test.cc new file mode 100644 index 000000000000..1c5451660f04 --- /dev/null +++ b/test/common/common/scope_tracker_test.cc @@ -0,0 +1,37 @@ +#include + +#include "common/api/api_impl.h" +#include "common/common/scope_tracker.h" +#include "common/event/dispatcher_impl.h" + +#include "test/mocks/common.h" +#include "test/test_common/utility.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +namespace Envoy { +namespace { + +using testing::_; + +TEST(ScopeTrackerScopeStateTest, ShouldManageTrackedObjectOnDispatcherStack) { + Api::ApiPtr api(Api::createApiForTest()); + Event::DispatcherPtr dispatcher(api->allocateDispatcher("test_thread")); + MockScopedTrackedObject tracked_object; + { + ScopeTrackerScopeState scope(&tracked_object, *dispatcher); + // Check that the tracked_object is on the tracked object stack + dispatcher->popTrackedObject(&tracked_object); + + // Restore it to the top, it should be removed in the dtor of scope. + dispatcher->pushTrackedObject(&tracked_object); + } + + // Check nothing is tracked now. + EXPECT_CALL(tracked_object, dumpState(_, _)).Times(0); + static_cast(dispatcher.get())->onFatalError(std::cerr); +} + +} // namespace +} // namespace Envoy diff --git a/test/common/event/dispatcher_impl_test.cc b/test/common/event/dispatcher_impl_test.cc index 8c612144db50..2bca020fb4b4 100644 --- a/test/common/event/dispatcher_impl_test.cc +++ b/test/common/event/dispatcher_impl_test.cc @@ -1,10 +1,13 @@ #include +#include "envoy/common/scope_tracker.h" #include "envoy/thread/thread.h" #include "common/api/api_impl.h" #include "common/api/os_sys_calls_impl.h" #include "common/common/lock_guard.h" +#include "common/common/scope_tracker.h" +#include "common/common/utility.h" #include "common/event/deferred_task.h" #include "common/event/dispatcher_impl.h" #include "common/event/timer_impl.h" @@ -234,6 +237,15 @@ class TestDeferredDeletable : public DeferredDeletable { std::function on_destroy_; }; +class TestDispatcherThreadDeletable : public DispatcherThreadDeletable { +public: + TestDispatcherThreadDeletable(std::function on_destroy) : on_destroy_(on_destroy) {} + ~TestDispatcherThreadDeletable() override { on_destroy_(); } + +private: + std::function on_destroy_; +}; + TEST(DeferredDeleteTest, DeferredDelete) { InSequence s; Api::ApiPtr api = Api::createApiForTest(); @@ -476,6 +488,100 @@ TEST_F(DispatcherImplTest, RunPostCallbacksLocking) { } } +TEST_F(DispatcherImplTest, DispatcherThreadDeleted) { + dispatcher_->deleteInDispatcherThread(std::make_unique( + [this, id = api_->threadFactory().currentThreadId()]() { + ASSERT(id != api_->threadFactory().currentThreadId()); + { + Thread::LockGuard lock(mu_); + ASSERT(!work_finished_); + work_finished_ = true; + } + cv_.notifyOne(); + })); + + Thread::LockGuard lock(mu_); + while (!work_finished_) { + cv_.wait(mu_); + } +} + +TEST(DispatcherThreadDeletedImplTest, DispatcherThreadDeletedAtNextCycle) { + Api::ApiPtr api_(Api::createApiForTest()); + DispatcherPtr dispatcher(api_->allocateDispatcher("test_thread")); + std::vector> watchers; + watchers.reserve(3); + for (int i = 0; i < 3; ++i) { + watchers.push_back(std::make_unique()); + } + dispatcher->deleteInDispatcherThread( + std::make_unique([&watchers]() { watchers[0]->ready(); })); + EXPECT_CALL(*watchers[0], ready()); + dispatcher->run(Event::Dispatcher::RunType::NonBlock); + dispatcher->deleteInDispatcherThread( + std::make_unique([&watchers]() { watchers[1]->ready(); })); + dispatcher->deleteInDispatcherThread( + std::make_unique([&watchers]() { watchers[2]->ready(); })); + EXPECT_CALL(*watchers[1], ready()); + EXPECT_CALL(*watchers[2], ready()); + dispatcher->run(Event::Dispatcher::RunType::NonBlock); +} + +class DispatcherShutdownTest : public testing::Test { +protected: + DispatcherShutdownTest() + : api_(Api::createApiForTest()), dispatcher_(api_->allocateDispatcher("test_thread")) {} + + Api::ApiPtr api_; + DispatcherPtr dispatcher_; +}; + +TEST_F(DispatcherShutdownTest, ShutdownClearThreadLocalDeletables) { + ReadyWatcher watcher; + + dispatcher_->deleteInDispatcherThread( + std::make_unique([&watcher]() { watcher.ready(); })); + EXPECT_CALL(watcher, ready()); + dispatcher_->shutdown(); +} + +TEST_F(DispatcherShutdownTest, ShutdownDoesnotClearDeferredListOrPostCallback) { + ReadyWatcher watcher; + ReadyWatcher deferred_watcher; + ReadyWatcher post_watcher; + + { + InSequence s; + + dispatcher_->deferredDelete(std::make_unique( + [&deferred_watcher]() { deferred_watcher.ready(); })); + dispatcher_->post([&post_watcher]() { post_watcher.ready(); }); + dispatcher_->deleteInDispatcherThread( + std::make_unique([&watcher]() { watcher.ready(); })); + EXPECT_CALL(watcher, ready()); + dispatcher_->shutdown(); + + ::testing::Mock::VerifyAndClearExpectations(&watcher); + EXPECT_CALL(deferred_watcher, ready()); + dispatcher_.reset(); + } +} + +TEST_F(DispatcherShutdownTest, DestroyClearAllList) { + ReadyWatcher watcher; + ReadyWatcher deferred_watcher; + dispatcher_->deferredDelete( + std::make_unique([&deferred_watcher]() { deferred_watcher.ready(); })); + dispatcher_->deleteInDispatcherThread( + std::make_unique([&watcher]() { watcher.ready(); })); + { + InSequence s; + EXPECT_CALL(deferred_watcher, ready()); + EXPECT_CALL(watcher, ready()); + dispatcher_.reset(); + } +} + TEST_F(DispatcherImplTest, Timer) { timerTest([](Timer& timer) { timer.enableTimer(std::chrono::milliseconds(0)); }); timerTest([](Timer& timer) { timer.enableTimer(std::chrono::milliseconds(50)); }); @@ -533,9 +639,71 @@ TEST_F(DispatcherImplTest, IsThreadSafe) { EXPECT_FALSE(dispatcher_->isThreadSafe()); } +TEST_F(DispatcherImplTest, ShouldDumpNothingIfNoTrackedObjects) { + std::array buffer; + OutputBufferStream ostream{buffer.data(), buffer.size()}; + + // Call on FatalError to trigger dumps of tracked objects. + dispatcher_->post([this, &ostream]() { + Thread::LockGuard lock(mu_); + static_cast(dispatcher_.get())->onFatalError(ostream); + work_finished_ = true; + cv_.notifyOne(); + }); + + Thread::LockGuard lock(mu_); + while (!work_finished_) { + cv_.wait(mu_); + } + + // Check ostream still empty. + EXPECT_EQ(ostream.contents(), ""); +} + +class MessageTrackedObject : public ScopeTrackedObject { +public: + MessageTrackedObject(absl::string_view sv) : sv_(sv) {} + void dumpState(std::ostream& os, int /*indent_level*/) const override { os << sv_; } + +private: + absl::string_view sv_; +}; + +TEST_F(DispatcherImplTest, ShouldDumpTrackedObjectsInFILO) { + std::array buffer; + OutputBufferStream ostream{buffer.data(), buffer.size()}; + + // Call on FatalError to trigger dumps of tracked objects. + dispatcher_->post([this, &ostream]() { + Thread::LockGuard lock(mu_); + + // Add several tracked objects to the dispatcher + MessageTrackedObject first{"first"}; + ScopeTrackerScopeState first_state{&first, *dispatcher_}; + MessageTrackedObject second{"second"}; + ScopeTrackerScopeState second_state{&second, *dispatcher_}; + MessageTrackedObject third{"third"}; + ScopeTrackerScopeState third_state{&third, *dispatcher_}; + + static_cast(dispatcher_.get())->onFatalError(ostream); + work_finished_ = true; + cv_.notifyOne(); + }); + + Thread::LockGuard lock(mu_); + while (!work_finished_) { + cv_.wait(mu_); + } + + // Check the dump includes and registered objects in a FILO order. + EXPECT_EQ(ostream.contents(), "thirdsecondfirst"); +} + class TestFatalAction : public Server::Configuration::FatalAction { public: - void run(const ScopeTrackedObject* /*current_object*/) override { ++times_ran_; } + void run(absl::Span /*tracked_objects*/) override { + ++times_ran_; + } bool isAsyncSignalSafe() const override { return true; } int getNumTimesRan() { return times_ran_; } diff --git a/test/common/event/scaled_range_timer_manager_impl_test.cc b/test/common/event/scaled_range_timer_manager_impl_test.cc index c6f7476c8af5..29e6a19aa529 100644 --- a/test/common/event/scaled_range_timer_manager_impl_test.cc +++ b/test/common/event/scaled_range_timer_manager_impl_test.cc @@ -1,5 +1,6 @@ #include +#include "envoy/common/scope_tracker.h" #include "envoy/event/timer.h" #include "common/event/dispatcher_impl.h" @@ -24,9 +25,14 @@ class ScopeTrackingDispatcher : public WrappedDispatcher { ScopeTrackingDispatcher(DispatcherPtr dispatcher) : WrappedDispatcher(*dispatcher), dispatcher_(std::move(dispatcher)) {} - const ScopeTrackedObject* setTrackedObject(const ScopeTrackedObject* object) override { + void pushTrackedObject(const ScopeTrackedObject* object) override { scope_ = object; - return impl_.setTrackedObject(object); + return impl_.pushTrackedObject(object); + } + + void popTrackedObject(const ScopeTrackedObject* expected_object) override { + scope_ = nullptr; + return impl_.popTrackedObject(expected_object); } const ScopeTrackedObject* scope_{nullptr}; diff --git a/test/common/http/conn_manager_impl_test.cc b/test/common/http/conn_manager_impl_test.cc index cd9944e0d7cc..04df5b2070e6 100644 --- a/test/common/http/conn_manager_impl_test.cc +++ b/test/common/http/conn_manager_impl_test.cc @@ -2656,7 +2656,8 @@ TEST_F(HttpConnectionManagerImplTest, RequestTimeoutCallbackDisarmsAndReturns408 EXPECT_CALL(response_encoder_, encodeData(_, true)).WillOnce(AddBufferToString(&response_body)); conn_manager_->newStream(response_encoder_); - EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, setTrackedObject(_)).Times(2); + EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, pushTrackedObject(_)); + EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, popTrackedObject(_)); request_timer->invokeCallback(); return Http::okStatus(); })); @@ -2886,7 +2887,8 @@ TEST_F(HttpConnectionManagerImplTest, RequestHeaderTimeoutCallbackDisarmsAndRetu EXPECT_CALL(*request_header_timer, enableTimer(request_headers_timeout_, _)); conn_manager_->newStream(response_encoder_); - EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, setTrackedObject(_)).Times(2); + EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, pushTrackedObject(_)); + EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, popTrackedObject(_)); return Http::okStatus(); })); diff --git a/test/common/http/conn_manager_impl_test_2.cc b/test/common/http/conn_manager_impl_test_2.cc index 76495244f508..b7dfd0298ab9 100644 --- a/test/common/http/conn_manager_impl_test_2.cc +++ b/test/common/http/conn_manager_impl_test_2.cc @@ -2348,9 +2348,10 @@ TEST_F(HttpConnectionManagerImplTest, TestSessionTrace) { { RequestHeaderMapPtr headers{ new TestRequestHeaderMapImpl{{":authority", "host"}, {":path", "/"}, {":method", "POST"}}}; - EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, setTrackedObject(_)) - .Times(2) - .WillOnce(Invoke([](const ScopeTrackedObject* object) -> const ScopeTrackedObject* { + + EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, pushTrackedObject(_)) + .Times(1) + .WillOnce(Invoke([](const ScopeTrackedObject* object) -> void { ASSERT(object != nullptr); // On the first call, this should be the active stream. std::stringstream out; object->dumpState(out); @@ -2358,9 +2359,8 @@ TEST_F(HttpConnectionManagerImplTest, TestSessionTrace) { EXPECT_THAT(state, testing::HasSubstr("filter_manager_callbacks_.requestHeaders(): null")); EXPECT_THAT(state, testing::HasSubstr("protocol_: 1")); - return nullptr; - })) - .WillRepeatedly(Return(nullptr)); + })); + EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, popTrackedObject(_)); EXPECT_CALL(*decoder_filters_[0], decodeHeaders(_, false)) .WillOnce(Invoke([](HeaderMap&, bool) -> FilterHeadersStatus { return FilterHeadersStatus::StopIteration; @@ -2371,9 +2371,9 @@ TEST_F(HttpConnectionManagerImplTest, TestSessionTrace) { // Send trailers to that stream, and verify by this point headers are in logged state. { RequestTrailerMapPtr trailers{new TestRequestTrailerMapImpl{{"foo", "bar"}}}; - EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, setTrackedObject(_)) - .Times(2) - .WillOnce(Invoke([](const ScopeTrackedObject* object) -> const ScopeTrackedObject* { + EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, pushTrackedObject(_)) + .Times(1) + .WillOnce(Invoke([](const ScopeTrackedObject* object) -> void { ASSERT(object != nullptr); // On the first call, this should be the active stream. std::stringstream out; object->dumpState(out); @@ -2381,9 +2381,8 @@ TEST_F(HttpConnectionManagerImplTest, TestSessionTrace) { EXPECT_THAT(state, testing::HasSubstr("filter_manager_callbacks_.requestHeaders(): \n")); EXPECT_THAT(state, testing::HasSubstr("':authority', 'host'\n")); EXPECT_THAT(state, testing::HasSubstr("protocol_: 1")); - return nullptr; - })) - .WillRepeatedly(Return(nullptr)); + })); + EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, popTrackedObject(_)); EXPECT_CALL(*decoder_filters_[0], decodeComplete()); EXPECT_CALL(*decoder_filters_[0], decodeTrailers(_)) .WillOnce(Return(FilterTrailersStatus::StopIteration)); diff --git a/test/common/http/filter_manager_test.cc b/test/common/http/filter_manager_test.cc index 85d755e864cb..0328cd4a0425 100644 --- a/test/common/http/filter_manager_test.cc +++ b/test/common/http/filter_manager_test.cc @@ -198,7 +198,8 @@ TEST_F(FilterManagerTest, MatchTreeSkipActionDecodingHeaders) { TEST_F(FilterManagerTest, MatchTreeSkipActionRequestAndResponseHeaders) { initialize(); - EXPECT_CALL(dispatcher_, setTrackedObject(_)).Times(2); + EXPECT_CALL(dispatcher_, pushTrackedObject(_)); + EXPECT_CALL(dispatcher_, popTrackedObject(_)); // This stream filter will skip further callbacks once it sees both the request and response // header. As such, it should see the decoding callbacks but none of the encoding callbacks. @@ -251,4 +252,4 @@ TEST_F(FilterManagerTest, MatchTreeSkipActionRequestAndResponseHeaders) { } } // namespace } // namespace Http -} // namespace Envoy \ No newline at end of file +} // namespace Envoy diff --git a/test/common/network/connection_impl_test.cc b/test/common/network/connection_impl_test.cc index 63361228acbe..cfbccc6c4b8d 100644 --- a/test/common/network/connection_impl_test.cc +++ b/test/common/network/connection_impl_test.cc @@ -1958,6 +1958,7 @@ class FakeReadFilter : public Network::ReadFilter { class MockTransportConnectionImplTest : public testing::Test { public: MockTransportConnectionImplTest() : stream_info_(dispatcher_.timeSource(), nullptr) { + EXPECT_CALL(dispatcher_, isThreadSafe()).WillRepeatedly(Return(true)); EXPECT_CALL(dispatcher_.buffer_factory_, create_(_, _, _)) .WillRepeatedly(Invoke([](std::function below_low, std::function above_high, std::function above_overflow) -> Buffer::Instance* { @@ -1978,7 +1979,8 @@ class MockTransportConnectionImplTest : public testing::Test { TransportSocketPtr(transport_socket_), stream_info_, true); connection_->addConnectionCallbacks(callbacks_); // File events will trigger setTrackedObject on the dispatcher. - EXPECT_CALL(dispatcher_, setTrackedObject(_)).WillRepeatedly(Return(nullptr)); + EXPECT_CALL(dispatcher_, pushTrackedObject(_)).Times(AnyNumber()); + EXPECT_CALL(dispatcher_, popTrackedObject(_)).Times(AnyNumber()); } ~MockTransportConnectionImplTest() override { connection_->close(ConnectionCloseType::NoFlush); } diff --git a/test/common/router/router_test.cc b/test/common/router/router_test.cc index a540901ec284..26525ebf3536 100644 --- a/test/common/router/router_test.cc +++ b/test/common/router/router_test.cc @@ -116,8 +116,9 @@ class RouterTestBase : public testing::Test { // Make the "system time" non-zero, because 0 is considered invalid by DateUtil. test_time_.setMonotonicTime(std::chrono::milliseconds(50)); - // Allow any number of setTrackedObject calls for the dispatcher strict mock. - EXPECT_CALL(callbacks_.dispatcher_, setTrackedObject(_)).Times(AnyNumber()); + // Allow any number of (append|pop)TrackedObject calls for the dispatcher strict mock. + EXPECT_CALL(callbacks_.dispatcher_, pushTrackedObject(_)).Times(AnyNumber()); + EXPECT_CALL(callbacks_.dispatcher_, popTrackedObject(_)).Times(AnyNumber()); } void expectResponseTimerCreate() { @@ -294,7 +295,8 @@ class RouterTestBase : public testing::Test { [&](Http::ResponseDecoder& decoder, Http::ConnectionPool::Callbacks& callbacks) -> Http::ConnectionPool::Cancellable* { response_decoder_ = &decoder; - EXPECT_CALL(callbacks_.dispatcher_, setTrackedObject(_)).Times(testing::AtLeast(2)); + EXPECT_CALL(callbacks_.dispatcher_, pushTrackedObject(_)).Times(testing::AtLeast(1)); + EXPECT_CALL(callbacks_.dispatcher_, popTrackedObject(_)).Times(testing::AtLeast(1)); callbacks.onPoolReady(original_encoder_, cm_.thread_local_cluster_.conn_pool_.host_, upstream_stream_info_, Http::Protocol::Http10); return nullptr; @@ -2262,7 +2264,8 @@ TEST_F(RouterTest, GrpcOk) { EXPECT_EQ(1U, callbacks_.route_->route_entry_.virtual_cluster_.stats().upstream_rq_total_.value()); - EXPECT_CALL(callbacks_.dispatcher_, setTrackedObject(_)).Times(2); + EXPECT_CALL(callbacks_.dispatcher_, pushTrackedObject(_)); + EXPECT_CALL(callbacks_.dispatcher_, popTrackedObject(_)); Http::ResponseHeaderMapPtr response_headers( new Http::TestResponseHeaderMapImpl{{":status", "200"}}); EXPECT_CALL(cm_.thread_local_cluster_.conn_pool_.host_->outlier_detector_, @@ -2270,7 +2273,8 @@ TEST_F(RouterTest, GrpcOk) { response_decoder->decodeHeaders(std::move(response_headers), false); EXPECT_TRUE(verifyHostUpstreamStats(0, 0)); - EXPECT_CALL(callbacks_.dispatcher_, setTrackedObject(_)).Times(2); + EXPECT_CALL(callbacks_.dispatcher_, pushTrackedObject(_)); + EXPECT_CALL(callbacks_.dispatcher_, popTrackedObject(_)); Http::ResponseTrailerMapPtr response_trailers( new Http::TestResponseTrailerMapImpl{{"grpc-status", "0"}}); response_decoder->decodeTrailers(std::move(response_trailers)); diff --git a/test/common/router/router_upstream_log_test.cc b/test/common/router/router_upstream_log_test.cc index 7821291c80a5..8b66f739140a 100644 --- a/test/common/router/router_upstream_log_test.cc +++ b/test/common/router/router_upstream_log_test.cc @@ -99,7 +99,8 @@ class RouterUpstreamLogTest : public testing::Test { ShadowWriterPtr(new MockShadowWriter()), router_proto); router_ = std::make_shared(*config_); router_->setDecoderFilterCallbacks(callbacks_); - EXPECT_CALL(callbacks_.dispatcher_, setTrackedObject(_)).Times(testing::AnyNumber()); + EXPECT_CALL(callbacks_.dispatcher_, pushTrackedObject(_)).Times(testing::AnyNumber()); + EXPECT_CALL(callbacks_.dispatcher_, popTrackedObject(_)).Times(testing::AnyNumber()); upstream_locality_.set_zone("to_az"); context_.cluster_manager_.initializeThreadLocalClusters({"fake_cluster"}); diff --git a/test/common/signal/fatal_action_test.cc b/test/common/signal/fatal_action_test.cc index 9a286c9f9fb1..1e276f016e4a 100644 --- a/test/common/signal/fatal_action_test.cc +++ b/test/common/signal/fatal_action_test.cc @@ -1,3 +1,6 @@ +#include + +#include "envoy/common/scope_tracker.h" #include "envoy/server/fatal_action_config.h" #include "common/signal/fatal_action.h" @@ -23,9 +26,10 @@ class TestFatalErrorHandler : public FatalErrorHandlerInterface { void onFatalError(std::ostream& /*os*/) const override {} void runFatalActionsOnTrackedObject(const FatalAction::FatalActionPtrList& actions) const override { - // Call the Fatal Actions with nullptr + // Call the Fatal Actions with a non-empty vector so it runs the action. + std::vector tracked_objects{nullptr}; for (const Server::Configuration::FatalActionPtr& action : actions) { - action->run(nullptr); + action->run(tracked_objects); } } }; @@ -33,7 +37,9 @@ class TestFatalErrorHandler : public FatalErrorHandlerInterface { class TestFatalAction : public Server::Configuration::FatalAction { public: TestFatalAction(bool is_safe, int* const counter) : is_safe_(is_safe), counter_(counter) {} - void run(const ScopeTrackedObject* /*current_object*/) override { ++(*counter_); } + void run(absl::Span /*tracked_objects*/) override { + ++(*counter_); + } bool isAsyncSignalSafe() const override { return is_safe_; } private: diff --git a/test/common/signal/signals_test.cc b/test/common/signal/signals_test.cc index 3ecf49f6695b..f2e5ddde8c8d 100644 --- a/test/common/signal/signals_test.cc +++ b/test/common/signal/signals_test.cc @@ -1,6 +1,9 @@ #include #include +#include + +#include "envoy/common/scope_tracker.h" #include "common/signal/fatal_error_handler.h" #include "common/signal/signal_action.h" @@ -28,21 +31,27 @@ extern void resetFatalActionStateForTest(); // Use this test handler instead of a mock, because fatal error handlers must be // signal-safe and a mock might allocate memory. class TestFatalErrorHandler : public FatalErrorHandlerInterface { +public: void onFatalError(std::ostream& os) const override { os << "HERE!"; } void runFatalActionsOnTrackedObject(const FatalAction::FatalActionPtrList& actions) const override { // Run the actions for (const auto& action : actions) { - action->run(nullptr); + action->run(tracked_objects_); } } + +private: + std::vector tracked_objects_{nullptr}; }; // Use this to test fatal actions get called, as well as the order they run. class EchoFatalAction : public Server::Configuration::FatalAction { public: EchoFatalAction(absl::string_view echo_msg) : echo_msg_(echo_msg) {} - void run(const ScopeTrackedObject* /*current_object*/) override { std::cerr << echo_msg_; } + void run(absl::Span /*tracked_objects*/) override { + std::cerr << echo_msg_; + } bool isAsyncSignalSafe() const override { return true; } private: @@ -52,7 +61,9 @@ class EchoFatalAction : public Server::Configuration::FatalAction { // Use this to test failing while in a signal handler. class SegfaultFatalAction : public Server::Configuration::FatalAction { public: - void run(const ScopeTrackedObject* /*current_object*/) override { raise(SIGSEGV); } + void run(absl::Span /*tracked_objects*/) override { + raise(SIGSEGV); + } bool isAsyncSignalSafe() const override { return false; } }; diff --git a/test/common/upstream/cluster_manager_impl_test.cc b/test/common/upstream/cluster_manager_impl_test.cc index df0cfd1aba77..194d41180f59 100644 --- a/test/common/upstream/cluster_manager_impl_test.cc +++ b/test/common/upstream/cluster_manager_impl_test.cc @@ -869,8 +869,10 @@ TEST_F(ClusterManagerImplTest, HttpHealthChecker) { createClientConnection_( PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.1:11001")), _, _, _)) .WillOnce(Return(connection)); + EXPECT_CALL(factory_.dispatcher_, deleteInDispatcherThread(_)); create(parseBootstrapFromV3Yaml(yaml)); factory_.tls_.shutdownThread(); + factory_.dispatcher_.to_delete_.clear(); } TEST_F(ClusterManagerImplTest, UnknownCluster) { diff --git a/test/common/upstream/logical_dns_cluster_test.cc b/test/common/upstream/logical_dns_cluster_test.cc index db1511775067..5e0e1b107aaf 100644 --- a/test/common/upstream/logical_dns_cluster_test.cc +++ b/test/common/upstream/logical_dns_cluster_test.cc @@ -202,11 +202,11 @@ class LogicalDnsClusterTest : public Event::TestUsingSimulatedTime, public testi Network::DnsResolver::ResolveCb dns_callback_; NiceMock tls_; Event::MockTimer* resolve_timer_; - std::shared_ptr cluster_; ReadyWatcher membership_updated_; ReadyWatcher initialized_; NiceMock runtime_; NiceMock dispatcher_; + std::shared_ptr cluster_; NiceMock local_info_; NiceMock admin_; Singleton::ManagerImpl singleton_manager_{Thread::threadFactoryForTest()}; diff --git a/test/common/upstream/original_dst_cluster_test.cc b/test/common/upstream/original_dst_cluster_test.cc index 26f15b226453..9ecd87ede145 100644 --- a/test/common/upstream/original_dst_cluster_test.cc +++ b/test/common/upstream/original_dst_cluster_test.cc @@ -94,11 +94,11 @@ class OriginalDstClusterTest : public Event::TestUsingSimulatedTime, public test Stats::TestUtil::TestStore stats_store_; Ssl::MockContextManager ssl_context_manager_; + NiceMock dispatcher_; OriginalDstClusterSharedPtr cluster_; ReadyWatcher membership_updated_; ReadyWatcher initialized_; NiceMock runtime_; - NiceMock dispatcher_; Event::MockTimer* cleanup_timer_; NiceMock random_; NiceMock local_info_; diff --git a/test/extensions/filters/http/fault/fault_filter_test.cc b/test/extensions/filters/http/fault/fault_filter_test.cc index 63707e4d96e9..570ef954e94d 100644 --- a/test/extensions/filters/http/fault/fault_filter_test.cc +++ b/test/extensions/filters/http/fault/fault_filter_test.cc @@ -122,15 +122,16 @@ class FaultFilterTest : public testing::Test { const std::string v2_empty_fault_config_yaml = "{}"; - void SetUpTest(const envoy::extensions::filters::http::fault::v3::HTTPFault fault) { + void setUpTest(const envoy::extensions::filters::http::fault::v3::HTTPFault fault) { config_ = std::make_shared(fault, runtime_, "prefix.", stats_, time_system_); filter_ = std::make_unique(config_); filter_->setDecoderFilterCallbacks(decoder_filter_callbacks_); filter_->setEncoderFilterCallbacks(encoder_filter_callbacks_); - EXPECT_CALL(decoder_filter_callbacks_.dispatcher_, setTrackedObject(_)).Times(AnyNumber()); + EXPECT_CALL(decoder_filter_callbacks_.dispatcher_, pushTrackedObject(_)).Times(AnyNumber()); + EXPECT_CALL(decoder_filter_callbacks_.dispatcher_, popTrackedObject(_)).Times(AnyNumber()); } - void SetUpTest(const std::string& yaml) { SetUpTest(convertYamlStrToProtoConfig(yaml)); } + void setUpTest(const std::string& yaml) { setUpTest(convertYamlStrToProtoConfig(yaml)); } void expectDelayTimer(uint64_t duration_ms) { timer_ = new Event::MockTimer(&decoder_filter_callbacks_.dispatcher_); @@ -228,7 +229,7 @@ TEST_F(FaultFilterTest, AbortWithHttpStatus) { fault.mutable_abort()->mutable_percentage()->set_denominator( envoy::type::v3::FractionalPercent::HUNDRED); fault.mutable_abort()->set_http_status(429); - SetUpTest(fault); + setUpTest(fault); EXPECT_CALL(runtime_.snapshot_, getInteger("fault.http.max_active_faults", std::numeric_limits::max())) @@ -274,7 +275,7 @@ TEST_F(FaultFilterTest, AbortWithHttpStatus) { } TEST_F(FaultFilterTest, HeaderAbortWithHttpStatus) { - SetUpTest(header_abort_only_yaml); + setUpTest(header_abort_only_yaml); request_headers_.addCopy("x-envoy-fault-abort-request", "429"); @@ -329,7 +330,7 @@ TEST_F(FaultFilterTest, AbortWithGrpcStatus) { fault.mutable_abort()->mutable_percentage()->set_denominator( envoy::type::v3::FractionalPercent::HUNDRED); fault.mutable_abort()->set_grpc_status(5); - SetUpTest(fault); + setUpTest(fault); EXPECT_CALL(runtime_.snapshot_, getInteger("fault.http.max_active_faults", std::numeric_limits::max())) @@ -377,7 +378,7 @@ TEST_F(FaultFilterTest, AbortWithGrpcStatus) { TEST_F(FaultFilterTest, HeaderAbortWithGrpcStatus) { decoder_filter_callbacks_.is_grpc_request_ = true; - SetUpTest(header_abort_only_yaml); + setUpTest(header_abort_only_yaml); request_headers_.addCopy("x-envoy-fault-abort-grpc-request", "5"); @@ -427,7 +428,7 @@ TEST_F(FaultFilterTest, HeaderAbortWithGrpcStatus) { } TEST_F(FaultFilterTest, HeaderAbortWithHttpAndGrpcStatus) { - SetUpTest(header_abort_only_yaml); + setUpTest(header_abort_only_yaml); request_headers_.addCopy("x-envoy-fault-abort-request", "429"); request_headers_.addCopy("x-envoy-fault-abort-grpc-request", "5"); @@ -478,7 +479,7 @@ TEST_F(FaultFilterTest, HeaderAbortWithHttpAndGrpcStatus) { } TEST_F(FaultFilterTest, FixedDelayZeroDuration) { - SetUpTest(fixed_delay_only_yaml); + setUpTest(fixed_delay_only_yaml); // Delay related calls EXPECT_CALL( @@ -506,7 +507,7 @@ TEST_F(FaultFilterTest, FixedDelayZeroDuration) { } TEST_F(FaultFilterTest, Overflow) { - SetUpTest(fixed_delay_only_yaml); + setUpTest(fixed_delay_only_yaml); // Delay related calls EXPECT_CALL( @@ -532,7 +533,7 @@ TEST_F(FaultFilterTest, Overflow) { // Verifies that we don't increment the active_faults gauge when not applying a fault. TEST_F(FaultFilterTest, Passthrough) { envoy::extensions::filters::http::fault::v3::HTTPFault fault; - SetUpTest(fault); + setUpTest(fault); EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_->decodeHeaders(request_headers_, true)); @@ -545,7 +546,7 @@ TEST_F(FaultFilterTest, FixedDelayDeprecatedPercentAndNonZeroDuration) { fault.mutable_delay()->mutable_percentage()->set_denominator( envoy::type::v3::FractionalPercent::HUNDRED); fault.mutable_delay()->mutable_fixed_delay()->set_seconds(5); - SetUpTest(fault); + setUpTest(fault); EXPECT_CALL(runtime_.snapshot_, getInteger("fault.http.max_active_faults", std::numeric_limits::max())) @@ -588,7 +589,7 @@ TEST_F(FaultFilterTest, FixedDelayDeprecatedPercentAndNonZeroDuration) { } TEST_F(FaultFilterTest, DelayForDownstreamCluster) { - SetUpTest(fixed_delay_only_yaml); + setUpTest(fixed_delay_only_yaml); EXPECT_CALL(runtime_.snapshot_, getInteger("fault.http.max_active_faults", std::numeric_limits::max())) @@ -624,7 +625,8 @@ TEST_F(FaultFilterTest, DelayForDownstreamCluster) { EXPECT_CALL(decoder_filter_callbacks_, continueDecoding()); EXPECT_EQ(Http::FilterDataStatus::StopIterationAndWatermark, filter_->decodeData(data_, false)); - EXPECT_CALL(decoder_filter_callbacks_.dispatcher_, setTrackedObject(_)).Times(2); + EXPECT_CALL(decoder_filter_callbacks_.dispatcher_, pushTrackedObject(_)); + EXPECT_CALL(decoder_filter_callbacks_.dispatcher_, popTrackedObject(_)); timer_->invokeCallback(); EXPECT_EQ(Http::FilterTrailersStatus::Continue, filter_->decodeTrailers(request_trailers_)); @@ -636,7 +638,7 @@ TEST_F(FaultFilterTest, DelayForDownstreamCluster) { } TEST_F(FaultFilterTest, FixedDelayAndAbortDownstream) { - SetUpTest(fixed_delay_and_abort_yaml); + setUpTest(fixed_delay_and_abort_yaml); EXPECT_CALL(runtime_.snapshot_, getInteger("fault.http.max_active_faults", std::numeric_limits::max())) @@ -702,7 +704,7 @@ TEST_F(FaultFilterTest, FixedDelayAndAbortDownstream) { } TEST_F(FaultFilterTest, FixedDelayAndAbort) { - SetUpTest(fixed_delay_and_abort_yaml); + setUpTest(fixed_delay_and_abort_yaml); EXPECT_CALL(runtime_.snapshot_, getInteger("fault.http.max_active_faults", std::numeric_limits::max())) @@ -758,7 +760,7 @@ TEST_F(FaultFilterTest, FixedDelayAndAbort) { } TEST_F(FaultFilterTest, FixedDelayAndAbortDownstreamNodes) { - SetUpTest(fixed_delay_and_abort_nodes_yaml); + setUpTest(fixed_delay_and_abort_nodes_yaml); EXPECT_CALL(runtime_.snapshot_, getInteger("fault.http.max_active_faults", std::numeric_limits::max())) @@ -812,13 +814,13 @@ TEST_F(FaultFilterTest, FixedDelayAndAbortDownstreamNodes) { } TEST_F(FaultFilterTest, NoDownstreamMatch) { - SetUpTest(fixed_delay_and_abort_nodes_yaml); + setUpTest(fixed_delay_and_abort_nodes_yaml); EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_->decodeHeaders(request_headers_, true)); } TEST_F(FaultFilterTest, FixedDelayAndAbortHeaderMatchSuccess) { - SetUpTest(fixed_delay_and_abort_match_headers_yaml); + setUpTest(fixed_delay_and_abort_match_headers_yaml); request_headers_.addCopy("x-foo1", "Bar"); request_headers_.addCopy("x-foo2", "RandomValue"); @@ -875,7 +877,7 @@ TEST_F(FaultFilterTest, FixedDelayAndAbortHeaderMatchSuccess) { } TEST_F(FaultFilterTest, FixedDelayAndAbortHeaderMatchFail) { - SetUpTest(fixed_delay_and_abort_match_headers_yaml); + setUpTest(fixed_delay_and_abort_match_headers_yaml); request_headers_.addCopy("x-foo1", "Bar"); request_headers_.addCopy("x-foo3", "Baz"); @@ -903,7 +905,7 @@ TEST_F(FaultFilterTest, FixedDelayAndAbortHeaderMatchFail) { } TEST_F(FaultFilterTest, TimerResetAfterStreamReset) { - SetUpTest(fixed_delay_only_yaml); + setUpTest(fixed_delay_only_yaml); EXPECT_CALL(runtime_.snapshot_, getInteger("fault.http.max_active_faults", std::numeric_limits::max())) @@ -954,7 +956,7 @@ TEST_F(FaultFilterTest, TimerResetAfterStreamReset) { } TEST_F(FaultFilterTest, FaultWithTargetClusterMatchSuccess) { - SetUpTest(delay_with_upstream_cluster_yaml); + setUpTest(delay_with_upstream_cluster_yaml); const std::string upstream_cluster("www1"); EXPECT_CALL(decoder_filter_callbacks_.route_->route_entry_, clusterName()) @@ -999,7 +1001,7 @@ TEST_F(FaultFilterTest, FaultWithTargetClusterMatchSuccess) { } TEST_F(FaultFilterTest, FaultWithTargetClusterMatchFail) { - SetUpTest(delay_with_upstream_cluster_yaml); + setUpTest(delay_with_upstream_cluster_yaml); const std::string upstream_cluster("mismatch"); EXPECT_CALL(decoder_filter_callbacks_.route_->route_entry_, clusterName()) @@ -1027,7 +1029,7 @@ TEST_F(FaultFilterTest, FaultWithTargetClusterMatchFail) { } TEST_F(FaultFilterTest, FaultWithTargetClusterNullRoute) { - SetUpTest(delay_with_upstream_cluster_yaml); + setUpTest(delay_with_upstream_cluster_yaml); const std::string upstream_cluster("www1"); EXPECT_CALL(*decoder_filter_callbacks_.route_, routeEntry()).WillRepeatedly(Return(nullptr)); @@ -1108,7 +1110,7 @@ TEST_F(FaultFilterTest, RouteFaultOverridesListenerFault) { // route-level fault overrides listener-level fault { - SetUpTest(v2_empty_fault_config_yaml); // This is a valid listener level fault + setUpTest(v2_empty_fault_config_yaml); // This is a valid listener level fault TestPerFilterConfigFault(&delay_fault, nullptr); } @@ -1116,7 +1118,7 @@ TEST_F(FaultFilterTest, RouteFaultOverridesListenerFault) { { config_->stats().aborts_injected_.reset(); config_->stats().delays_injected_.reset(); - SetUpTest(v2_empty_fault_config_yaml); + setUpTest(v2_empty_fault_config_yaml); TestPerFilterConfigFault(nullptr, &delay_fault); } @@ -1124,7 +1126,7 @@ TEST_F(FaultFilterTest, RouteFaultOverridesListenerFault) { { config_->stats().aborts_injected_.reset(); config_->stats().delays_injected_.reset(); - SetUpTest(v2_empty_fault_config_yaml); + setUpTest(v2_empty_fault_config_yaml); TestPerFilterConfigFault(&delay_fault, &abort_fault); } } @@ -1135,7 +1137,7 @@ class FaultFilterRateLimitTest : public FaultFilterTest { envoy::extensions::filters::http::fault::v3::HTTPFault fault; fault.mutable_response_rate_limit()->mutable_fixed_limit()->set_limit_kbps(1); fault.mutable_response_rate_limit()->mutable_percentage()->set_numerator(100); - SetUpTest(fault); + setUpTest(fault); EXPECT_CALL( runtime_.snapshot_, diff --git a/test/extensions/filters/http/squash/squash_filter_test.cc b/test/extensions/filters/http/squash/squash_filter_test.cc index 5cb81564c4be..260ba53ca2a1 100644 --- a/test/extensions/filters/http/squash/squash_filter_test.cc +++ b/test/extensions/filters/http/squash/squash_filter_test.cc @@ -2,6 +2,7 @@ #include #include +#include "envoy/common/scope_tracker.h" #include "envoy/extensions/filters/http/squash/v3/squash.pb.h" #include "common/http/message_impl.h" @@ -328,7 +329,8 @@ TEST_F(SquashFilterTest, Timeout) { EXPECT_CALL(request_, cancel()); EXPECT_CALL(filter_callbacks_, continueDecoding()); - EXPECT_CALL(filter_callbacks_.dispatcher_, setTrackedObject(_)).Times(2); + EXPECT_CALL(filter_callbacks_.dispatcher_, pushTrackedObject(_)); + EXPECT_CALL(filter_callbacks_.dispatcher_, popTrackedObject(_)); attachmentTimeout_timer_->invokeCallback(); EXPECT_EQ(Envoy::Http::FilterDataStatus::Continue, filter_->decodeData(buffer, false)); @@ -357,7 +359,9 @@ TEST_F(SquashFilterTest, CheckRetryPollingAttachment) { // Expect the second get attachment request expectAsyncClientSend(); - EXPECT_CALL(filter_callbacks_.dispatcher_, setTrackedObject(_)).Times(2); + EXPECT_CALL(filter_callbacks_.dispatcher_, pushTrackedObject(_)); + EXPECT_CALL(filter_callbacks_.dispatcher_, popTrackedObject(_)); + retry_timer->invokeCallback(); EXPECT_CALL(filter_callbacks_, continueDecoding()); completeGetStatusRequest("attached"); @@ -377,7 +381,8 @@ TEST_F(SquashFilterTest, PollingAttachmentNoCluster) { // Expect the second get attachment request ON_CALL(factory_context_.cluster_manager_, getThreadLocalCluster("squash")) .WillByDefault(Return(nullptr)); - EXPECT_CALL(filter_callbacks_.dispatcher_, setTrackedObject(_)).Times(2); + EXPECT_CALL(filter_callbacks_.dispatcher_, pushTrackedObject(_)); + EXPECT_CALL(filter_callbacks_.dispatcher_, popTrackedObject(_)); EXPECT_CALL(*retry_timer, enableTimer(config_->attachmentPollPeriod(), _)); retry_timer->invokeCallback(); } @@ -395,7 +400,8 @@ TEST_F(SquashFilterTest, CheckRetryPollingAttachmentOnFailure) { // Expect the second get attachment request expectAsyncClientSend(); - EXPECT_CALL(filter_callbacks_.dispatcher_, setTrackedObject(_)).Times(2); + EXPECT_CALL(filter_callbacks_.dispatcher_, pushTrackedObject(_)); + EXPECT_CALL(filter_callbacks_.dispatcher_, popTrackedObject(_)); retry_timer->invokeCallback(); EXPECT_CALL(filter_callbacks_, continueDecoding()); @@ -466,7 +472,8 @@ TEST_F(SquashFilterTest, TimerExpiresInline) { attachmentTimeout_timer_->scope_ = scope; attachmentTimeout_timer_->enabled_ = true; // timer expires inline - EXPECT_CALL(filter_callbacks_.dispatcher_, setTrackedObject(_)).Times(2); + EXPECT_CALL(filter_callbacks_.dispatcher_, pushTrackedObject(_)); + EXPECT_CALL(filter_callbacks_.dispatcher_, popTrackedObject(_)); attachmentTimeout_timer_->invokeCallback(); })); diff --git a/test/integration/sds_dynamic_integration_test.cc b/test/integration/sds_dynamic_integration_test.cc index 6023882ab9d6..c1dcb3501a8b 100644 --- a/test/integration/sds_dynamic_integration_test.cc +++ b/test/integration/sds_dynamic_integration_test.cc @@ -673,5 +673,134 @@ TEST_P(SdsDynamicUpstreamIntegrationTest, WrongSecretFirst) { EXPECT_EQ(1, test_server_->counter("sds.client_cert.update_rejected")->value()); } +// Test CDS with SDS. A cluster provided by CDS raises new SDS request for upstream cert. +class SdsCdsIntegrationTest : public SdsDynamicIntegrationBaseTest { +public: + void initialize() override { + config_helper_.addConfigModifier([this](envoy::config::bootstrap::v3::Bootstrap& bootstrap) { + // Create the dynamic cluster. This cluster will be using sds. + dynamic_cluster_ = bootstrap.mutable_static_resources()->clusters(0); + dynamic_cluster_.set_name("dynamic"); + dynamic_cluster_.mutable_connect_timeout()->MergeFrom( + ProtobufUtil::TimeUtil::MillisecondsToDuration(500000)); + auto* transport_socket = dynamic_cluster_.mutable_transport_socket(); + envoy::extensions::transport_sockets::tls::v3::UpstreamTlsContext tls_context; + auto* secret_config = + tls_context.mutable_common_tls_context()->add_tls_certificate_sds_secret_configs(); + setUpSdsConfig(secret_config, "client_cert"); + + transport_socket->set_name("envoy.transport_sockets.tls"); + transport_socket->mutable_typed_config()->PackFrom(tls_context); + + // Add cds cluster first. + auto* cds_cluster = bootstrap.mutable_static_resources()->add_clusters(); + cds_cluster->MergeFrom(bootstrap.static_resources().clusters()[0]); + cds_cluster->set_name("cds_cluster"); + ConfigHelper::setHttp2(*cds_cluster); + // Then add sds cluster. + auto* sds_cluster = bootstrap.mutable_static_resources()->add_clusters(); + sds_cluster->MergeFrom(bootstrap.static_resources().clusters()[0]); + sds_cluster->set_name("sds_cluster"); + ConfigHelper::setHttp2(*sds_cluster); + + const std::string cds_yaml = R"EOF( + resource_api_version: V3 + api_config_source: + api_type: GRPC + transport_api_version: V3 + grpc_services: + envoy_grpc: + cluster_name: cds_cluster + set_node_on_first_message_only: true +)EOF"; + auto* cds = bootstrap.mutable_dynamic_resources()->mutable_cds_config(); + TestUtility::loadFromYaml(cds_yaml, *cds); + }); + + HttpIntegrationTest::initialize(); + } + + void TearDown() override { + { + AssertionResult result = sds_connection_->close(); + RELEASE_ASSERT(result, result.message()); + result = sds_connection_->waitForDisconnect(); + RELEASE_ASSERT(result, result.message()); + sds_connection_.reset(); + } + cleanUpXdsConnection(); + cleanupUpstreamAndDownstream(); + codec_client_.reset(); + test_server_.reset(); + fake_upstreams_.clear(); + } + + void createUpstreams() override { + // Static cluster. + addFakeUpstream(FakeHttpConnection::Type::HTTP1); + // Cds Cluster. + addFakeUpstream(FakeHttpConnection::Type::HTTP2); + // Sds Cluster. + addFakeUpstream(FakeHttpConnection::Type::HTTP2); + } + + void sendCdsResponse() { + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "", {}, {}, {}, true)); + sendDiscoveryResponse( + Config::TypeUrl::get().Cluster, {dynamic_cluster_}, {dynamic_cluster_}, {}, "55"); + } + + void sendSdsResponse2(const envoy::extensions::transport_sockets::tls::v3::Secret& secret, + FakeStream& sds_stream) { + API_NO_BOOST(envoy::api::v2::DiscoveryResponse) discovery_response; + discovery_response.set_version_info("1"); + discovery_response.set_type_url(Config::TypeUrl::get().Secret); + discovery_response.add_resources()->PackFrom(secret); + sds_stream.sendGrpcMessage(discovery_response); + } + envoy::config::cluster::v3::Cluster dynamic_cluster_; + FakeHttpConnectionPtr sds_connection_; + FakeStreamPtr sds_stream_; +}; + +INSTANTIATE_TEST_SUITE_P(IpVersions, SdsCdsIntegrationTest, GRPC_CLIENT_INTEGRATION_PARAMS); + +TEST_P(SdsCdsIntegrationTest, BasicSuccess) { + on_server_init_function_ = [this]() { + { + // CDS. + AssertionResult result = + fake_upstreams_[1]->waitForHttpConnection(*dispatcher_, xds_connection_); + RELEASE_ASSERT(result, result.message()); + result = xds_connection_->waitForNewStream(*dispatcher_, xds_stream_); + RELEASE_ASSERT(result, result.message()); + xds_stream_->startGrpcStream(); + sendCdsResponse(); + } + { + // SDS. + AssertionResult result = + fake_upstreams_[2]->waitForHttpConnection(*dispatcher_, sds_connection_); + RELEASE_ASSERT(result, result.message()); + + result = sds_connection_->waitForNewStream(*dispatcher_, sds_stream_); + RELEASE_ASSERT(result, result.message()); + sds_stream_->startGrpcStream(); + sendSdsResponse2(getClientSecret(), *sds_stream_); + } + }; + initialize(); + + test_server_->waitForCounterGe( + "cluster.dynamic.client_ssl_socket_factory.ssl_context_update_by_sds", 1); + // The 4 clusters are CDS,SDS,static and dynamic cluster. + test_server_->waitForGaugeGe("cluster_manager.active_clusters", 4); + + sendDiscoveryResponse(Config::TypeUrl::get().Cluster, {}, {}, + {}, "42"); + // Successfully removed the dynamic cluster. + test_server_->waitForGaugeEq("cluster_manager.active_clusters", 3); +} + } // namespace Ssl } // namespace Envoy diff --git a/test/mocks/event/mocks.cc b/test/mocks/event/mocks.cc index a8db4995abb3..06d39c6e020a 100644 --- a/test/mocks/event/mocks.cc +++ b/test/mocks/event/mocks.cc @@ -31,6 +31,7 @@ MockDispatcher::MockDispatcher(const std::string& name) : name_(name) { std::function above_overflow) -> Buffer::Instance* { return new Buffer::WatermarkBuffer(below_low, above_high, above_overflow); })); + ON_CALL(*this, isThreadSafe()).WillByDefault(Return(true)); } MockDispatcher::~MockDispatcher() = default; diff --git a/test/mocks/event/mocks.h b/test/mocks/event/mocks.h index 8e29e84c3b32..87627c6d5ce9 100644 --- a/test/mocks/event/mocks.h +++ b/test/mocks/event/mocks.h @@ -5,6 +5,7 @@ #include #include +#include "envoy/common/scope_tracker.h" #include "envoy/common/time.h" #include "envoy/event/deferred_deletable.h" #include "envoy/event/dispatcher.h" @@ -129,13 +130,16 @@ class MockDispatcher : public Dispatcher { MOCK_METHOD(void, exit, ()); MOCK_METHOD(SignalEvent*, listenForSignal_, (signal_t signal_num, SignalCb cb)); MOCK_METHOD(void, post, (std::function callback)); + MOCK_METHOD(void, deleteInDispatcherThread, (DispatcherThreadDeletableConstPtr deletable)); MOCK_METHOD(void, run, (RunType type)); - MOCK_METHOD(const ScopeTrackedObject*, setTrackedObject, (const ScopeTrackedObject* object)); + MOCK_METHOD(void, pushTrackedObject, (const ScopeTrackedObject* object)); + MOCK_METHOD(void, popTrackedObject, (const ScopeTrackedObject* expected_object)); MOCK_METHOD(bool, isThreadSafe, (), (const)); Buffer::WatermarkFactory& getWatermarkFactory() override { return buffer_factory_; } MOCK_METHOD(Thread::ThreadId, getCurrentThreadId, ()); MOCK_METHOD(MonotonicTime, approximateMonotonicTime, (), (const)); MOCK_METHOD(void, updateApproximateMonotonicTime, ()); + MOCK_METHOD(void, shutdown, ()); GlobalTimeSystem time_system_; std::list to_delete_; diff --git a/test/mocks/event/wrapped_dispatcher.h b/test/mocks/event/wrapped_dispatcher.h index 974d61b39be2..5036a5514351 100644 --- a/test/mocks/event/wrapped_dispatcher.h +++ b/test/mocks/event/wrapped_dispatcher.h @@ -94,11 +94,19 @@ class WrappedDispatcher : public Dispatcher { void post(std::function callback) override { impl_.post(std::move(callback)); } + void deleteInDispatcherThread(DispatcherThreadDeletableConstPtr deletable) override { + impl_.deleteInDispatcherThread(std::move(deletable)); + } + void run(RunType type) override { impl_.run(type); } Buffer::WatermarkFactory& getWatermarkFactory() override { return impl_.getWatermarkFactory(); } - const ScopeTrackedObject* setTrackedObject(const ScopeTrackedObject* object) override { - return impl_.setTrackedObject(object); + void pushTrackedObject(const ScopeTrackedObject* object) override { + return impl_.pushTrackedObject(object); + } + + void popTrackedObject(const ScopeTrackedObject* expected_object) override { + return impl_.popTrackedObject(expected_object); } MonotonicTime approximateMonotonicTime() const override { @@ -109,6 +117,8 @@ class WrappedDispatcher : public Dispatcher { bool isThreadSafe() const override { return impl_.isThreadSafe(); } + void shutdown() override { impl_.shutdown(); } + protected: Dispatcher& impl_; }; diff --git a/test/mocks/router/router_filter_interface.cc b/test/mocks/router/router_filter_interface.cc index f1fb2ab0518c..9d175ee6cb7a 100644 --- a/test/mocks/router/router_filter_interface.cc +++ b/test/mocks/router/router_filter_interface.cc @@ -18,7 +18,8 @@ MockRouterFilterInterface::MockRouterFilterInterface() ON_CALL(*this, config()).WillByDefault(ReturnRef(config_)); ON_CALL(*this, cluster()).WillByDefault(Return(cluster_info_)); ON_CALL(*this, upstreamRequests()).WillByDefault(ReturnRef(requests_)); - EXPECT_CALL(callbacks_.dispatcher_, setTrackedObject(_)).Times(AnyNumber()); + EXPECT_CALL(callbacks_.dispatcher_, pushTrackedObject(_)).Times(AnyNumber()); + EXPECT_CALL(callbacks_.dispatcher_, popTrackedObject(_)).Times(AnyNumber()); ON_CALL(*this, routeEntry()).WillByDefault(Return(&route_entry_)); ON_CALL(callbacks_, connection()).WillByDefault(Return(&client_connection_)); route_entry_.connect_config_.emplace(RouteEntry::ConnectConfig()); diff --git a/test/server/server_test.cc b/test/server/server_test.cc index 10cc3a019291..087f6d0b7848 100644 --- a/test/server/server_test.cc +++ b/test/server/server_test.cc @@ -1363,7 +1363,7 @@ TEST_P(ServerInstanceImplTest, WithUnknownBootstrapExtensions) { #ifndef WIN32 class SafeFatalAction : public Configuration::FatalAction { public: - void run(const ScopeTrackedObject* /*current_object*/) override { + void run(absl::Span /*tracked_objects*/) override { std::cerr << "Called SafeFatalAction" << std::endl; } @@ -1372,7 +1372,7 @@ class SafeFatalAction : public Configuration::FatalAction { class UnsafeFatalAction : public Configuration::FatalAction { public: - void run(const ScopeTrackedObject* /*current_object*/) override { + void run(absl::Span /*tracked_objects*/) override { std::cerr << "Called UnsafeFatalAction" << std::endl; }