diff --git a/include/envoy/config/grpc_mux.h b/include/envoy/config/grpc_mux.h index 43725cb30233..fc1c603b74ac 100644 --- a/include/envoy/config/grpc_mux.h +++ b/include/envoy/config/grpc_mux.h @@ -42,7 +42,7 @@ class GrpcMuxWatch { * Updates the set of resources that the watch is interested in. * @param resources set of resource names to watch for */ - virtual void update(const std::set& resources) PURE; + virtual void update(const absl::flat_hash_set& resources) PURE; }; using GrpcMuxWatchPtr = std::unique_ptr; @@ -100,13 +100,13 @@ class GrpcMux { * away, its EDS updates should be cancelled by destroying the GrpcMuxWatchPtr. */ virtual GrpcMuxWatchPtr addWatch(const std::string& type_url, - const std::set& resources, + const absl::flat_hash_set& resources, SubscriptionCallbacks& callbacks, OpaqueResourceDecoder& resource_decoder, const bool use_namespace_matching) PURE; virtual void requestOnDemandUpdate(const std::string& type_url, - const std::set& for_update) PURE; + const absl::flat_hash_set& for_update) PURE; using TypeUrlMap = absl::flat_hash_map; static TypeUrlMap& typeUrlMap() { MUTABLE_CONSTRUCT_ON_FIRST_USE(TypeUrlMap, {}); } diff --git a/include/envoy/config/subscription.h b/include/envoy/config/subscription.h index 19c8dd19f953..c8b0ae54c470 100644 --- a/include/envoy/config/subscription.h +++ b/include/envoy/config/subscription.h @@ -180,20 +180,20 @@ class Subscription { * to fetch throughout the lifetime of the Subscription object. * @param resources set of resource names to fetch. */ - virtual void start(const std::set& resource_names) PURE; + virtual void start(const absl::flat_hash_set& resource_names) PURE; /** * Update the resources to fetch. - * @param resources vector of resource names to fetch. It's a (not unordered_)set so that it can - * be passed to std::set_difference, which must be given sorted collections. + * @param resources vector of resource names to fetch. */ - virtual void updateResourceInterest(const std::set& update_to_these_names) PURE; + virtual void + updateResourceInterest(const absl::flat_hash_set& update_to_these_names) PURE; /** * Creates a discovery request for resources. * @param add_these_names resource ids for inclusion in the discovery request. */ - virtual void requestOnDemandUpdate(const std::set& add_these_names) PURE; + virtual void requestOnDemandUpdate(const absl::flat_hash_set& add_these_names) PURE; }; using SubscriptionPtr = std::unique_ptr; diff --git a/source/common/common/utility.h b/source/common/common/utility.h index e4eba5b37117..c995fed2db38 100644 --- a/source/common/common/utility.h +++ b/source/common/common/utility.h @@ -752,4 +752,17 @@ class InlineString : public InlineStorage { char data_[]; }; +class SetUtil { +public: + // Use instead of std::set_difference for unordered absl::flat_hash_set containers. + template + static void setDifference(const absl::flat_hash_set& original_set, + const absl::flat_hash_set& remove_set, + absl::flat_hash_set& result_set) { + std::copy_if(original_set.begin(), original_set.end(), + std::inserter(result_set, result_set.begin()), + [&remove_set](const T& v) -> bool { return remove_set.count(v) == 0; }); + } +}; + } // namespace Envoy diff --git a/source/common/config/BUILD b/source/common/config/BUILD index 44aeac2c1cb8..1454159381ec 100644 --- a/source/common/config/BUILD +++ b/source/common/config/BUILD @@ -443,6 +443,7 @@ envoy_cc_library( "//source/common/common:assert_lib", "//source/common/common:cleanup_lib", "//source/common/common:minimal_logger_lib", + "//source/common/common:utility_lib", "//source/common/protobuf", "@envoy_api//envoy/service/discovery/v3:pkg_cc_proto", ], diff --git a/source/common/config/delta_subscription_state.cc b/source/common/config/delta_subscription_state.cc index 7702081602a1..8821427c0c37 100644 --- a/source/common/config/delta_subscription_state.cc +++ b/source/common/config/delta_subscription_state.cc @@ -32,8 +32,9 @@ DeltaSubscriptionState::DeltaSubscriptionState(std::string type_url, type_url_(std::move(type_url)), watch_map_(watch_map), local_info_(local_info), dispatcher_(dispatcher) {} -void DeltaSubscriptionState::updateSubscriptionInterest(const std::set& cur_added, - const std::set& cur_removed) { +void DeltaSubscriptionState::updateSubscriptionInterest( + const absl::flat_hash_set& cur_added, + const absl::flat_hash_set& cur_removed) { for (const auto& a : cur_added) { setResourceWaitingForServer(a); // If interest in a resource is removed-then-added (all before a discovery request diff --git a/source/common/config/delta_subscription_state.h b/source/common/config/delta_subscription_state.h index 089b632c9a01..ac073d777705 100644 --- a/source/common/config/delta_subscription_state.h +++ b/source/common/config/delta_subscription_state.h @@ -29,9 +29,9 @@ class DeltaSubscriptionState : public Logger::Loggable { const LocalInfo::LocalInfo& local_info, Event::Dispatcher& dispatcher); // Update which resources we're interested in subscribing to. - void updateSubscriptionInterest(const std::set& cur_added, - const std::set& cur_removed); - void addAliasesToResolve(const std::set& aliases); + void updateSubscriptionInterest(const absl::flat_hash_set& cur_added, + const absl::flat_hash_set& cur_removed); + void addAliasesToResolve(const absl::flat_hash_set& aliases); // Whether there was a change in our subscription interest we have yet to inform the server of. bool subscriptionUpdatePending() const; @@ -98,8 +98,8 @@ class DeltaSubscriptionState : public Logger::Loggable { const bool supports_heartbeats_; TtlManager ttl_; // The keys of resource_versions_. Only tracked separately because std::map does not provide an - // iterator into just its keys, e.g. for use in std::set_difference. - std::set resource_names_; + // iterator into just its keys. + absl::flat_hash_set resource_names_; const std::string type_url_; UntypedConfigUpdateCallbacks& watch_map_; diff --git a/source/common/config/filesystem_subscription_impl.cc b/source/common/config/filesystem_subscription_impl.cc index 2e801b8609db..537fa8044260 100644 --- a/source/common/config/filesystem_subscription_impl.cc +++ b/source/common/config/filesystem_subscription_impl.cc @@ -27,13 +27,13 @@ FilesystemSubscriptionImpl::FilesystemSubscriptionImpl( } // Config::Subscription -void FilesystemSubscriptionImpl::start(const std::set&) { +void FilesystemSubscriptionImpl::start(const absl::flat_hash_set&) { started_ = true; // Attempt to read in case there is a file there already. refresh(); } -void FilesystemSubscriptionImpl::updateResourceInterest(const std::set&) { +void FilesystemSubscriptionImpl::updateResourceInterest(const absl::flat_hash_set&) { // Bump stats for consistent behavior with other xDS. stats_.update_attempt_.inc(); } diff --git a/source/common/config/filesystem_subscription_impl.h b/source/common/config/filesystem_subscription_impl.h index cc66b54ad627..1c8d9d041ccf 100644 --- a/source/common/config/filesystem_subscription_impl.h +++ b/source/common/config/filesystem_subscription_impl.h @@ -27,9 +27,9 @@ class FilesystemSubscriptionImpl : public Config::Subscription, // Config::Subscription // We report all discovered resources in the watched file, so the resource names arguments are // unused, and updateResourceInterest is a no-op (other than updating a stat). - void start(const std::set&) override; - void updateResourceInterest(const std::set&) override; - void requestOnDemandUpdate(const std::set&) override { + void start(const absl::flat_hash_set&) override; + void updateResourceInterest(const absl::flat_hash_set&) override; + void requestOnDemandUpdate(const absl::flat_hash_set&) override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; } diff --git a/source/common/config/grpc_mux_impl.cc b/source/common/config/grpc_mux_impl.cc index 2734867446fe..ffd1e31319ef 100644 --- a/source/common/config/grpc_mux_impl.cc +++ b/source/common/config/grpc_mux_impl.cc @@ -68,7 +68,7 @@ void GrpcMuxImpl::sendDiscoveryRequest(const std::string& type_url) { } GrpcMuxWatchPtr GrpcMuxImpl::addWatch(const std::string& type_url, - const std::set& resources, + const absl::flat_hash_set& resources, SubscriptionCallbacks& callbacks, OpaqueResourceDecoder& resource_decoder, const bool) { auto watch = diff --git a/source/common/config/grpc_mux_impl.h b/source/common/config/grpc_mux_impl.h index bd554891461d..4b46dbb67317 100644 --- a/source/common/config/grpc_mux_impl.h +++ b/source/common/config/grpc_mux_impl.h @@ -47,12 +47,13 @@ class GrpcMuxImpl : public GrpcMux, ScopedResume pause(const std::string& type_url) override; ScopedResume pause(const std::vector type_urls) override; - GrpcMuxWatchPtr addWatch(const std::string& type_url, const std::set& resources, + GrpcMuxWatchPtr addWatch(const std::string& type_url, + const absl::flat_hash_set& resources, SubscriptionCallbacks& callbacks, OpaqueResourceDecoder& resource_decoder, const bool use_namespace_matching = false) override; - void requestOnDemandUpdate(const std::string&, const std::set&) override { + void requestOnDemandUpdate(const std::string&, const absl::flat_hash_set&) override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; } @@ -80,11 +81,12 @@ class GrpcMuxImpl : public GrpcMux, void sendDiscoveryRequest(const std::string& type_url); struct GrpcMuxWatchImpl : public GrpcMuxWatch { - GrpcMuxWatchImpl(const std::set& resources, SubscriptionCallbacks& callbacks, - OpaqueResourceDecoder& resource_decoder, const std::string& type_url, - GrpcMuxImpl& parent) - : resources_(resources), callbacks_(callbacks), resource_decoder_(resource_decoder), - type_url_(type_url), parent_(parent), watches_(parent.apiStateFor(type_url).watches_) { + GrpcMuxWatchImpl(const absl::flat_hash_set& resources, + SubscriptionCallbacks& callbacks, OpaqueResourceDecoder& resource_decoder, + const std::string& type_url, GrpcMuxImpl& parent) + : callbacks_(callbacks), resource_decoder_(resource_decoder), type_url_(type_url), + parent_(parent), watches_(parent.apiStateFor(type_url).watches_) { + std::copy(resources.begin(), resources.end(), std::inserter(resources_, resources_.begin())); watches_.emplace(watches_.begin(), this); } @@ -95,17 +97,19 @@ class GrpcMuxImpl : public GrpcMux, } } - void update(const std::set& resources) override { + void update(const absl::flat_hash_set& resources) override { watches_.remove(this); if (!resources_.empty()) { parent_.queueDiscoveryRequest(type_url_); } - resources_ = resources; + resources_.clear(); + std::copy(resources.begin(), resources.end(), std::inserter(resources_, resources_.begin())); // move this watch to the beginning of the list watches_.emplace(watches_.begin(), this); parent_.queueDiscoveryRequest(type_url_); } + // Maintain deterministic wire ordering via ordered std::set. std::set resources_; SubscriptionCallbacks& callbacks_; OpaqueResourceDecoder& resource_decoder_; @@ -184,12 +188,12 @@ class NullGrpcMuxImpl : public GrpcMux, return std::make_unique([] {}); } - GrpcMuxWatchPtr addWatch(const std::string&, const std::set&, SubscriptionCallbacks&, - OpaqueResourceDecoder&, const bool) override { + GrpcMuxWatchPtr addWatch(const std::string&, const absl::flat_hash_set&, + SubscriptionCallbacks&, OpaqueResourceDecoder&, const bool) override { ExceptionUtil::throwEnvoyException("ADS must be configured to support an ADS config source"); } - void requestOnDemandUpdate(const std::string&, const std::set&) override { + void requestOnDemandUpdate(const std::string&, const absl::flat_hash_set&) override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; } diff --git a/source/common/config/grpc_subscription_impl.cc b/source/common/config/grpc_subscription_impl.cc index 87ed20fb4e45..263e1ca92b2d 100644 --- a/source/common/config/grpc_subscription_impl.cc +++ b/source/common/config/grpc_subscription_impl.cc @@ -25,7 +25,7 @@ GrpcSubscriptionImpl::GrpcSubscriptionImpl(GrpcMuxSharedPtr grpc_mux, use_namespace_matching_(use_namespace_matching) {} // Config::Subscription -void GrpcSubscriptionImpl::start(const std::set& resources) { +void GrpcSubscriptionImpl::start(const absl::flat_hash_set& resources) { if (init_fetch_timeout_.count() > 0) { init_fetch_timeout_timer_ = dispatcher_.createTimer([this]() -> void { callbacks_.onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::FetchTimedout, @@ -50,12 +50,13 @@ void GrpcSubscriptionImpl::start(const std::set& resources) { } void GrpcSubscriptionImpl::updateResourceInterest( - const std::set& update_to_these_names) { + const absl::flat_hash_set& update_to_these_names) { watch_->update(update_to_these_names); stats_.update_attempt_.inc(); } -void GrpcSubscriptionImpl::requestOnDemandUpdate(const std::set& for_update) { +void GrpcSubscriptionImpl::requestOnDemandUpdate( + const absl::flat_hash_set& for_update) { grpc_mux_->requestOnDemandUpdate(type_url_, for_update); stats_.update_attempt_.inc(); } @@ -137,7 +138,7 @@ GrpcCollectionSubscriptionImpl::GrpcCollectionSubscriptionImpl( init_fetch_timeout, is_aggregated, false), collection_locator_(collection_locator) {} -void GrpcCollectionSubscriptionImpl::start(const std::set& resource_names) { +void GrpcCollectionSubscriptionImpl::start(const absl::flat_hash_set& resource_names) { ASSERT(resource_names.empty()); GrpcSubscriptionImpl::start({XdsResourceIdentifier::encodeUrl(collection_locator_)}); } diff --git a/source/common/config/grpc_subscription_impl.h b/source/common/config/grpc_subscription_impl.h index 1c55177623e7..b595e191d914 100644 --- a/source/common/config/grpc_subscription_impl.h +++ b/source/common/config/grpc_subscription_impl.h @@ -27,9 +27,10 @@ class GrpcSubscriptionImpl : public Subscription, bool use_namespace_matching); // Config::Subscription - void start(const std::set& resource_names) override; - void updateResourceInterest(const std::set& update_to_these_names) override; - void requestOnDemandUpdate(const std::set& add_these_names) override; + void start(const absl::flat_hash_set& resource_names) override; + void + updateResourceInterest(const absl::flat_hash_set& update_to_these_names) override; + void requestOnDemandUpdate(const absl::flat_hash_set& add_these_names) override; // Config::SubscriptionCallbacks (all pass through to callbacks_!) void onConfigUpdate(const std::vector& resources, const std::string& version_info) override; @@ -71,7 +72,7 @@ class GrpcCollectionSubscriptionImpl : public GrpcSubscriptionImpl { Event::Dispatcher& dispatcher, std::chrono::milliseconds init_fetch_timeout, bool is_aggregated); - void start(const std::set& resource_names) override; + void start(const absl::flat_hash_set& resource_names) override; private: xds::core::v3::ResourceLocator collection_locator_; diff --git a/source/common/config/http_subscription_impl.cc b/source/common/config/http_subscription_impl.cc index 5e25aff9e935..054b4cc49946 100644 --- a/source/common/config/http_subscription_impl.cc +++ b/source/common/config/http_subscription_impl.cc @@ -43,7 +43,7 @@ HttpSubscriptionImpl::HttpSubscriptionImpl( } // Config::Subscription -void HttpSubscriptionImpl::start(const std::set& resource_names) { +void HttpSubscriptionImpl::start(const absl::flat_hash_set& resource_names) { if (init_fetch_timeout_.count() > 0) { init_fetch_timeout_timer_ = dispatcher_.createTimer([this]() -> void { handleFailure(Config::ConfigUpdateFailureReason::FetchTimedout, nullptr); @@ -53,14 +53,18 @@ void HttpSubscriptionImpl::start(const std::set& resource_names) { Protobuf::RepeatedPtrField resources_vector(resource_names.begin(), resource_names.end()); + // Sort to provide stable wire ordering. + std::sort(resources_vector.begin(), resources_vector.end()); request_.mutable_resource_names()->Swap(&resources_vector); initialize(); } void HttpSubscriptionImpl::updateResourceInterest( - const std::set& update_to_these_names) { + const absl::flat_hash_set& update_to_these_names) { Protobuf::RepeatedPtrField resources_vector(update_to_these_names.begin(), update_to_these_names.end()); + // Sort to provide stable wire ordering. + std::sort(resources_vector.begin(), resources_vector.end()); request_.mutable_resource_names()->Swap(&resources_vector); } diff --git a/source/common/config/http_subscription_impl.h b/source/common/config/http_subscription_impl.h index 9303b213ad9f..33263e0ef3eb 100644 --- a/source/common/config/http_subscription_impl.h +++ b/source/common/config/http_subscription_impl.h @@ -34,9 +34,10 @@ class HttpSubscriptionImpl : public Http::RestApiFetcher, ProtobufMessage::ValidationVisitor& validation_visitor); // Config::Subscription - void start(const std::set& resource_names) override; - void updateResourceInterest(const std::set& update_to_these_names) override; - void requestOnDemandUpdate(const std::set&) override { + void start(const absl::flat_hash_set& resource_names) override; + void + updateResourceInterest(const absl::flat_hash_set& update_to_these_names) override; + void requestOnDemandUpdate(const absl::flat_hash_set&) override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; } diff --git a/source/common/config/new_grpc_mux_impl.cc b/source/common/config/new_grpc_mux_impl.cc index 4a53b329e36a..0ace0efe045d 100644 --- a/source/common/config/new_grpc_mux_impl.cc +++ b/source/common/config/new_grpc_mux_impl.cc @@ -130,7 +130,7 @@ void NewGrpcMuxImpl::kickOffAck(UpdateAck ack) { void NewGrpcMuxImpl::start() { grpc_stream_.establishNewStream(); } GrpcMuxWatchPtr NewGrpcMuxImpl::addWatch(const std::string& type_url, - const std::set& resources, + const absl::flat_hash_set& resources, SubscriptionCallbacks& callbacks, OpaqueResourceDecoder& resource_decoder, const bool use_namespace_matching) { @@ -154,14 +154,14 @@ GrpcMuxWatchPtr NewGrpcMuxImpl::addWatch(const std::string& type_url, // the whole subscription, or if a removed name has no other watch interested in it, then the // subscription will enqueue and attempt to send an appropriate discovery request. void NewGrpcMuxImpl::updateWatch(const std::string& type_url, Watch* watch, - const std::set& resources, + const absl::flat_hash_set& resources, const bool creating_namespace_watch) { ASSERT(watch != nullptr); auto sub = subscriptions_.find(type_url); RELEASE_ASSERT(sub != subscriptions_.end(), fmt::format("Watch of {} has no subscription to update.", type_url)); // If this is a glob collection subscription, we need to compute actual context parameters. - std::set xdstp_resources; + absl::flat_hash_set xdstp_resources; // TODO(htuch): add support for resources beyond glob collections, the constraints below around // resource size and ID reflect the progress of the xdstp:// implementation. if (!resources.empty() && XdsResourceIdentifier::hasXdsTpScheme(*resources.begin())) { @@ -198,7 +198,7 @@ void NewGrpcMuxImpl::updateWatch(const std::string& type_url, Watch* watch, } void NewGrpcMuxImpl::requestOnDemandUpdate(const std::string& type_url, - const std::set& for_update) { + const absl::flat_hash_set& for_update) { auto sub = subscriptions_.find(type_url); RELEASE_ASSERT(sub != subscriptions_.end(), fmt::format("Watch of {} has no subscription to update.", type_url)); diff --git a/source/common/config/new_grpc_mux_impl.h b/source/common/config/new_grpc_mux_impl.h index 98ce6002489b..392be536fe2c 100644 --- a/source/common/config/new_grpc_mux_impl.h +++ b/source/common/config/new_grpc_mux_impl.h @@ -38,13 +38,14 @@ class NewGrpcMuxImpl const RateLimitSettings& rate_limit_settings, const LocalInfo::LocalInfo& local_info); - GrpcMuxWatchPtr addWatch(const std::string& type_url, const std::set& resources, + GrpcMuxWatchPtr addWatch(const std::string& type_url, + const absl::flat_hash_set& resources, SubscriptionCallbacks& callbacks, OpaqueResourceDecoder& resource_decoder, const bool use_namespace_matching = false) override; void requestOnDemandUpdate(const std::string& type_url, - const std::set& for_update) override; + const absl::flat_hash_set& for_update) override; ScopedResume pause(const std::string& type_url) override; ScopedResume pause(const std::vector type_urls) override; @@ -101,7 +102,7 @@ class NewGrpcMuxImpl } } - void update(const std::set& resources) override { + void update(const absl::flat_hash_set& resources) override { parent_.updateWatch(type_url_, watch_, resources); } @@ -117,7 +118,8 @@ class NewGrpcMuxImpl // the whole subscription, or if a removed name has no other watch interested in it, then the // subscription will enqueue and attempt to send an appropriate discovery request. void updateWatch(const std::string& type_url, Watch* watch, - const std::set& resources, bool creating_namespace_watch = false); + const absl::flat_hash_set& resources, + bool creating_namespace_watch = false); void addSubscription(const std::string& type_url, const bool use_namespace_matching); diff --git a/source/common/config/watch_map.cc b/source/common/config/watch_map.cc index 2bba3c07173b..78934f23fec2 100644 --- a/source/common/config/watch_map.cc +++ b/source/common/config/watch_map.cc @@ -3,6 +3,7 @@ #include "envoy/service/discovery/v3/discovery.pb.h" #include "common/common/cleanup.h" +#include "common/common/utility.h" #include "common/config/decoded_resource_impl.h" #include "common/config/xds_resource.h" @@ -61,23 +62,20 @@ void WatchMap::removeDeferredWatches() { deferred_removed_during_update_ = nullptr; } -AddedRemoved WatchMap::updateWatchInterest(Watch* watch, - const std::set& update_to_these_names) { +AddedRemoved +WatchMap::updateWatchInterest(Watch* watch, + const absl::flat_hash_set& update_to_these_names) { if (update_to_these_names.empty()) { wildcard_watches_.insert(watch); } else { wildcard_watches_.erase(watch); } - std::vector newly_added_to_watch; - std::set_difference(update_to_these_names.begin(), update_to_these_names.end(), - watch->resource_names_.begin(), watch->resource_names_.end(), - std::inserter(newly_added_to_watch, newly_added_to_watch.begin())); + absl::flat_hash_set newly_added_to_watch; + SetUtil::setDifference(update_to_these_names, watch->resource_names_, newly_added_to_watch); - std::vector newly_removed_from_watch; - std::set_difference(watch->resource_names_.begin(), watch->resource_names_.end(), - update_to_these_names.begin(), update_to_these_names.end(), - std::inserter(newly_removed_from_watch, newly_removed_from_watch.begin())); + absl::flat_hash_set newly_removed_from_watch; + SetUtil::setDifference(watch->resource_names_, update_to_these_names, newly_removed_from_watch); watch->resource_names_ = update_to_these_names; @@ -226,9 +224,10 @@ void WatchMap::onConfigUpdateFailed(ConfigUpdateFailureReason reason, const Envo } } -std::set WatchMap::findAdditions(const std::vector& newly_added_to_watch, - Watch* watch) { - std::set newly_added_to_subscription; +absl::flat_hash_set +WatchMap::findAdditions(const absl::flat_hash_set& newly_added_to_watch, + Watch* watch) { + absl::flat_hash_set newly_added_to_subscription; for (const auto& name : newly_added_to_watch) { auto entry = watch_interest_.find(name); if (entry == watch_interest_.end()) { @@ -242,9 +241,10 @@ std::set WatchMap::findAdditions(const std::vector& ne return newly_added_to_subscription; } -std::set -WatchMap::findRemovals(const std::vector& newly_removed_from_watch, Watch* watch) { - std::set newly_removed_from_subscription; +absl::flat_hash_set +WatchMap::findRemovals(const absl::flat_hash_set& newly_removed_from_watch, + Watch* watch) { + absl::flat_hash_set newly_removed_from_subscription; for (const auto& name : newly_removed_from_watch) { auto entry = watch_interest_.find(name); RELEASE_ASSERT( diff --git a/source/common/config/watch_map.h b/source/common/config/watch_map.h index d0d9e822b28e..768117e93be6 100644 --- a/source/common/config/watch_map.h +++ b/source/common/config/watch_map.h @@ -17,10 +17,10 @@ namespace Envoy { namespace Config { struct AddedRemoved { - AddedRemoved(std::set&& added, std::set&& removed) + AddedRemoved(absl::flat_hash_set&& added, absl::flat_hash_set&& removed) : added_(std::move(added)), removed_(std::move(removed)) {} - std::set added_; - std::set removed_; + absl::flat_hash_set added_; + absl::flat_hash_set removed_; }; struct Watch { @@ -28,7 +28,7 @@ struct Watch { : callbacks_(callbacks), resource_decoder_(resource_decoder) {} SubscriptionCallbacks& callbacks_; OpaqueResourceDecoder& resource_decoder_; - std::set resource_names_; // must be sorted set, for set_difference. + absl::flat_hash_set resource_names_; // Needed only for state-of-the-world. // Whether the most recent update contained any resources this watch cares about. // If true, a new update that also contains no resources can skip this watch. @@ -73,7 +73,7 @@ class WatchMap : public UntypedConfigUpdateCallbacks, public Logger::Loggable& update_to_these_names); + const absl::flat_hash_set& update_to_these_names); // Expects that the watch to be removed has already had all of its resource names removed via // updateWatchInterest(). @@ -96,13 +96,13 @@ class WatchMap : public UntypedConfigUpdateCallbacks, public Logger::Loggable findAdditions(const std::vector& newly_added_to_watch, - Watch* watch); + absl::flat_hash_set + findAdditions(const absl::flat_hash_set& newly_added_to_watch, Watch* watch); // Given a list of names that an individual watch no longer cares about, returns those names that // in fact the entire subscription no longer cares about. - std::set findRemovals(const std::vector& newly_removed_from_watch, - Watch* watch); + absl::flat_hash_set + findRemovals(const absl::flat_hash_set& newly_removed_from_watch, Watch* watch); // Returns the union of watch_interest_[resource_name] and wildcard_watches_. absl::flat_hash_set watchesInterestedIn(const std::string& resource_name); diff --git a/test/common/common/utility_test.cc b/test/common/common/utility_test.cc index 6f4f8a2a628b..48b82909bfce 100644 --- a/test/common/common/utility_test.cc +++ b/test/common/common/utility_test.cc @@ -20,6 +20,8 @@ #include "gtest/gtest.h" using testing::ContainerEq; +using testing::ElementsAre; +using testing::WhenSorted; #ifdef WIN32 using testing::HasSubstr; using testing::Not; @@ -977,4 +979,27 @@ TEST(ErrorDetailsTest, WindowsFormatMessage) { } #endif +TEST(SetUtil, All) { + { + absl::flat_hash_set result; + SetUtil::setDifference({1, 2, 3}, {1, 3}, result); + EXPECT_THAT(result, WhenSorted(ElementsAre(2))); + } + { + absl::flat_hash_set result; + SetUtil::setDifference({1, 2, 3}, {4, 5}, result); + EXPECT_THAT(result, WhenSorted(ElementsAre(1, 2, 3))); + } + { + absl::flat_hash_set result; + SetUtil::setDifference({}, {4, 5}, result); + EXPECT_THAT(result, WhenSorted(ElementsAre())); + } + { + absl::flat_hash_set result; + SetUtil::setDifference({1, 2, 3}, {}, result); + EXPECT_THAT(result, WhenSorted(ElementsAre(1, 2, 3))); + } +} + } // namespace Envoy diff --git a/test/common/config/BUILD b/test/common/config/BUILD index 757620e15ac5..56b8e4b5ac98 100644 --- a/test/common/config/BUILD +++ b/test/common/config/BUILD @@ -229,6 +229,7 @@ envoy_cc_test_library( hdrs = ["delta_subscription_test_harness.h"], deps = [ ":subscription_test_harness", + "//source/common/common:utility_lib", "//source/common/config:new_grpc_mux_lib", "//source/common/config:version_converter_lib", "//source/common/grpc:common_lib", diff --git a/test/common/config/delta_subscription_test_harness.h b/test/common/config/delta_subscription_test_harness.h index c4179e43420c..2e9766c1c196 100644 --- a/test/common/config/delta_subscription_test_harness.h +++ b/test/common/config/delta_subscription_test_harness.h @@ -78,7 +78,7 @@ class DeltaSubscriptionTestHarness : public SubscriptionTestHarness { subscription_started_ = true; last_cluster_names_ = cluster_names; expectSendMessage(last_cluster_names_, ""); - subscription_->start(cluster_names); + subscription_->start(flattenResources(cluster_names)); } void expectSendMessage(const std::set& cluster_names, const std::string& version, @@ -173,7 +173,7 @@ class DeltaSubscriptionTestHarness : public SubscriptionTestHarness { std::inserter(unsub, unsub.begin())); expectSendMessage(sub, unsub, Grpc::Status::WellKnownGrpcStatus::Ok, "", {}); - subscription_->updateResourceInterest(cluster_names); + subscription_->updateResourceInterest(flattenResources(cluster_names)); last_cluster_names_ = cluster_names; } diff --git a/test/common/config/filesystem_subscription_test_harness.h b/test/common/config/filesystem_subscription_test_harness.h index d19da3bbed6f..9c372725207e 100644 --- a/test/common/config/filesystem_subscription_test_harness.h +++ b/test/common/config/filesystem_subscription_test_harness.h @@ -54,11 +54,11 @@ class FilesystemSubscriptionTestHarness : public SubscriptionTestHarness { void startSubscription(const std::set& cluster_names) override { std::ifstream config_file(path_); file_at_start_ = config_file.good(); - subscription_.start(cluster_names); + subscription_.start(flattenResources(cluster_names)); } void updateResourceInterest(const std::set& cluster_names) override { - subscription_.updateResourceInterest(cluster_names); + subscription_.updateResourceInterest(flattenResources(cluster_names)); } void updateFile(const std::string& json, bool run_dispatcher = true) { diff --git a/test/common/config/grpc_subscription_test_harness.h b/test/common/config/grpc_subscription_test_harness.h index f0cc781000bf..b500c8aaf1c1 100644 --- a/test/common/config/grpc_subscription_test_harness.h +++ b/test/common/config/grpc_subscription_test_harness.h @@ -98,7 +98,7 @@ class GrpcSubscriptionTestHarness : public SubscriptionTestHarness { EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); last_cluster_names_ = cluster_names; expectSendMessage(last_cluster_names_, "", true); - subscription_->start(cluster_names); + subscription_->start(flattenResources(cluster_names)); } void deliverConfigUpdate(const std::vector& cluster_names, @@ -154,7 +154,7 @@ class GrpcSubscriptionTestHarness : public SubscriptionTestHarness { } expectSendMessage(both, version_); expectSendMessage(cluster_names, version_); - subscription_->updateResourceInterest(cluster_names); + subscription_->updateResourceInterest(flattenResources(cluster_names)); last_cluster_names_ = cluster_names; } diff --git a/test/common/config/http_subscription_test_harness.h b/test/common/config/http_subscription_test_harness.h index 91ab6905a8b8..cdb0266c5a92 100644 --- a/test/common/config/http_subscription_test_harness.h +++ b/test/common/config/http_subscription_test_harness.h @@ -111,13 +111,13 @@ class HttpSubscriptionTestHarness : public SubscriptionTestHarness { version_ = ""; cluster_names_ = cluster_names; expectSendMessage(cluster_names, ""); - subscription_->start(cluster_names); + subscription_->start(flattenResources(cluster_names)); } void updateResourceInterest(const std::set& cluster_names) override { cluster_names_ = cluster_names; expectSendMessage(cluster_names, version_); - subscription_->updateResourceInterest(cluster_names); + subscription_->updateResourceInterest(flattenResources(cluster_names)); timer_cb_(); } diff --git a/test/common/config/subscription_test_harness.h b/test/common/config/subscription_test_harness.h index 57342a11af92..5b0fe24a49d2 100644 --- a/test/common/config/subscription_test_harness.h +++ b/test/common/config/subscription_test_harness.h @@ -109,6 +109,14 @@ class SubscriptionTestHarness : public Event::TestUsingSimulatedTime { virtual void doSubscriptionTearDown() {} + // Helper util to convert to absl::flat_hash_set when calling Subscription interface methods. + absl::flat_hash_set flattenResources(const std::set& resources) { + absl::flat_hash_set flat_resources; + std::copy(resources.begin(), resources.end(), + std::inserter(flat_resources, flat_resources.begin())); + return flat_resources; + } + Stats::TestUtil::TestStore stats_store_; SubscriptionStats stats_; ControlPlaneStats control_plane_stats_; diff --git a/test/common/config/watch_map_test.cc b/test/common/config/watch_map_test.cc index b6b7e22a974c..34ae11c16e3c 100644 --- a/test/common/config/watch_map_test.cc +++ b/test/common/config/watch_map_test.cc @@ -136,7 +136,7 @@ TEST(WatchMapTest, Basic) { } { // The watch is interested in Alice and Bob... - std::set update_to({"alice", "bob"}); + absl::flat_hash_set update_to({"alice", "bob"}); AddedRemoved added_removed = watch_map.updateWatchInterest(watch, update_to); EXPECT_EQ(update_to, added_removed.added_); EXPECT_TRUE(added_removed.removed_.empty()); @@ -159,10 +159,10 @@ TEST(WatchMapTest, Basic) { } { // The watch is now interested in Bob, Carol, Dave, Eve... - std::set update_to({"bob", "carol", "dave", "eve"}); + absl::flat_hash_set update_to({"bob", "carol", "dave", "eve"}); AddedRemoved added_removed = watch_map.updateWatchInterest(watch, update_to); - EXPECT_EQ(std::set({"carol", "dave", "eve"}), added_removed.added_); - EXPECT_EQ(std::set({"alice"}), added_removed.removed_); + EXPECT_EQ(absl::flat_hash_set({"carol", "dave", "eve"}), added_removed.added_); + EXPECT_EQ(absl::flat_hash_set({"alice"}), added_removed.removed_); // ...the update is going to contain Alice, Carol, Dave... Protobuf::RepeatedPtrField updated_resources; @@ -209,7 +209,7 @@ TEST(WatchMapTest, Overlap) { // First watch becomes interested. { - std::set update_to({"alice", "dummy"}); + absl::flat_hash_set update_to({"alice", "dummy"}); AddedRemoved added_removed = watch_map.updateWatchInterest(watch1, update_to); EXPECT_EQ(update_to, added_removed.added_); // add to subscription EXPECT_TRUE(added_removed.removed_.empty()); @@ -222,7 +222,7 @@ TEST(WatchMapTest, Overlap) { } // Second watch becomes interested. { - std::set update_to({"alice", "dummy"}); + absl::flat_hash_set update_to({"alice", "dummy"}); AddedRemoved added_removed = watch_map.updateWatchInterest(watch2, update_to); EXPECT_TRUE(added_removed.added_.empty()); // nothing happens EXPECT_TRUE(added_removed.removed_.empty()); @@ -251,7 +251,8 @@ TEST(WatchMapTest, Overlap) { { AddedRemoved added_removed = watch_map.updateWatchInterest(watch2, {"dummy"}); EXPECT_TRUE(added_removed.added_.empty()); - EXPECT_EQ(std::set({"alice"}), added_removed.removed_); // remove from subscription + EXPECT_EQ(absl::flat_hash_set({"alice"}), + added_removed.removed_); // remove from subscription } } @@ -348,7 +349,7 @@ TEST(WatchMapTest, AddRemoveAdd) { // First watch becomes interested. { - std::set update_to({"alice", "dummy"}); + absl::flat_hash_set update_to({"alice", "dummy"}); AddedRemoved added_removed = watch_map.updateWatchInterest(watch1, update_to); EXPECT_EQ(update_to, added_removed.added_); // add to subscription EXPECT_TRUE(added_removed.removed_.empty()); @@ -363,7 +364,7 @@ TEST(WatchMapTest, AddRemoveAdd) { { AddedRemoved added_removed = watch_map.updateWatchInterest(watch1, {"dummy"}); EXPECT_TRUE(added_removed.added_.empty()); - EXPECT_EQ(std::set({"alice"}), + EXPECT_EQ(absl::flat_hash_set({"alice"}), added_removed.removed_); // remove from subscription // (The xDS client should have responded to updateWatchInterest()'s return value by removing @@ -371,9 +372,10 @@ TEST(WatchMapTest, AddRemoveAdd) { } // Second watch becomes interested. { - std::set update_to({"alice", "dummy"}); + absl::flat_hash_set update_to({"alice", "dummy"}); AddedRemoved added_removed = watch_map.updateWatchInterest(watch2, update_to); - EXPECT_EQ(std::set({"alice"}), added_removed.added_); // add to subscription + EXPECT_EQ(absl::flat_hash_set({"alice"}), + added_removed.added_); // add to subscription EXPECT_TRUE(added_removed.removed_.empty()); // Both watches receive the update. For watch2, this is obviously desired. diff --git a/test/common/router/scoped_rds_test.cc b/test/common/router/scoped_rds_test.cc index 4d0a9e4a85c9..7ef4868b25af 100644 --- a/test/common/router/scoped_rds_test.cc +++ b/test/common/router/scoped_rds_test.cc @@ -134,22 +134,22 @@ class ScopedRdsTest : public ScopedRoutesTestBase { API_NO_BOOST(envoy::api::v2::RouteConfiguration)().GetDescriptor()->full_name())), _, _, _, _)) .Times(AnyNumber()) - .WillRepeatedly(Invoke([this](const envoy::config::core::v3::ConfigSource&, - absl::string_view, Stats::Scope&, - Envoy::Config::SubscriptionCallbacks& callbacks, - Envoy::Config::OpaqueResourceDecoder&, bool) { - auto ret = std::make_unique>(); - rds_subscription_by_config_subscription_[ret.get()] = &callbacks; - EXPECT_CALL(*ret, start(_)) - .WillOnce(Invoke( - [this, config_sub_addr = ret.get()](const std::set& resource_names) { + .WillRepeatedly( + Invoke([this](const envoy::config::core::v3::ConfigSource&, absl::string_view, + Stats::Scope&, Envoy::Config::SubscriptionCallbacks& callbacks, + Envoy::Config::OpaqueResourceDecoder&, bool) { + auto ret = std::make_unique>(); + rds_subscription_by_config_subscription_[ret.get()] = &callbacks; + EXPECT_CALL(*ret, start(_)) + .WillOnce(Invoke([this, config_sub_addr = ret.get()]( + const absl::flat_hash_set& resource_names) { EXPECT_EQ(resource_names.size(), 1); auto iter = rds_subscription_by_config_subscription_.find(config_sub_addr); EXPECT_NE(iter, rds_subscription_by_config_subscription_.end()); rds_subscription_by_name_[*resource_names.begin()] = iter->second; })); - return ret; - })); + return ret; + })); ON_CALL(context_init_manager_, add(_)).WillByDefault(Invoke([this](const Init::Target& target) { target_handles_.push_back(target.createHandle("test")); diff --git a/test/mocks/config/mocks.h b/test/mocks/config/mocks.h index cc4c36df2b7c..80e2eed8b505 100644 --- a/test/mocks/config/mocks.h +++ b/test/mocks/config/mocks.h @@ -62,9 +62,11 @@ class MockUntypedConfigUpdateCallbacks : public UntypedConfigUpdateCallbacks { class MockSubscription : public Subscription { public: - MOCK_METHOD(void, start, (const std::set& resources)); - MOCK_METHOD(void, updateResourceInterest, (const std::set& update_to_these_names)); - MOCK_METHOD(void, requestOnDemandUpdate, (const std::set& add_these_names)); + MOCK_METHOD(void, start, (const absl::flat_hash_set& resources)); + MOCK_METHOD(void, updateResourceInterest, + (const absl::flat_hash_set& update_to_these_names)); + MOCK_METHOD(void, requestOnDemandUpdate, + (const absl::flat_hash_set& add_these_names)); }; class MockSubscriptionFactory : public SubscriptionFactory { @@ -105,19 +107,20 @@ class MockGrpcMux : public GrpcMux { MOCK_METHOD(ScopedResume, pause, (const std::vector type_urls), (override)); MOCK_METHOD(void, addSubscription, - (const std::set& resources, const std::string& type_url, + (const absl::flat_hash_set& resources, const std::string& type_url, SubscriptionCallbacks& callbacks, SubscriptionStats& stats, std::chrono::milliseconds init_fetch_timeout)); MOCK_METHOD(void, updateResourceInterest, - (const std::set& resources, const std::string& type_url)); + (const absl::flat_hash_set& resources, const std::string& type_url)); MOCK_METHOD(GrpcMuxWatchPtr, addWatch, - (const std::string& type_url, const std::set& resources, + (const std::string& type_url, const absl::flat_hash_set& resources, SubscriptionCallbacks& callbacks, OpaqueResourceDecoder& resource_decoder, const bool use_prefix_matching)); MOCK_METHOD(void, requestOnDemandUpdate, - (const std::string& type_url, const std::set& add_these_names)); + (const std::string& type_url, + const absl::flat_hash_set& add_these_names)); }; class MockGrpcStreamCallbacks