Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

config: switch from std::set to absl::flat_hash_set for resource names. #14739

Merged
merged 1 commit into from
Jan 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions include/envoy/config/grpc_mux.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class GrpcMuxWatch {
* Updates the set of resources that the watch is interested in.
* @param resources set of resource names to watch for
*/
virtual void update(const std::set<std::string>& resources) PURE;
virtual void update(const absl::flat_hash_set<std::string>& resources) PURE;
};

using GrpcMuxWatchPtr = std::unique_ptr<GrpcMuxWatch>;
Expand Down Expand Up @@ -100,13 +100,13 @@ class GrpcMux {
* away, its EDS updates should be cancelled by destroying the GrpcMuxWatchPtr.
*/
virtual GrpcMuxWatchPtr addWatch(const std::string& type_url,
const std::set<std::string>& resources,
const absl::flat_hash_set<std::string>& resources,
SubscriptionCallbacks& callbacks,
OpaqueResourceDecoder& resource_decoder,
const bool use_namespace_matching) PURE;

virtual void requestOnDemandUpdate(const std::string& type_url,
const std::set<std::string>& for_update) PURE;
const absl::flat_hash_set<std::string>& for_update) PURE;

using TypeUrlMap = absl::flat_hash_map<std::string, std::string>;
static TypeUrlMap& typeUrlMap() { MUTABLE_CONSTRUCT_ON_FIRST_USE(TypeUrlMap, {}); }
Expand Down
10 changes: 5 additions & 5 deletions include/envoy/config/subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,20 +180,20 @@ class Subscription {
* to fetch throughout the lifetime of the Subscription object.
* @param resources set of resource names to fetch.
*/
virtual void start(const std::set<std::string>& resource_names) PURE;
virtual void start(const absl::flat_hash_set<std::string>& resource_names) PURE;

/**
* Update the resources to fetch.
* @param resources vector of resource names to fetch. It's a (not unordered_)set so that it can
* be passed to std::set_difference, which must be given sorted collections.
* @param resources vector of resource names to fetch.
*/
virtual void updateResourceInterest(const std::set<std::string>& update_to_these_names) PURE;
virtual void
updateResourceInterest(const absl::flat_hash_set<std::string>& update_to_these_names) PURE;

/**
* Creates a discovery request for resources.
* @param add_these_names resource ids for inclusion in the discovery request.
*/
virtual void requestOnDemandUpdate(const std::set<std::string>& add_these_names) PURE;
virtual void requestOnDemandUpdate(const absl::flat_hash_set<std::string>& add_these_names) PURE;
};

using SubscriptionPtr = std::unique_ptr<Subscription>;
Expand Down
13 changes: 13 additions & 0 deletions source/common/common/utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -752,4 +752,17 @@ class InlineString : public InlineStorage {
char data_[];
};

class SetUtil {
public:
// Use instead of std::set_difference for unordered absl::flat_hash_set containers.
template <typename T>
static void setDifference(const absl::flat_hash_set<T>& original_set,
const absl::flat_hash_set<T>& remove_set,
absl::flat_hash_set<T>& result_set) {
std::copy_if(original_set.begin(), original_set.end(),
std::inserter(result_set, result_set.begin()),
[&remove_set](const T& v) -> bool { return remove_set.count(v) == 0; });
}
};

} // namespace Envoy
1 change: 1 addition & 0 deletions source/common/config/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ envoy_cc_library(
"//source/common/common:assert_lib",
"//source/common/common:cleanup_lib",
"//source/common/common:minimal_logger_lib",
"//source/common/common:utility_lib",
"//source/common/protobuf",
"@envoy_api//envoy/service/discovery/v3:pkg_cc_proto",
],
Expand Down
5 changes: 3 additions & 2 deletions source/common/config/delta_subscription_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ DeltaSubscriptionState::DeltaSubscriptionState(std::string type_url,
type_url_(std::move(type_url)), watch_map_(watch_map), local_info_(local_info),
dispatcher_(dispatcher) {}

void DeltaSubscriptionState::updateSubscriptionInterest(const std::set<std::string>& cur_added,
const std::set<std::string>& cur_removed) {
void DeltaSubscriptionState::updateSubscriptionInterest(
const absl::flat_hash_set<std::string>& cur_added,
const absl::flat_hash_set<std::string>& cur_removed) {
for (const auto& a : cur_added) {
setResourceWaitingForServer(a);
// If interest in a resource is removed-then-added (all before a discovery request
Expand Down
10 changes: 5 additions & 5 deletions source/common/config/delta_subscription_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ class DeltaSubscriptionState : public Logger::Loggable<Logger::Id::config> {
const LocalInfo::LocalInfo& local_info, Event::Dispatcher& dispatcher);

// Update which resources we're interested in subscribing to.
void updateSubscriptionInterest(const std::set<std::string>& cur_added,
const std::set<std::string>& cur_removed);
void addAliasesToResolve(const std::set<std::string>& aliases);
void updateSubscriptionInterest(const absl::flat_hash_set<std::string>& cur_added,
const absl::flat_hash_set<std::string>& cur_removed);
void addAliasesToResolve(const absl::flat_hash_set<std::string>& aliases);

// Whether there was a change in our subscription interest we have yet to inform the server of.
bool subscriptionUpdatePending() const;
Expand Down Expand Up @@ -98,8 +98,8 @@ class DeltaSubscriptionState : public Logger::Loggable<Logger::Id::config> {
const bool supports_heartbeats_;
TtlManager ttl_;
// The keys of resource_versions_. Only tracked separately because std::map does not provide an
// iterator into just its keys, e.g. for use in std::set_difference.
std::set<std::string> resource_names_;
// iterator into just its keys.
absl::flat_hash_set<std::string> resource_names_;

const std::string type_url_;
UntypedConfigUpdateCallbacks& watch_map_;
Expand Down
4 changes: 2 additions & 2 deletions source/common/config/filesystem_subscription_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ FilesystemSubscriptionImpl::FilesystemSubscriptionImpl(
}

// Config::Subscription
void FilesystemSubscriptionImpl::start(const std::set<std::string>&) {
void FilesystemSubscriptionImpl::start(const absl::flat_hash_set<std::string>&) {
started_ = true;
// Attempt to read in case there is a file there already.
refresh();
}

void FilesystemSubscriptionImpl::updateResourceInterest(const std::set<std::string>&) {
void FilesystemSubscriptionImpl::updateResourceInterest(const absl::flat_hash_set<std::string>&) {
// Bump stats for consistent behavior with other xDS.
stats_.update_attempt_.inc();
}
Expand Down
6 changes: 3 additions & 3 deletions source/common/config/filesystem_subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ class FilesystemSubscriptionImpl : public Config::Subscription,
// Config::Subscription
// We report all discovered resources in the watched file, so the resource names arguments are
// unused, and updateResourceInterest is a no-op (other than updating a stat).
void start(const std::set<std::string>&) override;
void updateResourceInterest(const std::set<std::string>&) override;
void requestOnDemandUpdate(const std::set<std::string>&) override {
void start(const absl::flat_hash_set<std::string>&) override;
void updateResourceInterest(const absl::flat_hash_set<std::string>&) override;
void requestOnDemandUpdate(const absl::flat_hash_set<std::string>&) override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}

Expand Down
2 changes: 1 addition & 1 deletion source/common/config/grpc_mux_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ void GrpcMuxImpl::sendDiscoveryRequest(const std::string& type_url) {
}

GrpcMuxWatchPtr GrpcMuxImpl::addWatch(const std::string& type_url,
const std::set<std::string>& resources,
const absl::flat_hash_set<std::string>& resources,
SubscriptionCallbacks& callbacks,
OpaqueResourceDecoder& resource_decoder, const bool) {
auto watch =
Expand Down
28 changes: 16 additions & 12 deletions source/common/config/grpc_mux_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,13 @@ class GrpcMuxImpl : public GrpcMux,
ScopedResume pause(const std::string& type_url) override;
ScopedResume pause(const std::vector<std::string> type_urls) override;

GrpcMuxWatchPtr addWatch(const std::string& type_url, const std::set<std::string>& resources,
GrpcMuxWatchPtr addWatch(const std::string& type_url,
const absl::flat_hash_set<std::string>& resources,
SubscriptionCallbacks& callbacks,
OpaqueResourceDecoder& resource_decoder,
const bool use_namespace_matching = false) override;

void requestOnDemandUpdate(const std::string&, const std::set<std::string>&) override {
void requestOnDemandUpdate(const std::string&, const absl::flat_hash_set<std::string>&) override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}

Expand Down Expand Up @@ -80,11 +81,12 @@ class GrpcMuxImpl : public GrpcMux,
void sendDiscoveryRequest(const std::string& type_url);

struct GrpcMuxWatchImpl : public GrpcMuxWatch {
GrpcMuxWatchImpl(const std::set<std::string>& resources, SubscriptionCallbacks& callbacks,
OpaqueResourceDecoder& resource_decoder, const std::string& type_url,
GrpcMuxImpl& parent)
: resources_(resources), callbacks_(callbacks), resource_decoder_(resource_decoder),
type_url_(type_url), parent_(parent), watches_(parent.apiStateFor(type_url).watches_) {
GrpcMuxWatchImpl(const absl::flat_hash_set<std::string>& resources,
SubscriptionCallbacks& callbacks, OpaqueResourceDecoder& resource_decoder,
const std::string& type_url, GrpcMuxImpl& parent)
: callbacks_(callbacks), resource_decoder_(resource_decoder), type_url_(type_url),
parent_(parent), watches_(parent.apiStateFor(type_url).watches_) {
std::copy(resources.begin(), resources.end(), std::inserter(resources_, resources_.begin()));
watches_.emplace(watches_.begin(), this);
}

Expand All @@ -95,17 +97,19 @@ class GrpcMuxImpl : public GrpcMux,
}
}

void update(const std::set<std::string>& resources) override {
void update(const absl::flat_hash_set<std::string>& resources) override {
watches_.remove(this);
if (!resources_.empty()) {
parent_.queueDiscoveryRequest(type_url_);
}
resources_ = resources;
resources_.clear();
std::copy(resources.begin(), resources.end(), std::inserter(resources_, resources_.begin()));
// move this watch to the beginning of the list
watches_.emplace(watches_.begin(), this);
parent_.queueDiscoveryRequest(type_url_);
}

// Maintain deterministic wire ordering via ordered std::set.
std::set<std::string> resources_;
SubscriptionCallbacks& callbacks_;
OpaqueResourceDecoder& resource_decoder_;
Expand Down Expand Up @@ -184,12 +188,12 @@ class NullGrpcMuxImpl : public GrpcMux,
return std::make_unique<Cleanup>([] {});
}

GrpcMuxWatchPtr addWatch(const std::string&, const std::set<std::string>&, SubscriptionCallbacks&,
OpaqueResourceDecoder&, const bool) override {
GrpcMuxWatchPtr addWatch(const std::string&, const absl::flat_hash_set<std::string>&,
SubscriptionCallbacks&, OpaqueResourceDecoder&, const bool) override {
ExceptionUtil::throwEnvoyException("ADS must be configured to support an ADS config source");
}

void requestOnDemandUpdate(const std::string&, const std::set<std::string>&) override {
void requestOnDemandUpdate(const std::string&, const absl::flat_hash_set<std::string>&) override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}

Expand Down
9 changes: 5 additions & 4 deletions source/common/config/grpc_subscription_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ GrpcSubscriptionImpl::GrpcSubscriptionImpl(GrpcMuxSharedPtr grpc_mux,
use_namespace_matching_(use_namespace_matching) {}

// Config::Subscription
void GrpcSubscriptionImpl::start(const std::set<std::string>& resources) {
void GrpcSubscriptionImpl::start(const absl::flat_hash_set<std::string>& resources) {
if (init_fetch_timeout_.count() > 0) {
init_fetch_timeout_timer_ = dispatcher_.createTimer([this]() -> void {
callbacks_.onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::FetchTimedout,
Expand All @@ -50,12 +50,13 @@ void GrpcSubscriptionImpl::start(const std::set<std::string>& resources) {
}

void GrpcSubscriptionImpl::updateResourceInterest(
const std::set<std::string>& update_to_these_names) {
const absl::flat_hash_set<std::string>& update_to_these_names) {
watch_->update(update_to_these_names);
stats_.update_attempt_.inc();
}

void GrpcSubscriptionImpl::requestOnDemandUpdate(const std::set<std::string>& for_update) {
void GrpcSubscriptionImpl::requestOnDemandUpdate(
const absl::flat_hash_set<std::string>& for_update) {
grpc_mux_->requestOnDemandUpdate(type_url_, for_update);
stats_.update_attempt_.inc();
}
Expand Down Expand Up @@ -137,7 +138,7 @@ GrpcCollectionSubscriptionImpl::GrpcCollectionSubscriptionImpl(
init_fetch_timeout, is_aggregated, false),
collection_locator_(collection_locator) {}

void GrpcCollectionSubscriptionImpl::start(const std::set<std::string>& resource_names) {
void GrpcCollectionSubscriptionImpl::start(const absl::flat_hash_set<std::string>& resource_names) {
ASSERT(resource_names.empty());
GrpcSubscriptionImpl::start({XdsResourceIdentifier::encodeUrl(collection_locator_)});
}
Expand Down
9 changes: 5 additions & 4 deletions source/common/config/grpc_subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ class GrpcSubscriptionImpl : public Subscription,
bool use_namespace_matching);

// Config::Subscription
void start(const std::set<std::string>& resource_names) override;
void updateResourceInterest(const std::set<std::string>& update_to_these_names) override;
void requestOnDemandUpdate(const std::set<std::string>& add_these_names) override;
void start(const absl::flat_hash_set<std::string>& resource_names) override;
void
updateResourceInterest(const absl::flat_hash_set<std::string>& update_to_these_names) override;
void requestOnDemandUpdate(const absl::flat_hash_set<std::string>& add_these_names) override;
// Config::SubscriptionCallbacks (all pass through to callbacks_!)
void onConfigUpdate(const std::vector<Config::DecodedResourceRef>& resources,
const std::string& version_info) override;
Expand Down Expand Up @@ -71,7 +72,7 @@ class GrpcCollectionSubscriptionImpl : public GrpcSubscriptionImpl {
Event::Dispatcher& dispatcher,
std::chrono::milliseconds init_fetch_timeout, bool is_aggregated);

void start(const std::set<std::string>& resource_names) override;
void start(const absl::flat_hash_set<std::string>& resource_names) override;

private:
xds::core::v3::ResourceLocator collection_locator_;
Expand Down
8 changes: 6 additions & 2 deletions source/common/config/http_subscription_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ HttpSubscriptionImpl::HttpSubscriptionImpl(
}

// Config::Subscription
void HttpSubscriptionImpl::start(const std::set<std::string>& resource_names) {
void HttpSubscriptionImpl::start(const absl::flat_hash_set<std::string>& resource_names) {
if (init_fetch_timeout_.count() > 0) {
init_fetch_timeout_timer_ = dispatcher_.createTimer([this]() -> void {
handleFailure(Config::ConfigUpdateFailureReason::FetchTimedout, nullptr);
Expand All @@ -53,14 +53,18 @@ void HttpSubscriptionImpl::start(const std::set<std::string>& resource_names) {

Protobuf::RepeatedPtrField<std::string> resources_vector(resource_names.begin(),
resource_names.end());
// Sort to provide stable wire ordering.
std::sort(resources_vector.begin(), resources_vector.end());
request_.mutable_resource_names()->Swap(&resources_vector);
initialize();
}

void HttpSubscriptionImpl::updateResourceInterest(
const std::set<std::string>& update_to_these_names) {
const absl::flat_hash_set<std::string>& update_to_these_names) {
Protobuf::RepeatedPtrField<std::string> resources_vector(update_to_these_names.begin(),
update_to_these_names.end());
// Sort to provide stable wire ordering.
std::sort(resources_vector.begin(), resources_vector.end());
request_.mutable_resource_names()->Swap(&resources_vector);
}

Expand Down
7 changes: 4 additions & 3 deletions source/common/config/http_subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ class HttpSubscriptionImpl : public Http::RestApiFetcher,
ProtobufMessage::ValidationVisitor& validation_visitor);

// Config::Subscription
void start(const std::set<std::string>& resource_names) override;
void updateResourceInterest(const std::set<std::string>& update_to_these_names) override;
void requestOnDemandUpdate(const std::set<std::string>&) override {
void start(const absl::flat_hash_set<std::string>& resource_names) override;
void
updateResourceInterest(const absl::flat_hash_set<std::string>& update_to_these_names) override;
void requestOnDemandUpdate(const absl::flat_hash_set<std::string>&) override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}

Expand Down
8 changes: 4 additions & 4 deletions source/common/config/new_grpc_mux_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ void NewGrpcMuxImpl::kickOffAck(UpdateAck ack) {
void NewGrpcMuxImpl::start() { grpc_stream_.establishNewStream(); }

GrpcMuxWatchPtr NewGrpcMuxImpl::addWatch(const std::string& type_url,
const std::set<std::string>& resources,
const absl::flat_hash_set<std::string>& resources,
SubscriptionCallbacks& callbacks,
OpaqueResourceDecoder& resource_decoder,
const bool use_namespace_matching) {
Expand All @@ -154,14 +154,14 @@ GrpcMuxWatchPtr NewGrpcMuxImpl::addWatch(const std::string& type_url,
// the whole subscription, or if a removed name has no other watch interested in it, then the
// subscription will enqueue and attempt to send an appropriate discovery request.
void NewGrpcMuxImpl::updateWatch(const std::string& type_url, Watch* watch,
const std::set<std::string>& resources,
const absl::flat_hash_set<std::string>& resources,
const bool creating_namespace_watch) {
ASSERT(watch != nullptr);
auto sub = subscriptions_.find(type_url);
RELEASE_ASSERT(sub != subscriptions_.end(),
fmt::format("Watch of {} has no subscription to update.", type_url));
// If this is a glob collection subscription, we need to compute actual context parameters.
std::set<std::string> xdstp_resources;
absl::flat_hash_set<std::string> xdstp_resources;
// TODO(htuch): add support for resources beyond glob collections, the constraints below around
// resource size and ID reflect the progress of the xdstp:// implementation.
if (!resources.empty() && XdsResourceIdentifier::hasXdsTpScheme(*resources.begin())) {
Expand Down Expand Up @@ -198,7 +198,7 @@ void NewGrpcMuxImpl::updateWatch(const std::string& type_url, Watch* watch,
}

void NewGrpcMuxImpl::requestOnDemandUpdate(const std::string& type_url,
const std::set<std::string>& for_update) {
const absl::flat_hash_set<std::string>& for_update) {
auto sub = subscriptions_.find(type_url);
RELEASE_ASSERT(sub != subscriptions_.end(),
fmt::format("Watch of {} has no subscription to update.", type_url));
Expand Down
Loading