Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

connection: Remember transport socket read resumption requests and replay them when re-enabling read. #13772

Merged
1 change: 1 addition & 0 deletions docs/root/version_history/current.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Bug Fixes
*Changes expected to improve the state of the world and are unlikely to have negative effects*

* http: sending CONNECT_ERROR for HTTP/2 where appropriate during CONNECT requests.
* tls: fix read resumption after triggering buffer high-watermark and all remaining request/response bytes are stored in the SSL connection's internal buffers.

Removed Config or Runtime
-------------------------
Expand Down
32 changes: 23 additions & 9 deletions source/common/network/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ ConnectionImpl::ConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPt
[]() -> void { /* TODO(adisuissa): Handle overflow watermark */ })),
write_buffer_above_high_watermark_(false), detect_early_close_(true),
enable_half_close_(false), read_end_stream_raised_(false), read_end_stream_(false),
write_end_stream_(false), current_write_end_stream_(false), dispatch_buffered_data_(false) {
write_end_stream_(false), current_write_end_stream_(false), dispatch_buffered_data_(false),
transport_wants_read_(false) {

if (!connected) {
connecting_ = true;
Expand Down Expand Up @@ -189,7 +190,7 @@ Connection::State ConnectionImpl::state() const {

void ConnectionImpl::closeConnectionImmediately() { closeSocket(ConnectionEvent::LocalClose); }

bool ConnectionImpl::consumerWantsToRead() {
bool ConnectionImpl::filterChainWantsData() {
return read_disable_count_ == 0 ||
(read_disable_count_ == 1 && read_buffer_.highWatermarkTriggered());
}
Expand Down Expand Up @@ -267,7 +268,7 @@ void ConnectionImpl::noDelay(bool enable) {
}

void ConnectionImpl::onRead(uint64_t read_buffer_size) {
if (inDelayedClose() || !consumerWantsToRead()) {
if (inDelayedClose() || !filterChainWantsData()) {
return;
}
ASSERT(ioHandle().isOpen());
Expand Down Expand Up @@ -352,11 +353,17 @@ void ConnectionImpl::readDisable(bool disable) {
ioHandle().enableFileEvents(Event::FileReadyType::Read | Event::FileReadyType::Write);
}

if (consumerWantsToRead() && read_buffer_.length() > 0) {
// If the connection has data buffered there's no guarantee there's also data in the kernel
// which will kick off the filter chain. Alternately if the read buffer has data the fd could
// be read disabled. To handle these cases, fake an event to make sure the buffered data gets
// processed regardless and ensure that we dispatch it via onRead.
if (filterChainWantsData() && (read_buffer_.length() > 0 || transport_wants_read_)) {
// If the read_buffer_ is not empty or transport_wants_read_ is true, the connection may be
// able to process additional bytes even if there is no data in the kernel to kick off the
// filter chain. Alternately if the read buffer has data the fd could be read disabled. To
// handle these cases, fake an event to make sure the buffered data in the read buffer or in
// transport socket internal buffers gets processed regardless and ensure that we dispatch it
// via onRead.

// Sanity check: resumption with read_disable_count_ > 0 should only happen if the read
// buffer's high watermark has triggered.
Comment on lines +364 to +365
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stupid question: why do we bother with this extra logic to read when the disable count is still 1 and high watermark triggered? Is the idea that we already have the data so we might as well flush it? If the high watermark triggered it's probably going to be stuck on the other side, so mostly just wondering if this extra logic is actually worth it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is part of the implementation of high-watermarks on the read buffer introduced in PR #11170

The parser is able to consume input from the read buffer, we just don't want to read additional bytes from the transport into the read buffer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the idea here is that we allow reading data out in the hope that it will get us below the low watermark? OK thanks that makes sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

The common scenario is when the H1 parser stops consuming bytes from the read buffer and calls readDisable(true) once it detects the end of the current request message. When request processing is complete, the H1 parser re-enables read. When H1 pipelining is in use, a full second request message could be in the read buffer and the read buffer may have triggered the high-watermark.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense, though note that we don't do pipelining.

ASSERT(read_buffer_.length() > 0 || read_disable_count_ == 0);
dispatch_buffered_data_ = true;
setReadBufferReady();
}
Expand Down Expand Up @@ -549,12 +556,19 @@ void ConnectionImpl::onReadReady() {
// 2) The consumer of connection data called readDisable(true), and instead of reading from the
// socket we simply need to dispatch already read data.
if (read_disable_count_ != 0) {
if (latched_dispatch_buffered_data && consumerWantsToRead()) {
// Do not clear transport_wants_read_ when returning early; the early return skips the transport
// socket doRead call.
if (latched_dispatch_buffered_data && filterChainWantsData()) {
onRead(read_buffer_.length());
}
return;
}

// Clear transport_wants_read_ just before the call to doRead. This is the only way to ensure that
// the transport socket read resumption happens as requested; onReadReady() returns early without
// reading from the transport if the read buffer is above high watermark at the start of the
// method.
transport_wants_read_ = false;
IoResult result = transport_socket_->doRead(read_buffer_);
uint64_t new_buffer_size = read_buffer_.length();
updateReadBufferStats(result.bytes_processed_, new_buffer_size);
Expand Down
19 changes: 13 additions & 6 deletions source/common/network/connection_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,10 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback
// TODO(htuch): While this is the basis for also yielding to other connections to provide some
// fair sharing of CPU resources, the underlying event loop does not make any fairness guarantees.
// Reconsider how to make fairness happen.
void setReadBufferReady() override { ioHandle().activateFileEvents(Event::FileReadyType::Read); }
void setReadBufferReady() override {
transport_wants_read_ = true;
ioHandle().activateFileEvents(Event::FileReadyType::Read);
}
void flushWriteBuffer() override;

// Obtain global next connection ID. This should only be used in tests.
Expand All @@ -126,11 +129,10 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback
// A convenience function which returns true if
// 1) The read disable count is zero or
// 2) The read disable count is one due to the read buffer being overrun.
// In either case the consumer of the data would like to read from the buffer.
// If the read count is greater than one, or equal to one when the buffer is
// not overrun, then the consumer of the data has called readDisable, and does
// not want to read.
bool consumerWantsToRead();
// In either case the filter chain would like to process data from the read buffer or transport
// socket. If the read count is greater than one, or equal to one when the buffer is not overrun,
// then the filter chain has called readDisable, and does not want additional data.
bool filterChainWantsData();

// Network::ConnectionImplBase
void closeConnectionImmediately() final;
Expand Down Expand Up @@ -195,6 +197,11 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback
bool write_end_stream_ : 1;
bool current_write_end_stream_ : 1;
bool dispatch_buffered_data_ : 1;
// True if the most recent call to the transport socket's doRead method invoked setReadBufferReady
// to schedule read resumption after yielding due to shouldDrainReadBuffer(). When true,
// readDisable must schedule read resumption when read_disable_count_ == 0 to ensure that read
// resumption happens when remaining bytes are held in transport socket internal buffers.
bool transport_wants_read_ : 1;
};

class ServerConnectionImpl : public ConnectionImpl, virtual public ServerConnection {
Expand Down
191 changes: 191 additions & 0 deletions test/common/network/connection_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1857,6 +1857,197 @@ TEST_F(MockTransportConnectionImplTest, ObjectDestructOrder) {
file_ready_cb_(Event::FileReadyType::Read);
}

// Verify that read resumptions requested via setReadBufferReady() are scheduled once read is
// re-enabled.
TEST_F(MockTransportConnectionImplTest, ReadBufferReadyResumeAfterReadDisable) {
InSequence s;

std::shared_ptr<MockReadFilter> read_filter(new StrictMock<MockReadFilter>());
connection_->enableHalfClose(true);
connection_->addReadFilter(read_filter);

EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Write));
connection_->readDisable(true);
EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write));
// No calls to activate when re-enabling if there are no pending read requests.
EXPECT_CALL(*file_event_, activate(_)).Times(0);
connection_->readDisable(false);

// setReadBufferReady triggers an immediate call to activate.
EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read));
connection_->setReadBufferReady();

// When processing a sequence of read disable/read enable, changes to the enabled event mask
// happen only when the disable count transitions to/from 0.
EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Write));
connection_->readDisable(true);
connection_->readDisable(true);
connection_->readDisable(true);
connection_->readDisable(false);
connection_->readDisable(false);
EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write));
// Expect a read activation since there have been no transport doRead calls since the call to
// setReadBufferReady.
EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read));
connection_->readDisable(false);

// No calls to doRead when file_ready_cb is invoked while read disabled.
EXPECT_CALL(*file_event_, setEnabled(_));
connection_->readDisable(true);
EXPECT_CALL(*transport_socket_, doRead(_)).Times(0);
file_ready_cb_(Event::FileReadyType::Read);

// Expect a read activate when re-enabling since the file ready cb has not done a read.
EXPECT_CALL(*file_event_, setEnabled(_));
EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read));
connection_->readDisable(false);

// Do a read to clear the want_read_ flag, verify that no read activation is scheduled.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

want_read_ -> transport_wants_read_ here and other places below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Nice catch.

EXPECT_CALL(*transport_socket_, doRead(_))
.WillOnce(Return(IoResult{PostIoAction::KeepOpen, 0, false}));
file_ready_cb_(Event::FileReadyType::Read);
EXPECT_CALL(*file_event_, setEnabled(_));
connection_->readDisable(true);
EXPECT_CALL(*file_event_, setEnabled(_));
// No read activate call.
EXPECT_CALL(*file_event_, activate(_)).Times(0);
connection_->readDisable(false);
}

// Verify that read resumption is scheduled when read is re-enabled while the read buffer is
// non-empty.
TEST_F(MockTransportConnectionImplTest, ReadBufferResumeAfterReadDisable) {
InSequence s;

std::shared_ptr<MockReadFilter> read_filter(new StrictMock<MockReadFilter>());
connection_->setBufferLimits(5);
connection_->enableHalfClose(true);
connection_->addReadFilter(read_filter);

// Add some data to the read buffer to trigger read activate calls when re-enabling read.
EXPECT_CALL(*transport_socket_, doRead(_))
.WillOnce(Invoke([](Buffer::Instance& buffer) -> IoResult {
buffer.add("0123456789");
return {PostIoAction::KeepOpen, 10, false};
}));
// Expect a change to the event mask when hitting the read buffer high-watermark.
EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Write));
EXPECT_CALL(*read_filter, onNewConnection()).WillOnce(Return(FilterStatus::Continue));
EXPECT_CALL(*read_filter, onData(_, false)).WillOnce(Return(FilterStatus::Continue));
file_ready_cb_(Event::FileReadyType::Read);

// Already read disabled, expect no changes to enabled events mask.
EXPECT_CALL(*file_event_, setEnabled(_)).Times(0);
connection_->readDisable(true);
connection_->readDisable(true);
connection_->readDisable(false);
// Read buffer is at the high watermark so read_disable_count should be == 1. Expect a read
// activate but no call to setEnable to change the registration mask.
EXPECT_CALL(*file_event_, setEnabled(_)).Times(0);
EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read));
connection_->readDisable(false);

// Invoke the file event cb while read_disable_count_ == 1 to partially drain the read buffer.
// Expect no transport reads.
EXPECT_CALL(*transport_socket_, doRead(_)).Times(0);
EXPECT_CALL(*read_filter, onData(_, _))
.WillRepeatedly(Invoke([&](Buffer::Instance& data, bool) -> FilterStatus {
EXPECT_EQ(10, data.length());
data.drain(data.length() - 1);
return FilterStatus::Continue;
}));
// Partial drain of the read buffer below low watermark triggers an update to the fd enabled mask
// and a read activate since the read buffer is not empty.
EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write));
EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read));
file_ready_cb_(Event::FileReadyType::Read);

// Drain the rest of the buffer and verify there are no spurious read activate calls.
EXPECT_CALL(*transport_socket_, doRead(_))
.WillOnce(Return(IoResult{PostIoAction::KeepOpen, 0, false}));
EXPECT_CALL(*read_filter, onData(_, _))
.WillRepeatedly(Invoke([&](Buffer::Instance& data, bool) -> FilterStatus {
EXPECT_EQ(1, data.length());
data.drain(1);
return FilterStatus::Continue;
}));
file_ready_cb_(Event::FileReadyType::Read);

EXPECT_CALL(*file_event_, setEnabled(_));
connection_->readDisable(true);
EXPECT_CALL(*file_event_, setEnabled(_));
// read buffer is empty, no read activate call.
EXPECT_CALL(*file_event_, activate(_)).Times(0);
connection_->readDisable(false);
}

// Verify that want_read_ read resumption is not lost when processing read buffer high-watermark
// resumptions.
TEST_F(MockTransportConnectionImplTest, ResumeWhileAndAfterReadDisable) {
InSequence s;

std::shared_ptr<MockReadFilter> read_filter(new StrictMock<MockReadFilter>());
connection_->setBufferLimits(5);
connection_->enableHalfClose(true);
connection_->addReadFilter(read_filter);

// Add some data to the read buffer and also call setReadBufferReady to mimic what transport
// sockets are expected to do when the read buffer high watermark is hit.
EXPECT_CALL(*transport_socket_, doRead(_))
.WillOnce(Invoke([this](Buffer::Instance& buffer) -> IoResult {
buffer.add("0123456789");
connection_->setReadBufferReady();
return {PostIoAction::KeepOpen, 10, false};
}));
// Expect a change to the event mask when hitting the read buffer high-watermark.
EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Write));
// The setReadBufferReady call adds a spurious read activation.
// TODO(antoniovicente) Skip the read activate in setReadBufferReady when read_disable_count_ > 0.
EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read));
EXPECT_CALL(*read_filter, onNewConnection()).WillOnce(Return(FilterStatus::Continue));
EXPECT_CALL(*read_filter, onData(_, false)).WillOnce(Return(FilterStatus::Continue));
file_ready_cb_(Event::FileReadyType::Read);

// Already read disabled, expect no changes to enabled events mask.
EXPECT_CALL(*file_event_, setEnabled(_)).Times(0);
connection_->readDisable(true);
connection_->readDisable(true);
connection_->readDisable(false);
// Read buffer is at the high watermark so read_disable_count should be == 1. Expect a read
// activate but no call to setEnable to change the registration mask.
EXPECT_CALL(*file_event_, setEnabled(_)).Times(0);
EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read));
connection_->readDisable(false);

// Invoke the file event cb while read_disable_count_ == 1 and fully drain the read buffer.
// Expect no transport reads. Expect a read resumption due to want_read_ being true when read is
// re-enabled due to going under the low watermark.
EXPECT_CALL(*transport_socket_, doRead(_)).Times(0);
EXPECT_CALL(*read_filter, onData(_, _))
.WillRepeatedly(Invoke([&](Buffer::Instance& data, bool) -> FilterStatus {
EXPECT_EQ(10, data.length());
data.drain(data.length());
return FilterStatus::Continue;
}));
// The buffer is fully drained. Expect a read activation because setReadBufferReady set want_read_
// and no transport doRead calls have happened.
EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write));
EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read));
file_ready_cb_(Event::FileReadyType::Read);

EXPECT_CALL(*transport_socket_, doRead(_))
.WillOnce(Return(IoResult{PostIoAction::KeepOpen, 0, false}));
file_ready_cb_(Event::FileReadyType::Read);

// Verify there are no read activate calls the event callback does a transport read and clears the
// want_read_ state.
EXPECT_CALL(*file_event_, setEnabled(_));
connection_->readDisable(true);
EXPECT_CALL(*file_event_, setEnabled(_));
EXPECT_CALL(*file_event_, activate(_)).Times(0);
connection_->readDisable(false);
}

// Test that BytesSentCb is invoked at the correct times
TEST_F(MockTransportConnectionImplTest, BytesSentCallback) {
uint64_t bytes_sent = 0;
Expand Down
Loading