Skip to content

Commit

Permalink
ads: pause and resume v3 apis (envoyproxy#11300)
Browse files Browse the repository at this point in the history
Pause and resune V3 Api Verions as well as V2 when using ads.

Currently only V2 Api is being paused, this causes envoy to send a separate discovery request for every resource on CDS/LDS/SRDS updates.

Risk Level: Medium?
Testing:

Fixes: envoyproxy#11267

Signed-off-by: Sebastian Schepens <sebastian.schepens@mercadolibre.com>
  • Loading branch information
sschepens authored and songhu committed Jun 25, 2020
1 parent e04a24c commit 4d669b7
Show file tree
Hide file tree
Showing 15 changed files with 384 additions and 75 deletions.
25 changes: 25 additions & 0 deletions include/envoy/config/grpc_mux.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,30 @@ class GrpcMux {
*/
virtual void pause(const std::string& type_url) PURE;

/**
* Pause discovery requests for given API types. This is useful when we're processing an update
* for LDS or CDS and don't want a flood of updates for RDS or EDS respectively. Discovery
* requests may later be resumed with resume().
* @param type_urls type URLs corresponding to xDS API, e.g.
* type.googleapis.com/envoy.api.v2.Cluster.
*/
virtual void pause(const std::vector<std::string> type_urls) PURE;

/**
* Resume discovery requests for a given API type. This will send a discovery request if one would
* have been sent during the pause.
* @param type_url type URL corresponding to xDS API e.g. type.googleapis.com/envoy.api.v2.Cluster
*/
virtual void resume(const std::string& type_url) PURE;

/**
* Resume discovery requests for given API types. This will send a discovery request if one would
* have been sent during the pause.
* @param type_urls type URLs corresponding to xDS API e.g.
* type.googleapis.com/envoy.api.v2.Cluster
*/
virtual void resume(const std::vector<std::string> type_urls) PURE;

/**
* Retrieves the current pause state as set by pause()/resume().
* @param type_url type URL corresponding to xDS API, e.g.
Expand All @@ -80,6 +97,14 @@ class GrpcMux {
*/
virtual bool paused(const std::string& type_url) const PURE;

/**
* Retrieves the current pause state as set by pause()/resume().
* @param type_urls type URLs corresponding to xDS API, e.g.
* type.googleapis.com/envoy.api.v2.Cluster
* @return bool whether any of the APIs is paused.
*/
virtual bool paused(const std::vector<std::string> type_urls) const PURE;

/**
* Start a configuration subscription asynchronously for some API type and resources.
* @param type_url type URL corresponding to xDS API, e.g.
Expand Down
21 changes: 21 additions & 0 deletions source/common/config/grpc_mux_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ void GrpcMuxImpl::pause(const std::string& type_url) {
api_state.paused_ = true;
}

void GrpcMuxImpl::pause(const std::vector<std::string> type_urls) {
for (const auto& type_url : type_urls) {
pause(type_url);
}
}

void GrpcMuxImpl::resume(const std::string& type_url) {
ENVOY_LOG(debug, "Resuming discovery requests for {}", type_url);
ApiState& api_state = api_state_[type_url];
Expand All @@ -115,6 +121,12 @@ void GrpcMuxImpl::resume(const std::string& type_url) {
}
}

void GrpcMuxImpl::resume(const std::vector<std::string> type_urls) {
for (const auto& type_url : type_urls) {
resume(type_url);
}
}

bool GrpcMuxImpl::paused(const std::string& type_url) const {
auto entry = api_state_.find(type_url);
if (entry == api_state_.end()) {
Expand All @@ -123,6 +135,15 @@ bool GrpcMuxImpl::paused(const std::string& type_url) const {
return entry->second.paused_;
}

bool GrpcMuxImpl::paused(const std::vector<std::string> type_urls) const {
for (const auto& type_url : type_urls) {
if (paused(type_url)) {
return true;
}
}
return false;
}

void GrpcMuxImpl::onDiscoveryResponse(
std::unique_ptr<envoy::service::discovery::v3::DiscoveryResponse>&& message,
ControlPlaneStats& control_plane_stats) {
Expand Down
6 changes: 6 additions & 0 deletions source/common/config/grpc_mux_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@ class GrpcMuxImpl : public GrpcMux,

// GrpcMux
void pause(const std::string& type_url) override;
void pause(const std::vector<std::string> type_urls) override;
void resume(const std::string& type_url) override;
void resume(const std::vector<std::string> type_urls) override;
bool paused(const std::string& type_url) const override;
bool paused(const std::vector<std::string> type_urls) const override;

GrpcMuxWatchPtr addWatch(const std::string& type_url, const std::set<std::string>& resources,
SubscriptionCallbacks& callbacks) override;
Expand Down Expand Up @@ -142,8 +145,11 @@ class NullGrpcMuxImpl : public GrpcMux,
public:
void start() override {}
void pause(const std::string&) override {}
void pause(const std::vector<std::string>) override {}
void resume(const std::string&) override {}
void resume(const std::vector<std::string>) override {}
bool paused(const std::string&) const override { return false; }
bool paused(const std::vector<std::string>) const override { return false; }

GrpcMuxWatchPtr addWatch(const std::string&, const std::set<std::string>&,
SubscriptionCallbacks&) override {
Expand Down
21 changes: 21 additions & 0 deletions source/common/config/new_grpc_mux_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,36 @@ NewGrpcMuxImpl::NewGrpcMuxImpl(Grpc::RawAsyncClientPtr&& async_client,

void NewGrpcMuxImpl::pause(const std::string& type_url) { pausable_ack_queue_.pause(type_url); }

void NewGrpcMuxImpl::pause(const std::vector<std::string> type_urls) {
for (const auto& type_url : type_urls) {
pause(type_url);
}
}

void NewGrpcMuxImpl::resume(const std::string& type_url) {
pausable_ack_queue_.resume(type_url);
trySendDiscoveryRequests();
}

void NewGrpcMuxImpl::resume(const std::vector<std::string> type_urls) {
for (const auto& type_url : type_urls) {
resume(type_url);
}
}

bool NewGrpcMuxImpl::paused(const std::string& type_url) const {
return pausable_ack_queue_.paused(type_url);
}

bool NewGrpcMuxImpl::paused(const std::vector<std::string> type_urls) const {
for (const auto& type_url : type_urls) {
if (paused(type_url)) {
return true;
}
}
return false;
}

void NewGrpcMuxImpl::onDiscoveryResponse(
std::unique_ptr<envoy::service::discovery::v3::DeltaDiscoveryResponse>&& message,
ControlPlaneStats&) {
Expand Down
4 changes: 4 additions & 0 deletions source/common/config/new_grpc_mux_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,12 @@ class NewGrpcMuxImpl
SubscriptionCallbacks& callbacks) override;

void pause(const std::string& type_url) override;
void pause(const std::vector<std::string> type_urls) override;
void resume(const std::string& type_url) override;
void resume(const std::vector<std::string> type_urls) override;
bool paused(const std::string& type_url) const override;
bool paused(const std::vector<std::string> type_urls) const override;

void onDiscoveryResponse(
std::unique_ptr<envoy::service::discovery::v3::DeltaDiscoveryResponse>&& message,
ControlPlaneStats& control_plane_stats) override;
Expand Down
10 changes: 5 additions & 5 deletions source/common/router/scoped_rds.cc
Original file line number Diff line number Diff line change
Expand Up @@ -236,17 +236,17 @@ void ScopedRdsConfigSubscription::onConfigUpdate(
std::unique_ptr<Cleanup> resume_rds;
// if local init manager is initialized, the parent init manager may have gone away.
if (localInitManager().state() == Init::Manager::State::Initialized) {
const auto type_url = Envoy::Config::getTypeUrl<envoy::config::route::v3::RouteConfiguration>(
envoy::config::core::v3::ApiVersion::V2);
const auto type_urls =
Envoy::Config::getAllVersionTypeUrls<envoy::config::route::v3::RouteConfiguration>();
noop_init_manager =
std::make_unique<Init::ManagerImpl>(fmt::format("SRDS {}:{}", name_, version_info));
// Pause RDS to not send a burst of RDS requests until we start all the new subscriptions.
// In the case if factory_context_.init_manager() is uninitialized, RDS is already paused
// either by Server init or LDS init.
if (factory_context_.clusterManager().adsMux()) {
factory_context_.clusterManager().adsMux()->pause(type_url);
factory_context_.clusterManager().adsMux()->pause(type_urls);
}
resume_rds = std::make_unique<Cleanup>([this, &noop_init_manager, version_info, type_url] {
resume_rds = std::make_unique<Cleanup>([this, &noop_init_manager, version_info, type_urls] {
// For new RDS subscriptions created after listener warming up, we don't wait for them to
// warm up.
Init::WatcherImpl noop_watcher(
Expand All @@ -258,7 +258,7 @@ void ScopedRdsConfigSubscription::onConfigUpdate(
// Note in the case of partial acceptance, accepted RDS subscriptions should be started
// despite of any error.
if (factory_context_.clusterManager().adsMux()) {
factory_context_.clusterManager().adsMux()->resume(type_url);
factory_context_.clusterManager().adsMux()->resume(type_urls);
}
});
}
Expand Down
8 changes: 4 additions & 4 deletions source/common/upstream/cds_api_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ void CdsApiImpl::onConfigUpdate(
const std::string& system_version_info) {
std::unique_ptr<Cleanup> maybe_eds_resume;
if (cm_.adsMux()) {
const auto type_url = Config::getTypeUrl<envoy::config::endpoint::v3::ClusterLoadAssignment>(
envoy::config::core::v3::ApiVersion::V2);
cm_.adsMux()->pause(type_url);
const auto type_urls =
Config::getAllVersionTypeUrls<envoy::config::endpoint::v3::ClusterLoadAssignment>();
cm_.adsMux()->pause(type_urls);
maybe_eds_resume =
std::make_unique<Cleanup>([this, type_url] { cm_.adsMux()->resume(type_url); });
std::make_unique<Cleanup>([this, type_urls] { cm_.adsMux()->resume(type_urls); });
}

ENVOY_LOG(info, "cds: add {} cluster(s), remove {} cluster(s)", added_resources.size(),
Expand Down
25 changes: 13 additions & 12 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -149,19 +149,21 @@ void ClusterManagerInitHelper::maybeFinishInitialize() {
secondary_init_clusters_.empty());
if (!secondary_init_clusters_.empty()) {
if (!started_secondary_initialize_) {
const auto type_url = Config::getTypeUrl<envoy::config::endpoint::v3::ClusterLoadAssignment>(
envoy::config::core::v3::ApiVersion::V2);
ENVOY_LOG(info, "cm init: initializing secondary clusters");
// If the first CDS response doesn't have any primary cluster, ClusterLoadAssignment
// should be already paused by CdsApiImpl::onConfigUpdate(). Need to check that to
// avoid double pause ClusterLoadAssignment.
if (cm_.adsMux() == nullptr || cm_.adsMux()->paused(type_url)) {
initializeSecondaryClusters();
} else {
cm_.adsMux()->pause(type_url);
Cleanup eds_resume([this, type_url] { cm_.adsMux()->resume(type_url); });
initializeSecondaryClusters();
std::unique_ptr<Cleanup> maybe_eds_resume;
if (cm_.adsMux()) {
const auto type_urls =
Config::getAllVersionTypeUrls<envoy::config::endpoint::v3::ClusterLoadAssignment>();
if (!cm_.adsMux()->paused(type_urls)) {
cm_.adsMux()->pause(type_urls);
maybe_eds_resume =
std::make_unique<Cleanup>([this, type_urls] { cm_.adsMux()->resume(type_urls); });
}
}
initializeSecondaryClusters();
}
return;
}
Expand Down Expand Up @@ -798,13 +800,12 @@ void ClusterManagerImpl::updateClusterCounts() {
// signal to ADS to proceed with RDS updates.
// If we're in the middle of shutting down (ads_mux_ already gone) then this is irrelevant.
if (ads_mux_) {
const auto type_url = Config::getTypeUrl<envoy::config::cluster::v3::Cluster>(
envoy::config::core::v3::ApiVersion::V2);
const auto type_urls = Config::getAllVersionTypeUrls<envoy::config::cluster::v3::Cluster>();
const uint64_t previous_warming = cm_stats_.warming_clusters_.value();
if (previous_warming == 0 && !warming_clusters_.empty()) {
ads_mux_->pause(type_url);
ads_mux_->pause(type_urls);
} else if (previous_warming > 0 && warming_clusters_.empty()) {
ads_mux_->resume(type_url);
ads_mux_->resume(type_urls);
}
}
cm_stats_.active_clusters_.set(active_clusters_.size());
Expand Down
12 changes: 6 additions & 6 deletions source/server/lds_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ void LdsApiImpl::onConfigUpdate(
const Protobuf::RepeatedPtrField<envoy::service::discovery::v3::Resource>& added_resources,
const Protobuf::RepeatedPtrField<std::string>& removed_resources,
const std::string& system_version_info) {
std::unique_ptr<Cleanup> maybe_eds_resume;
std::unique_ptr<Cleanup> maybe_rds_resume;
if (cm_.adsMux()) {
const auto type_url = Config::getTypeUrl<envoy::config::route::v3::RouteConfiguration>(
envoy::config::core::v3::ApiVersion::V2);
cm_.adsMux()->pause(type_url);
maybe_eds_resume =
std::make_unique<Cleanup>([this, type_url] { cm_.adsMux()->resume(type_url); });
const auto type_urls =
Config::getAllVersionTypeUrls<envoy::config::route::v3::RouteConfiguration>();
cm_.adsMux()->pause(type_urls);
maybe_rds_resume =
std::make_unique<Cleanup>([this, type_urls] { cm_.adsMux()->resume(type_urls); });
}

bool any_applied = false;
Expand Down
8 changes: 4 additions & 4 deletions source/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -600,13 +600,13 @@ RunHelper::RunHelper(Instance& instance, const Options& options, Event::Dispatch
return;
}

const auto type_url = Config::getTypeUrl<envoy::config::route::v3::RouteConfiguration>(
envoy::config::core::v3::ApiVersion::V2);
const auto type_urls =
Config::getAllVersionTypeUrls<envoy::config::route::v3::RouteConfiguration>();
// Pause RDS to ensure that we don't send any requests until we've
// subscribed to all the RDS resources. The subscriptions happen in the init callbacks,
// so we pause RDS until we've completed all the callbacks.
if (cm.adsMux()) {
cm.adsMux()->pause(type_url);
cm.adsMux()->pause(type_urls);
}

ENVOY_LOG(info, "all clusters initialized. initializing init manager");
Expand All @@ -615,7 +615,7 @@ RunHelper::RunHelper(Instance& instance, const Options& options, Event::Dispatch
// Now that we're execute all the init callbacks we can resume RDS
// as we've subscribed to all the statically defined RDS resources.
if (cm.adsMux()) {
cm.adsMux()->resume(type_url);
cm.adsMux()->resume(type_urls);
}
});
}
Expand Down
Loading

0 comments on commit 4d669b7

Please sign in to comment.