diff --git a/api/envoy/config/cluster/v3/cluster.proto b/api/envoy/config/cluster/v3/cluster.proto index 3571ccf9abbd..7747c75672c0 100644 --- a/api/envoy/config/cluster/v3/cluster.proto +++ b/api/envoy/config/cluster/v3/cluster.proto @@ -612,7 +612,32 @@ message Cluster { // // This is limited somewhat arbitrarily to 3 because prefetching connections too aggressively can // harm latency more than the prefetching helps. - google.protobuf.DoubleValue prefetch_ratio = 1 [(validate.rules).double = {lte: 3.0 gte: 1.0}]; + google.protobuf.DoubleValue per_upstream_prefetch_ratio = 1 + [(validate.rules).double = {lte: 3.0 gte: 1.0}]; + + // Indicates how many many streams (rounded up) can be anticipated across a cluster for each + // stream, useful for low QPS services. This is currently supported for a subset of + // deterministic non-hash-based load-balancing algorithms (weighted round robin, random). + // Unlike per_upstream_prefetch_ratio this prefetches across the upstream instances in a + // cluster, doing best effort predictions of what upstream would be picked next and + // pre-establishing a connection. + // + // For example if prefetching is set to 2 for a round robin HTTP/2 cluster, on the first + // incoming stream, 2 connections will be prefetched - one to the first upstream for this + // cluster, one to the second on the assumption there will be a follow-up stream. + // + // Prefetching will be limited to one prefetch per configured upstream in the cluster. + // + // If this value is not set, or set explicitly to one, Envoy will fetch as many connections + // as needed to serve streams in flight, so during warm up and in steady state if a connection + // is closed (and per_upstream_prefetch_ratio is not set), there will be a latency hit for + // connection establishment. + // + // If both this and prefetch_ratio are set, Envoy will make sure both predicted needs are met, + // basically prefetching max(predictive-prefetch, per-upstream-prefetch), for each upstream. + // TODO(alyssawilk) per LB docs and LB overview docs when unhiding. + google.protobuf.DoubleValue predictive_prefetch_ratio = 2 + [(validate.rules).double = {lte: 3.0 gte: 1.0}]; } reserved 12, 15, 7, 11, 35; diff --git a/api/envoy/config/cluster/v4alpha/cluster.proto b/api/envoy/config/cluster/v4alpha/cluster.proto index 9b7536836365..391eabb1db3e 100644 --- a/api/envoy/config/cluster/v4alpha/cluster.proto +++ b/api/envoy/config/cluster/v4alpha/cluster.proto @@ -622,7 +622,32 @@ message Cluster { // // This is limited somewhat arbitrarily to 3 because prefetching connections too aggressively can // harm latency more than the prefetching helps. - google.protobuf.DoubleValue prefetch_ratio = 1 [(validate.rules).double = {lte: 3.0 gte: 1.0}]; + google.protobuf.DoubleValue per_upstream_prefetch_ratio = 1 + [(validate.rules).double = {lte: 3.0 gte: 1.0}]; + + // Indicates how many many streams (rounded up) can be anticipated across a cluster for each + // stream, useful for low QPS services. This is currently supported for a subset of + // deterministic non-hash-based load-balancing algorithms (weighted round robin, random). + // Unlike per_upstream_prefetch_ratio this prefetches across the upstream instances in a + // cluster, doing best effort predictions of what upstream would be picked next and + // pre-establishing a connection. + // + // For example if prefetching is set to 2 for a round robin HTTP/2 cluster, on the first + // incoming stream, 2 connections will be prefetched - one to the first upstream for this + // cluster, one to the second on the assumption there will be a follow-up stream. + // + // Prefetching will be limited to one prefetch per configured upstream in the cluster. + // + // If this value is not set, or set explicitly to one, Envoy will fetch as many connections + // as needed to serve streams in flight, so during warm up and in steady state if a connection + // is closed (and per_upstream_prefetch_ratio is not set), there will be a latency hit for + // connection establishment. + // + // If both this and prefetch_ratio are set, Envoy will make sure both predicted needs are met, + // basically prefetching max(predictive-prefetch, per-upstream-prefetch), for each upstream. + // TODO(alyssawilk) per LB docs and LB overview docs when unhiding. + google.protobuf.DoubleValue predictive_prefetch_ratio = 2 + [(validate.rules).double = {lte: 3.0 gte: 1.0}]; } reserved 12, 15, 7, 11, 35, 47; diff --git a/generated_api_shadow/envoy/config/cluster/v3/cluster.proto b/generated_api_shadow/envoy/config/cluster/v3/cluster.proto index 7560baed3434..215c7d9a26af 100644 --- a/generated_api_shadow/envoy/config/cluster/v3/cluster.proto +++ b/generated_api_shadow/envoy/config/cluster/v3/cluster.proto @@ -612,7 +612,32 @@ message Cluster { // // This is limited somewhat arbitrarily to 3 because prefetching connections too aggressively can // harm latency more than the prefetching helps. - google.protobuf.DoubleValue prefetch_ratio = 1 [(validate.rules).double = {lte: 3.0 gte: 1.0}]; + google.protobuf.DoubleValue per_upstream_prefetch_ratio = 1 + [(validate.rules).double = {lte: 3.0 gte: 1.0}]; + + // Indicates how many many streams (rounded up) can be anticipated across a cluster for each + // stream, useful for low QPS services. This is currently supported for a subset of + // deterministic non-hash-based load-balancing algorithms (weighted round robin, random). + // Unlike per_upstream_prefetch_ratio this prefetches across the upstream instances in a + // cluster, doing best effort predictions of what upstream would be picked next and + // pre-establishing a connection. + // + // For example if prefetching is set to 2 for a round robin HTTP/2 cluster, on the first + // incoming stream, 2 connections will be prefetched - one to the first upstream for this + // cluster, one to the second on the assumption there will be a follow-up stream. + // + // Prefetching will be limited to one prefetch per configured upstream in the cluster. + // + // If this value is not set, or set explicitly to one, Envoy will fetch as many connections + // as needed to serve streams in flight, so during warm up and in steady state if a connection + // is closed (and per_upstream_prefetch_ratio is not set), there will be a latency hit for + // connection establishment. + // + // If both this and prefetch_ratio are set, Envoy will make sure both predicted needs are met, + // basically prefetching max(predictive-prefetch, per-upstream-prefetch), for each upstream. + // TODO(alyssawilk) per LB docs and LB overview docs when unhiding. + google.protobuf.DoubleValue predictive_prefetch_ratio = 2 + [(validate.rules).double = {lte: 3.0 gte: 1.0}]; } reserved 12, 15; diff --git a/generated_api_shadow/envoy/config/cluster/v4alpha/cluster.proto b/generated_api_shadow/envoy/config/cluster/v4alpha/cluster.proto index 55915350dced..648b00f31cf4 100644 --- a/generated_api_shadow/envoy/config/cluster/v4alpha/cluster.proto +++ b/generated_api_shadow/envoy/config/cluster/v4alpha/cluster.proto @@ -622,7 +622,32 @@ message Cluster { // // This is limited somewhat arbitrarily to 3 because prefetching connections too aggressively can // harm latency more than the prefetching helps. - google.protobuf.DoubleValue prefetch_ratio = 1 [(validate.rules).double = {lte: 3.0 gte: 1.0}]; + google.protobuf.DoubleValue per_upstream_prefetch_ratio = 1 + [(validate.rules).double = {lte: 3.0 gte: 1.0}]; + + // Indicates how many many streams (rounded up) can be anticipated across a cluster for each + // stream, useful for low QPS services. This is currently supported for a subset of + // deterministic non-hash-based load-balancing algorithms (weighted round robin, random). + // Unlike per_upstream_prefetch_ratio this prefetches across the upstream instances in a + // cluster, doing best effort predictions of what upstream would be picked next and + // pre-establishing a connection. + // + // For example if prefetching is set to 2 for a round robin HTTP/2 cluster, on the first + // incoming stream, 2 connections will be prefetched - one to the first upstream for this + // cluster, one to the second on the assumption there will be a follow-up stream. + // + // Prefetching will be limited to one prefetch per configured upstream in the cluster. + // + // If this value is not set, or set explicitly to one, Envoy will fetch as many connections + // as needed to serve streams in flight, so during warm up and in steady state if a connection + // is closed (and per_upstream_prefetch_ratio is not set), there will be a latency hit for + // connection establishment. + // + // If both this and prefetch_ratio are set, Envoy will make sure both predicted needs are met, + // basically prefetching max(predictive-prefetch, per-upstream-prefetch), for each upstream. + // TODO(alyssawilk) per LB docs and LB overview docs when unhiding. + google.protobuf.DoubleValue predictive_prefetch_ratio = 2 + [(validate.rules).double = {lte: 3.0 gte: 1.0}]; } reserved 12, 15, 7, 11, 35; diff --git a/include/envoy/common/conn_pool.h b/include/envoy/common/conn_pool.h index 0619e7e32479..619ec9b0c9e7 100644 --- a/include/envoy/common/conn_pool.h +++ b/include/envoy/common/conn_pool.h @@ -68,6 +68,14 @@ class Instance { * @return Upstream::HostDescriptionConstSharedPtr the host for which connections are pooled. */ virtual Upstream::HostDescriptionConstSharedPtr host() const PURE; + + /** + * Prefetches an upstream connection, if existing connections do not meet both current and + * anticipated load. + * + * @return true if a connection was prefetched, false otherwise. + */ + virtual bool maybePrefetch(float prefetch_ratio) PURE; }; enum class PoolFailureReason { diff --git a/include/envoy/upstream/load_balancer.h b/include/envoy/upstream/load_balancer.h index 031daffc8ad2..fe8bb7e73b92 100644 --- a/include/envoy/upstream/load_balancer.h +++ b/include/envoy/upstream/load_balancer.h @@ -99,6 +99,14 @@ class LoadBalancer { * is missing and use sensible defaults. */ virtual HostConstSharedPtr chooseHost(LoadBalancerContext* context) PURE; + + /** + * Returns a best effort prediction of the next host to be picked, or nullptr if not predictable. + * Advances with subsequent calls, so while the first call will return the next host to be picked, + * a subsequent call will return the second host to be picked. + * @param context supplies the context which is used in host selection. + */ + virtual HostConstSharedPtr peekAnotherHost(LoadBalancerContext* context) PURE; }; using LoadBalancerPtr = std::unique_ptr; diff --git a/include/envoy/upstream/upstream.h b/include/envoy/upstream/upstream.h index 3f727e318d43..127df14c923a 100644 --- a/include/envoy/upstream/upstream.h +++ b/include/envoy/upstream/upstream.h @@ -735,7 +735,12 @@ class ClusterInfo { /** * @return how many streams should be anticipated per each current stream. */ - virtual float prefetchRatio() const PURE; + virtual float perUpstreamPrefetchRatio() const PURE; + + /** + * @return how many streams should be anticipated per each current stream. + */ + virtual float peekaheadRatio() const PURE; /** * @return soft limit on size of the cluster's connections read and write buffers. diff --git a/source/common/conn_pool/conn_pool_base.cc b/source/common/conn_pool/conn_pool_base.cc index aff522c9f669..eb8f1f0e3859 100644 --- a/source/common/conn_pool/conn_pool_base.cc +++ b/source/common/conn_pool/conn_pool_base.cc @@ -33,7 +33,7 @@ void ConnPoolImplBase::destructAllConnections() { dispatcher_.clearDeferredDeleteList(); } -bool ConnPoolImplBase::shouldCreateNewConnection() const { +bool ConnPoolImplBase::shouldCreateNewConnection(float global_prefetch_ratio) const { // If the host is not healthy, don't make it do extra work, especially as // upstream selection logic may result in bypassing this upstream entirely. // If an Envoy user wants prefetching for degraded upstreams this could be @@ -41,6 +41,17 @@ bool ConnPoolImplBase::shouldCreateNewConnection() const { if (host_->health() != Upstream::Host::Health::Healthy) { return pending_streams_.size() > connecting_stream_capacity_; } + + // If global prefetching is on, and this connection is within the global + // prefetch limit, prefetch. + // We may eventually want to track prefetch_attempts to allow more prefetching for + // heavily weighted upstreams or sticky picks. + if (global_prefetch_ratio > 1.0 && + ((pending_streams_.size() + 1 + num_active_streams_) * global_prefetch_ratio > + (connecting_stream_capacity_ + num_active_streams_))) { + return true; + } + // The number of streams we want to be provisioned for is the number of // pending and active streams times the prefetch ratio. // The number of streams we are (theoretically) provisioned for is the @@ -48,13 +59,13 @@ bool ConnPoolImplBase::shouldCreateNewConnection() const { // // If prefetch ratio is not set, it defaults to 1, and this simplifies to the // legacy value of pending_streams_.size() > connecting_stream_capacity_ - return (pending_streams_.size() + num_active_streams_) * prefetchRatio() > + return (pending_streams_.size() + num_active_streams_) * perUpstreamPrefetchRatio() > (connecting_stream_capacity_ + num_active_streams_); } -float ConnPoolImplBase::prefetchRatio() const { +float ConnPoolImplBase::perUpstreamPrefetchRatio() const { if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.allow_prefetch")) { - return host_->cluster().prefetchRatio(); + return host_->cluster().perUpstreamPrefetchRatio(); } else { return 1.0; } @@ -74,9 +85,9 @@ void ConnPoolImplBase::tryCreateNewConnections() { } } -bool ConnPoolImplBase::tryCreateNewConnection() { +bool ConnPoolImplBase::tryCreateNewConnection(float global_prefetch_ratio) { // There are already enough CONNECTING connections for the number of queued streams. - if (!shouldCreateNewConnection()) { + if (!shouldCreateNewConnection(global_prefetch_ratio)) { return false; } @@ -184,6 +195,10 @@ ConnectionPool::Cancellable* ConnPoolImplBase::newStream(AttachContext& context) } } +bool ConnPoolImplBase::maybePrefetch(float global_prefetch_ratio) { + return tryCreateNewConnection(global_prefetch_ratio); +} + void ConnPoolImplBase::onUpstreamReady() { while (!pending_streams_.empty() && !ready_clients_.empty()) { ActiveClientPtr& client = ready_clients_.front(); @@ -394,14 +409,14 @@ void ConnPoolImplBase::purgePendingStreams( bool ConnPoolImplBase::connectingConnectionIsExcess() const { ASSERT(connecting_stream_capacity_ >= connecting_clients_.front()->effectiveConcurrentStreamLimit()); - // If prefetchRatio is one, this simplifies to checking if there would still be sufficient - // connecting stream capacity to serve all pending streams if the most recent client were - // removed from the picture. + // If perUpstreamPrefetchRatio is one, this simplifies to checking if there would still be + // sufficient connecting stream capacity to serve all pending streams if the most recent client + // were removed from the picture. // // If prefetch ratio is set, it also factors in the anticipated load based on both queued streams // and active streams, and makes sure the connecting capacity would still be sufficient to serve // that even with the most recent client removed. - return (pending_streams_.size() + num_active_streams_) * prefetchRatio() <= + return (pending_streams_.size() + num_active_streams_) * perUpstreamPrefetchRatio() <= (connecting_stream_capacity_ - connecting_clients_.front()->effectiveConcurrentStreamLimit() + num_active_streams_); } diff --git a/source/common/conn_pool/conn_pool_base.h b/source/common/conn_pool/conn_pool_base.h index 83369795a5ad..c258fafa7bf5 100644 --- a/source/common/conn_pool/conn_pool_base.h +++ b/source/common/conn_pool/conn_pool_base.h @@ -149,6 +149,9 @@ class ConnPoolImplBase : protected Logger::Loggable { void checkForDrained(); void onUpstreamReady(); ConnectionPool::Cancellable* newStream(AttachContext& context); + // Called if this pool is likely to be picked soon, to determine if it's worth + // prefetching a connection. + bool maybePrefetch(float global_prefetch_ratio); virtual ConnectionPool::Cancellable* newPendingStream(AttachContext& context) PURE; @@ -176,7 +179,9 @@ class ConnPoolImplBase : protected Logger::Loggable { // Creates a new connection if there is sufficient demand, it is allowed by resourceManager, or // to avoid starving this pool. - bool tryCreateNewConnection(); + // Demand is determined either by perUpstreamPrefetchRatio() or global_prefetch_ratio + // if this is called by maybePrefetch() + bool tryCreateNewConnection(float global_prefetch_ratio = 0); // A helper function which determines if a canceled pending connection should // be closed as excess or not. @@ -184,9 +189,9 @@ class ConnPoolImplBase : protected Logger::Loggable { // A helper function which determines if a new incoming stream should trigger // connection prefetch. - bool shouldCreateNewConnection() const; + bool shouldCreateNewConnection(float global_prefetch_ratio) const; - float prefetchRatio() const; + float perUpstreamPrefetchRatio() const; const Upstream::HostConstSharedPtr host_; const Upstream::ResourcePriority priority_; diff --git a/source/common/http/conn_pool_base.h b/source/common/http/conn_pool_base.h index 939eed295b01..f24eea263f06 100644 --- a/source/common/http/conn_pool_base.h +++ b/source/common/http/conn_pool_base.h @@ -51,6 +51,9 @@ class HttpConnPoolImplBase : public Envoy::ConnectionPool::ConnPoolImplBase, Upstream::HostDescriptionConstSharedPtr host() const override { return host_; } ConnectionPool::Cancellable* newStream(Http::ResponseDecoder& response_decoder, Http::ConnectionPool::Callbacks& callbacks) override; + bool maybePrefetch(float ratio) override { + return Envoy::ConnectionPool::ConnPoolImplBase::maybePrefetch(ratio); + } bool hasActiveConnections() const override; // Creates a new PendingStream and enqueues it into the queue. diff --git a/source/common/tcp/conn_pool.h b/source/common/tcp/conn_pool.h index cf5ab3cea651..c75b28f59156 100644 --- a/source/common/tcp/conn_pool.h +++ b/source/common/tcp/conn_pool.h @@ -158,6 +158,9 @@ class ConnPoolImpl : public Envoy::ConnectionPool::ConnPoolImplBase, TcpAttachContext context(&callbacks); return Envoy::ConnectionPool::ConnPoolImplBase::newStream(context); } + bool maybePrefetch(float prefetch_ratio) override { + return Envoy::ConnectionPool::ConnPoolImplBase::maybePrefetch(prefetch_ratio); + } ConnectionPool::Cancellable* newPendingStream(Envoy::ConnectionPool::AttachContext& context) override { diff --git a/source/common/tcp/original_conn_pool.h b/source/common/tcp/original_conn_pool.h index 2c0af2d50680..e17a4bb2ac38 100644 --- a/source/common/tcp/original_conn_pool.h +++ b/source/common/tcp/original_conn_pool.h @@ -33,6 +33,8 @@ class OriginalConnPoolImpl : Logger::Loggable, public Connecti void drainConnections() override; void closeConnections() override; ConnectionPool::Cancellable* newConnection(ConnectionPool::Callbacks& callbacks) override; + // The old pool does not implement prefetching. + bool maybePrefetch(float) override { return false; } Upstream::HostDescriptionConstSharedPtr host() const override { return host_; } protected: diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index 56dda590fd41..e3f0a3dec1b6 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -822,6 +822,31 @@ ThreadLocalCluster* ClusterManagerImpl::get(absl::string_view cluster) { } } +void ClusterManagerImpl::maybePrefetch( + ThreadLocalClusterManagerImpl::ClusterEntryPtr& cluster_entry, + std::function pick_prefetch_pool) { + // TODO(alyssawilk) As currently implemented, this will always just prefetch + // one connection ahead of actually needed connections. + // + // Instead we want to track the following metrics across the entire connection + // pool and use the same algorithm we do for per-upstream prefetch: + // ((pending_streams_ + num_active_streams_) * global_prefetch_ratio > + // (connecting_stream_capacity_ + num_active_streams_))) + // and allow multiple prefetches per pick. + // Also cap prefetches such that + // num_unused_prefetch < num hosts + // since if we have more prefetches than hosts, we should consider kicking into + // per-upstream prefetch. + // + // Once we do this, this should loop capped number of times while shouldPrefetch is true. + if (cluster_entry->cluster_info_->peekaheadRatio() > 1.0) { + ConnectionPool::Instance* prefetch_pool = pick_prefetch_pool(); + if (prefetch_pool) { + prefetch_pool->maybePrefetch(cluster_entry->cluster_info_->peekaheadRatio()); + } + } +} + Http::ConnectionPool::Instance* ClusterManagerImpl::httpConnPoolForCluster(const std::string& cluster, ResourcePriority priority, absl::optional protocol, @@ -834,7 +859,19 @@ ClusterManagerImpl::httpConnPoolForCluster(const std::string& cluster, ResourceP } // Select a host and create a connection pool for it if it does not already exist. - return entry->second->connPool(priority, protocol, context); + auto ret = entry->second->connPool(priority, protocol, context, false); + + // Now see if another host should be prefetched. + // httpConnPoolForCluster is called immediately before a call for newStream. newStream doesn't + // have the load balancer context needed to make selection decisions so prefetching must be + // performed here in anticipation of the new stream. + // TODO(alyssawilk) refactor to have one function call and return a pair, so this invariant is + // code-enforced. + maybePrefetch(entry->second, [&entry, &priority, &protocol, &context]() { + return entry->second->connPool(priority, protocol, context, true); + }); + + return ret; } Tcp::ConnectionPool::Instance* @@ -848,7 +885,19 @@ ClusterManagerImpl::tcpConnPoolForCluster(const std::string& cluster, ResourcePr } // Select a host and create a connection pool for it if it does not already exist. - return entry->second->tcpConnPool(priority, context); + auto ret = entry->second->tcpConnPool(priority, context, false); + + // tcpConnPoolForCluster is called immediately before a call for newConnection. newConnection + // doesn't have the load balancer context needed to make selection decisions so prefetching must + // be performed here in anticipation of the new connection. + // TODO(alyssawilk) refactor to have one function call and return a pair, so this invariant is + // code-enforced. + // Now see if another host should be prefetched. + maybePrefetch(entry->second, [&entry, &priority, &context]() { + return entry->second->tcpConnPool(priority, context, true); + }); + + return ret; } void ClusterManagerImpl::postThreadLocalDrainConnections(const Cluster& cluster, @@ -1292,8 +1341,8 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::~ClusterEntry() Http::ConnectionPool::Instance* ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::connPool( ResourcePriority priority, absl::optional downstream_protocol, - LoadBalancerContext* context) { - HostConstSharedPtr host = lb_->chooseHost(context); + LoadBalancerContext* context, bool peek) { + HostConstSharedPtr host = (peek ? lb_->peekAnotherHost(context) : lb_->chooseHost(context)); if (!host) { ENVOY_LOG(debug, "no healthy host for HTTP connection pool"); cluster_info_->stats().upstream_cx_none_healthy_.inc(); @@ -1352,8 +1401,8 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::connPool( Tcp::ConnectionPool::Instance* ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::tcpConnPool( - ResourcePriority priority, LoadBalancerContext* context) { - HostConstSharedPtr host = lb_->chooseHost(context); + ResourcePriority priority, LoadBalancerContext* context, bool peek) { + HostConstSharedPtr host = (peek ? lb_->peekAnotherHost(context) : lb_->chooseHost(context)); if (!host) { ENVOY_LOG(debug, "no healthy host for TCP connection pool"); cluster_info_->stats().upstream_cx_none_healthy_.inc(); diff --git a/source/common/upstream/cluster_manager_impl.h b/source/common/upstream/cluster_manager_impl.h index c229395c1353..1196bd13db72 100644 --- a/source/common/upstream/cluster_manager_impl.h +++ b/source/common/upstream/cluster_manager_impl.h @@ -336,10 +336,10 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable downstream_protocol, - LoadBalancerContext* context); + LoadBalancerContext* context, bool peek); Tcp::ConnectionPool::Instance* tcpConnPool(ResourcePriority priority, - LoadBalancerContext* context); + LoadBalancerContext* context, bool peek); // Upstream::ThreadLocalCluster const PrioritySet& prioritySet() override { return priority_set_; } @@ -483,6 +483,8 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable prefetch_pool); ClusterManagerFactory& factory_; Runtime::Loader& runtime_; diff --git a/source/common/upstream/edf_scheduler.h b/source/common/upstream/edf_scheduler.h index fc135dfc1490..2f91d0e5cbed 100644 --- a/source/common/upstream/edf_scheduler.h +++ b/source/common/upstream/edf_scheduler.h @@ -1,6 +1,6 @@ #pragma once - #include +#include #include #include "common/common/assert.h" @@ -25,32 +25,47 @@ namespace Upstream { // weights and an O(log n) pick time. template class EdfScheduler { public: + // Each time peekAgain is called, it will return the best-effort subsequent + // pick, popping and reinserting the entry as if it had been picked, and + // inserting it into the pre-picked queue. + // The first time peekAgain is called, it will return the + // first item which will be picked, the second time it is called it will + // return the second item which will be picked. As picks occur, that window + // will shrink. + std::shared_ptr peekAgain(std::function calculate_weight) { + if (hasEntry()) { + prepick_list_.push_back(std::move(queue_.top().entry_)); + std::shared_ptr ret{prepick_list_.back()}; + add(calculate_weight(*ret), ret); + queue_.pop(); + return ret; + } + return nullptr; + } + /** - * Pick queue entry with closest deadline. - * @return std::shared_ptr to the queue entry if a valid entry exists in the queue, nullptr - * otherwise. The entry is removed from the queue. + * Pick queue entry with closest deadline and adds it back using the weight + * from calculate_weight. + * @return std::shared_ptr to next valid the queue entry if or nullptr if none exists. */ - std::shared_ptr pick() { - EDF_TRACE("Queue pick: queue_.size()={}, current_time_={}.", queue_.size(), current_time_); - while (true) { - if (queue_.empty()) { - EDF_TRACE("Queue is empty."); - return nullptr; - } - const EdfEntry& edf_entry = queue_.top(); - // Entry has been removed, let's see if there's another one. - if (edf_entry.entry_.expired()) { - EDF_TRACE("Entry has expired, repick."); - queue_.pop(); + std::shared_ptr pickAndAdd(std::function calculate_weight) { + while (!prepick_list_.empty()) { + // In this case the entry was added back during peekAgain so don't re-add. + if (prepick_list_.front().expired()) { + prepick_list_.pop_front(); continue; } - std::shared_ptr ret{edf_entry.entry_}; - ASSERT(edf_entry.deadline_ >= current_time_); - current_time_ = edf_entry.deadline_; + std::shared_ptr ret{prepick_list_.front()}; + prepick_list_.pop_front(); + return ret; + } + if (hasEntry()) { + std::shared_ptr ret{queue_.top().entry_}; queue_.pop(); - EDF_TRACE("Picked {}, current_time_={}.", static_cast(ret.get()), current_time_); + add(calculate_weight(*ret), ret); return ret; } + return nullptr; } /** @@ -74,6 +89,31 @@ template class EdfScheduler { bool empty() const { return queue_.empty(); } private: + /** + * Clears expired entries, and returns true if there's still entries in the queue. + */ + bool hasEntry() { + EDF_TRACE("Queue pick: queue_.size()={}, current_time_={}.", queue_.size(), current_time_); + while (true) { + if (queue_.empty()) { + EDF_TRACE("Queue is empty."); + return false; + } + const EdfEntry& edf_entry = queue_.top(); + // Entry has been removed, let's see if there's another one. + if (edf_entry.entry_.expired()) { + EDF_TRACE("Entry has expired, repick."); + queue_.pop(); + continue; + } + std::shared_ptr ret{edf_entry.entry_}; + ASSERT(edf_entry.deadline_ >= current_time_); + current_time_ = edf_entry.deadline_; + EDF_TRACE("Picked {}, current_time_={}.", static_cast(ret.get()), current_time_); + return true; + } + } + struct EdfEntry { double deadline_; // Tie breaker for entries with the same deadline. This is used to provide FIFO behavior. @@ -98,6 +138,7 @@ template class EdfScheduler { uint64_t order_offset_{}; // Min priority queue for EDF. std::priority_queue queue_; + std::list> prepick_list_; }; #undef EDF_DEBUG diff --git a/source/common/upstream/load_balancer_impl.cc b/source/common/upstream/load_balancer_impl.cc index b7b2435ec266..f12085eea88d 100644 --- a/source/common/upstream/load_balancer_impl.cc +++ b/source/common/upstream/load_balancer_impl.cc @@ -123,6 +123,7 @@ LoadBalancerBase::LoadBalancerBase( [this](uint32_t priority, const HostVector&, const HostVector&) -> void { UNREFERENCED_PARAMETER(priority); recalculatePerPriorityPanic(); + stashed_random_.clear(); }); } @@ -321,21 +322,18 @@ void LoadBalancerBase::recalculateLoadInTotalPanic() { } std::pair -LoadBalancerBase::chooseHostSet(LoadBalancerContext* context) { +LoadBalancerBase::chooseHostSet(LoadBalancerContext* context, uint64_t hash) const { if (context) { const auto priority_loads = context->determinePriorityLoad( priority_set_, per_priority_load_, Upstream::RetryPriority::defaultPriorityMapping); - - const auto priority_and_source = - choosePriority(random_.random(), priority_loads.healthy_priority_load_, - priority_loads.degraded_priority_load_); + const auto priority_and_source = choosePriority(hash, priority_loads.healthy_priority_load_, + priority_loads.degraded_priority_load_); return {*priority_set_.hostSetsPerPriority()[priority_and_source.first], priority_and_source.second}; } - const auto priority_and_source = - choosePriority(random_.random(), per_priority_load_.healthy_priority_load_, - per_priority_load_.degraded_priority_load_); + const auto priority_and_source = choosePriority(hash, per_priority_load_.healthy_priority_load_, + per_priority_load_.degraded_priority_load_); return {*priority_set_.hostSetsPerPriority()[priority_and_source.first], priority_and_source.second}; } @@ -525,7 +523,7 @@ HostConstSharedPtr LoadBalancerBase::chooseHost(LoadBalancerContext* context) { return host; } -bool LoadBalancerBase::isHostSetInPanic(const HostSet& host_set) { +bool LoadBalancerBase::isHostSetInPanic(const HostSet& host_set) const { uint64_t global_panic_threshold = std::min( 100, runtime_.snapshot().getInteger(RuntimePanicThreshold, default_healthy_panic_percent_)); const auto host_count = host_set.hosts().size() - host_set.excludedHosts().size(); @@ -557,7 +555,7 @@ void ZoneAwareLoadBalancerBase::calculateLocalityPercentage( } } -uint32_t ZoneAwareLoadBalancerBase::tryChooseLocalLocalityHosts(const HostSet& host_set) { +uint32_t ZoneAwareLoadBalancerBase::tryChooseLocalLocalityHosts(const HostSet& host_set) const { PerPriorityState& state = *per_priority_state_[host_set.priority()]; ASSERT(state.locality_routing_state_ != LocalityRoutingState::NoLocalityRouting); @@ -608,8 +606,8 @@ uint32_t ZoneAwareLoadBalancerBase::tryChooseLocalLocalityHosts(const HostSet& h } absl::optional -ZoneAwareLoadBalancerBase::hostSourceToUse(LoadBalancerContext* context) { - auto host_set_and_source = chooseHostSet(context); +ZoneAwareLoadBalancerBase::hostSourceToUse(LoadBalancerContext* context, uint64_t hash) const { + auto host_set_and_source = chooseHostSet(context, hash); // The second argument tells us which availability we should target from the selected host set. const auto host_availability = host_set_and_source.second; @@ -674,7 +672,7 @@ ZoneAwareLoadBalancerBase::hostSourceToUse(LoadBalancerContext* context) { return hosts_source; } -const HostVector& ZoneAwareLoadBalancerBase::hostSourceToHosts(HostsSource hosts_source) { +const HostVector& ZoneAwareLoadBalancerBase::hostSourceToHosts(HostsSource hosts_source) const { const HostSet& host_set = *priority_set_.hostSetsPerPriority()[hosts_source.priority_]; switch (hosts_source.source_type_) { case HostsSource::SourceType::AllHosts: @@ -748,8 +746,8 @@ void EdfLoadBalancerBase::refresh(uint32_t priority) { // refreshes for the weighted case. if (!hosts.empty()) { for (uint32_t i = 0; i < seed_ % hosts.size(); ++i) { - auto host = scheduler.edf_->pick(); - scheduler.edf_->add(hostWeight(*host), host); + auto host = + scheduler.edf_->pickAndAdd([this](const Host& host) { return hostWeight(host); }); } } }; @@ -775,8 +773,8 @@ void EdfLoadBalancerBase::refresh(uint32_t priority) { } } -HostConstSharedPtr EdfLoadBalancerBase::chooseHostOnce(LoadBalancerContext* context) { - const absl::optional hosts_source = hostSourceToUse(context); +HostConstSharedPtr EdfLoadBalancerBase::peekAnotherHost(LoadBalancerContext* context) { + const absl::optional hosts_source = hostSourceToUse(context, random(true)); if (!hosts_source) { return nullptr; } @@ -791,10 +789,33 @@ HostConstSharedPtr EdfLoadBalancerBase::chooseHostOnce(LoadBalancerContext* cont // whether to use EDF or do unweighted (fast) selection. EDF is non-null iff the original weights // of 2 or more hosts differ. if (scheduler.edf_ != nullptr) { - auto host = scheduler.edf_->pick(); - if (host != nullptr) { - scheduler.edf_->add(hostWeight(*host), host); + return scheduler.edf_->peekAgain([this](const Host& host) { return hostWeight(host); }); + } else { + const HostVector& hosts_to_use = hostSourceToHosts(*hosts_source); + if (hosts_to_use.empty()) { + return nullptr; } + return unweightedHostPeek(hosts_to_use, *hosts_source); + } +} + +HostConstSharedPtr EdfLoadBalancerBase::chooseHostOnce(LoadBalancerContext* context) { + const absl::optional hosts_source = hostSourceToUse(context, random(false)); + if (!hosts_source) { + return nullptr; + } + auto scheduler_it = scheduler_.find(*hosts_source); + // We should always have a scheduler for any return value from + // hostSourceToUse() via the construction in refresh(); + ASSERT(scheduler_it != scheduler_.end()); + auto& scheduler = scheduler_it->second; + + // As has been commented in both EdfLoadBalancerBase::refresh and + // BaseDynamicClusterImpl::updateDynamicHostList, we must do a runtime pivot here to determine + // whether to use EDF or do unweighted (fast) selection. EDF is non-null iff the original weights + // of 2 or more hosts differ. + if (scheduler.edf_ != nullptr) { + auto host = scheduler.edf_->pickAndAdd([this](const Host& host) { return hostWeight(host); }); return host; } else { const HostVector& hosts_to_use = hostSourceToHosts(*hosts_source); @@ -805,6 +826,14 @@ HostConstSharedPtr EdfLoadBalancerBase::chooseHostOnce(LoadBalancerContext* cont } } +HostConstSharedPtr LeastRequestLoadBalancer::unweightedHostPeek(const HostVector&, + const HostsSource&) { + // LeastRequestLoadBalancer can not do deterministic prefetching, because + // any other thread might select the least-requested-host between prefetch and + // host-pick, and change the rq_active checks. + return nullptr; +} + HostConstSharedPtr LeastRequestLoadBalancer::unweightedHostPick(const HostVector& hosts_to_use, const HostsSource&) { HostSharedPtr candidate_host = nullptr; @@ -828,8 +857,17 @@ HostConstSharedPtr LeastRequestLoadBalancer::unweightedHostPick(const HostVector return candidate_host; } +HostConstSharedPtr RandomLoadBalancer::peekAnotherHost(LoadBalancerContext* context) { + return peekOrChoose(context, true); +} + HostConstSharedPtr RandomLoadBalancer::chooseHostOnce(LoadBalancerContext* context) { - const absl::optional hosts_source = hostSourceToUse(context); + return peekOrChoose(context, false); +} + +HostConstSharedPtr RandomLoadBalancer::peekOrChoose(LoadBalancerContext* context, bool peek) { + uint64_t random_hash = random(peek); + const absl::optional hosts_source = hostSourceToUse(context, random_hash); if (!hosts_source) { return nullptr; } @@ -839,7 +877,7 @@ HostConstSharedPtr RandomLoadBalancer::chooseHostOnce(LoadBalancerContext* conte return nullptr; } - return hosts_to_use[random_.random() % hosts_to_use.size()]; + return hosts_to_use[random_hash % hosts_to_use.size()]; } SubsetSelectorImpl::SubsetSelectorImpl( diff --git a/source/common/upstream/load_balancer_impl.h b/source/common/upstream/load_balancer_impl.h index 11da8b6282be..fab367067255 100644 --- a/source/common/upstream/load_balancer_impl.h +++ b/source/common/upstream/load_balancer_impl.h @@ -62,7 +62,7 @@ class LoadBalancerBase : public LoadBalancer { * majority of hosts are unhealthy we'll be likely in a panic mode. In this case we'll route * requests to hosts regardless of whether they are healthy or not. */ - bool isHostSetInPanic(const HostSet& host_set); + bool isHostSetInPanic(const HostSet& host_set) const; /** * Method is called when all host sets are in panic mode. @@ -81,7 +81,8 @@ class LoadBalancerBase : public LoadBalancer { // degraded_per_priority_load_, only degraded hosts should be selected from that host set. // // @return host set to use and which availability to target. - std::pair chooseHostSet(LoadBalancerContext* context); + std::pair chooseHostSet(LoadBalancerContext* context, + uint64_t hash) const; uint32_t percentageLoad(uint32_t priority) const { return per_priority_load_.healthy_priority_load_.get()[priority]; @@ -90,9 +91,24 @@ class LoadBalancerBase : public LoadBalancer { return per_priority_load_.degraded_priority_load_.get()[priority]; } bool isInPanic(uint32_t priority) const { return per_priority_panic_[priority]; } + uint64_t random(bool peeking) { + if (peeking) { + stashed_random_.push_back(random_.random()); + return stashed_random_.back(); + } else { + if (!stashed_random_.empty()) { + auto random = stashed_random_.front(); + stashed_random_.pop_front(); + return random; + } else { + return random_.random(); + } + } + } ClusterStats& stats_; Runtime::Loader& runtime_; + std::deque stashed_random_; Random::RandomGenerator& random_; const uint32_t default_healthy_panic_percent_; // The priority-ordered set of hosts to use for load balancing. @@ -240,12 +256,12 @@ class ZoneAwareLoadBalancerBase : public LoadBalancerBase { * Pick the host source to use, doing zone aware routing when the hosts are sufficiently healthy. * If no host is chosen (due to fail_traffic_on_panic being set), return absl::nullopt. */ - absl::optional hostSourceToUse(LoadBalancerContext* context); + absl::optional hostSourceToUse(LoadBalancerContext* context, uint64_t hash) const; /** * Index into priority_set via hosts source descriptor. */ - const HostVector& hostSourceToHosts(HostsSource hosts_source); + const HostVector& hostSourceToHosts(HostsSource hosts_source) const; private: enum class LocalityRoutingState { @@ -273,7 +289,7 @@ class ZoneAwareLoadBalancerBase : public LoadBalancerBase { * Try to select upstream hosts from the same locality. * @param host_set the last host set returned by chooseHostSet() */ - uint32_t tryChooseLocalLocalityHosts(const HostSet& host_set); + uint32_t tryChooseLocalLocalityHosts(const HostSet& host_set) const; /** * @return (number of hosts in a given locality)/(total number of hosts) in `ret` param. @@ -359,6 +375,7 @@ class EdfLoadBalancerBase : public ZoneAwareLoadBalancerBase { const envoy::config::cluster::v3::Cluster::CommonLbConfig& common_config); // Upstream::LoadBalancerBase + HostConstSharedPtr peekAnotherHost(LoadBalancerContext* context) override; HostConstSharedPtr chooseHostOnce(LoadBalancerContext* context) override; protected: @@ -383,6 +400,8 @@ class EdfLoadBalancerBase : public ZoneAwareLoadBalancerBase { private: virtual void refreshHostSource(const HostsSource& source) PURE; virtual double hostWeight(const Host& host) PURE; + virtual HostConstSharedPtr unweightedHostPeek(const HostVector& hosts_to_use, + const HostsSource& source) PURE; virtual HostConstSharedPtr unweightedHostPick(const HostVector& hosts_to_use, const HostsSource& source) PURE; @@ -411,10 +430,25 @@ class RoundRobinLoadBalancer : public EdfLoadBalancerBase { // already exists. Note that host sources will never be removed, but given how uncommon this // is it probably doesn't matter. rr_indexes_.insert({source, seed_}); + // If the list of hosts changes, the order of picks change. Discard the + // index. + peekahead_index_ = 0; } double hostWeight(const Host& host) override { return host.weight(); } + HostConstSharedPtr unweightedHostPeek(const HostVector& hosts_to_use, + const HostsSource& source) override { + auto i = rr_indexes_.find(source); + if (i == rr_indexes_.end()) { + return nullptr; + } + return hosts_to_use[(i->second + (peekahead_index_)++) % hosts_to_use.size()]; + } + HostConstSharedPtr unweightedHostPick(const HostVector& hosts_to_use, const HostsSource& source) override { + if (peekahead_index_ > 0) { + --peekahead_index_; + } // To avoid storing the RR index in the base class, we end up using a second map here with // host source as the key. This means that each LB decision will require two map lookups in // the unweighted case. We might consider trying to optimize this in the future. @@ -422,6 +456,7 @@ class RoundRobinLoadBalancer : public EdfLoadBalancerBase { return hosts_to_use[rr_indexes_[source]++ % hosts_to_use.size()]; } + uint64_t peekahead_index_{}; absl::node_hash_map rr_indexes_; }; @@ -510,6 +545,8 @@ class LeastRequestLoadBalancer : public EdfLoadBalancerBase, return static_cast(host.weight()) / std::pow(host.stats().rq_active_.value() + 1, active_request_bias_); } + HostConstSharedPtr unweightedHostPeek(const HostVector& hosts_to_use, + const HostsSource& source) override; HostConstSharedPtr unweightedHostPick(const HostVector& hosts_to_use, const HostsSource& source) override; @@ -536,6 +573,10 @@ class RandomLoadBalancer : public ZoneAwareLoadBalancerBase { // Upstream::LoadBalancerBase HostConstSharedPtr chooseHostOnce(LoadBalancerContext* context) override; + HostConstSharedPtr peekAnotherHost(LoadBalancerContext* context) override; + +protected: + HostConstSharedPtr peekOrChoose(LoadBalancerContext* context, bool peek); }; /** diff --git a/source/common/upstream/original_dst_cluster.h b/source/common/upstream/original_dst_cluster.h index 14970a46094a..a5e6b6e96cef 100644 --- a/source/common/upstream/original_dst_cluster.h +++ b/source/common/upstream/original_dst_cluster.h @@ -56,6 +56,8 @@ class OriginalDstCluster : public ClusterImplBase { // Upstream::LoadBalancer HostConstSharedPtr chooseHost(LoadBalancerContext* context) override; + // Prefetching is not implemented for OriginalDstCluster + HostConstSharedPtr peekAnotherHost(LoadBalancerContext*) override { return nullptr; } private: Network::Address::InstanceConstSharedPtr requestOverrideHost(LoadBalancerContext* context); diff --git a/source/common/upstream/subset_lb.h b/source/common/upstream/subset_lb.h index 0681d4526d26..d6a300cb318a 100644 --- a/source/common/upstream/subset_lb.h +++ b/source/common/upstream/subset_lb.h @@ -37,6 +37,8 @@ class SubsetLoadBalancer : public LoadBalancer, Logger::Loggable; diff --git a/source/common/upstream/thread_aware_lb_impl.h b/source/common/upstream/thread_aware_lb_impl.h index e36554fcff86..8c02cd54284f 100644 --- a/source/common/upstream/thread_aware_lb_impl.h +++ b/source/common/upstream/thread_aware_lb_impl.h @@ -73,6 +73,8 @@ class ThreadAwareLoadBalancerBase : public LoadBalancerBase, public ThreadAwareL HostConstSharedPtr chooseHostOnce(LoadBalancerContext*) override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; } + // Prefetch not implemented for hash based load balancing + HostConstSharedPtr peekAnotherHost(LoadBalancerContext*) override { return nullptr; } protected: ThreadAwareLoadBalancerBase( @@ -95,6 +97,8 @@ class ThreadAwareLoadBalancerBase : public LoadBalancerBase, public ThreadAwareL // Upstream::LoadBalancer HostConstSharedPtr chooseHost(LoadBalancerContext* context) override; + // Prefetch not implemented for hash based load balancing + HostConstSharedPtr peekAnotherHost(LoadBalancerContext*) override { return nullptr; } ClusterStats& stats_; Random::RandomGenerator& random_; diff --git a/source/common/upstream/upstream_impl.cc b/source/common/upstream/upstream_impl.cc index 250d6689a64b..e7a5c129a06b 100644 --- a/source/common/upstream/upstream_impl.cc +++ b/source/common/upstream/upstream_impl.cc @@ -479,12 +479,12 @@ HostSetImpl::chooseLocality(EdfScheduler* locality_scheduler) { if (locality_scheduler == nullptr) { return {}; } - const std::shared_ptr locality = locality_scheduler->pick(); + const std::shared_ptr locality = locality_scheduler->pickAndAdd( + [](const LocalityEntry& locality) { return locality.effective_weight_; }); // We don't build a schedule if there are no weighted localities, so we should always succeed. ASSERT(locality != nullptr); // If we picked it before, its weight must have been positive. ASSERT(locality->effective_weight_ > 0); - locality_scheduler->add(locality->effective_weight_, locality); return locality->index_; } @@ -688,8 +688,10 @@ ClusterInfoImpl::ClusterInfoImpl( Http::DEFAULT_MAX_HEADERS_COUNT))), connect_timeout_( std::chrono::milliseconds(PROTOBUF_GET_MS_REQUIRED(config, connect_timeout))), - prefetch_ratio_( - PROTOBUF_GET_WRAPPED_OR_DEFAULT(config.prefetch_policy(), prefetch_ratio, 1.0)), + per_upstream_prefetch_ratio_(PROTOBUF_GET_WRAPPED_OR_DEFAULT( + config.prefetch_policy(), per_upstream_prefetch_ratio, 1.0)), + peekahead_ratio_( + PROTOBUF_GET_WRAPPED_OR_DEFAULT(config.prefetch_policy(), predictive_prefetch_ratio, 0)), per_connection_buffer_limit_bytes_( PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, per_connection_buffer_limit_bytes, 1024 * 1024)), socket_matcher_(std::move(socket_matcher)), stats_scope_(std::move(stats_scope)), diff --git a/source/common/upstream/upstream_impl.h b/source/common/upstream/upstream_impl.h index e905d8068d80..c74e489384f0 100644 --- a/source/common/upstream/upstream_impl.h +++ b/source/common/upstream/upstream_impl.h @@ -535,7 +535,8 @@ class ClusterInfoImpl : public ClusterInfo, protected Logger::Loggable idleTimeout() const override { return idle_timeout_; } - float prefetchRatio() const override { return prefetch_ratio_; } + float perUpstreamPrefetchRatio() const override { return per_upstream_prefetch_ratio_; } + float peekaheadRatio() const override { return peekahead_ratio_; } uint32_t perConnectionBufferLimitBytes() const override { return per_connection_buffer_limit_bytes_; } @@ -662,7 +663,8 @@ class ClusterInfoImpl : public ClusterInfo, protected Logger::Loggable idle_timeout_; - const float prefetch_ratio_; + const float per_upstream_prefetch_ratio_; + const float peekahead_ratio_; const uint32_t per_connection_buffer_limit_bytes_; TransportSocketMatcherPtr socket_matcher_; Stats::ScopePtr stats_scope_; diff --git a/source/extensions/clusters/aggregate/cluster.h b/source/extensions/clusters/aggregate/cluster.h index 417a8e8de156..92adfe68f187 100644 --- a/source/extensions/clusters/aggregate/cluster.h +++ b/source/extensions/clusters/aggregate/cluster.h @@ -77,6 +77,10 @@ class AggregateClusterLoadBalancer : public Upstream::LoadBalancer { // Upstream::LoadBalancer Upstream::HostConstSharedPtr chooseHost(Upstream::LoadBalancerContext* context) override; + // Prefetching not yet implemented for extensions. + Upstream::HostConstSharedPtr peekAnotherHost(Upstream::LoadBalancerContext*) override { + return nullptr; + } private: // Use inner class to extend LoadBalancerBase. When initializing AggregateClusterLoadBalancer, the @@ -92,6 +96,10 @@ class AggregateClusterLoadBalancer : public Upstream::LoadBalancer { // Upstream::LoadBalancer Upstream::HostConstSharedPtr chooseHost(Upstream::LoadBalancerContext* context) override; + // Prefetching not yet implemented for extensions. + Upstream::HostConstSharedPtr peekAnotherHost(Upstream::LoadBalancerContext*) override { + return nullptr; + } // Upstream::LoadBalancerBase Upstream::HostConstSharedPtr chooseHostOnce(Upstream::LoadBalancerContext*) override { diff --git a/source/extensions/clusters/dynamic_forward_proxy/cluster.h b/source/extensions/clusters/dynamic_forward_proxy/cluster.h index 7354c60de168..a34b8d6b1871 100644 --- a/source/extensions/clusters/dynamic_forward_proxy/cluster.h +++ b/source/extensions/clusters/dynamic_forward_proxy/cluster.h @@ -59,6 +59,10 @@ class Cluster : public Upstream::BaseDynamicClusterImpl, // Upstream::LoadBalancer Upstream::HostConstSharedPtr chooseHost(Upstream::LoadBalancerContext* context) override; + // Prefetching not implemented. + Upstream::HostConstSharedPtr peekAnotherHost(Upstream::LoadBalancerContext*) override { + return nullptr; + } const HostInfoMapSharedPtr host_map_; }; diff --git a/source/extensions/clusters/redis/redis_cluster_lb.h b/source/extensions/clusters/redis/redis_cluster_lb.h index 0c5142a8290a..561de3b681e5 100644 --- a/source/extensions/clusters/redis/redis_cluster_lb.h +++ b/source/extensions/clusters/redis/redis_cluster_lb.h @@ -189,6 +189,9 @@ class RedisClusterLoadBalancerFactory : public ClusterSlotUpdateCallBack, // Upstream::LoadBalancerBase Upstream::HostConstSharedPtr chooseHost(Upstream::LoadBalancerContext*) override; + Upstream::HostConstSharedPtr peekAnotherHost(Upstream::LoadBalancerContext*) override { + return nullptr; + } private: const SlotArraySharedPtr slot_array_; diff --git a/test/common/conn_pool/conn_pool_base_test.cc b/test/common/conn_pool/conn_pool_base_test.cc index 282e66c18612..bf2b1946967c 100644 --- a/test/common/conn_pool/conn_pool_base_test.cc +++ b/test/common/conn_pool/conn_pool_base_test.cc @@ -11,6 +11,7 @@ namespace Envoy { namespace ConnectionPool { +using testing::AnyNumber; using testing::InvokeWithoutArgs; using testing::Return; @@ -75,7 +76,7 @@ class ConnPoolImplBaseTest : public testing::Test { TEST_F(ConnPoolImplBaseTest, BasicPrefetch) { // Create more than one connection per new stream. - ON_CALL(*cluster_, prefetchRatio).WillByDefault(Return(1.5)); + ON_CALL(*cluster_, perUpstreamPrefetchRatio).WillByDefault(Return(1.5)); // On new stream, create 2 connections. EXPECT_CALL(pool_, instantiateActiveClient).Times(2); @@ -89,7 +90,7 @@ TEST_F(ConnPoolImplBaseTest, PrefetchOnDisconnect) { testing::InSequence s; // Create more than one connection per new stream. - ON_CALL(*cluster_, prefetchRatio).WillByDefault(Return(1.5)); + ON_CALL(*cluster_, perUpstreamPrefetchRatio).WillByDefault(Return(1.5)); // On new stream, create 2 connections. EXPECT_CALL(pool_, instantiateActiveClient).Times(2); @@ -109,7 +110,7 @@ TEST_F(ConnPoolImplBaseTest, PrefetchOnDisconnect) { TEST_F(ConnPoolImplBaseTest, NoPrefetchIfUnhealthy) { // Create more than one connection per new stream. - ON_CALL(*cluster_, prefetchRatio).WillByDefault(Return(1.5)); + ON_CALL(*cluster_, perUpstreamPrefetchRatio).WillByDefault(Return(1.5)); host_->healthFlagSet(Upstream::Host::HealthFlag::FAILED_ACTIVE_HC); EXPECT_EQ(host_->health(), Upstream::Host::Health::Unhealthy); @@ -124,7 +125,7 @@ TEST_F(ConnPoolImplBaseTest, NoPrefetchIfUnhealthy) { TEST_F(ConnPoolImplBaseTest, NoPrefetchIfDegraded) { // Create more than one connection per new stream. - ON_CALL(*cluster_, prefetchRatio).WillByDefault(Return(1.5)); + ON_CALL(*cluster_, perUpstreamPrefetchRatio).WillByDefault(Return(1.5)); EXPECT_EQ(host_->health(), Upstream::Host::Health::Healthy); host_->healthFlagSet(Upstream::Host::HealthFlag::DEGRADED_EDS_HEALTH); @@ -138,5 +139,33 @@ TEST_F(ConnPoolImplBaseTest, NoPrefetchIfDegraded) { pool_.destructAllConnections(); } +TEST_F(ConnPoolImplBaseTest, ExplicitPrefetch) { + // Create more than one connection per new stream. + ON_CALL(*cluster_, perUpstreamPrefetchRatio).WillByDefault(Return(1.5)); + EXPECT_CALL(pool_, instantiateActiveClient).Times(AnyNumber()); + + // With global prefetch off, we won't prefetch. + EXPECT_FALSE(pool_.maybePrefetch(0)); + // With prefetch ratio of 1.1, we'll prefetch two connections. + // Currently, no number of subsequent calls to prefetch will increase that. + EXPECT_TRUE(pool_.maybePrefetch(1.1)); + EXPECT_TRUE(pool_.maybePrefetch(1.1)); + EXPECT_FALSE(pool_.maybePrefetch(1.1)); + + // With a higher prefetch ratio, more connections may be prefetched. + EXPECT_TRUE(pool_.maybePrefetch(3)); + + pool_.destructAllConnections(); +} + +TEST_F(ConnPoolImplBaseTest, ExplicitPrefetchNotHealthy) { + // Create more than one connection per new stream. + ON_CALL(*cluster_, perUpstreamPrefetchRatio).WillByDefault(Return(1.5)); + + // Prefetch won't occur if the host is not healthy. + host_->healthFlagSet(Upstream::Host::HealthFlag::DEGRADED_EDS_HEALTH); + EXPECT_FALSE(pool_.maybePrefetch(1)); +} + } // namespace ConnectionPool } // namespace Envoy diff --git a/test/common/http/http2/conn_pool_test.cc b/test/common/http/http2/conn_pool_test.cc index 6e23074f691a..409c3179a955 100644 --- a/test/common/http/http2/conn_pool_test.cc +++ b/test/common/http/http2/conn_pool_test.cc @@ -1318,7 +1318,7 @@ TEST_F(Http2ConnPoolImplTest, DrainedConnectionsNotActive) { TEST_F(Http2ConnPoolImplTest, PrefetchWithoutMultiplexing) { cluster_->http2_options_.mutable_max_concurrent_streams()->set_value(1); - ON_CALL(*cluster_, prefetchRatio).WillByDefault(Return(1.5)); + ON_CALL(*cluster_, perUpstreamPrefetchRatio).WillByDefault(Return(1.5)); // With one request per connection, and prefetch 1.5, the first request will // kick off 2 connections. @@ -1348,7 +1348,7 @@ TEST_F(Http2ConnPoolImplTest, PrefetchOff) { Runtime::LoaderSingleton::getExisting()->mergeValues( {{"envoy.reloadable_features.allow_prefetch", "false"}}); cluster_->http2_options_.mutable_max_concurrent_streams()->set_value(1); - ON_CALL(*cluster_, prefetchRatio).WillByDefault(Return(1.5)); + ON_CALL(*cluster_, perUpstreamPrefetchRatio).WillByDefault(Return(1.5)); // Despite the prefetch ratio, no prefetch will happen due to the runtime // disable. @@ -1363,7 +1363,7 @@ TEST_F(Http2ConnPoolImplTest, PrefetchOff) { TEST_F(Http2ConnPoolImplTest, PrefetchWithMultiplexing) { cluster_->http2_options_.mutable_max_concurrent_streams()->set_value(2); - ON_CALL(*cluster_, prefetchRatio).WillByDefault(Return(1.5)); + ON_CALL(*cluster_, perUpstreamPrefetchRatio).WillByDefault(Return(1.5)); // With two requests per connection, and prefetch 1.5, the first request will // only kick off 1 connection. @@ -1384,7 +1384,7 @@ TEST_F(Http2ConnPoolImplTest, PrefetchWithMultiplexing) { TEST_F(Http2ConnPoolImplTest, PrefetchEvenWhenReady) { cluster_->http2_options_.mutable_max_concurrent_streams()->set_value(1); - ON_CALL(*cluster_, prefetchRatio).WillByDefault(Return(1.5)); + ON_CALL(*cluster_, perUpstreamPrefetchRatio).WillByDefault(Return(1.5)); // With one request per connection, and prefetch 1.5, the first request will // kick off 2 connections. @@ -1410,7 +1410,7 @@ TEST_F(Http2ConnPoolImplTest, PrefetchEvenWhenReady) { TEST_F(Http2ConnPoolImplTest, PrefetchAfterTimeout) { cluster_->http2_options_.mutable_max_concurrent_streams()->set_value(1); - ON_CALL(*cluster_, prefetchRatio).WillByDefault(Return(1.5)); + ON_CALL(*cluster_, perUpstreamPrefetchRatio).WillByDefault(Return(1.5)); expectClientsCreate(2); ActiveTestRequest r1(*this, 0, false); @@ -1431,7 +1431,7 @@ TEST_F(Http2ConnPoolImplTest, PrefetchAfterTimeout) { TEST_F(Http2ConnPoolImplTest, CloseExcessWithPrefetch) { cluster_->http2_options_.mutable_max_concurrent_streams()->set_value(1); - ON_CALL(*cluster_, prefetchRatio).WillByDefault(Return(1.00)); + ON_CALL(*cluster_, perUpstreamPrefetchRatio).WillByDefault(Return(1.00)); // First request prefetches an additional connection. expectClientsCreate(1); @@ -1442,7 +1442,7 @@ TEST_F(Http2ConnPoolImplTest, CloseExcessWithPrefetch) { ActiveTestRequest r2(*this, 0, false); // Change the prefetch ratio to force the connection to no longer be excess. - ON_CALL(*cluster_, prefetchRatio).WillByDefault(Return(2)); + ON_CALL(*cluster_, perUpstreamPrefetchRatio).WillByDefault(Return(2)); // Closing off the second request should bring us back to 1 request in queue, // desired capacity 2, so will not close the connection. EXPECT_CALL(*this, onClientDestroy()).Times(0); @@ -1454,6 +1454,19 @@ TEST_F(Http2ConnPoolImplTest, CloseExcessWithPrefetch) { closeAllClients(); } +// Test that maybePrefetch is passed up to the base class implementation. +TEST_F(Http2ConnPoolImplTest, MaybePrefetch) { + ON_CALL(*cluster_, perUpstreamPrefetchRatio).WillByDefault(Return(1.5)); + + EXPECT_FALSE(pool_->maybePrefetch(0)); + + expectClientsCreate(1); + EXPECT_TRUE(pool_->maybePrefetch(2)); + + pool_->drainConnections(); + closeAllClients(); +} + } // namespace Http2 } // namespace Http } // namespace Envoy diff --git a/test/common/tcp/conn_pool_test.cc b/test/common/tcp/conn_pool_test.cc index ff5a678d93e4..838cc9a64035 100644 --- a/test/common/tcp/conn_pool_test.cc +++ b/test/common/tcp/conn_pool_test.cc @@ -87,6 +87,13 @@ class ConnPoolBase : public Tcp::ConnectionPool::Instance { MOCK_METHOD(void, onConnReleasedForTest, ()); MOCK_METHOD(void, onConnDestroyedForTest, ()); + bool maybePrefetch(float ratio) override { + if (!test_new_connection_pool_) { + return false; + } + ASSERT(dynamic_cast(conn_pool_.get()) != nullptr); + return dynamic_cast(conn_pool_.get())->maybePrefetch(ratio); + } struct TestConnection { Network::MockClientConnection* connection_; @@ -1041,6 +1048,19 @@ TEST_P(TcpConnPoolImplTest, RequestCapacity) { conn_pool_.test_conns_[2].connection_->raiseEvent(Network::ConnectionEvent::RemoteClose); } +// Test that maybePrefetch is passed up to the base class implementation. +TEST_P(TcpConnPoolImplTest, TestPrefetch) { + if (!test_new_connection_pool_) { + return; + } + EXPECT_FALSE(conn_pool_.maybePrefetch(0)); + + conn_pool_.expectConnCreate(); + ASSERT_TRUE(conn_pool_.maybePrefetch(2)); + + conn_pool_.test_conns_[0].connection_->raiseEvent(Network::ConnectionEvent::RemoteClose); +} + /** * Test that pending connections are closed when the connection pool is destroyed. */ @@ -1084,6 +1104,7 @@ TEST_P(TcpConnPoolImplDestructorTest, TestReadyConnectionsAreClosed) { EXPECT_CALL(dispatcher_, clearDeferredDeleteList()); conn_pool_.reset(); } + INSTANTIATE_TEST_SUITE_P(ConnectionPools, TcpConnPoolImplTest, testing::Bool()); INSTANTIATE_TEST_SUITE_P(ConnectionPools, TcpConnPoolImplDestructorTest, testing::Bool()); diff --git a/test/common/upstream/cluster_manager_impl_test.cc b/test/common/upstream/cluster_manager_impl_test.cc index 442a4cb5edb1..b0a51582b784 100644 --- a/test/common/upstream/cluster_manager_impl_test.cc +++ b/test/common/upstream/cluster_manager_impl_test.cc @@ -3959,7 +3959,93 @@ TEST_F(ClusterManagerImplTest, ConnectionPoolPerDownstreamConnection) { EXPECT_EQ(conn_pool_vector.front(), cluster_manager_->httpConnPoolForCluster("cluster_1", ResourcePriority::Default, Http::Protocol::Http11, &lb_context)); -} // namespace +} + +class PrefetchTest : public ClusterManagerImplTest { +public: + void initialize(float ratio) { + const std::string yaml = R"EOF( + static_resources: + clusters: + - name: cluster_1 + connect_timeout: 0.250s + lb_policy: ROUND_ROBIN + type: STATIC + )EOF"; + + ReadyWatcher initialized; + EXPECT_CALL(initialized, ready()); + envoy::config::bootstrap::v3::Bootstrap config = parseBootstrapFromV3Yaml(yaml); + if (ratio != 0) { + config.mutable_static_resources() + ->mutable_clusters(0) + ->mutable_prefetch_policy() + ->mutable_predictive_prefetch_ratio() + ->set_value(ratio); + } + create(config); + + // Set up for an initialize callback. + cluster_manager_->setInitializedCb([&]() -> void { initialized.ready(); }); + + std::unique_ptr callbacks( + new NiceMock()); + ClusterUpdateCallbacksHandlePtr cb = + cluster_manager_->addThreadLocalClusterUpdateCallbacks(*callbacks); + + cluster_ = &cluster_manager_->activeClusters().begin()->second.get(); + + // Set up the HostSet. + host1_ = makeTestHost(cluster_->info(), "tcp://127.0.0.1:80"); + host2_ = makeTestHost(cluster_->info(), "tcp://127.0.0.1:80"); + + HostVector hosts{host1_, host2_}; + auto hosts_ptr = std::make_shared(hosts); + + // Sending non-mergeable updates. + cluster_->prioritySet().updateHosts( + 0, HostSetImpl::partitionHosts(hosts_ptr, HostsPerLocalityImpl::empty()), nullptr, hosts, + {}, 100); + } + + Cluster* cluster_{}; + HostSharedPtr host1_; + HostSharedPtr host2_; +}; + +TEST_F(PrefetchTest, PrefetchOff) { + // With prefetch set to 0, each request for a connection pool will only + // allocate that conn pool. + initialize(0); + EXPECT_CALL(factory_, allocateConnPool_(_, _, _)) + .Times(1) + .WillRepeatedly(ReturnNew()); + cluster_manager_->httpConnPoolForCluster("cluster_1", ResourcePriority::Default, + Http::Protocol::Http11, nullptr); + + EXPECT_CALL(factory_, allocateTcpConnPool_(_)) + .Times(1) + .WillRepeatedly(ReturnNew()); + cluster_manager_->tcpConnPoolForCluster("cluster_1", ResourcePriority::Default, nullptr); +} + +TEST_F(PrefetchTest, PrefetchOn) { + // With prefetch set to 1.1, each request for a connection pool will kick off + // prefetching, so create the pool for both the current connection and the + // anticipated one. + initialize(1.1); + EXPECT_CALL(factory_, allocateConnPool_(_, _, _)) + .Times(2) + .WillRepeatedly(ReturnNew>()); + cluster_manager_->httpConnPoolForCluster("cluster_1", ResourcePriority::Default, + Http::Protocol::Http11, nullptr); + + EXPECT_CALL(factory_, allocateTcpConnPool_(_)) + .Times(2) + .WillRepeatedly(ReturnNew>()); + cluster_manager_->tcpConnPoolForCluster("cluster_1", ResourcePriority::Default, nullptr); +} + } // namespace } // namespace Upstream } // namespace Envoy diff --git a/test/common/upstream/edf_scheduler_test.cc b/test/common/upstream/edf_scheduler_test.cc index ddb39bf847a0..a0f0b92168c8 100644 --- a/test/common/upstream/edf_scheduler_test.cc +++ b/test/common/upstream/edf_scheduler_test.cc @@ -8,7 +8,8 @@ namespace { TEST(EdfSchedulerTest, Empty) { EdfScheduler sched; - EXPECT_EQ(nullptr, sched.pick()); + EXPECT_EQ(nullptr, sched.peekAgain([](const double&) { return 0; })); + EXPECT_EQ(nullptr, sched.pickAndAdd([](const double&) { return 0; })); } // Validate we get regular RR behavior when all weights are the same. @@ -24,9 +25,10 @@ TEST(EdfSchedulerTest, Unweighted) { for (uint32_t rounds = 0; rounds < 128; ++rounds) { for (uint32_t i = 0; i < num_entries; ++i) { - auto p = sched.pick(); + auto peek = sched.peekAgain([](const double&) { return 1; }); + auto p = sched.pickAndAdd([](const double&) { return 1; }); EXPECT_EQ(i, *p); - sched.add(1, p); + EXPECT_EQ(*peek, *p); } } } @@ -45,9 +47,10 @@ TEST(EdfSchedulerTest, Weighted) { } for (uint32_t i = 0; i < (num_entries * (1 + num_entries)) / 2; ++i) { - auto p = sched.pick(); + auto peek = sched.peekAgain([](const double& orig) { return orig + 1; }); + auto p = sched.pickAndAdd([](const double& orig) { return orig + 1; }); + EXPECT_EQ(*p, *peek); ++pick_count[*p]; - sched.add(*p + 1, p); } for (uint32_t i = 0; i < num_entries; ++i) { @@ -66,9 +69,69 @@ TEST(EdfSchedulerTest, Expired) { sched.add(1, second_entry); } - auto p = sched.pick(); + auto peek = sched.peekAgain([](const double&) { return 1; }); + auto p = sched.pickAndAdd([](const double&) { return 1; }); + EXPECT_EQ(*peek, *p); EXPECT_EQ(*second_entry, *p); - EXPECT_EQ(nullptr, sched.pick()); + EXPECT_EQ(*second_entry, *p); +} + +// Validate that expired entries are not peeked. +TEST(EdfSchedulerTest, ExpiredPeek) { + EdfScheduler sched; + + { + auto second_entry = std::make_shared(42); + auto first_entry = std::make_shared(37); + sched.add(2, first_entry); + sched.add(1, second_entry); + } + auto third_entry = std::make_shared(37); + sched.add(3, third_entry); + + EXPECT_EQ(37, *sched.peekAgain([](const double&) { return 1; })); +} + +// Validate that expired entries are ignored. +TEST(EdfSchedulerTest, ExpiredPeekedIsNotPicked) { + EdfScheduler sched; + + { + auto second_entry = std::make_shared(42); + auto first_entry = std::make_shared(37); + sched.add(2, first_entry); + sched.add(1, second_entry); + for (int i = 0; i < 3; ++i) { + EXPECT_TRUE(sched.peekAgain([](const double&) { return 1; }) != nullptr); + } + } + + EXPECT_TRUE(sched.peekAgain([](const double&) { return 1; }) == nullptr); + EXPECT_TRUE(sched.pickAndAdd([](const double&) { return 1; }) == nullptr); +} + +TEST(EdfSchedulerTest, ManyPeekahead) { + EdfScheduler sched1; + EdfScheduler sched2; + constexpr uint32_t num_entries = 128; + std::shared_ptr entries[num_entries]; + + for (uint32_t i = 0; i < num_entries; ++i) { + entries[i] = std::make_shared(i); + sched1.add(1, entries[i]); + sched2.add(1, entries[i]); + } + + std::vector picks; + for (uint32_t rounds = 0; rounds < 10; ++rounds) { + picks.push_back(*sched1.peekAgain([](const double&) { return 1; })); + } + for (uint32_t rounds = 0; rounds < 10; ++rounds) { + auto p1 = sched1.pickAndAdd([](const double&) { return 1; }); + auto p2 = sched2.pickAndAdd([](const double&) { return 1; }); + EXPECT_EQ(picks[rounds], *p1); + EXPECT_EQ(*p2, *p1); + } } } // namespace diff --git a/test/common/upstream/load_balancer_impl_test.cc b/test/common/upstream/load_balancer_impl_test.cc index 422cdd8710a6..81603a608a65 100644 --- a/test/common/upstream/load_balancer_impl_test.cc +++ b/test/common/upstream/load_balancer_impl_test.cc @@ -68,6 +68,9 @@ class TestLb : public LoadBalancerBase { HostConstSharedPtr chooseHostOnce(LoadBalancerContext*) override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; } + HostConstSharedPtr peekAnotherHost(LoadBalancerContext*) override { + NOT_IMPLEMENTED_GCOVR_EXCL_LINE; + } }; class LoadBalancerBaseTest : public LoadBalancerTestBase { @@ -139,7 +142,7 @@ TEST_P(LoadBalancerBaseTest, PrioritySelection) { // on the number of hosts regardless of their health. EXPECT_EQ(50, lb_.percentageLoad(0)); EXPECT_EQ(50, lb_.percentageLoad(1)); - EXPECT_EQ(&host_set_, &lb_.chooseHostSet(&context).first); + EXPECT_EQ(&host_set_, &lb_.chooseHostSet(&context, 0).first); // Modify number of hosts in failover, but leave them in the unhealthy state // primary and secondary are in panic mode, so load distribution is @@ -147,7 +150,7 @@ TEST_P(LoadBalancerBaseTest, PrioritySelection) { updateHostSet(failover_host_set_, 2, 0); EXPECT_EQ(34, lb_.percentageLoad(0)); EXPECT_EQ(66, lb_.percentageLoad(1)); - EXPECT_EQ(&host_set_, &lb_.chooseHostSet(&context).first); + EXPECT_EQ(&host_set_, &lb_.chooseHostSet(&context, 0).first); // Update the priority set with a new priority level P=2 and ensure the host // is chosen @@ -157,7 +160,7 @@ TEST_P(LoadBalancerBaseTest, PrioritySelection) { EXPECT_EQ(0, lb_.percentageLoad(1)); EXPECT_EQ(100, lb_.percentageLoad(2)); priority_load.healthy_priority_load_ = HealthyLoad({0u, 0u, 100}); - EXPECT_EQ(&tertiary_host_set_, &lb_.chooseHostSet(&context).first); + EXPECT_EQ(&tertiary_host_set_, &lb_.chooseHostSet(&context, 0).first); // Now add a healthy host in P=0 and make sure it is immediately selected. updateHostSet(host_set_, 1 /* num_hosts */, 1 /* num_healthy_hosts */); @@ -166,14 +169,14 @@ TEST_P(LoadBalancerBaseTest, PrioritySelection) { EXPECT_EQ(100, lb_.percentageLoad(0)); EXPECT_EQ(0, lb_.percentageLoad(2)); priority_load.healthy_priority_load_ = HealthyLoad({100u, 0u, 0u}); - EXPECT_EQ(&host_set_, &lb_.chooseHostSet(&context).first); + EXPECT_EQ(&host_set_, &lb_.chooseHostSet(&context, 0).first); // Remove the healthy host and ensure we fail back over to tertiary_host_set_ updateHostSet(host_set_, 1 /* num_hosts */, 0 /* num_healthy_hosts */); EXPECT_EQ(0, lb_.percentageLoad(0)); EXPECT_EQ(100, lb_.percentageLoad(2)); priority_load.healthy_priority_load_ = HealthyLoad({0u, 0u, 100}); - EXPECT_EQ(&tertiary_host_set_, &lb_.chooseHostSet(&context).first); + EXPECT_EQ(&tertiary_host_set_, &lb_.chooseHostSet(&context, 0).first); } // Tests host selection with a randomized number of healthy, degraded and unhealthy hosts. @@ -217,7 +220,7 @@ TEST_P(LoadBalancerBaseTest, PrioritySelectionFuzz) { const auto&) -> const HealthyAndDegradedLoad& { return original_load; })); for (uint64_t i = 0; i < total_hosts; ++i) { - const auto hs = lb_.chooseHostSet(&context); + const auto hs = lb_.chooseHostSet(&context, 0); switch (hs.second) { case LoadBalancerBase::HostAvailability::Healthy: // Either we selected one of the healthy hosts or we failed to select anything and defaulted @@ -245,7 +248,7 @@ TEST_P(LoadBalancerBaseTest, PrioritySelectionWithFilter) { updateHostSet(failover_host_set_, 1, 1); // Since we've excluded P0, we should pick the failover host set - EXPECT_EQ(failover_host_set_.priority(), lb_.chooseHostSet(&context).first.priority()); + EXPECT_EQ(failover_host_set_.priority(), lb_.chooseHostSet(&context, 0).first.priority()); updateHostSet(host_set_, 1 /* num_hosts */, 0 /* num_healthy_hosts */, 1 /* num_degraded_hosts */); @@ -256,7 +259,7 @@ TEST_P(LoadBalancerBaseTest, PrioritySelectionWithFilter) { priority_load.degraded_priority_load_ = Upstream::DegradedLoad({0, 100}); // Since we've excluded P0, we should pick the failover host set - EXPECT_EQ(failover_host_set_.priority(), lb_.chooseHostSet(&context).first.priority()); + EXPECT_EQ(failover_host_set_.priority(), lb_.chooseHostSet(&context, 0).first.priority()); } TEST_P(LoadBalancerBaseTest, OverProvisioningFactor) { @@ -553,6 +556,15 @@ class RoundRobinLoadBalancerTest : public LoadBalancerTestBase { {}, empty_host_vector_, empty_host_vector_, absl::nullopt); } + void peekThenPick(std::vector picks) { + for (auto i : picks) { + EXPECT_EQ(hostSet().healthy_hosts_[i], lb_->peekAnotherHost(nullptr)); + } + for (auto i : picks) { + EXPECT_EQ(hostSet().healthy_hosts_[i], lb_->chooseHost(nullptr)); + } + } + std::shared_ptr local_priority_set_; std::shared_ptr lb_; HostsPerLocalityConstSharedPtr empty_locality_; @@ -569,6 +581,7 @@ TEST_P(FailoverTest, BasicFailover) { failover_host_set_.healthy_hosts_ = {makeTestHost(info_, "tcp://127.0.0.1:82")}; failover_host_set_.hosts_ = failover_host_set_.healthy_hosts_; init(false); + EXPECT_EQ(failover_host_set_.healthy_hosts_[0], lb_->peekAnotherHost(nullptr)); EXPECT_EQ(failover_host_set_.healthy_hosts_[0], lb_->chooseHost(nullptr)); } @@ -578,6 +591,7 @@ TEST_P(FailoverTest, BasicDegradedHosts) { host_set_.degraded_hosts_ = host_set_.hosts_; failover_host_set_.hosts_ = failover_host_set_.healthy_hosts_; init(false); + EXPECT_EQ(host_set_.degraded_hosts_[0], lb_->peekAnotherHost(nullptr)); EXPECT_EQ(host_set_.degraded_hosts_[0], lb_->chooseHost(nullptr)); } @@ -763,10 +777,36 @@ TEST_P(RoundRobinLoadBalancerTest, Normal) { makeTestHost(info_, "tcp://127.0.0.1:81")}; hostSet().hosts_ = hostSet().healthy_hosts_; init(false); + + // Make sure the round robin pattern works for peeking. + EXPECT_EQ(hostSet().healthy_hosts_[0], lb_->peekAnotherHost(nullptr)); + EXPECT_EQ(hostSet().healthy_hosts_[1], lb_->peekAnotherHost(nullptr)); + EXPECT_EQ(hostSet().healthy_hosts_[0], lb_->chooseHost(nullptr)); EXPECT_EQ(hostSet().healthy_hosts_[1], lb_->chooseHost(nullptr)); EXPECT_EQ(hostSet().healthy_hosts_[0], lb_->chooseHost(nullptr)); + + // Make sure that if picks get ahead of peeks, peeks resume at the next pick. + EXPECT_EQ(hostSet().healthy_hosts_[1], lb_->peekAnotherHost(nullptr)); EXPECT_EQ(hostSet().healthy_hosts_[1], lb_->chooseHost(nullptr)); + + EXPECT_EQ(hostSet().healthy_hosts_[0], lb_->chooseHost(nullptr)); + + // Change host set with no peeks in progress + hostSet().healthy_hosts_.push_back(makeTestHost(info_, "tcp://127.0.0.1:82")); + hostSet().hosts_.push_back(hostSet().healthy_hosts_.back()); + hostSet().runCallbacks({hostSet().healthy_hosts_.back()}, {}); + peekThenPick({2, 0, 1, 2}); + + // Now peek a few extra to push the index forward, alter the host set, and + // make sure the index is restored to 0. + EXPECT_EQ(hostSet().healthy_hosts_[0], lb_->peekAnotherHost(nullptr)); + EXPECT_EQ(hostSet().healthy_hosts_[1], lb_->peekAnotherHost(nullptr)); + + hostSet().healthy_hosts_.push_back(makeTestHost(info_, "tcp://127.0.0.1:83")); + hostSet().hosts_.push_back(hostSet().healthy_hosts_.back()); + hostSet().runCallbacks({hostSet().healthy_hosts_.back()}, {hostSet().healthy_hosts_.front()}); + peekThenPick({1, 2, 3}); } // Validate that the RNG seed influences pick order. @@ -1658,19 +1698,26 @@ class RandomLoadBalancerTest : public LoadBalancerTestBase { TEST_P(RandomLoadBalancerTest, NoHosts) { init(); + + EXPECT_EQ(nullptr, lb_->peekAnotherHost(nullptr)); EXPECT_EQ(nullptr, lb_->chooseHost(nullptr)); } TEST_P(RandomLoadBalancerTest, Normal) { init(); - hostSet().healthy_hosts_ = {makeTestHost(info_, "tcp://127.0.0.1:80"), makeTestHost(info_, "tcp://127.0.0.1:81")}; hostSet().hosts_ = hostSet().healthy_hosts_; hostSet().runCallbacks({}, {}); // Trigger callbacks. The added/removed lists are not relevant. - EXPECT_CALL(random_, random()).WillOnce(Return(0)).WillOnce(Return(2)); + + EXPECT_CALL(random_, random()).WillOnce(Return(2)); + EXPECT_EQ(hostSet().healthy_hosts_[0], lb_->peekAnotherHost(nullptr)); + + EXPECT_CALL(random_, random()).WillOnce(Return(3)); + EXPECT_EQ(hostSet().healthy_hosts_[1], lb_->peekAnotherHost(nullptr)); + + EXPECT_CALL(random_, random()).Times(0); EXPECT_EQ(hostSet().healthy_hosts_[0], lb_->chooseHost(nullptr)); - EXPECT_CALL(random_, random()).WillOnce(Return(0)).WillOnce(Return(3)); EXPECT_EQ(hostSet().healthy_hosts_[1], lb_->chooseHost(nullptr)); } diff --git a/test/extensions/clusters/aggregate/cluster_test.cc b/test/extensions/clusters/aggregate/cluster_test.cc index 343cbb9f4339..bc1b6f61a948 100644 --- a/test/extensions/clusters/aggregate/cluster_test.cc +++ b/test/extensions/clusters/aggregate/cluster_test.cc @@ -176,6 +176,7 @@ TEST_F(AggregateClusterTest, LoadBalancerTest) { for (int i = 0; i <= 65; ++i) { EXPECT_CALL(random_, random()).WillOnce(Return(i)); + EXPECT_TRUE(lb_->peekAnotherHost(nullptr) == nullptr); Upstream::HostConstSharedPtr target = lb_->chooseHost(nullptr); EXPECT_EQ(host.get(), target.get()); } diff --git a/test/integration/clusters/custom_static_cluster.h b/test/integration/clusters/custom_static_cluster.h index 23b1d573b262..9691f99234ea 100644 --- a/test/integration/clusters/custom_static_cluster.h +++ b/test/integration/clusters/custom_static_cluster.h @@ -39,6 +39,9 @@ class CustomStaticCluster : public Upstream::ClusterImplBase { Upstream::HostConstSharedPtr chooseHost(Upstream::LoadBalancerContext*) override { return host_; } + Upstream::HostConstSharedPtr peekAnotherHost(Upstream::LoadBalancerContext*) override { + return nullptr; + } const Upstream::HostSharedPtr host_; }; diff --git a/test/integration/protocol_integration_test.cc b/test/integration/protocol_integration_test.cc index 719e34954f2d..e6a584c803c2 100644 --- a/test/integration/protocol_integration_test.cc +++ b/test/integration/protocol_integration_test.cc @@ -1945,7 +1945,7 @@ TEST_P(ProtocolIntegrationTest, ConnDurationTimeoutNoHttpRequest) { TEST_P(DownstreamProtocolIntegrationTest, TestPrefetch) { config_helper_.addConfigModifier([](envoy::config::bootstrap::v3::Bootstrap& bootstrap) { auto* cluster = bootstrap.mutable_static_resources()->mutable_clusters(0); - cluster->mutable_prefetch_policy()->mutable_prefetch_ratio()->set_value(1.5); + cluster->mutable_prefetch_policy()->mutable_per_upstream_prefetch_ratio()->set_value(1.5); }); initialize(); codec_client_ = makeHttpConnection(lookupPort("http")); diff --git a/test/integration/stats_integration_test.cc b/test/integration/stats_integration_test.cc index 2ffc1ade58c9..a5600628a402 100644 --- a/test/integration/stats_integration_test.cc +++ b/test/integration/stats_integration_test.cc @@ -289,6 +289,7 @@ TEST_P(ClusterMemoryTestRunner, MemoryLargeClusterSizeWithFakeSymbolTable) { // 2020/07/31 12035 45002 46000 Init manager store unready targets in hash map. // 2020/08/10 12275 44949 46000 Re-organize tls histogram maps to improve continuity. // 2020/08/11 12202 44949 46500 router: add new retry back-off strategy + // 2020/09/11 12973 47500 upstream: predictive prefetch // Note: when adjusting this value: EXPECT_MEMORY_EQ is active only in CI // 'release' builds, where we control the platform and tool-chain. So you @@ -309,7 +310,7 @@ TEST_P(ClusterMemoryTestRunner, MemoryLargeClusterSizeWithFakeSymbolTable) { // https://github.com/envoyproxy/envoy/issues/12209 // EXPECT_MEMORY_EQ(m_per_cluster, 44949); } - EXPECT_MEMORY_LE(m_per_cluster, 46500); // Round up to allow platform variations. + EXPECT_MEMORY_LE(m_per_cluster, 47500); // Round up to allow platform variations. } TEST_P(ClusterMemoryTestRunner, MemoryLargeClusterSizeWithRealSymbolTable) { @@ -368,6 +369,7 @@ TEST_P(ClusterMemoryTestRunner, MemoryLargeClusterSizeWithRealSymbolTable) { // 2020/07/31 12035 37114 38000 Init manager store unready targets in hash map. // 2020/08/10 12275 37061 38000 Re-organize tls histogram maps to improve continuity. // 2020/08/11 12202 37061 38500 router: add new retry back-off strategy + // 2020/09/11 12973 38993 upstream: predictive prefetch // Note: when adjusting this value: EXPECT_MEMORY_EQ is active only in CI // 'release' builds, where we control the platform and tool-chain. So you @@ -388,7 +390,7 @@ TEST_P(ClusterMemoryTestRunner, MemoryLargeClusterSizeWithRealSymbolTable) { // https://github.com/envoyproxy/envoy/issues/12209 // EXPECT_MEMORY_EQ(m_per_cluster, 37061); } - EXPECT_MEMORY_LE(m_per_cluster, 38500); // Round up to allow platform variations. + EXPECT_MEMORY_LE(m_per_cluster, 39000); // Round up to allow platform variations. } TEST_P(ClusterMemoryTestRunner, MemoryLargeHostSizeWithStats) { diff --git a/test/mocks/http/conn_pool.h b/test/mocks/http/conn_pool.h index 4fd32853cfa9..fcfb5e090c51 100644 --- a/test/mocks/http/conn_pool.h +++ b/test/mocks/http/conn_pool.h @@ -22,6 +22,7 @@ class MockInstance : public Instance { MOCK_METHOD(void, drainConnections, ()); MOCK_METHOD(bool, hasActiveConnections, (), (const)); MOCK_METHOD(Cancellable*, newStream, (ResponseDecoder & response_decoder, Callbacks& callbacks)); + MOCK_METHOD(bool, maybePrefetch, (float)); MOCK_METHOD(Upstream::HostDescriptionConstSharedPtr, host, (), (const)); std::shared_ptr> host_; diff --git a/test/mocks/tcp/mocks.h b/test/mocks/tcp/mocks.h index 9e4182423ed6..c03cb1368192 100644 --- a/test/mocks/tcp/mocks.h +++ b/test/mocks/tcp/mocks.h @@ -55,6 +55,7 @@ class MockInstance : public Instance { MOCK_METHOD(void, drainConnections, ()); MOCK_METHOD(void, closeConnections, ()); MOCK_METHOD(Cancellable*, newConnection, (Tcp::ConnectionPool::Callbacks & callbacks)); + MOCK_METHOD(bool, maybePrefetch, (float), ()); MOCK_METHOD(Upstream::HostDescriptionConstSharedPtr, host, (), (const)); Envoy::ConnectionPool::MockCancellable* newConnectionImpl(Callbacks& cb); diff --git a/test/mocks/upstream/cluster_info.cc b/test/mocks/upstream/cluster_info.cc index 63d31ee92665..87afe77c3014 100644 --- a/test/mocks/upstream/cluster_info.cc +++ b/test/mocks/upstream/cluster_info.cc @@ -53,7 +53,7 @@ MockClusterInfo::MockClusterInfo() circuit_breakers_stats_, absl::nullopt, absl::nullopt)) { ON_CALL(*this, connectTimeout()).WillByDefault(Return(std::chrono::milliseconds(1))); ON_CALL(*this, idleTimeout()).WillByDefault(Return(absl::optional())); - ON_CALL(*this, prefetchRatio()).WillByDefault(Return(1.0)); + ON_CALL(*this, perUpstreamPrefetchRatio()).WillByDefault(Return(1.0)); ON_CALL(*this, name()).WillByDefault(ReturnRef(name_)); ON_CALL(*this, edsServiceName()).WillByDefault(ReturnPointee(&eds_service_name_)); ON_CALL(*this, http1Settings()).WillByDefault(ReturnRef(http1_settings_)); diff --git a/test/mocks/upstream/cluster_info.h b/test/mocks/upstream/cluster_info.h index 80f17582914d..a0a3b14ac049 100644 --- a/test/mocks/upstream/cluster_info.h +++ b/test/mocks/upstream/cluster_info.h @@ -89,7 +89,8 @@ class MockClusterInfo : public ClusterInfo { MOCK_METHOD(bool, addedViaApi, (), (const)); MOCK_METHOD(std::chrono::milliseconds, connectTimeout, (), (const)); MOCK_METHOD(const absl::optional, idleTimeout, (), (const)); - MOCK_METHOD(float, prefetchRatio, (), (const)); + MOCK_METHOD(float, perUpstreamPrefetchRatio, (), (const)); + MOCK_METHOD(float, peekaheadRatio, (), (const)); MOCK_METHOD(uint32_t, perConnectionBufferLimitBytes, (), (const)); MOCK_METHOD(uint64_t, features, (), (const)); MOCK_METHOD(const Http::Http1Settings&, http1Settings, (), (const)); diff --git a/test/mocks/upstream/load_balancer.h b/test/mocks/upstream/load_balancer.h index 364b6a7eb1d3..356782e914bb 100644 --- a/test/mocks/upstream/load_balancer.h +++ b/test/mocks/upstream/load_balancer.h @@ -16,6 +16,7 @@ class MockLoadBalancer : public LoadBalancer { // Upstream::LoadBalancer MOCK_METHOD(HostConstSharedPtr, chooseHost, (LoadBalancerContext * context)); + MOCK_METHOD(HostConstSharedPtr, peekAnotherHost, (LoadBalancerContext * context)); std::shared_ptr host_{new MockHost()}; };