Skip to content

Commit

Permalink
rds: split subscription out from RdsRouteConfigProviderImpl (#3960)
Browse files Browse the repository at this point in the history
Signed-off-by: Lizan Zhou <zlizan@google.com>
  • Loading branch information
lizan authored Jul 27, 2018
1 parent 1dfde38 commit 280baee
Show file tree
Hide file tree
Showing 4 changed files with 228 additions and 93 deletions.
131 changes: 85 additions & 46 deletions source/common/router/rds_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,23 +47,19 @@ StaticRouteConfigProviderImpl::StaticRouteConfigProviderImpl(

// TODO(htuch): If support for multiple clusters is added per #1170 cluster_name_
// initialization needs to be fixed.
RdsRouteConfigProviderImpl::RdsRouteConfigProviderImpl(
RdsRouteConfigSubscription::RdsRouteConfigSubscription(
const envoy::config::filter::network::http_connection_manager::v2::Rds& rds,
const std::string& manager_identifier, Server::Configuration::FactoryContext& factory_context,
const std::string& stat_prefix, RouteConfigProviderManagerImpl& route_config_provider_manager)
: factory_context_(factory_context), tls_(factory_context.threadLocal().allocateSlot()),
route_config_name_(rds.route_config_name()),
const std::string& stat_prefix,
Envoy::Router::RouteConfigProviderManagerImpl& route_config_provider_manager)
: route_config_name_(rds.route_config_name()),
scope_(factory_context.scope().createScope(stat_prefix + "rds." + route_config_name_ + ".")),
stats_({ALL_RDS_STATS(POOL_COUNTER(*scope_))}),
route_config_provider_manager_(route_config_provider_manager),
manager_identifier_(manager_identifier),
manager_identifier_(manager_identifier), time_source_(factory_context.systemTimeSource()),
last_updated_(factory_context.systemTimeSource().currentTime()) {
::Envoy::Config::Utility::checkLocalInfo("rds", factory_context.localInfo());

ConfigConstSharedPtr initial_config(new NullConfigImpl());
tls_->set([initial_config](Event::Dispatcher&) -> ThreadLocal::ThreadLocalObjectSharedPtr {
return std::make_shared<ThreadLocalConfig>(initial_config);
});
subscription_ = Envoy::Config::SubscriptionFactory::subscriptionFromConfigSource<
envoy::api::v2::RouteConfiguration>(
rds.config_source(), factory_context.localInfo().node(), factory_context.dispatcher(),
Expand All @@ -77,27 +73,22 @@ RdsRouteConfigProviderImpl::RdsRouteConfigProviderImpl(
},
"envoy.api.v2.RouteDiscoveryService.FetchRoutes",
"envoy.api.v2.RouteDiscoveryService.StreamRoutes");
config_source_ = MessageUtil::getJsonStringFromMessage(rds.config_source(), true);
}

RdsRouteConfigProviderImpl::~RdsRouteConfigProviderImpl() {
RdsRouteConfigSubscription::~RdsRouteConfigSubscription() {
// If we get destroyed during initialization, make sure we signal that we "initialized".
runInitializeCallbackIfAny();

// The ownership of RdsRouteConfigProviderImpl is shared among all HttpConnectionManagers that
// hold a shared_ptr to it. The RouteConfigProviderManager holds weak_ptrs to the
// RdsRouteConfigProviders. Therefore, the map entry for the RdsRouteConfigProvider has to get
// cleaned by the RdsRouteConfigProvider's destructor.
route_config_provider_manager_.route_config_providers_.erase(manager_identifier_);
route_config_provider_manager_.route_config_subscriptions_.erase(manager_identifier_);
}

Router::ConfigConstSharedPtr RdsRouteConfigProviderImpl::config() {
return tls_->getTyped<ThreadLocalConfig>().config_;
}

void RdsRouteConfigProviderImpl::onConfigUpdate(const ResourceVector& resources,
void RdsRouteConfigSubscription::onConfigUpdate(const ResourceVector& resources,
const std::string& version_info) {
last_updated_ = factory_context_.systemTimeSource().currentTime();
last_updated_ = time_source_.currentTime();

if (resources.empty()) {
ENVOY_LOG(debug, "Missing RouteConfiguration for {} in onConfigUpdate()", route_config_name_);
Expand All @@ -115,35 +106,79 @@ void RdsRouteConfigProviderImpl::onConfigUpdate(const ResourceVector& resources,
throw EnvoyException(fmt::format("Unexpected RDS configuration (expecting {}): {}",
route_config_name_, route_config.name()));
}

const uint64_t new_hash = MessageUtil::hash(route_config);
if (!config_info_ || new_hash != config_info_.value().last_config_hash_) {
ConfigConstSharedPtr new_config(new ConfigImpl(route_config, factory_context_, false));
config_info_ = {new_hash, version_info};
route_config_proto_ = route_config;
stats_.config_reload_.inc();
ENVOY_LOG(debug, "rds: loading new configuration: config_name={} hash={}", route_config_name_,
new_hash);
tls_->runOnAllThreads(
[this, new_config]() -> void { tls_->getTyped<ThreadLocalConfig>().config_ = new_config; });
route_config_proto_ = route_config;
for (auto* provider : route_config_providers_) {
provider->onConfigUpdate();
}
}

runInitializeCallbackIfAny();
}

void RdsRouteConfigProviderImpl::onConfigUpdateFailed(const EnvoyException*) {
void RdsRouteConfigSubscription::onConfigUpdateFailed(const EnvoyException*) {
// We need to allow server startup to continue, even if we have a bad
// config.
runInitializeCallbackIfAny();
}

void RdsRouteConfigProviderImpl::runInitializeCallbackIfAny() {
void RdsRouteConfigSubscription::registerInitTarget(Init::Manager& init_manager) {
init_manager.registerTarget(*this);
}

void RdsRouteConfigSubscription::runInitializeCallbackIfAny() {
if (initialize_callback_) {
initialize_callback_();
initialize_callback_ = nullptr;
}
}

void RdsRouteConfigProviderImpl::registerInitTarget(Init::Manager& init_manager) {
init_manager.registerTarget(*this);
RdsRouteConfigProviderImpl::RdsRouteConfigProviderImpl(
RdsRouteConfigSubscriptionSharedPtr&& subscription,
Server::Configuration::FactoryContext& factory_context)
: subscription_(std::move(subscription)), factory_context_(factory_context),
tls_(factory_context.threadLocal().allocateSlot()) {
ConfigConstSharedPtr initial_config;
if (subscription_->config_info_.has_value()) {
initial_config =
std::make_shared<ConfigImpl>(subscription_->route_config_proto_, factory_context_, false);
} else {
initial_config = std::make_shared<NullConfigImpl>();
}
tls_->set([initial_config](Event::Dispatcher&) -> ThreadLocal::ThreadLocalObjectSharedPtr {
return std::make_shared<ThreadLocalConfig>(initial_config);
});
subscription_->route_config_providers_.insert(this);
}

RdsRouteConfigProviderImpl::~RdsRouteConfigProviderImpl() {
subscription_->route_config_providers_.erase(this);
}

Router::ConfigConstSharedPtr RdsRouteConfigProviderImpl::config() {
return tls_->getTyped<ThreadLocalConfig>().config_;
}

absl::optional<RouteConfigProvider::ConfigInfo> RdsRouteConfigProviderImpl::configInfo() const {
if (!subscription_->config_info_) {
return {};
} else {
return ConfigInfo{subscription_->route_config_proto_,
subscription_->config_info_.value().last_config_version_};
}
}

void RdsRouteConfigProviderImpl::onConfigUpdate() {
ConfigConstSharedPtr new_config(
new ConfigImpl(subscription_->route_config_proto_, factory_context_, false));
tls_->runOnAllThreads(
[this, new_config]() -> void { tls_->getTyped<ThreadLocalConfig>().config_ = new_config; });
}

RouteConfigProviderManagerImpl::RouteConfigProviderManagerImpl(Server::Admin& admin) {
Expand All @@ -157,14 +192,15 @@ RouteConfigProviderManagerImpl::RouteConfigProviderManagerImpl(Server::Admin& ad
std::vector<RouteConfigProviderSharedPtr>
RouteConfigProviderManagerImpl::getRdsRouteConfigProviders() {
std::vector<RouteConfigProviderSharedPtr> ret;
ret.reserve(route_config_providers_.size());
for (const auto& element : route_config_providers_) {
ret.reserve(route_config_subscriptions_.size());
for (const auto& element : route_config_subscriptions_) {
// Because the RouteConfigProviderManager's weak_ptrs only get cleaned up
// in the RdsRouteConfigProviderImpl destructor, and the single threaded nature
// in the RdsRouteConfigSubscription destructor, and the single threaded nature
// of this code, locking the weak_ptr will not fail.
RouteConfigProviderSharedPtr provider = element.second.lock();
ASSERT(provider);
ret.push_back(provider);
auto subscription = element.second.lock();
ASSERT(subscription);
ASSERT(subscription->route_config_providers_.size() > 0);
ret.push_back((*subscription->route_config_providers_.begin())->shared_from_this());
}
return ret;
};
Expand All @@ -190,31 +226,34 @@ Router::RouteConfigProviderSharedPtr RouteConfigProviderManagerImpl::getRdsRoute
const envoy::config::filter::network::http_connection_manager::v2::Rds& rds,
Server::Configuration::FactoryContext& factory_context, const std::string& stat_prefix) {

// RdsRouteConfigProviders are unique based on their serialized RDS config.
// RdsRouteConfigSubscriptions are unique based on their serialized RDS config.
// TODO(htuch): Full serialization here gives large IDs, could get away with a
// strong hash instead.
const std::string manager_identifier = rds.SerializeAsString();

auto it = route_config_providers_.find(manager_identifier);
if (it == route_config_providers_.end()) {
RdsRouteConfigSubscriptionSharedPtr subscription;

auto it = route_config_subscriptions_.find(manager_identifier);
if (it == route_config_subscriptions_.end()) {
// std::make_shared does not work for classes with private constructors. There are ways
// around it. However, since this is not a performance critical path we err on the side
// of simplicity.
std::shared_ptr<RdsRouteConfigProviderImpl> new_provider{new RdsRouteConfigProviderImpl(
rds, manager_identifier, factory_context, stat_prefix, *this)};

new_provider->registerInitTarget(factory_context.initManager());
subscription.reset(new RdsRouteConfigSubscription(rds, manager_identifier, factory_context,
stat_prefix, *this));

route_config_providers_.insert({manager_identifier, new_provider});
subscription->registerInitTarget(factory_context.initManager());

return new_provider;
route_config_subscriptions_.insert({manager_identifier, subscription});
} else {
// Because the RouteConfigProviderManager's weak_ptrs only get cleaned up
// in the RdsRouteConfigSubscription destructor, and the single threaded nature
// of this code, locking the weak_ptr will not fail.
subscription = it->second.lock();
}
ASSERT(subscription);

// Because the RouteConfigProviderManager's weak_ptrs only get cleaned up
// in the RdsRouteConfigProviderImpl destructor, and the single threaded nature
// of this code, locking the weak_ptr will not fail.
Router::RouteConfigProviderSharedPtr new_provider = it->second.lock();
ASSERT(new_provider);
Router::RouteConfigProviderSharedPtr new_provider{
new RdsRouteConfigProviderImpl(std::move(subscription), factory_context)};
return new_provider;
};

Expand Down
88 changes: 55 additions & 33 deletions source/common/router/rds_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <functional>
#include <string>
#include <unordered_map>
#include <unordered_set>

#include "envoy/api/v2/rds.pb.h"
#include "envoy/api/v2/route/route.pb.h"
Expand Down Expand Up @@ -80,36 +81,25 @@ struct RdsStats {
};

class RouteConfigProviderManagerImpl;
class RdsRouteConfigProviderImpl;

/**
* Implementation of RouteConfigProvider that fetches the route configuration dynamically using
* the RDS API.
* A class that fetches the route configuration dynamically using the RDS API and updates them to
* RDS config providers.
*/
class RdsRouteConfigProviderImpl
: public RouteConfigProvider,
public Init::Target,
class RdsRouteConfigSubscription
: public Init::Target,
Envoy::Config::SubscriptionCallbacks<envoy::api::v2::RouteConfiguration>,
Logger::Loggable<Logger::Id::router> {
public:
~RdsRouteConfigProviderImpl();
~RdsRouteConfigSubscription();

// Init::Target
void initialize(std::function<void()> callback) override {
initialize_callback_ = callback;
subscription_->start({route_config_name_}, *this);
}

// Router::RouteConfigProvider
Router::ConfigConstSharedPtr config() override;
absl::optional<ConfigInfo> configInfo() const override {
if (!config_info_) {
return {};
} else {
return ConfigInfo{route_config_proto_, config_info_.value().last_config_version_};
}
}
SystemTime lastUpdated() const override { return last_updated_; }

// Config::SubscriptionCallbacks
void onConfigUpdate(const ResourceVector& resources, const std::string& version_info) override;
void onConfigUpdateFailed(const EnvoyException* e) override;
Expand All @@ -118,18 +108,12 @@ class RdsRouteConfigProviderImpl
}

private:
struct ThreadLocalConfig : public ThreadLocal::ThreadLocalObject {
ThreadLocalConfig(ConfigConstSharedPtr initial_config) : config_(initial_config) {}

ConfigConstSharedPtr config_;
};

struct LastConfigInfo {
uint64_t last_config_hash_;
std::string last_config_version_;
};

RdsRouteConfigProviderImpl(
RdsRouteConfigSubscription(
const envoy::config::filter::network::http_connection_manager::v2::Rds& rds,
const std::string& manager_identifier, Server::Configuration::FactoryContext& factory_context,
const std::string& stat_prefix,
Expand All @@ -138,19 +122,57 @@ class RdsRouteConfigProviderImpl
void registerInitTarget(Init::Manager& init_manager);
void runInitializeCallbackIfAny();

Server::Configuration::FactoryContext& factory_context_;
std::unique_ptr<Envoy::Config::Subscription<envoy::api::v2::RouteConfiguration>> subscription_;
ThreadLocal::SlotPtr tls_;
std::string config_source_;
std::function<void()> initialize_callback_;
const std::string route_config_name_;
absl::optional<LastConfigInfo> config_info_;
Stats::ScopePtr scope_;
RdsStats stats_;
std::function<void()> initialize_callback_;
RouteConfigProviderManagerImpl& route_config_provider_manager_;
const std::string manager_identifier_;
envoy::api::v2::RouteConfiguration route_config_proto_;
SystemTimeSource& time_source_;
SystemTime last_updated_;
absl::optional<LastConfigInfo> config_info_;
envoy::api::v2::RouteConfiguration route_config_proto_;
std::unordered_set<RdsRouteConfigProviderImpl*> route_config_providers_;

friend class RouteConfigProviderManagerImpl;
friend class RdsRouteConfigProviderImpl;
};

typedef std::shared_ptr<RdsRouteConfigSubscription> RdsRouteConfigSubscriptionSharedPtr;

/**
* Implementation of RouteConfigProvider that fetches the route configuration dynamically using
* the subscription.
*/
class RdsRouteConfigProviderImpl : public RouteConfigProvider,
public std::enable_shared_from_this<RdsRouteConfigProviderImpl>,
Logger::Loggable<Logger::Id::router> {
public:
~RdsRouteConfigProviderImpl();

RdsRouteConfigSubscription& subscription() { return *subscription_; }
void onConfigUpdate();

// Router::RouteConfigProvider
Router::ConfigConstSharedPtr config() override;
absl::optional<ConfigInfo> configInfo() const override;
SystemTime lastUpdated() const override { return subscription_->last_updated_; }

private:
struct ThreadLocalConfig : public ThreadLocal::ThreadLocalObject {
ThreadLocalConfig(ConfigConstSharedPtr initial_config) : config_(initial_config) {}

ConfigConstSharedPtr config_;
};

RdsRouteConfigProviderImpl(RdsRouteConfigSubscriptionSharedPtr&& subscription,
Server::Configuration::FactoryContext& factory_context);

RdsRouteConfigSubscriptionSharedPtr subscription_;
Server::Configuration::FactoryContext& factory_context_;
ThreadLocal::SlotPtr tls_;
const std::string route_config_name_;

friend class RouteConfigProviderManagerImpl;
};
Expand Down Expand Up @@ -180,12 +202,12 @@ class RouteConfigProviderManagerImpl : public RouteConfigProviderManager,
// as in ConfigTracker. I.e. the ProviderImpls would have an EntryOwner for these lists
// Then the lifetime management stuff is centralized and opaque. Plus the copypasta
// in getRdsRouteConfigProviders()/getStaticRouteConfigProviders() goes away.
std::unordered_map<std::string, std::weak_ptr<RdsRouteConfigProviderImpl>>
route_config_providers_;
std::unordered_map<std::string, std::weak_ptr<RdsRouteConfigSubscription>>
route_config_subscriptions_;
std::vector<std::weak_ptr<RouteConfigProvider>> static_route_config_providers_;
Server::ConfigTracker::EntryOwnerPtr config_tracker_entry_;

friend class RdsRouteConfigProviderImpl;
friend class RdsRouteConfigSubscription;
};

} // namespace Router
Expand Down
Loading

0 comments on commit 280baee

Please sign in to comment.