Skip to content

Commit

Permalink
Add option for merging cluster updates (#3941)
Browse files Browse the repository at this point in the history
This change introduces a new configuration parameter for clusters,
`time_between_updates`.

If this is set, all cluster updates — membership changes, metadata
changes on endpoints, or healtcheck state changes — that happen
within a `time_between_updates` duration will be merged and delivered
at once when the duration ends.

This is useful for big clusters (> 1k endpoints) using the subset LB.

Partially addresses #3929.

Signed-off-by: Raul Gutierrez Segales <rgs@pinterest.com>
  • Loading branch information
rgs1 authored and mattklein123 committed Jul 31, 2018
1 parent eb5ea98 commit 62441f9
Show file tree
Hide file tree
Showing 6 changed files with 434 additions and 11 deletions.
11 changes: 11 additions & 0 deletions api/envoy/api/v2/cds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,17 @@ message Cluster {
ZoneAwareLbConfig zone_aware_lb_config = 2;
LocalityWeightedLbConfig locality_weighted_lb_config = 3;
}
// If set, all health check/weight/metadata updates that happen within this duration will be
// merged and delivered in one shot when the duration expires. The start of the duration is when
// the first update happens. This is useful for big clusters, with potentially noisy deploys
// that might trigger excessive CPU usage due to a constant stream of healthcheck state changes
// or metadata updates. By default, this is not configured and updates apply immediately. Also,
// the first set of updates to be seen apply immediately as well (e.g.: a new cluster).
//
// Note: merging does not apply to cluster membership changes (e.g.: adds/removes); this is
// because merging those updates isn't currently safe. See
// https://github.com/envoyproxy/envoy/pull/3941.
google.protobuf.Duration update_merge_window = 4;
}

// Common configuration for all load balancer implementations.
Expand Down
4 changes: 4 additions & 0 deletions docs/root/configuration/cluster_manager/cluster_stats.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ statistics. Any ``:`` character in the stats name is replaced with ``_``.
cluster_added, Counter, Total clusters added (either via static config or CDS)
cluster_modified, Counter, Total clusters modified (via CDS)
cluster_removed, Counter, Total clusters removed (via CDS)
cluster_updated, Counter, Total cluster updates
cluster_updated_via_merge, Counter, Total cluster updates applied as merged updates
update_merge_cancelled, Counter, Total merged updates that got cancelled and delivered early
update_out_of_merge_window, Counter, Total updates which arrived out of a merge window
active_clusters, Gauge, Number of currently active (warmed) clusters
warming_clusters, Gauge, Number of currently warming (not active) clusters

Expand Down
4 changes: 3 additions & 1 deletion docs/root/intro/version_history.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ Version history
===============
* access log: added :ref:`response flag filter <envoy_api_msg_config.filter.accesslog.v2.ResponseFlagFilter>`
to filter based on the presence of Envoy response flags.
* access log: added RESPONSE_DURATION and RESPONSE_TX_DURATION.
* admin: added :http:get:`/hystrix_event_stream` as an endpoint for monitoring envoy's statistics
through `Hystrix dashboard <https://github.com/Netflix-Skunkworks/hystrix-dashboard/wiki>`_.
* grpc-json: added support for building HTTP response from
`google.api.HttpBody <https://github.com/googleapis/googleapis/blob/master/google/api/httpbody.proto>`_.
* cluster: added :ref:`option <envoy_api_field_Cluster.CommonLbConfig.update_merge_window>` to merge
health check/weight/metadata updates within the given duration.
* config: v1 disabled by default. v1 support remains available until October via flipping --v2-config-only=false.
* config: v1 disabled by default. v1 support remains available until October via setting :option:`--allow-deprecated-v1-api`.
* health check: added support for :ref:`custom health check <envoy_api_field_core.HealthCheck.custom_health_check>`.
Expand Down Expand Up @@ -51,7 +54,6 @@ Version history
* thrift_proxy: introduced thrift routing, moved configuration to correct location
* upstream: added configuration option to the subset load balancer to take locality weights into account when
selecting a host from a subset.
* access log: added RESPONSE_DURATION and RESPONSE_TX_DURATION.

1.7.0
===============
Expand Down
112 changes: 110 additions & 2 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ ClusterManagerImpl::ClusterManagerImpl(const envoy::config::bootstrap::v2::Boots
init_helper_([this](Cluster& cluster) { onClusterInit(cluster); }),
config_tracker_entry_(
admin.getConfigTracker().add("clusters", [this] { return dumpClusterConfigs(); })),
system_time_source_(system_time_source) {
system_time_source_(system_time_source), dispatcher_(main_thread_dispatcher) {
async_client_manager_ = std::make_unique<Grpc::AsyncClientManagerImpl>(*this, tls);
const auto& cm_config = bootstrap.cluster_manager();
if (cm_config.has_outlier_detection()) {
Expand Down Expand Up @@ -330,7 +330,35 @@ void ClusterManagerImpl::onClusterInit(Cluster& cluster) {
const HostVector& hosts_removed) {
// This fires when a cluster is about to have an updated member set. We need to send this
// out to all of the thread local configurations.
postThreadLocalClusterUpdate(cluster, priority, hosts_added, hosts_removed);

// Should we save this update and merge it with other updates?
//
// Note that we can only _safely_ merge updates that have no added/removed hosts. That is,
// only those updates that signal a change in host healthcheck state, weight or metadata.
//
// We've discussed merging updates related to hosts being added/removed, but it's really
// tricky to merge those given that downstream consumers of these updates expect to see the
// full list of updates, not a condensed one. This is because they use the broadcasted
// HostSharedPtrs within internal maps to track hosts. If we fail to broadcast the entire list
// of removals, these maps will leak those HostSharedPtrs.
//
// See https://github.com/envoyproxy/envoy/pull/3941 for more context.
bool scheduled = false;
const bool merging_enabled = cluster.info()->lbConfig().has_update_merge_window();
// Remember: we only merge updates with no adds/removes — just hc/weight/metadata changes.
const bool is_mergeable = !hosts_added.size() && !hosts_removed.size();

if (merging_enabled) {
// If this is not mergeable, we should cancel any scheduled updates since
// we'll deliver it immediately.
scheduled = scheduleUpdate(cluster, priority, is_mergeable);
}

// If an update was not scheduled for later, deliver it immediately.
if (!scheduled) {
cm_stats_.cluster_updated_.inc();
postThreadLocalClusterUpdate(cluster, priority, hosts_added, hosts_removed);
}
});

// Finally, if the cluster has any hosts, post updates cross-thread so the per-thread load
Expand All @@ -343,6 +371,83 @@ void ClusterManagerImpl::onClusterInit(Cluster& cluster) {
}
}

bool ClusterManagerImpl::scheduleUpdate(const Cluster& cluster, uint32_t priority, bool mergeable) {
const auto& update_merge_window = cluster.info()->lbConfig().update_merge_window();
const auto timeout = DurationUtil::durationToMilliseconds(update_merge_window);

// Find pending updates for this cluster.
auto& updates_by_prio = updates_map_[cluster.info()->name()];
if (!updates_by_prio) {
updates_by_prio.reset(new PendingUpdatesByPriorityMap());
}

// Find pending updates for this priority.
auto& updates = (*updates_by_prio)[priority];
if (!updates) {
updates.reset(new PendingUpdates());
}

// Has an update_merge_window gone by since the last update? If so, don't schedule
// the update so it can be applied immediately. Ditto if this is not a mergeable update.
const auto delta = std::chrono::steady_clock::now() - updates->last_updated_;
const uint64_t delta_ms = std::chrono::duration_cast<std::chrono::milliseconds>(delta).count();
const bool out_of_merge_window = delta_ms > timeout;
if (out_of_merge_window || !mergeable) {
// If there was a pending update, we cancel the pending merged update.
//
// Note: it's possible that even though we are outside of a merge window (delta_ms > timeout),
// a timer is enabled. This race condition is fine, since we'll disable the timer here and
// deliver the update immediately.

// Why wasn't the update scheduled for later delivery? We keep some stats that are helpful
// to understand why merging did not happen. There's 2 things we are tracking here:

// 1) Was this update out of a merge window?
if (mergeable && out_of_merge_window) {
cm_stats_.update_out_of_merge_window_.inc();
}

// 2) Were there previous updates that we are cancelling (and delivering immediately)?
if (updates->disableTimer()) {
cm_stats_.update_merge_cancelled_.inc();
}

updates->last_updated_ = std::chrono::steady_clock::now();
return false;
}

// If there's no timer, create one.
if (updates->timer_ == nullptr) {
updates->timer_ = dispatcher_.createTimer([this, &cluster, priority, &updates]() -> void {
applyUpdates(cluster, priority, *updates);
});
}

// Ensure there's a timer set to deliver these updates.
if (!updates->timer_enabled_) {
updates->enableTimer(timeout);
}

return true;
}

void ClusterManagerImpl::applyUpdates(const Cluster& cluster, uint32_t priority,
PendingUpdates& updates) {
// Deliver pending updates.

// Remember that these merged updates are _only_ for updates related to
// HC/weight/metadata changes. That's why added/removed are empty. All
// adds/removals were already immediately broadcasted.
static const HostVector hosts_added;
static const HostVector hosts_removed;

postThreadLocalClusterUpdate(cluster, priority, hosts_added, hosts_removed);

cm_stats_.cluster_updated_via_merge_.inc();
updates.timer_enabled_ = false;
updates.last_updated_ = std::chrono::steady_clock::now();
}

bool ClusterManagerImpl::addOrUpdateCluster(const envoy::api::v2::Cluster& cluster,
const std::string& version_info) {
// First we need to see if this new config is new or an update to an existing dynamic cluster.
Expand Down Expand Up @@ -468,6 +573,9 @@ bool ClusterManagerImpl::removeCluster(const std::string& cluster_name) {
if (removed) {
cm_stats_.cluster_removed_.inc();
updateGauges();
// Did we ever deliver merged updates for this cluster?
// No need to manually disable timers, this should take care of it.
updates_map_.erase(cluster_name);
}

return removed;
Expand Down
48 changes: 46 additions & 2 deletions source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ class ClusterManagerInitHelper : Logger::Loggable<Logger::Id::upstream> {
COUNTER(cluster_added) \
COUNTER(cluster_modified) \
COUNTER(cluster_removed) \
COUNTER(cluster_updated) \
COUNTER(cluster_updated_via_merge) \
COUNTER(update_merge_cancelled) \
COUNTER(update_out_of_merge_window) \
GAUGE (active_clusters) \
GAUGE (warming_clusters)
// clang-format on
Expand Down Expand Up @@ -209,6 +213,11 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u

ClusterManagerFactory& clusterManagerFactory() override { return factory_; }

protected:
virtual void postThreadLocalClusterUpdate(const Cluster& cluster, uint32_t priority,
const HostVector& hosts_added,
const HostVector& hosts_removed);

private:
/**
* Thread local cached cluster data. Each thread local cluster gets updates from the parent
Expand Down Expand Up @@ -361,14 +370,47 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u
// This map is ordered so that config dumping is consistent.
typedef std::map<std::string, ClusterDataPtr> ClusterMap;

struct PendingUpdates {
void enableTimer(const uint64_t timeout) {
ASSERT(!timer_enabled_);
if (timer_ != nullptr) {
timer_->enableTimer(std::chrono::milliseconds(timeout));
timer_enabled_ = true;
}
}
bool disableTimer() {
const bool was_enabled = timer_enabled_;
if (timer_ != nullptr) {
timer_->disableTimer();
timer_enabled_ = false;
}
return was_enabled;
}

Event::TimerPtr timer_;
// TODO(rgs1): this should be part of Event::Timer's interface.
bool timer_enabled_{};
// This is default constructed to the clock's epoch:
// https://en.cppreference.com/w/cpp/chrono/time_point/time_point
//
// This will usually be the computer's boot time, which means that given a not very large
// `Cluster.CommonLbConfig.update_merge_window`, the first update will trigger immediately
// (the expected behavior).
MonotonicTime last_updated_;
};
using PendingUpdatesPtr = std::unique_ptr<PendingUpdates>;
using PendingUpdatesByPriorityMap = std::unordered_map<uint32_t, PendingUpdatesPtr>;
using PendingUpdatesByPriorityMapPtr = std::unique_ptr<PendingUpdatesByPriorityMap>;
using ClusterUpdatesMap = std::unordered_map<std::string, PendingUpdatesByPriorityMapPtr>;

void applyUpdates(const Cluster& cluster, uint32_t priority, PendingUpdates& updates);
bool scheduleUpdate(const Cluster& cluster, uint32_t priority, bool mergeable);
void createOrUpdateThreadLocalCluster(ClusterData& cluster);
ProtobufTypes::MessagePtr dumpClusterConfigs();
static ClusterManagerStats generateStats(Stats::Scope& scope);
void loadCluster(const envoy::api::v2::Cluster& cluster, const std::string& version_info,
bool added_via_api, ClusterMap& cluster_map);
void onClusterInit(Cluster& cluster);
void postThreadLocalClusterUpdate(const Cluster& cluster, uint32_t priority,
const HostVector& hosts_added, const HostVector& hosts_removed);
void postThreadLocalHealthFailure(const HostSharedPtr& host);
void updateGauges();

Expand All @@ -394,6 +436,8 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u
Grpc::AsyncClientManagerPtr async_client_manager_;
Server::ConfigTracker::EntryOwnerPtr config_tracker_entry_;
SystemTimeSource& system_time_source_;
ClusterUpdatesMap updates_map_;
Event::Dispatcher& dispatcher_;
};

} // namespace Upstream
Expand Down
Loading

0 comments on commit 62441f9

Please sign in to comment.