Skip to content

Commit

Permalink
backport to v1.13: connection: Remember transport socket read resumpt…
Browse files Browse the repository at this point in the history
…ion requests and replay them when re-enabling read. (envoyproxy#13772) (envoyproxy#14173)

Fixes SslSocket read resumption after readDisable when processing the SSL record that contains the last bytes of the HTTP message

Signed-off-by: Antonio Vicente <avd@google.com>
  • Loading branch information
antoniovicente committed Dec 3, 2020
1 parent 605560f commit b01e47a
Show file tree
Hide file tree
Showing 9 changed files with 245 additions and 12 deletions.
2 changes: 2 additions & 0 deletions docs/root/intro/version_history.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ Version history
================
Changes
-------
* listener: fix crash when disabling or re-enabling listeners due to overload while processing LDS updates.
* tls: fix read resumption after triggering buffer high-watermark and all remaining request/response bytes are stored in the SSL connection's internal buffers.
* udp: fixed issue in which receiving truncated UDP datagrams would cause Envoy to crash.

1.13.6 (September 29, 2020)
Expand Down
17 changes: 15 additions & 2 deletions source/common/network/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ ConnectionImpl::ConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPt
[this]() -> void { this->onHighWatermark(); })),
read_enabled_(true), above_high_watermark_(false), detect_early_close_(true),
enable_half_close_(false), read_end_stream_raised_(false), read_end_stream_(false),
write_end_stream_(false), current_write_end_stream_(false), dispatch_buffered_data_(false) {
write_end_stream_(false), current_write_end_stream_(false), dispatch_buffered_data_(false),
transport_wants_read_(false) {
// Treat the lack of a valid fd (which in practice only happens if we run out of FDs) as an OOM
// condition and just crash.
RELEASE_ASSERT(ioHandle().fd() != -1, "");
Expand Down Expand Up @@ -339,7 +340,13 @@ void ConnectionImpl::readDisable(bool disable) {
// If the connection has data buffered there's no guarantee there's also data in the kernel
// which will kick off the filter chain. Instead fake an event to make sure the buffered data
// gets processed regardless and ensure that we dispatch it via onRead.
if (read_buffer_.length() > 0) {
if (read_buffer_.length() > 0 || transport_wants_read_) {
// If the read_buffer_ is not empty or transport_wants_read_ is true, the connection may be
// able to process additional bytes even if there is no data in the kernel to kick off the
// filter chain. Alternately if the read buffer has data the fd could be read disabled. To
// handle these cases, fake an event to make sure the buffered data in the read buffer or in
// transport socket internal buffers gets processed regardless and ensure that we dispatch it
// via onRead.
dispatch_buffered_data_ = true;
file_event_->activate(Event::FileReadyType::Read);
}
Expand Down Expand Up @@ -509,9 +516,15 @@ void ConnectionImpl::onFileEvent(uint32_t events) {

void ConnectionImpl::onReadReady() {
ENVOY_CONN_LOG(trace, "read ready", *this);
ASSERT(read_enabled_);

ASSERT(!connecting_);

// Clear transport_wants_read_ just before the call to doRead. This is the only way to ensure that
// the transport socket read resumption happens as requested; onReadReady() returns early without
// reading from the transport if the read buffer is above high watermark at the start of the
// method.
transport_wants_read_ = false;
IoResult result = transport_socket_->doRead(read_buffer_);
uint64_t new_buffer_size = read_buffer_.length();
updateReadBufferStats(result.bytes_processed_, new_buffer_size);
Expand Down
10 changes: 9 additions & 1 deletion source/common/network/connection_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,10 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback
// TODO(htuch): While this is the basis for also yielding to other connections to provide some
// fair sharing of CPU resources, the underlying event loop does not make any fairness guarantees.
// Reconsider how to make fairness happen.
void setReadBufferReady() override { file_event_->activate(Event::FileReadyType::Read); }
void setReadBufferReady() override {
transport_wants_read_ = true;
file_event_->activate(Event::FileReadyType::Read);
}

// Obtain global next connection ID. This should only be used in tests.
static uint64_t nextGlobalIdForTest() { return next_global_id_; }
Expand Down Expand Up @@ -178,6 +181,11 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback
bool write_end_stream_ : 1;
bool current_write_end_stream_ : 1;
bool dispatch_buffered_data_ : 1;
// True if the most recent call to the transport socket's doRead method invoked setReadBufferReady
// to schedule read resumption after yielding due to shouldDrainReadBuffer(). When true,
// readDisable must schedule read resumption when read_disable_count_ == 0 to ensure that read
// resumption happens when remaining bytes are held in transport socket internal buffers.
bool transport_wants_read_ : 1;
};

/**
Expand Down
55 changes: 55 additions & 0 deletions test/common/network/connection_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1506,6 +1506,61 @@ TEST_F(MockTransportConnectionImplTest, ObjectDestructOrder) {
file_ready_cb_(Event::FileReadyType::Read);
}

// Verify that read resumptions requested via setReadBufferReady() are scheduled once read is
// re-enabled.
TEST_F(MockTransportConnectionImplTest, ReadBufferReadyResumeAfterReadDisable) {
InSequence s;

std::shared_ptr<MockReadFilter> read_filter(new StrictMock<MockReadFilter>());
connection_->enableHalfClose(true);
connection_->addReadFilter(read_filter);

EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Write));
connection_->readDisable(true);
EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write));
// No calls to activate when re-enabling if there are no pending read requests.
EXPECT_CALL(*file_event_, activate(_)).Times(0);
connection_->readDisable(false);

// setReadBufferReady triggers an immediate call to activate.
EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read));
connection_->setReadBufferReady();

// When processing a sequence of read disable/read enable, changes to the enabled event mask
// happen only when the disable count transitions to/from 0.
EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Write));
connection_->readDisable(true);
connection_->readDisable(true);
connection_->readDisable(true);
connection_->readDisable(false);
connection_->readDisable(false);
EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write));
// Expect a read activation since there have been no transport doRead calls since the call to
// setReadBufferReady.
EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read));
connection_->readDisable(false);

// Disable read.
EXPECT_CALL(*file_event_, setEnabled(_));
connection_->readDisable(true);

// Expect a read activate when re-enabling since the file ready cb has not done a read.
EXPECT_CALL(*file_event_, setEnabled(_));
EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read));
connection_->readDisable(false);

// Do a read to clear the transport_wants_read_ flag, verify that no read activation is scheduled.
EXPECT_CALL(*transport_socket_, doRead(_))
.WillOnce(Return(IoResult{PostIoAction::KeepOpen, 0, false}));
file_ready_cb_(Event::FileReadyType::Read);
EXPECT_CALL(*file_event_, setEnabled(_));
connection_->readDisable(true);
EXPECT_CALL(*file_event_, setEnabled(_));
// No read activate call.
EXPECT_CALL(*file_event_, activate(_)).Times(0);
connection_->readDisable(false);
}

// Test that BytesSentCb is invoked at the correct times
TEST_F(MockTransportConnectionImplTest, BytesSentCallback) {
uint64_t bytes_sent = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "extensions/transport_sockets/tls/context_config_impl.h"
#include "extensions/transport_sockets/tls/context_manager_impl.h"

#include "test/integration/autonomous_upstream.h"
#include "test/integration/integration.h"
#include "test/integration/utility.h"
#include "test/test_common/network_utility.h"
Expand Down Expand Up @@ -177,6 +178,103 @@ TEST_P(SslIntegrationTest, AdminCertEndpoint) {
EXPECT_EQ("200", response->headers().Status()->value().getStringView());
}

class RawWriteSslIntegrationTest : public SslIntegrationTest {
protected:
std::unique_ptr<Http::TestHeaderMapImpl>
testFragmentedRequestWithBufferLimit(std::list<std::string> request_chunks,
uint32_t buffer_limit) {
autonomous_upstream_ = true;
config_helper_.setBufferLimits(buffer_limit, buffer_limit);
initialize();

// write_request_cb will write each of the items in request_chunks as a separate SSL_write.
auto write_request_cb = [&request_chunks](Network::ClientConnection& client) {
if (!request_chunks.empty()) {
Buffer::OwnedImpl buffer(request_chunks.front());
client.write(buffer, false);
request_chunks.pop_front();
}
};

auto client_transport_socket_factory_ptr =
createClientSslTransportSocketFactory({}, *context_manager_, *api_);
std::string response;
auto connection = createConnectionDriver(
lookupPort("http"), write_request_cb,
[&](Network::ClientConnection&, const Buffer::Instance& data) -> void {
response.append(data.toString());
},
client_transport_socket_factory_ptr->createTransportSocket({}));

// Drive the connection until we get a response.
while (response.empty()) {
connection->run(Event::Dispatcher::RunType::NonBlock);
}
EXPECT_THAT(response, testing::HasSubstr("HTTP/1.1 200 OK\r\n"));

connection->close();
return reinterpret_cast<AutonomousUpstream*>(fake_upstreams_.front().get())
->lastRequestHeaders();
}
};

INSTANTIATE_TEST_SUITE_P(IpVersions, RawWriteSslIntegrationTest,
testing::ValuesIn(TestEnvironment::getIpVersionsForTest()),
TestUtility::ipTestParamsToString);

// Regression test for https://github.com/envoyproxy/envoy/issues/12304
TEST_P(RawWriteSslIntegrationTest, HighWatermarkReadResumptionProcessingHeaders) {
// The raw writer will perform a separate SSL_write for each of the chunks below. Chunk sizes were
// picked such that the connection's high watermark will trigger while processing the last SSL
// record containing the request headers. Verify that read resumption works correctly after
// hitting the receive buffer high watermark.
std::list<std::string> request_chunks = {
"GET / HTTP/1.1\r\nHost: host\r\n",
"key1:" + std::string(14000, 'a') + "\r\n",
"key2:" + std::string(16000, 'b') + "\r\n\r\n",
};

std::unique_ptr<Http::TestHeaderMapImpl> upstream_headers =
testFragmentedRequestWithBufferLimit(request_chunks, 15 * 1024);
ASSERT_TRUE(upstream_headers != nullptr);
EXPECT_EQ(upstream_headers->Host()->value(), "host");
EXPECT_EQ(std::string(14000, 'a'),
upstream_headers->get(Envoy::Http::LowerCaseString("key1"))->value().getStringView());
EXPECT_EQ(std::string(16000, 'b'),
upstream_headers->get(Envoy::Http::LowerCaseString("key2"))->value().getStringView());
}

// Regression test for https://github.com/envoyproxy/envoy/issues/12304
TEST_P(RawWriteSslIntegrationTest, HighWatermarkReadResumptionProcesingBody) {
// The raw writer will perform a separate SSL_write for each of the chunks below. Chunk sizes were
// picked such that the connection's high watermark will trigger while processing the last SSL
// record containing the POST body. Verify that read resumption works correctly after hitting the
// receive buffer high watermark.
std::list<std::string> request_chunks = {
"POST / HTTP/1.1\r\nHost: host\r\ncontent-length: 30000\r\n\r\n",
std::string(14000, 'a'),
std::string(16000, 'a'),
};

std::unique_ptr<Http::TestHeaderMapImpl> upstream_headers =
testFragmentedRequestWithBufferLimit(request_chunks, 15 * 1024);
ASSERT_TRUE(upstream_headers != nullptr);
}

// Regression test for https://github.com/envoyproxy/envoy/issues/12304
TEST_P(RawWriteSslIntegrationTest, HighWatermarkReadResumptionProcesingLargerBody) {
std::list<std::string> request_chunks = {
"POST / HTTP/1.1\r\nHost: host\r\ncontent-length: 150000\r\n\r\n",
};
for (int i = 0; i < 10; ++i) {
request_chunks.push_back(std::string(15000, 'a'));
}

std::unique_ptr<Http::TestHeaderMapImpl> upstream_headers =
testFragmentedRequestWithBufferLimit(request_chunks, 16 * 1024);
ASSERT_TRUE(upstream_headers != nullptr);
}

// Validate certificate selection across different certificate types and client TLS versions.
class SslCertficateIntegrationTest
: public testing::TestWithParam<
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ class SslIntegrationTestBase : public HttpIntegrationTest {
// Set this true to debug SSL handshake issues with openssl s_client. The
// verbose trace will be in the logs, openssl must be installed separately.
bool debug_with_s_client_{false};

private:
std::unique_ptr<ContextManager> context_manager_;
};

Expand Down
15 changes: 15 additions & 0 deletions test/integration/integration.h
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,21 @@ class BaseIntegrationTest : protected Logger::Loggable<Logger::Id::testing> {
void sendRawHttpAndWaitForResponse(int port, const char* raw_http, std::string* response,
bool disconnect_after_headers_complete = false);

/**
* Helper to create ConnectionDriver.
*
* @param port the port to connect to.
* @param write_request_cb callback used to send data.
* @param data_callback the callback on the received data.
* @param transport_socket transport socket to use for the client connection
**/
std::unique_ptr<RawConnectionDriver> createConnectionDriver(
uint32_t port, RawConnectionDriver::DoWriteCallback write_request_cb,
std::function<void(Network::ClientConnection&, const Buffer::Instance&)>&& data_callback,
Network::TransportSocketPtr transport_socket) {
return std::make_unique<RawConnectionDriver>(port, write_request_cb, data_callback, version_, std::move(transport_socket));
}

protected:
// Create the envoy server in another thread and start it.
// Will not return until that server is listening.
Expand Down
25 changes: 24 additions & 1 deletion test/integration/utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ RawConnectionDriver::RawConnectionDriver(uint32_t port, Buffer::Instance& initia
api_ = Api::createApiForTest(stats_store_);
Event::GlobalTimeSystem time_system;
dispatcher_ = api_->allocateDispatcher();
callbacks_ = std::make_unique<ConnectionCallbacks>();
callbacks_ = std::make_unique<ConnectionCallbacks>([](){});
client_ = dispatcher_->createClientConnection(
Network::Utility::resolveUrl(
fmt::format("tcp://{}:{}", Network::Test::getLoopbackAddressUrlString(version), port)),
Expand All @@ -129,6 +129,29 @@ RawConnectionDriver::RawConnectionDriver(uint32_t port, Buffer::Instance& initia
client_->connect();
}

RawConnectionDriver::RawConnectionDriver(uint32_t port, DoWriteCallback write_request_callback,
ReadCallback response_data_callback,
Network::Address::IpVersion version,
Network::TransportSocketPtr transport_socket) {
api_ = Api::createApiForTest(stats_store_);
Event::GlobalTimeSystem time_system;
dispatcher_ = api_->allocateDispatcher();
callbacks_ = std::make_unique<ConnectionCallbacks>(
[this, write_request_callback]() { write_request_callback(*client_); });
client_ = dispatcher_->createClientConnection(
Network::Utility::resolveUrl(
fmt::format("tcp://{}:{}", Network::Test::getLoopbackAddressUrlString(version), port)),
Network::Address::InstanceConstSharedPtr(), std::move(transport_socket), nullptr);
// ConnectionCallbacks will call write_request_callback from the connect and low-watermark
// callbacks. Set a small buffer limit so high-watermark is triggered after every write and
// low-watermark is triggered every time the buffer is drained.
client_->setBufferLimits(1);

client_->addConnectionCallbacks(*callbacks_);
client_->addReadFilter(Network::ReadFilterSharedPtr{new ForwardingFilter(*this, response_data_callback)});
client_->connect();
}

RawConnectionDriver::~RawConnectionDriver() = default;

void RawConnectionDriver::run(Event::Dispatcher::RunType run_type) { dispatcher_->run(run_type); }
Expand Down
33 changes: 27 additions & 6 deletions test/integration/utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,15 @@ using BufferingStreamDecoderPtr = std::unique_ptr<BufferingStreamDecoder>;
*/
class RawConnectionDriver {
public:
using DoWriteCallback = std::function<void(Network::ClientConnection&)>;
using ReadCallback = std::function<void(Network::ClientConnection&, const Buffer::Instance&)>;

RawConnectionDriver(uint32_t port, Buffer::Instance& initial_data, ReadCallback data_callback,
Network::Address::IpVersion version);
RawConnectionDriver(uint32_t port, DoWriteCallback write_request_callback,
ReadCallback response_data_callback, Network::Address::IpVersion version,
Network::TransportSocketPtr transport_socket);
// Similar to the constructor above but accepts the request as a constructor argument.
RawConnectionDriver(uint32_t port, Buffer::Instance& initial_data,
ReadCallback data_callback, Network::Address::IpVersion version);
~RawConnectionDriver();
const Network::Connection& connection() { return *client_; }
bool connecting() { return callbacks_->connecting_; }
Expand All @@ -75,29 +80,45 @@ class RawConnectionDriver {
private:
struct ForwardingFilter : public Network::ReadFilterBaseImpl {
ForwardingFilter(RawConnectionDriver& parent, ReadCallback cb)
: parent_(parent), data_callback_(cb) {}
: parent_(parent), response_data_callback_(cb) {}

// Network::ReadFilter
Network::FilterStatus onData(Buffer::Instance& data, bool) override {
data_callback_(*parent_.client_, data);
response_data_callback_(*parent_.client_, data);
data.drain(data.length());
return Network::FilterStatus::StopIteration;
}

RawConnectionDriver& parent_;
ReadCallback data_callback_;
ReadCallback response_data_callback_;
};

struct ConnectionCallbacks : public Network::ConnectionCallbacks {
using WriteCb = std::function<void()>;

ConnectionCallbacks(WriteCb write_cb) : write_cb_(write_cb) {}
bool connected() const { return connected_; }
bool closed() const { return closed_; }

// Network::ConnectionCallbacks
void onEvent(Network::ConnectionEvent event) override {
if (!connected_ && event == Network::ConnectionEvent::Connected) {
write_cb_();
}

last_connection_event_ = event;
connecting_ = false;
}
void onAboveWriteBufferHighWatermark() override {}
void onBelowWriteBufferLowWatermark() override {}
void onBelowWriteBufferLowWatermark() override { write_cb_(); }

bool connecting_{true};
Network::ConnectionEvent last_connection_event_;

private:
WriteCb write_cb_;
bool connected_{false};
bool closed_{false};
};

Stats::IsolatedStoreImpl stats_store_;
Expand Down

0 comments on commit b01e47a

Please sign in to comment.