From 293936cab78d94dffee45d6789528299c1fd0c30 Mon Sep 17 00:00:00 2001 From: yanavlasov Date: Fri, 9 Oct 2020 19:30:38 -0400 Subject: [PATCH] Proactively disconnect connections flooded in read resumption (#13471) Signed-off-by: Yan Avlasov --- source/common/http/http2/codec_impl.cc | 1 + source/common/http/http2/codec_impl.h | 6 ++ source/common/http/http2/codec_impl_legacy.cc | 1 + source/common/http/http2/codec_impl_legacy.h | 6 ++ test/common/http/http2/codec_impl_test.cc | 66 +++++++++++++++++++ test/common/http/http2/http2_frame.cc | 22 +++++++ test/common/http/http2/http2_frame.h | 6 ++ test/integration/BUILD | 1 + .../filters/backpressure_filter.cc | 17 ++++- test/integration/http2_integration_test.cc | 63 ++++++++++++++++++ 10 files changed, 188 insertions(+), 1 deletion(-) diff --git a/source/common/http/http2/codec_impl.cc b/source/common/http/http2/codec_impl.cc index c2b78632482a..13980bacdef9 100644 --- a/source/common/http/http2/codec_impl.cc +++ b/source/common/http/http2/codec_impl.cc @@ -280,6 +280,7 @@ void ConnectionImpl::StreamImpl::readDisable(bool disable) { auto status = parent_.sendPendingFrames(); // See comment in the `encodeHeadersBase()` method about this RELEASE_ASSERT. RELEASE_ASSERT(status.ok(), "sendPendingFrames() failure in non dispatching context"); + parent_.checkProtocolConstraintViolation(); } } } diff --git a/source/common/http/http2/codec_impl.h b/source/common/http/http2/codec_impl.h index 72bbcc44e764..b4d15024bafc 100644 --- a/source/common/http/http2/codec_impl.h +++ b/source/common/http/http2/codec_impl.h @@ -277,6 +277,12 @@ class ConnectionImpl : public virtual Connection, protected Logger::Loggable void { this->pendingRecvBufferLowWatermark(); }, [this]() -> void { this->pendingRecvBufferHighWatermark(); }, diff --git a/source/common/http/http2/codec_impl_legacy.cc b/source/common/http/http2/codec_impl_legacy.cc index 3654b481fe54..68ab5c9019bb 100644 --- a/source/common/http/http2/codec_impl_legacy.cc +++ b/source/common/http/http2/codec_impl_legacy.cc @@ -267,6 +267,7 @@ void ConnectionImpl::StreamImpl::readDisable(bool disable) { nghttp2_session_consume(parent_.session_, stream_id_, unconsumed_bytes_); unconsumed_bytes_ = 0; parent_.sendPendingFrames(); + parent_.checkProtocolConstraintViolation(); } } } diff --git a/source/common/http/http2/codec_impl_legacy.h b/source/common/http/http2/codec_impl_legacy.h index edacf8ae8b04..238762c45df3 100644 --- a/source/common/http/http2/codec_impl_legacy.h +++ b/source/common/http/http2/codec_impl_legacy.h @@ -277,6 +277,12 @@ class ConnectionImpl : public virtual Connection, protected Logger::Loggable void { this->pendingRecvBufferLowWatermark(); }, [this]() -> void { this->pendingRecvBufferHighWatermark(); }, diff --git a/test/common/http/http2/codec_impl_test.cc b/test/common/http/http2/codec_impl_test.cc index e4640585533b..5ff4040b87bf 100644 --- a/test/common/http/http2/codec_impl_test.cc +++ b/test/common/http/http2/codec_impl_test.cc @@ -1365,6 +1365,72 @@ TEST_P(Http2CodecImplFlowControlTest, LargeServerBodyFlushTimeoutAfterGoaway) { EXPECT_EQ(0, server_stats_store_.counter("http2.tx_flush_timeout").value()); } +// Verify detection of downstream outbound frame queue by the WINDOW_UPDATE frames +// sent when codec resumes reading. +TEST_P(Http2CodecImplFlowControlTest, WindowUpdateOnReadResumingFlood) { + initialize(); + + TestRequestHeaderMapImpl request_headers; + HttpTestUtility::addDefaultHeaders(request_headers); + TestRequestHeaderMapImpl expected_headers; + HttpTestUtility::addDefaultHeaders(expected_headers); + EXPECT_CALL(request_decoder_, decodeHeaders_(HeaderMapEqual(&expected_headers), false)); + request_encoder_->encodeHeaders(request_headers, false); + + int frame_count = 0; + Buffer::OwnedImpl buffer; + ON_CALL(server_connection_, write(_, _)) + .WillByDefault(Invoke([&buffer, &frame_count](Buffer::Instance& frame, bool) { + ++frame_count; + buffer.move(frame); + })); + + auto* violation_callback = + new NiceMock(&server_connection_.dispatcher_); + + // Force the server stream to be read disabled. This will cause it to stop sending window + // updates to the client. + server_->getStream(1)->readDisable(true); + + uint32_t initial_stream_window = + nghttp2_session_get_stream_effective_local_window_size(client_->session(), 1); + // If this limit is changed, this test will fail due to the initial large writes being divided + // into more than 4 frames. Fast fail here with this explanatory comment. + ASSERT_EQ(65535, initial_stream_window); + // Make sure the limits were configured properly in test set up. + EXPECT_EQ(initial_stream_window, server_->getStream(1)->bufferLimit()); + EXPECT_EQ(initial_stream_window, client_->getStream(1)->bufferLimit()); + + // One large write gets broken into smaller frames. + EXPECT_CALL(request_decoder_, decodeData(_, false)).Times(AnyNumber()); + Buffer::OwnedImpl long_data(std::string(initial_stream_window / 2, 'a')); + request_encoder_->encodeData(long_data, false); + + EXPECT_EQ(initial_stream_window / 2, server_->getStreamUnconsumedBytes(1)); + + // pre-fill downstream outbound frame queue + TestResponseHeaderMapImpl response_headers{{":status", "200"}}; + response_encoder_->encodeHeaders(response_headers, false); + // Account for the single HEADERS frame above and pre-fill outbound queue with 1 byte DATA frames + for (uint32_t i = 0; i < CommonUtility::OptionsLimits::DEFAULT_MAX_OUTBOUND_FRAMES - 2; ++i) { + Buffer::OwnedImpl data("0"); + EXPECT_NO_THROW(response_encoder_->encodeData(data, false)); + } + + EXPECT_FALSE(violation_callback->enabled_); + + // Now unblock the server's stream. This will cause the bytes to be consumed, 2 flow control + // updates to be sent, and overflow outbound frame queue. + server_->getStream(1)->readDisable(false); + + EXPECT_TRUE(violation_callback->enabled_); + EXPECT_CALL(server_connection_, close(Envoy::Network::ConnectionCloseType::NoFlush)); + violation_callback->invokeCallback(); + + EXPECT_EQ(frame_count, CommonUtility::OptionsLimits::DEFAULT_MAX_OUTBOUND_FRAMES + 1); + EXPECT_EQ(1, server_stats_store_.counter("http2.outbound_flood").value()); +} + TEST_P(Http2CodecImplTest, WatermarkUnderEndStream) { initialize(); MockStreamCallbacks callbacks; diff --git a/test/common/http/http2/http2_frame.cc b/test/common/http/http2/http2_frame.cc index b29956144b4b..4d8a8c85cc15 100644 --- a/test/common/http/http2/http2_frame.cc +++ b/test/common/http/http2/http2_frame.cc @@ -324,6 +324,18 @@ Http2Frame Http2Frame::makePostRequest(uint32_t stream_index, absl::string_view return frame; } +Http2Frame Http2Frame::makePostRequest(uint32_t stream_index, absl::string_view host, + absl::string_view path, + const std::vector
extra_headers) { + + auto frame = makePostRequest(stream_index, host, path); + for (const auto& header : extra_headers) { + frame.appendHeaderWithoutIndexing(header); + } + frame.adjustPayloadSize(); + return frame; +} + Http2Frame Http2Frame::makeGenericFrame(absl::string_view contents) { Http2Frame frame; frame.appendData(contents); @@ -336,6 +348,16 @@ Http2Frame Http2Frame::makeGenericFrameFromHexDump(absl::string_view contents) { return frame; } +Http2Frame Http2Frame::makeDataFrame(uint32_t stream_index, absl::string_view data, + DataFlags flags) { + Http2Frame frame; + frame.buildHeader(Type::Data, 0, static_cast(flags), + makeNetworkOrderStreamId(stream_index)); + frame.appendData(data); + frame.adjustPayloadSize(); + return frame; +} + } // namespace Http2 } // namespace Http } // namespace Envoy diff --git a/test/common/http/http2/http2_frame.h b/test/common/http/http2/http2_frame.h index fc585815d850..d9a32fab64f2 100644 --- a/test/common/http/http2/http2_frame.h +++ b/test/common/http/http2/http2_frame.h @@ -138,6 +138,12 @@ class Http2Frame { absl::string_view path, const std::vector
extra_headers); static Http2Frame makePostRequest(uint32_t stream_index, absl::string_view host, absl::string_view path); + static Http2Frame makePostRequest(uint32_t stream_index, absl::string_view host, + absl::string_view path, + const std::vector
extra_headers); + static Http2Frame makeDataFrame(uint32_t stream_index, absl::string_view data, + DataFlags flags = DataFlags::None); + /** * Creates a frame with the given contents. This frame can be * malformed/invalid depending on the given contents. diff --git a/test/integration/BUILD b/test/integration/BUILD index 10d5f2bacdd3..e3dbfb5d8731 100644 --- a/test/integration/BUILD +++ b/test/integration/BUILD @@ -328,6 +328,7 @@ envoy_cc_test( "//source/extensions/filters/http/buffer:config", "//source/extensions/filters/http/health_check:config", "//test/common/http/http2:http2_frame", + "//test/integration/filters:backpressure_filter_config_lib", "//test/integration/filters:metadata_stop_all_filter_config_lib", "//test/integration/filters:request_metadata_filter_config_lib", "//test/integration/filters:response_metadata_filter_config_lib", diff --git a/test/integration/filters/backpressure_filter.cc b/test/integration/filters/backpressure_filter.cc index 1d6f8ce92be5..e5eb9ec6ea9a 100644 --- a/test/integration/filters/backpressure_filter.cc +++ b/test/integration/filters/backpressure_filter.cc @@ -14,12 +14,27 @@ namespace Envoy { // the content of the filter buffer. class BackpressureFilter : public Http::PassThroughFilter { public: - void onDestroy() override { decoder_callbacks_->onDecoderFilterBelowWriteBufferLowWatermark(); } + void onDestroy() override { + if (!below_write_buffer_low_watermark_called_) { + decoder_callbacks_->onDecoderFilterBelowWriteBufferLowWatermark(); + } + } Http::FilterHeadersStatus decodeHeaders(Http::RequestHeaderMap&, bool) override { decoder_callbacks_->onDecoderFilterAboveWriteBufferHighWatermark(); return Http::FilterHeadersStatus::Continue; } + + Http::FilterDataStatus encodeData(Buffer::Instance&, bool end_stream) override { + if (end_stream) { + below_write_buffer_low_watermark_called_ = true; + decoder_callbacks_->onDecoderFilterBelowWriteBufferLowWatermark(); + } + return Http::FilterDataStatus::Continue; + } + +private: + bool below_write_buffer_low_watermark_called_{false}; }; class BackpressureConfig : public Extensions::HttpFilters::Common::EmptyHttpFilterConfig { diff --git a/test/integration/http2_integration_test.cc b/test/integration/http2_integration_test.cc index ac52fd13d5a3..1337d45f7463 100644 --- a/test/integration/http2_integration_test.cc +++ b/test/integration/http2_integration_test.cc @@ -1976,6 +1976,69 @@ TEST_P(Http2FloodMitigationTest, Trailers) { EXPECT_EQ(1, test_server_->counter("http2.outbound_flood")->value()); } +// Verify flood detection by the WINDOW_UPDATE frame when a decoder filter is resuming reading from +// the downstream via DecoderFilterBelowWriteBufferLowWatermark. +TEST_P(Http2FloodMitigationTest, WindowUpdateOnLowWatermarkFlood) { + config_helper_.addFilter(R"EOF( + name: backpressure-filter + )EOF"); + config_helper_.setBufferLimits(1024 * 1024 * 1024, 1024 * 1024 * 1024); + // Set low window sizes in the server codec as nghttp2 sends WINDOW_UPDATE only after it consumes + // more than 25% of the window. + config_helper_.addConfigModifier( + [&](envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager& + hcm) -> void { + auto* h2_options = hcm.mutable_http2_protocol_options(); + h2_options->mutable_initial_stream_window_size()->set_value(70000); + h2_options->mutable_initial_connection_window_size()->set_value(70000); + }); + autonomous_upstream_ = true; + autonomous_allow_incomplete_streams_ = true; + beginSession(); + + writev_matcher_->setWritevReturnsEgain(); + + // pre-fill two away from overflow + const auto request = Http2Frame::makePostRequest( + Http2Frame::makeClientStreamId(0), "host", "/test/long/url", + {Http2Frame::Header("response_data_blocks", "998"), Http2Frame::Header("no_trailers", "0")}); + sendFrame(request); + + // The backpressure-filter disables reading when it sees request headers, and it should prevent + // WINDOW_UPDATE to be sent on the following DATA frames. Send enough DATA to consume more than + // 25% of the 70K window so that nghttp2 will send WINDOW_UPDATE on read resumption. + auto data_frame = + Http2Frame::makeDataFrame(Http2Frame::makeClientStreamId(0), std::string(16384, '0')); + sendFrame(data_frame); + sendFrame(data_frame); + data_frame = Http2Frame::makeDataFrame(Http2Frame::makeClientStreamId(0), std::string(16384, '1'), + Http2Frame::DataFlags::EndStream); + sendFrame(data_frame); + + // Upstream will respond with 998 DATA frames and the backpressure-filter filter will re-enable + // reading on the last DATA frame. This will cause nghttp2 to send two WINDOW_UPDATE frames for + // stream and connection windows. Together with response DATA frames it should overflow outbound + // frame queue. Wait for connection to be flooded with outbound WINDOW_UPDATE frame and + // disconnected. + tcp_client_->waitForDisconnect(); + + EXPECT_EQ(1, + test_server_->counter("http.config_test.downstream_flow_control_paused_reading_total") + ->value()); + + // If the server codec had incorrectly thrown an exception on flood detection it would cause + // the entire upstream to be disconnected. Verify it is still active, and there are no destroyed + // connections. + ASSERT_EQ(1, test_server_->gauge("cluster.cluster_0.upstream_cx_active")->value()); + ASSERT_EQ(0, test_server_->counter("cluster.cluster_0.upstream_cx_destroy")->value()); + // Verify that the flood check was triggered + EXPECT_EQ(1, test_server_->counter("http2.outbound_flood")->value()); +} + +// TODO(yanavlasov): add tests for WINDOW_UPDATE overflow from the router filter. These tests need +// missing support for write resumption from test sockets that were forced to return EAGAIN by the +// test. + // Verify that the server can detect flood of RST_STREAM frames. TEST_P(Http2FloodMitigationTest, RST_STREAM) { // Use invalid HTTP headers to trigger sending RST_STREAM frames.