Skip to content

Commit

Permalink
discovery: refactor configuration ingestion.
Browse files Browse the repository at this point in the history
Previously, the gRPC muxes required that decoding an opaque resource to
obtain its name, then dispatch to the relevant subscription, which would
again decode the opaque resource. This is pretty horrible efficiency
wise, in particular when upgrading from v2 -> v3.

In this patch, we introduce a DecodedResource wrapper and
OpaqueResourceDecoder. The config ingestion module, e.g. GrpcMuxImpl,
uses the OpaqueResourceDecoder to produce a typed DecodedResource,
performing the decode once. This DecodedResource is then dispatched
to the watching subscription.

This provides > 20% speedup on the v2 -> v3 tax for eds_speed_test,
decreasing from an overhead of 3.2x to 2.5x. It's also likely to unlock
further optimizations as we now have a wrapper resource and simplifies
subscription implementations, as they no longer need to deal with delta
vs. SotW resource decoding in different ways.

Risk level: Medium (configuration ingestion path changes).
Testing: New unit tests for
  DecodedResourceImpl/OpaqueResourceDecoderImpl, updated existing unit
  tests to work with new interfaces.

Partial solution to envoyproxy#11362

Signed-off-by: Harvey Tuch <htuch@google.com>
  • Loading branch information
htuch committed Jun 18, 2020
1 parent 5faff2a commit bb069ea
Show file tree
Hide file tree
Showing 74 changed files with 1,433 additions and 1,093 deletions.
4 changes: 3 additions & 1 deletion include/envoy/config/grpc_mux.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,14 @@ class GrpcMux {
* resources for type_url will result in callbacks.
* @param callbacks the callbacks to be notified of configuration updates. These must be valid
* until GrpcMuxWatch is destroyed.
* @param resource_decoder how incoming opaque resource objects are to be decoded.
* @return GrpcMuxWatchPtr a handle to cancel the subscription with. E.g. when a cluster goes
* away, its EDS updates should be cancelled by destroying the GrpcMuxWatchPtr.
*/
virtual GrpcMuxWatchPtr addWatch(const std::string& type_url,
const std::set<std::string>& resources,
SubscriptionCallbacks& callbacks) PURE;
SubscriptionCallbacks& callbacks,
OpaqueResourceDecoder& resource_decoder) PURE;
};

using GrpcMuxPtr = std::unique_ptr<GrpcMux>;
Expand Down
105 changes: 99 additions & 6 deletions include/envoy/config/subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,109 @@ enum class ConfigUpdateFailureReason {
UpdateRejected
};

/**
* A wrapper for xDS resources that have been deserialized from the wire.
*/
class DecodedResource {
public:
virtual ~DecodedResource() = default;

/**
* @return const std::string& resource name.
*/
virtual const std::string& name() const PURE;

/**
* @return const std::vector<std::string& resource aliases.
*/
virtual const std::vector<std::string>& aliases() const PURE;

/**
* @return const std::string& resource version.
*/
virtual const std::string& version() const PURE;

/**
* @return const Protobuf::Message& resource message reference. If hasResource() is false, this
* will be the empty message.
*/
virtual const Protobuf::Message& resource() const PURE;

/**
* @return bool does the xDS discovery response have a set resource payload?
*/
virtual bool hasResource() const PURE;
};

using DecodedResourcePtr = std::unique_ptr<DecodedResource>;
using DecodedResourceRef = std::reference_wrapper<DecodedResource>;

class OpaqueResourceDecoder {
public:
virtual ~OpaqueResourceDecoder() = default;

/**
* @param resource some opaque resource (ProtobufWkt::Any).
* @return ProtobufTypes::MessagePtr decoded protobuf message in the opaque resource, e.g. the
* RouteConfiguration for an Any containing envoy.config.route.v3.RouteConfiguration.
*/
virtual ProtobufTypes::MessagePtr decodeResource(const ProtobufWkt::Any& resource) PURE;

/**
* @param resource some opaque resourec (Protobuf::Message).
* @return std::String the resource name in a Protobuf::Message returned by decodeResource(), e.g.
* the route config name for a envoy.config.route.v3.RouteConfiguration message.
*/
virtual std::string resourceName(const Protobuf::Message& resource) PURE;
};

/**
* Subscription to DecodedResources.
*/
class SubscriptionCallbacks {
public:
virtual ~SubscriptionCallbacks() = default;

/**
* Called when a state-of-the-world configuration update is received. (State-of-the-world is
* everything other than delta gRPC - filesystem, HTTP, non-delta gRPC).
* @param resources vector of fetched resources corresponding to the configuration update.
* @param version_info supplies the version information as supplied by the xDS discovery response.
* @throw EnvoyException with reason if the configuration is rejected. Otherwise the configuration
* is accepted. Accepted configurations have their version_info reflected in subsequent
* requests.
*/
virtual void onConfigUpdate(const std::vector<DecodedResourceRef>& resources,
const std::string& version_info) PURE;

/**
* Called when a delta configuration update is received.
* @param added_resources resources newly added since the previous fetch.
* @param removed_resources names of resources that this fetch instructed to be removed.
* @param system_version_info aggregate response data "version", for debugging.
* @throw EnvoyException with reason if the config changes are rejected. Otherwise the changes
* are accepted. Accepted changes have their version_info reflected in subsequent requests.
*/
virtual void onConfigUpdate(const std::vector<DecodedResourceRef>& added_resources,
const Protobuf::RepeatedPtrField<std::string>& removed_resources,
const std::string& system_version_info) PURE;

/**
* Called when either the Subscription is unable to fetch a config update or when onConfigUpdate
* invokes an exception.
* @param reason supplies the update failure reason.
* @param e supplies any exception data on why the fetch failed. May be nullptr.
*/
virtual void onConfigUpdateFailed(ConfigUpdateFailureReason reason, const EnvoyException* e) PURE;
};

/**
* Invoked when raw config received from xDS wire.
*/
class UntypedConfigUpdateCallbacks {
public:
virtual ~UntypedConfigUpdateCallbacks() = default;

/**
* Called when a state-of-the-world configuration update is received. (State-of-the-world is
* everything other than delta gRPC - filesystem, HTTP, non-delta gRPC).
Expand Down Expand Up @@ -61,12 +160,6 @@ class SubscriptionCallbacks {
* @param e supplies any exception data on why the fetch failed. May be nullptr.
*/
virtual void onConfigUpdateFailed(ConfigUpdateFailureReason reason, const EnvoyException* e) PURE;

/**
* Obtain the "name" of a v2 API resource in a google.protobuf.Any, e.g. the route config name for
* a RouteConfiguration, based on the underlying resource type.
*/
virtual std::string resourceName(const ProtobufWkt::Any& resource) PURE;
};

/**
Expand Down
5 changes: 4 additions & 1 deletion include/envoy/config/subscription_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@ class SubscriptionFactory {
* @param scope stats scope for any stats tracked by the subscription.
* @param callbacks the callbacks needed by all Subscription objects, to deliver config updates.
* The callbacks must not result in the deletion of the Subscription object.
* @param resource_decoder how incoming opaque resource objects are to be decoded.
*
* @return SubscriptionPtr subscription object corresponding for config and type_url.
*/
virtual SubscriptionPtr
subscriptionFromConfigSource(const envoy::config::core::v3::ConfigSource& config,
absl::string_view type_url, Stats::Scope& scope,
SubscriptionCallbacks& callbacks) PURE;
SubscriptionCallbacks& callbacks,
OpaqueResourceDecoder& resource_decoder) PURE;
};

} // namespace Config
Expand Down
15 changes: 9 additions & 6 deletions include/envoy/router/route_config_update_receiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,21 @@ class RouteConfigUpdateReceiver {
virtual bool onRdsUpdate(const envoy::config::route::v3::RouteConfiguration& rc,
const std::string& version_info) PURE;

using VirtualHostRefVector =
std::vector<std::reference_wrapper<const envoy::config::route::v3::VirtualHost>>;

/**
* Called on updates via VHDS.
* @param added_resources supplies Resources (each containing a VirtualHost) that have been
* added.
* @param added_vhosts supplies VirtualHosts that have been added.
* @param added_resource_ids set of resources IDs (names + aliases) added.
* @param removed_resources supplies names of VirtualHosts that have been removed.
* @param version_info supplies RouteConfiguration version.
* @return bool whether RouteConfiguration has been updated.
*/
virtual bool onVhdsUpdate(
const Protobuf::RepeatedPtrField<envoy::service::discovery::v3::Resource>& added_resources,
const Protobuf::RepeatedPtrField<std::string>& removed_resources,
const std::string& version_info) PURE;
virtual bool onVhdsUpdate(const VirtualHostRefVector& added_vhosts,
const std::set<std::string>& added_resource_ids,
const Protobuf::RepeatedPtrField<std::string>& removed_resources,
const std::string& version_info) PURE;

/**
* @return std::string& the name of RouteConfiguration.
Expand Down
24 changes: 24 additions & 0 deletions source/common/config/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,15 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "decoded_resource_lib",
hdrs = ["decoded_resource_impl.h"],
deps = [
"//include/envoy/config:subscription_interface",
"//source/common/protobuf:utility_lib",
],
)

envoy_cc_library(
name = "delta_subscription_state_lib",
srcs = ["delta_subscription_state.cc"],
Expand All @@ -68,6 +77,7 @@ envoy_cc_library(
":api_version_lib",
":pausable_ack_queue_lib",
":utility_lib",
":watch_map_lib",
"//include/envoy/config:subscription_interface",
"//include/envoy/event:dispatcher_interface",
"//source/common/common:assert_lib",
Expand All @@ -86,6 +96,7 @@ envoy_cc_library(
srcs = ["filesystem_subscription_impl.cc"],
hdrs = ["filesystem_subscription_impl.h"],
deps = [
":decoded_resource_lib",
"//include/envoy/config:subscription_interface",
"//include/envoy/event:dispatcher_interface",
"//include/envoy/filesystem:filesystem_interface",
Expand Down Expand Up @@ -122,6 +133,7 @@ envoy_cc_library(
hdrs = ["grpc_mux_impl.h"],
deps = [
":api_version_lib",
":decoded_resource_lib",
":grpc_stream_lib",
":utility_lib",
"//include/envoy/config:grpc_mux_interface",
Expand Down Expand Up @@ -175,6 +187,7 @@ envoy_cc_library(
],
deps = [
":api_version_lib",
":decoded_resource_lib",
":version_converter_lib",
"//include/envoy/config:subscription_interface",
"//include/envoy/event:dispatcher_interface",
Expand Down Expand Up @@ -209,6 +222,15 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "opaque_resource_decoder_lib",
hdrs = ["opaque_resource_decoder_impl.h"],
deps = [
"//include/envoy/config:subscription_interface",
"//source/common/protobuf:utility_lib",
],
)

envoy_cc_library(
name = "pausable_ack_queue_lib",
srcs = ["pausable_ack_queue.cc"],
Expand Down Expand Up @@ -357,6 +379,7 @@ envoy_cc_library(
srcs = ["watch_map.cc"],
hdrs = ["watch_map.h"],
deps = [
":decoded_resource_lib",
"//include/envoy/config:subscription_interface",
"//source/common/common:assert_lib",
"//source/common/common:minimal_logger_lib",
Expand All @@ -369,6 +392,7 @@ envoy_cc_library(
name = "subscription_base_interface",
hdrs = ["subscription_base.h"],
deps = [
":opaque_resource_decoder_lib",
":resource_name_lib",
"//include/envoy/config:subscription_interface",
],
Expand Down
62 changes: 62 additions & 0 deletions source/common/config/decoded_resource_impl.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#pragma once

#include "envoy/config/subscription.h"

#include "common/protobuf/utility.h"

namespace Envoy {
namespace Config {

namespace {

std::vector<std::string>
repeatedPtrFieldToVector(const Protobuf::RepeatedPtrField<std::string>& xs) {
std::vector<std::string> ys;
std::copy(xs.begin(), xs.end(), std::back_inserter(ys));
return ys;
}

} // namespace

class DecodedResourceImpl : public DecodedResource {
public:
DecodedResourceImpl(OpaqueResourceDecoder& resource_decoder, const ProtobufWkt::Any& resource,
const std::string& version)
: DecodedResourceImpl(resource_decoder, {}, Protobuf::RepeatedPtrField<std::string>(),
resource, true, version) {}
DecodedResourceImpl(OpaqueResourceDecoder& resource_decoder,
const envoy::service::discovery::v3::Resource& resource)
: DecodedResourceImpl(resource_decoder, resource.name(), resource.aliases(),
resource.resource(), resource.has_resource(), resource.version()) {}
DecodedResourceImpl(ProtobufTypes::MessagePtr resource, const std::string& name,
const std::vector<std::string>& aliases, const std::string& version)
: resource_(std::move(resource)), has_resource_(true), name_(name), aliases_(aliases),
version_(version) {}

// Config::DecodedResource
const std::string& name() const override { return name_; }
const std::vector<std::string>& aliases() const override { return aliases_; }
const std::string& version() const override { return version_; };
const Protobuf::Message& resource() const override { return *resource_; };
bool hasResource() const override { return has_resource_; }

private:
DecodedResourceImpl(OpaqueResourceDecoder& resource_decoder, absl::optional<std::string> name,
const Protobuf::RepeatedPtrField<std::string>& aliases,
const ProtobufWkt::Any& resource, bool has_resource,
const std::string& version)
: resource_(resource_decoder.decodeResource(resource)), has_resource_(has_resource),
name_(name ? *name : resource_decoder.resourceName(*resource_)),
aliases_(repeatedPtrFieldToVector(aliases)), version_(version) {}

const ProtobufTypes::MessagePtr resource_;
const bool has_resource_;
const std::string name_;
const std::vector<std::string> aliases_;
const std::string version_;
};

using DecodedResourceImplPtr = std::unique_ptr<DecodedResourceImpl>;

} // namespace Config
} // namespace Envoy
10 changes: 5 additions & 5 deletions source/common/config/delta_subscription_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ namespace Envoy {
namespace Config {

DeltaSubscriptionState::DeltaSubscriptionState(std::string type_url,
SubscriptionCallbacks& callbacks,
UntypedConfigUpdateCallbacks& watch_map,
const LocalInfo::LocalInfo& local_info)
: type_url_(std::move(type_url)), callbacks_(callbacks), local_info_(local_info) {}
: type_url_(std::move(type_url)), watch_map_(watch_map), local_info_(local_info) {}

void DeltaSubscriptionState::updateSubscriptionInterest(const std::set<std::string>& cur_added,
const std::set<std::string>& cur_removed) {
Expand Down Expand Up @@ -81,7 +81,7 @@ void DeltaSubscriptionState::handleGoodResponse(
fmt::format("duplicate name {} found in the union of added+removed resources", name));
}
}
callbacks_.onConfigUpdate(message.resources(), message.removed_resources(),
watch_map_.onConfigUpdate(message.resources(), message.removed_resources(),
message.system_version_info());
for (const auto& resource : message.resources()) {
setResourceVersion(resource.name(), resource.version());
Expand All @@ -108,11 +108,11 @@ void DeltaSubscriptionState::handleBadResponse(const EnvoyException& e, UpdateAc
ack.error_detail_.set_code(Grpc::Status::WellKnownGrpcStatus::Internal);
ack.error_detail_.set_message(Config::Utility::truncateGrpcStatusMessage(e.what()));
ENVOY_LOG(warn, "delta config for {} rejected: {}", type_url_, e.what());
callbacks_.onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::UpdateRejected, &e);
watch_map_.onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::UpdateRejected, &e);
}

void DeltaSubscriptionState::handleEstablishmentFailure() {
callbacks_.onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure,
watch_map_.onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure,
nullptr);
}

Expand Down
6 changes: 3 additions & 3 deletions source/common/config/delta_subscription_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "common/common/logger.h"
#include "common/config/api_version.h"
#include "common/config/pausable_ack_queue.h"
#include "common/config/watch_map.h"

namespace Envoy {
namespace Config {
Expand All @@ -21,7 +22,7 @@ namespace Config {
// being multiplexed together by ADS.
class DeltaSubscriptionState : public Logger::Loggable<Logger::Id::config> {
public:
DeltaSubscriptionState(std::string type_url, SubscriptionCallbacks& callbacks,
DeltaSubscriptionState(std::string type_url, UntypedConfigUpdateCallbacks& watch_map_,
const LocalInfo::LocalInfo& local_info);

// Update which resources we're interested in subscribing to.
Expand Down Expand Up @@ -86,8 +87,7 @@ class DeltaSubscriptionState : public Logger::Loggable<Logger::Id::config> {
std::set<std::string> resource_names_;

const std::string type_url_;
// callbacks_ is expected to be a WatchMap.
SubscriptionCallbacks& callbacks_;
UntypedConfigUpdateCallbacks& watch_map_;
const LocalInfo::LocalInfo& local_info_;
std::chrono::milliseconds init_fetch_timeout_;

Expand Down
Loading

0 comments on commit bb069ea

Please sign in to comment.