Skip to content

Commit

Permalink
upstream: basic predictive prefetch for deterministic LBs (#12973)
Browse files Browse the repository at this point in the history
This does predictive (peekahead) prefetch for WRR, RR, and Random schedulers, and plumbing it up to the connection manager, which will currently only use it to prefetch 1 connection (see TODO for follow-up plans)

Risk Level: Medium (refactors LBs somewhat - should be no-op)
Testing: new unit tests
Docs Changes: n/a
Release Notes: n/a

Part of #2755

Signed-off-by: Alyssa Wilk <alyssar@chromium.org>
  • Loading branch information
alyssawilk authored Sep 17, 2020
1 parent b1cc4c0 commit 5875f23
Show file tree
Hide file tree
Showing 40 changed files with 730 additions and 114 deletions.
27 changes: 26 additions & 1 deletion api/envoy/config/cluster/v3/cluster.proto
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,32 @@ message Cluster {
//
// This is limited somewhat arbitrarily to 3 because prefetching connections too aggressively can
// harm latency more than the prefetching helps.
google.protobuf.DoubleValue prefetch_ratio = 1 [(validate.rules).double = {lte: 3.0 gte: 1.0}];
google.protobuf.DoubleValue per_upstream_prefetch_ratio = 1
[(validate.rules).double = {lte: 3.0 gte: 1.0}];

// Indicates how many many streams (rounded up) can be anticipated across a cluster for each
// stream, useful for low QPS services. This is currently supported for a subset of
// deterministic non-hash-based load-balancing algorithms (weighted round robin, random).
// Unlike per_upstream_prefetch_ratio this prefetches across the upstream instances in a
// cluster, doing best effort predictions of what upstream would be picked next and
// pre-establishing a connection.
//
// For example if prefetching is set to 2 for a round robin HTTP/2 cluster, on the first
// incoming stream, 2 connections will be prefetched - one to the first upstream for this
// cluster, one to the second on the assumption there will be a follow-up stream.
//
// Prefetching will be limited to one prefetch per configured upstream in the cluster.
//
// If this value is not set, or set explicitly to one, Envoy will fetch as many connections
// as needed to serve streams in flight, so during warm up and in steady state if a connection
// is closed (and per_upstream_prefetch_ratio is not set), there will be a latency hit for
// connection establishment.
//
// If both this and prefetch_ratio are set, Envoy will make sure both predicted needs are met,
// basically prefetching max(predictive-prefetch, per-upstream-prefetch), for each upstream.
// TODO(alyssawilk) per LB docs and LB overview docs when unhiding.
google.protobuf.DoubleValue predictive_prefetch_ratio = 2
[(validate.rules).double = {lte: 3.0 gte: 1.0}];
}

reserved 12, 15, 7, 11, 35;
Expand Down
27 changes: 26 additions & 1 deletion api/envoy/config/cluster/v4alpha/cluster.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 26 additions & 1 deletion generated_api_shadow/envoy/config/cluster/v3/cluster.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 26 additions & 1 deletion generated_api_shadow/envoy/config/cluster/v4alpha/cluster.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions include/envoy/common/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,14 @@ class Instance {
* @return Upstream::HostDescriptionConstSharedPtr the host for which connections are pooled.
*/
virtual Upstream::HostDescriptionConstSharedPtr host() const PURE;

/**
* Prefetches an upstream connection, if existing connections do not meet both current and
* anticipated load.
*
* @return true if a connection was prefetched, false otherwise.
*/
virtual bool maybePrefetch(float prefetch_ratio) PURE;
};

enum class PoolFailureReason {
Expand Down
8 changes: 8 additions & 0 deletions include/envoy/upstream/load_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@ class LoadBalancer {
* is missing and use sensible defaults.
*/
virtual HostConstSharedPtr chooseHost(LoadBalancerContext* context) PURE;

/**
* Returns a best effort prediction of the next host to be picked, or nullptr if not predictable.
* Advances with subsequent calls, so while the first call will return the next host to be picked,
* a subsequent call will return the second host to be picked.
* @param context supplies the context which is used in host selection.
*/
virtual HostConstSharedPtr peekAnotherHost(LoadBalancerContext* context) PURE;
};

using LoadBalancerPtr = std::unique_ptr<LoadBalancer>;
Expand Down
7 changes: 6 additions & 1 deletion include/envoy/upstream/upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,12 @@ class ClusterInfo {
/**
* @return how many streams should be anticipated per each current stream.
*/
virtual float prefetchRatio() const PURE;
virtual float perUpstreamPrefetchRatio() const PURE;

/**
* @return how many streams should be anticipated per each current stream.
*/
virtual float peekaheadRatio() const PURE;

/**
* @return soft limit on size of the cluster's connections read and write buffers.
Expand Down
35 changes: 25 additions & 10 deletions source/common/conn_pool/conn_pool_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,28 +33,39 @@ void ConnPoolImplBase::destructAllConnections() {
dispatcher_.clearDeferredDeleteList();
}

bool ConnPoolImplBase::shouldCreateNewConnection() const {
bool ConnPoolImplBase::shouldCreateNewConnection(float global_prefetch_ratio) const {
// If the host is not healthy, don't make it do extra work, especially as
// upstream selection logic may result in bypassing this upstream entirely.
// If an Envoy user wants prefetching for degraded upstreams this could be
// added later via extending the prefetch config.
if (host_->health() != Upstream::Host::Health::Healthy) {
return pending_streams_.size() > connecting_stream_capacity_;
}

// If global prefetching is on, and this connection is within the global
// prefetch limit, prefetch.
// We may eventually want to track prefetch_attempts to allow more prefetching for
// heavily weighted upstreams or sticky picks.
if (global_prefetch_ratio > 1.0 &&
((pending_streams_.size() + 1 + num_active_streams_) * global_prefetch_ratio >
(connecting_stream_capacity_ + num_active_streams_))) {
return true;
}

// The number of streams we want to be provisioned for is the number of
// pending and active streams times the prefetch ratio.
// The number of streams we are (theoretically) provisioned for is the
// connecting stream capacity plus the number of active streams.
//
// If prefetch ratio is not set, it defaults to 1, and this simplifies to the
// legacy value of pending_streams_.size() > connecting_stream_capacity_
return (pending_streams_.size() + num_active_streams_) * prefetchRatio() >
return (pending_streams_.size() + num_active_streams_) * perUpstreamPrefetchRatio() >
(connecting_stream_capacity_ + num_active_streams_);
}

float ConnPoolImplBase::prefetchRatio() const {
float ConnPoolImplBase::perUpstreamPrefetchRatio() const {
if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.allow_prefetch")) {
return host_->cluster().prefetchRatio();
return host_->cluster().perUpstreamPrefetchRatio();
} else {
return 1.0;
}
Expand All @@ -74,9 +85,9 @@ void ConnPoolImplBase::tryCreateNewConnections() {
}
}

bool ConnPoolImplBase::tryCreateNewConnection() {
bool ConnPoolImplBase::tryCreateNewConnection(float global_prefetch_ratio) {
// There are already enough CONNECTING connections for the number of queued streams.
if (!shouldCreateNewConnection()) {
if (!shouldCreateNewConnection(global_prefetch_ratio)) {
return false;
}

Expand Down Expand Up @@ -184,6 +195,10 @@ ConnectionPool::Cancellable* ConnPoolImplBase::newStream(AttachContext& context)
}
}

bool ConnPoolImplBase::maybePrefetch(float global_prefetch_ratio) {
return tryCreateNewConnection(global_prefetch_ratio);
}

void ConnPoolImplBase::onUpstreamReady() {
while (!pending_streams_.empty() && !ready_clients_.empty()) {
ActiveClientPtr& client = ready_clients_.front();
Expand Down Expand Up @@ -394,14 +409,14 @@ void ConnPoolImplBase::purgePendingStreams(
bool ConnPoolImplBase::connectingConnectionIsExcess() const {
ASSERT(connecting_stream_capacity_ >=
connecting_clients_.front()->effectiveConcurrentStreamLimit());
// If prefetchRatio is one, this simplifies to checking if there would still be sufficient
// connecting stream capacity to serve all pending streams if the most recent client were
// removed from the picture.
// If perUpstreamPrefetchRatio is one, this simplifies to checking if there would still be
// sufficient connecting stream capacity to serve all pending streams if the most recent client
// were removed from the picture.
//
// If prefetch ratio is set, it also factors in the anticipated load based on both queued streams
// and active streams, and makes sure the connecting capacity would still be sufficient to serve
// that even with the most recent client removed.
return (pending_streams_.size() + num_active_streams_) * prefetchRatio() <=
return (pending_streams_.size() + num_active_streams_) * perUpstreamPrefetchRatio() <=
(connecting_stream_capacity_ -
connecting_clients_.front()->effectiveConcurrentStreamLimit() + num_active_streams_);
}
Expand Down
11 changes: 8 additions & 3 deletions source/common/conn_pool/conn_pool_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ class ConnPoolImplBase : protected Logger::Loggable<Logger::Id::pool> {
void checkForDrained();
void onUpstreamReady();
ConnectionPool::Cancellable* newStream(AttachContext& context);
// Called if this pool is likely to be picked soon, to determine if it's worth
// prefetching a connection.
bool maybePrefetch(float global_prefetch_ratio);

virtual ConnectionPool::Cancellable* newPendingStream(AttachContext& context) PURE;

Expand Down Expand Up @@ -176,17 +179,19 @@ class ConnPoolImplBase : protected Logger::Loggable<Logger::Id::pool> {

// Creates a new connection if there is sufficient demand, it is allowed by resourceManager, or
// to avoid starving this pool.
bool tryCreateNewConnection();
// Demand is determined either by perUpstreamPrefetchRatio() or global_prefetch_ratio
// if this is called by maybePrefetch()
bool tryCreateNewConnection(float global_prefetch_ratio = 0);

// A helper function which determines if a canceled pending connection should
// be closed as excess or not.
bool connectingConnectionIsExcess() const;

// A helper function which determines if a new incoming stream should trigger
// connection prefetch.
bool shouldCreateNewConnection() const;
bool shouldCreateNewConnection(float global_prefetch_ratio) const;

float prefetchRatio() const;
float perUpstreamPrefetchRatio() const;

const Upstream::HostConstSharedPtr host_;
const Upstream::ResourcePriority priority_;
Expand Down
3 changes: 3 additions & 0 deletions source/common/http/conn_pool_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ class HttpConnPoolImplBase : public Envoy::ConnectionPool::ConnPoolImplBase,
Upstream::HostDescriptionConstSharedPtr host() const override { return host_; }
ConnectionPool::Cancellable* newStream(Http::ResponseDecoder& response_decoder,
Http::ConnectionPool::Callbacks& callbacks) override;
bool maybePrefetch(float ratio) override {
return Envoy::ConnectionPool::ConnPoolImplBase::maybePrefetch(ratio);
}
bool hasActiveConnections() const override;

// Creates a new PendingStream and enqueues it into the queue.
Expand Down
3 changes: 3 additions & 0 deletions source/common/tcp/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ class ConnPoolImpl : public Envoy::ConnectionPool::ConnPoolImplBase,
TcpAttachContext context(&callbacks);
return Envoy::ConnectionPool::ConnPoolImplBase::newStream(context);
}
bool maybePrefetch(float prefetch_ratio) override {
return Envoy::ConnectionPool::ConnPoolImplBase::maybePrefetch(prefetch_ratio);
}

ConnectionPool::Cancellable*
newPendingStream(Envoy::ConnectionPool::AttachContext& context) override {
Expand Down
2 changes: 2 additions & 0 deletions source/common/tcp/original_conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ class OriginalConnPoolImpl : Logger::Loggable<Logger::Id::pool>, public Connecti
void drainConnections() override;
void closeConnections() override;
ConnectionPool::Cancellable* newConnection(ConnectionPool::Callbacks& callbacks) override;
// The old pool does not implement prefetching.
bool maybePrefetch(float) override { return false; }
Upstream::HostDescriptionConstSharedPtr host() const override { return host_; }

protected:
Expand Down
Loading

0 comments on commit 5875f23

Please sign in to comment.