Skip to content

Commit

Permalink
Limit the number of HTTP requests processed from a connection in I/O …
Browse files Browse the repository at this point in the history
…cycle

Signed-off-by: Yan Avlasov <yavlasov@google.com>

Signed-off-by: Ryan Northey <ryan@synca.io>
  • Loading branch information
yanavlasov authored and phlax committed Oct 10, 2023
1 parent 230331f commit f737277
Show file tree
Hide file tree
Showing 9 changed files with 569 additions and 14 deletions.
7 changes: 7 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ behavior_changes:
(UHV) on and off.
The default value is off. This option is currently functional only when the ``ENVOY_ENABLE_UHV`` build flag is enabled.
See https://github.com/envoyproxy/envoy/issues/10646 for more information about UHV.
- area: http
change: |
Add runtime flag ``http.max_requests_per_io_cycle`` for setting the limit on the number of HTTP requests processed
from a single connection in a single I/O cycle. Requests over this limit are processed in subsequent I/O cycles. This
mitigates CPU starvation by connections that simultaneously send high number of requests by allowing requests from other
connections to make progress. This runtime value can be set to 1 in the presence of abusive HTTP/2 or HTTP/3 connections.
By default this limit is disabled.
minor_behavior_changes:
# *Changes that may cause incompatibilities for some users, but should not for most*
Expand Down
89 changes: 85 additions & 4 deletions source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ const absl::string_view ConnectionManagerImpl::PrematureResetTotalStreamCountKey
"overload.premature_reset_total_stream_count";
const absl::string_view ConnectionManagerImpl::PrematureResetMinStreamLifetimeSecondsKey =
"overload.premature_reset_min_stream_lifetime_seconds";
// Runtime key for maximum number of requests that can be processed from a single connection per
// I/O cycle. Requests over this limit are deferred until the next I/O cycle.
const absl::string_view ConnectionManagerImpl::MaxRequestsPerIoCycle =
"http.max_requests_per_io_cycle";

bool requestWasConnect(const RequestHeaderMapSharedPtr& headers, Protocol protocol) {
if (!headers) {
Expand Down Expand Up @@ -116,6 +120,8 @@ ConnectionManagerImpl::ConnectionManagerImpl(ConnectionManagerConfig& config,
/*node_id=*/local_info_.node().id(),
/*server_name=*/config_.serverName(),
/*proxy_status_config=*/config_.proxyStatusConfig())),
max_requests_during_dispatch_(
runtime_.snapshot().getInteger(ConnectionManagerImpl::MaxRequestsPerIoCycle, UINT32_MAX)),
refresh_rtt_after_request_(
Runtime::runtimeFeatureEnabled("envoy.reloadable_features.refresh_rtt_after_request")) {
ENVOY_LOG_ONCE_IF(
Expand All @@ -133,6 +139,10 @@ const ResponseHeaderMap& ConnectionManagerImpl::continueHeader() {
void ConnectionManagerImpl::initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) {
read_callbacks_ = &callbacks;
dispatcher_ = &callbacks.connection().dispatcher();
if (max_requests_during_dispatch_ != UINT32_MAX) {
deferred_request_processing_callback_ =
dispatcher_->createSchedulableCallback([this]() -> void { onDeferredRequestProcessing(); });
}

stats_.named_.downstream_cx_total_.inc();
stats_.named_.downstream_cx_active_.inc();
Expand Down Expand Up @@ -466,6 +476,7 @@ void ConnectionManagerImpl::createCodec(Buffer::Instance& data) {
}

Network::FilterStatus ConnectionManagerImpl::onData(Buffer::Instance& data, bool) {
requests_during_dispatch_count_ = 0;
if (!codec_) {
// Http3 codec should have been instantiated by now.
createCodec(data);
Expand Down Expand Up @@ -1406,7 +1417,12 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapSharedPt
traceRequest();
}

filter_manager_.decodeHeaders(*request_headers_, end_stream);
if (!connection_manager_.shouldDeferRequestProxyingToNextIoCycle()) {
filter_manager_.decodeHeaders(*request_headers_, end_stream);
} else {
state_.deferred_to_next_io_iteration_ = true;
state_.deferred_end_stream_ = end_stream;
}

// Reset it here for both global and overridden cases.
resetIdleTimer();
Expand Down Expand Up @@ -1473,8 +1489,15 @@ void ConnectionManagerImpl::ActiveStream::decodeData(Buffer::Instance& data, boo
connection_manager_.read_callbacks_->connection().dispatcher());
maybeEndDecode(end_stream);
filter_manager_.streamInfo().addBytesReceived(data.length());

filter_manager_.decodeData(data, end_stream);
if (!state_.deferred_to_next_io_iteration_) {
filter_manager_.decodeData(data, end_stream);
} else {
if (!deferred_data_) {
deferred_data_ = std::make_unique<Buffer::OwnedImpl>();
}
deferred_data_->move(data);
state_.deferred_end_stream_ = end_stream;
}
}

void ConnectionManagerImpl::ActiveStream::decodeTrailers(RequestTrailerMapPtr&& trailers) {
Expand All @@ -1490,7 +1513,9 @@ void ConnectionManagerImpl::ActiveStream::decodeTrailers(RequestTrailerMapPtr&&
return;
}
maybeEndDecode(true);
filter_manager_.decodeTrailers(*request_trailers_);
if (!state_.deferred_to_next_io_iteration_) {
filter_manager_.decodeTrailers(*request_trailers_);
}
}

void ConnectionManagerImpl::ActiveStream::decodeMetadata(MetadataMapPtr&& metadata_map) {
Expand Down Expand Up @@ -2203,5 +2228,61 @@ void ConnectionManagerImpl::ActiveStream::resetStream(Http::StreamResetReason, a
connection_manager_.doEndStream(*this);
}

bool ConnectionManagerImpl::ActiveStream::onDeferredRequestProcessing() {
// TODO(yanavlasov): Merge this with the filter manager continueIteration() method
if (!state_.deferred_to_next_io_iteration_) {
return false;
}
state_.deferred_to_next_io_iteration_ = false;
bool end_stream =
state_.deferred_end_stream_ && deferred_data_ == nullptr && request_trailers_ == nullptr;
filter_manager_.decodeHeaders(*request_headers_, end_stream);
if (end_stream) {
return true;
}
if (deferred_data_ != nullptr) {
end_stream = state_.deferred_end_stream_ && request_trailers_ == nullptr;
filter_manager_.decodeData(*deferred_data_, end_stream);
}
if (request_trailers_ != nullptr) {
filter_manager_.decodeTrailers(*request_trailers_);
}
return true;
}

bool ConnectionManagerImpl::shouldDeferRequestProxyingToNextIoCycle() {
// Do not defer this stream if stream deferral is disabled
if (deferred_request_processing_callback_ == nullptr) {
return false;
}
// Defer this stream if there are already deferred streams, so they are not
// processed out of order
if (deferred_request_processing_callback_->enabled()) {
return true;
}
++requests_during_dispatch_count_;
bool defer = requests_during_dispatch_count_ > max_requests_during_dispatch_;
if (defer) {
deferred_request_processing_callback_->scheduleCallbackNextIteration();
}
return defer;
}

void ConnectionManagerImpl::onDeferredRequestProcessing() {
requests_during_dispatch_count_ = 1; // 1 stream is always let through
// Streams are inserted at the head of the list. As such process deferred
// streams at the back of the list first.
for (auto reverse_iter = streams_.rbegin(); reverse_iter != streams_.rend();) {
auto& stream_ptr = *reverse_iter;
// Move the iterator to the next item in case the `onDeferredRequestProcessing` call removes the
// stream from the list.
++reverse_iter;
bool was_deferred = stream_ptr->onDeferredRequestProcessing();
if (was_deferred && shouldDeferRequestProxyingToNextIoCycle()) {
break;
}
}
}

} // namespace Http
} // namespace Envoy
23 changes: 22 additions & 1 deletion source/common/http/conn_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
// The minimum lifetime of a stream, in seconds, in order not to be considered
// prematurely closed.
static const absl::string_view PrematureResetMinStreamLifetimeSecondsKey;
static const absl::string_view MaxRequestsPerIoCycle;

private:
struct ActiveStream;
Expand Down Expand Up @@ -348,7 +349,7 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
: codec_saw_local_complete_(false), codec_encode_complete_(false),
on_reset_stream_called_(false), is_zombie_stream_(false), successful_upgrade_(false),
is_internally_destroyed_(false), is_internally_created_(false), is_tunneling_(false),
decorated_propagate_(true) {}
decorated_propagate_(true), deferred_to_next_io_iteration_(false) {}

// It's possibly for the codec to see the completed response but not fully
// encode it.
Expand All @@ -373,6 +374,14 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
bool is_tunneling_ : 1;

bool decorated_propagate_ : 1;

// Indicates that sending headers to the filter manager is deferred to the
// next I/O cycle. If data or trailers are received when this flag is set
// they are deferred too.
// TODO(yanavlasov): encapsulate the entire state of deferred streams into a separate
// structure, so it can be atomically created and cleared.
bool deferred_to_next_io_iteration_ : 1;
bool deferred_end_stream_ : 1;
};

bool canDestroyStream() const {
Expand Down Expand Up @@ -422,6 +431,11 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,

std::weak_ptr<bool> stillAlive() { return {still_alive_}; }

// Dispatch deferred headers, body and trailers to the filter manager.
// Return true if this stream was deferred and dispatched pending headers, body and trailers (if
// present). Return false if this stream was not deferred.
bool onDeferredRequestProcessing();

ConnectionManagerImpl& connection_manager_;
OptRef<const TracingConnectionManagerConfig> connection_manager_tracing_config_;
// TODO(snowp): It might make sense to move this to the FilterManager to avoid storing it in
Expand Down Expand Up @@ -511,6 +525,7 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
bool spawnUpstreamSpan() const override;

std::shared_ptr<bool> still_alive_ = std::make_shared<bool>(true);
std::unique_ptr<Buffer::OwnedImpl> deferred_data_;
};

using ActiveStreamPtr = std::unique_ptr<ActiveStream>;
Expand Down Expand Up @@ -588,6 +603,9 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
// and at least half have been prematurely reset?
void maybeDrainDueToPrematureResets();

bool shouldDeferRequestProxyingToNextIoCycle();
void onDeferredRequestProcessing();

enum class DrainState { NotDraining, Draining, Closing };

ConnectionManagerConfig& config_;
Expand Down Expand Up @@ -634,6 +652,9 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
// the definition given in `isPrematureRstStream()`.
uint64_t number_premature_stream_resets_{0};
const std::string proxy_name_; // for Proxy-Status.
uint32_t requests_during_dispatch_count_{0};
const uint32_t max_requests_during_dispatch_{UINT32_MAX};
Event::SchedulableCallbackPtr deferred_request_processing_callback_;

const bool refresh_rtt_after_request_{};
};
Expand Down
Loading

0 comments on commit f737277

Please sign in to comment.