Skip to content

Commit

Permalink
Proactively disconnect connections flooded in read resumption (#13471)
Browse files Browse the repository at this point in the history
Signed-off-by: Yan Avlasov <yavlasov@google.com>
  • Loading branch information
yanavlasov authored Oct 9, 2020
1 parent 319a9a6 commit 293936c
Show file tree
Hide file tree
Showing 10 changed files with 188 additions and 1 deletion.
1 change: 1 addition & 0 deletions source/common/http/http2/codec_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ void ConnectionImpl::StreamImpl::readDisable(bool disable) {
auto status = parent_.sendPendingFrames();
// See comment in the `encodeHeadersBase()` method about this RELEASE_ASSERT.
RELEASE_ASSERT(status.ok(), "sendPendingFrames() failure in non dispatching context");
parent_.checkProtocolConstraintViolation();
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions source/common/http/http2/codec_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,12 @@ class ConnectionImpl : public virtual Connection, protected Logger::Loggable<Log
int32_t stream_id_{-1};
uint32_t unconsumed_bytes_{0};
uint32_t read_disable_count_{0};

// Note that in current implementation the watermark callbacks of the pending_recv_data_ are
// never called. The watermark value is set to the size of the stream window. As a result this
// watermark can never overflow because the peer can never send more bytes than the stream
// window without triggering protocol error and this buffer is drained after each DATA frame was
// dispatched through the filter chain. See source/docs/flow_control.md for more information.
Buffer::WatermarkBuffer pending_recv_data_{
[this]() -> void { this->pendingRecvBufferLowWatermark(); },
[this]() -> void { this->pendingRecvBufferHighWatermark(); },
Expand Down
1 change: 1 addition & 0 deletions source/common/http/http2/codec_impl_legacy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ void ConnectionImpl::StreamImpl::readDisable(bool disable) {
nghttp2_session_consume(parent_.session_, stream_id_, unconsumed_bytes_);
unconsumed_bytes_ = 0;
parent_.sendPendingFrames();
parent_.checkProtocolConstraintViolation();
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions source/common/http/http2/codec_impl_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,12 @@ class ConnectionImpl : public virtual Connection, protected Logger::Loggable<Log
int32_t stream_id_{-1};
uint32_t unconsumed_bytes_{0};
uint32_t read_disable_count_{0};

// Note that in current implementation the watermark callbacks of the pending_recv_data_ are
// never called. The watermark value is set to the size of the stream window. As a result this
// watermark can never overflow because the peer can never send more bytes than the stream
// window without triggering protocol error and this buffer is drained after each DATA frame was
// dispatched through the filter chain. See source/docs/flow_control.md for more information.
Buffer::WatermarkBuffer pending_recv_data_{
[this]() -> void { this->pendingRecvBufferLowWatermark(); },
[this]() -> void { this->pendingRecvBufferHighWatermark(); },
Expand Down
66 changes: 66 additions & 0 deletions test/common/http/http2/codec_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1365,6 +1365,72 @@ TEST_P(Http2CodecImplFlowControlTest, LargeServerBodyFlushTimeoutAfterGoaway) {
EXPECT_EQ(0, server_stats_store_.counter("http2.tx_flush_timeout").value());
}

// Verify detection of downstream outbound frame queue by the WINDOW_UPDATE frames
// sent when codec resumes reading.
TEST_P(Http2CodecImplFlowControlTest, WindowUpdateOnReadResumingFlood) {
initialize();

TestRequestHeaderMapImpl request_headers;
HttpTestUtility::addDefaultHeaders(request_headers);
TestRequestHeaderMapImpl expected_headers;
HttpTestUtility::addDefaultHeaders(expected_headers);
EXPECT_CALL(request_decoder_, decodeHeaders_(HeaderMapEqual(&expected_headers), false));
request_encoder_->encodeHeaders(request_headers, false);

int frame_count = 0;
Buffer::OwnedImpl buffer;
ON_CALL(server_connection_, write(_, _))
.WillByDefault(Invoke([&buffer, &frame_count](Buffer::Instance& frame, bool) {
++frame_count;
buffer.move(frame);
}));

auto* violation_callback =
new NiceMock<Event::MockSchedulableCallback>(&server_connection_.dispatcher_);

// Force the server stream to be read disabled. This will cause it to stop sending window
// updates to the client.
server_->getStream(1)->readDisable(true);

uint32_t initial_stream_window =
nghttp2_session_get_stream_effective_local_window_size(client_->session(), 1);
// If this limit is changed, this test will fail due to the initial large writes being divided
// into more than 4 frames. Fast fail here with this explanatory comment.
ASSERT_EQ(65535, initial_stream_window);
// Make sure the limits were configured properly in test set up.
EXPECT_EQ(initial_stream_window, server_->getStream(1)->bufferLimit());
EXPECT_EQ(initial_stream_window, client_->getStream(1)->bufferLimit());

// One large write gets broken into smaller frames.
EXPECT_CALL(request_decoder_, decodeData(_, false)).Times(AnyNumber());
Buffer::OwnedImpl long_data(std::string(initial_stream_window / 2, 'a'));
request_encoder_->encodeData(long_data, false);

EXPECT_EQ(initial_stream_window / 2, server_->getStreamUnconsumedBytes(1));

// pre-fill downstream outbound frame queue
TestResponseHeaderMapImpl response_headers{{":status", "200"}};
response_encoder_->encodeHeaders(response_headers, false);
// Account for the single HEADERS frame above and pre-fill outbound queue with 1 byte DATA frames
for (uint32_t i = 0; i < CommonUtility::OptionsLimits::DEFAULT_MAX_OUTBOUND_FRAMES - 2; ++i) {
Buffer::OwnedImpl data("0");
EXPECT_NO_THROW(response_encoder_->encodeData(data, false));
}

EXPECT_FALSE(violation_callback->enabled_);

// Now unblock the server's stream. This will cause the bytes to be consumed, 2 flow control
// updates to be sent, and overflow outbound frame queue.
server_->getStream(1)->readDisable(false);

EXPECT_TRUE(violation_callback->enabled_);
EXPECT_CALL(server_connection_, close(Envoy::Network::ConnectionCloseType::NoFlush));
violation_callback->invokeCallback();

EXPECT_EQ(frame_count, CommonUtility::OptionsLimits::DEFAULT_MAX_OUTBOUND_FRAMES + 1);
EXPECT_EQ(1, server_stats_store_.counter("http2.outbound_flood").value());
}

TEST_P(Http2CodecImplTest, WatermarkUnderEndStream) {
initialize();
MockStreamCallbacks callbacks;
Expand Down
22 changes: 22 additions & 0 deletions test/common/http/http2/http2_frame.cc
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,18 @@ Http2Frame Http2Frame::makePostRequest(uint32_t stream_index, absl::string_view
return frame;
}

Http2Frame Http2Frame::makePostRequest(uint32_t stream_index, absl::string_view host,
absl::string_view path,
const std::vector<Header> extra_headers) {

auto frame = makePostRequest(stream_index, host, path);
for (const auto& header : extra_headers) {
frame.appendHeaderWithoutIndexing(header);
}
frame.adjustPayloadSize();
return frame;
}

Http2Frame Http2Frame::makeGenericFrame(absl::string_view contents) {
Http2Frame frame;
frame.appendData(contents);
Expand All @@ -336,6 +348,16 @@ Http2Frame Http2Frame::makeGenericFrameFromHexDump(absl::string_view contents) {
return frame;
}

Http2Frame Http2Frame::makeDataFrame(uint32_t stream_index, absl::string_view data,
DataFlags flags) {
Http2Frame frame;
frame.buildHeader(Type::Data, 0, static_cast<uint8_t>(flags),
makeNetworkOrderStreamId(stream_index));
frame.appendData(data);
frame.adjustPayloadSize();
return frame;
}

} // namespace Http2
} // namespace Http
} // namespace Envoy
6 changes: 6 additions & 0 deletions test/common/http/http2/http2_frame.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,12 @@ class Http2Frame {
absl::string_view path, const std::vector<Header> extra_headers);
static Http2Frame makePostRequest(uint32_t stream_index, absl::string_view host,
absl::string_view path);
static Http2Frame makePostRequest(uint32_t stream_index, absl::string_view host,
absl::string_view path,
const std::vector<Header> extra_headers);
static Http2Frame makeDataFrame(uint32_t stream_index, absl::string_view data,
DataFlags flags = DataFlags::None);

/**
* Creates a frame with the given contents. This frame can be
* malformed/invalid depending on the given contents.
Expand Down
1 change: 1 addition & 0 deletions test/integration/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ envoy_cc_test(
"//source/extensions/filters/http/buffer:config",
"//source/extensions/filters/http/health_check:config",
"//test/common/http/http2:http2_frame",
"//test/integration/filters:backpressure_filter_config_lib",
"//test/integration/filters:metadata_stop_all_filter_config_lib",
"//test/integration/filters:request_metadata_filter_config_lib",
"//test/integration/filters:response_metadata_filter_config_lib",
Expand Down
17 changes: 16 additions & 1 deletion test/integration/filters/backpressure_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,27 @@ namespace Envoy {
// the content of the filter buffer.
class BackpressureFilter : public Http::PassThroughFilter {
public:
void onDestroy() override { decoder_callbacks_->onDecoderFilterBelowWriteBufferLowWatermark(); }
void onDestroy() override {
if (!below_write_buffer_low_watermark_called_) {
decoder_callbacks_->onDecoderFilterBelowWriteBufferLowWatermark();
}
}

Http::FilterHeadersStatus decodeHeaders(Http::RequestHeaderMap&, bool) override {
decoder_callbacks_->onDecoderFilterAboveWriteBufferHighWatermark();
return Http::FilterHeadersStatus::Continue;
}

Http::FilterDataStatus encodeData(Buffer::Instance&, bool end_stream) override {
if (end_stream) {
below_write_buffer_low_watermark_called_ = true;
decoder_callbacks_->onDecoderFilterBelowWriteBufferLowWatermark();
}
return Http::FilterDataStatus::Continue;
}

private:
bool below_write_buffer_low_watermark_called_{false};
};

class BackpressureConfig : public Extensions::HttpFilters::Common::EmptyHttpFilterConfig {
Expand Down
63 changes: 63 additions & 0 deletions test/integration/http2_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1976,6 +1976,69 @@ TEST_P(Http2FloodMitigationTest, Trailers) {
EXPECT_EQ(1, test_server_->counter("http2.outbound_flood")->value());
}

// Verify flood detection by the WINDOW_UPDATE frame when a decoder filter is resuming reading from
// the downstream via DecoderFilterBelowWriteBufferLowWatermark.
TEST_P(Http2FloodMitigationTest, WindowUpdateOnLowWatermarkFlood) {
config_helper_.addFilter(R"EOF(
name: backpressure-filter
)EOF");
config_helper_.setBufferLimits(1024 * 1024 * 1024, 1024 * 1024 * 1024);
// Set low window sizes in the server codec as nghttp2 sends WINDOW_UPDATE only after it consumes
// more than 25% of the window.
config_helper_.addConfigModifier(
[&](envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager&
hcm) -> void {
auto* h2_options = hcm.mutable_http2_protocol_options();
h2_options->mutable_initial_stream_window_size()->set_value(70000);
h2_options->mutable_initial_connection_window_size()->set_value(70000);
});
autonomous_upstream_ = true;
autonomous_allow_incomplete_streams_ = true;
beginSession();

writev_matcher_->setWritevReturnsEgain();

// pre-fill two away from overflow
const auto request = Http2Frame::makePostRequest(
Http2Frame::makeClientStreamId(0), "host", "/test/long/url",
{Http2Frame::Header("response_data_blocks", "998"), Http2Frame::Header("no_trailers", "0")});
sendFrame(request);

// The backpressure-filter disables reading when it sees request headers, and it should prevent
// WINDOW_UPDATE to be sent on the following DATA frames. Send enough DATA to consume more than
// 25% of the 70K window so that nghttp2 will send WINDOW_UPDATE on read resumption.
auto data_frame =
Http2Frame::makeDataFrame(Http2Frame::makeClientStreamId(0), std::string(16384, '0'));
sendFrame(data_frame);
sendFrame(data_frame);
data_frame = Http2Frame::makeDataFrame(Http2Frame::makeClientStreamId(0), std::string(16384, '1'),
Http2Frame::DataFlags::EndStream);
sendFrame(data_frame);

// Upstream will respond with 998 DATA frames and the backpressure-filter filter will re-enable
// reading on the last DATA frame. This will cause nghttp2 to send two WINDOW_UPDATE frames for
// stream and connection windows. Together with response DATA frames it should overflow outbound
// frame queue. Wait for connection to be flooded with outbound WINDOW_UPDATE frame and
// disconnected.
tcp_client_->waitForDisconnect();

EXPECT_EQ(1,
test_server_->counter("http.config_test.downstream_flow_control_paused_reading_total")
->value());

// If the server codec had incorrectly thrown an exception on flood detection it would cause
// the entire upstream to be disconnected. Verify it is still active, and there are no destroyed
// connections.
ASSERT_EQ(1, test_server_->gauge("cluster.cluster_0.upstream_cx_active")->value());
ASSERT_EQ(0, test_server_->counter("cluster.cluster_0.upstream_cx_destroy")->value());
// Verify that the flood check was triggered
EXPECT_EQ(1, test_server_->counter("http2.outbound_flood")->value());
}

// TODO(yanavlasov): add tests for WINDOW_UPDATE overflow from the router filter. These tests need
// missing support for write resumption from test sockets that were forced to return EAGAIN by the
// test.

// Verify that the server can detect flood of RST_STREAM frames.
TEST_P(Http2FloodMitigationTest, RST_STREAM) {
// Use invalid HTTP headers to trigger sending RST_STREAM frames.
Expand Down

0 comments on commit 293936c

Please sign in to comment.