Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

connection: skip read activate call when reading from transport socket if the connection is read disabled #14043

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions include/envoy/network/transport_socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ class TransportSocketCallbacks {
virtual bool shouldDrainReadBuffer() PURE;

/**
* Mark read buffer ready to read in the event loop. This is used when yielding following
* shouldDrainReadBuffer().
* Mark the transport socket as readable in order to force a read in a future iteration of the
* event loop. This is used when yielding following shouldDrainReadBuffer().
*/
virtual void setReadBufferReady() PURE;
virtual void setTransportSocketIsReadable() PURE;

/**
* Raise a connection event to the connection. This can be used by a secure socket (e.g. TLS)
Expand Down
35 changes: 25 additions & 10 deletions source/common/network/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,19 @@ Connection::State ConnectionImpl::state() const {

void ConnectionImpl::closeConnectionImmediately() { closeSocket(ConnectionEvent::LocalClose); }

void ConnectionImpl::setTransportSocketIsReadable() {
// Remember that the transport requested read resumption, in case the resumption event is not
// scheduled immediately or is "lost" because read was disabled.
transport_wants_read_ = true;
// Only schedule a read activation if the connection is not read disabled to avoid spurious
// wakeups. When read disabled, the connection will not read from the transport, and limit
// dispatch to the current contents of the read buffer if its high-watermark is triggered and
// dispatch_buffered_data_ is set.
if (read_disable_count_ == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I see a test change to ensure that when read_disabled we don't activate here.
there's one other change in this PR, which is that in filterChainWantsData we no longer set transport_wants_read_ true. Is that behavior regression tested as well?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It stroke me odd as well so I dug a bit deeper into that and I think the current and the previous code are equivalent:

In the previous code we set transport_wants_read_ via calling setReadBufferReady in ReadDisable if:

(*) transport_wants_read_ is true already or buffer.length() > 0. This comes from the if-statement above. if (filterChainWantsData() && (read_buffer_.length() > 0 || transport_wants_read_))

Looking at the two cases individually:

  1. In the case that transport_wants_read_ is true already then it doesn't really matter that we do not set transport_wants_read_ to true.
  2. In the case that buffer.length() > 0 then the next call to ReadDisable(false) will trigger the read activation regardless of the value of transport_wants_read_ because of (*).

This means that even in the case where that we have a cycle of ReadDisable(true)/ReadEnable(false) (this used to leave connections at zombie state because the read activation was canceled) we will re-activate reads correctly.

+1 on testing though, the more the merrier.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're referring to the case where we check filterChainWantsData() in readDisable(false) below. Setting transport_wants_read_ when resuming from readDisable(false) does not matter because it is either already set when the method is called or onReadReady clears it before it is possible to drain the read buffer and call readDisable(false) from within that method.

Also, setting transport_wants_read_ in readDisable(false) was known to be redundant as seen in this comment thread: #13772 (comment)

Longer version:
After a read from the transport socket the connection read buffer can be in one of the following states:

  1. over high-watermark, which implies transport_wants_read_ == true and high watermark triggered
  2. at exactly the high-watermark, which implies transport_wants_read_ == true but high watermark not triggered
  3. not empty but below high-watermark, implies short read and transport_wants_read_ == false
  4. empty, implies transport_wants_read_ == false

transport_wants_read_ == true in the cases above since shouldDrainReadBuffer() == true triggers a call to setTransportSocketIsReadable() from the transport.

Resumption on readDisable(false) can happen if:
a. read_buffer_.highWatermarkTriggered() is true and read_disable_count_ == 1
b. read_buffer_ is not empty and read_disable_count_ == 0
c. read_buffer_ is empty and transport_wants_read_ == true

Additional considerations:
Bytes are only ever added or removed from the read buffer under the call stack of onReadReady

Resumption case (a) can only happen after transport socket read case (1), so transport_wants_read_ == true in this case. The next call to onReadReady will return early because read_disable_count_ >= 1, so transport_wants_read_ remains == true.

The next call to onReadReady after resumption case (b) will perform a read from the transport because read_disable_count_ == 0. onReadReady will set transport_wants_read_ to false before doing reads from the transport or processing the contents of the read buffer. Nothing reads the value of transport_wants_read_ between the time the read resumption is scheduled until onReadReady sets it to false.

transport_wants_read_ is already set to true in case (c)

While explaining this I noticed an edge case that I hadn't considered before: If the read buffer is exactly at the buffer_limit_, shouldDrainReadBuffer() will return true but highWatermarkTriggered() returns false. In fact, this edge case is an optimization for the common case where the read limit is configured to a multiple of 16KB, so transport reads end up exactly hitting the configured buffer limit and narrowly avoid the expensive calls to readDisable(true)/readDisable(false). I'ld love to get rid of this edge case, but in order to do that we need to reduce the cost of going through a readDisable(true)/readDisable(false) cycle.

ioHandle().activateFileEvents(Event::FileReadyType::Read);
}
}

bool ConnectionImpl::filterChainWantsData() {
return read_disable_count_ == 0 ||
(read_disable_count_ == 1 && read_buffer_.highWatermarkTriggered());
Expand Down Expand Up @@ -358,18 +371,19 @@ void ConnectionImpl::readDisable(bool disable) {
}

if (filterChainWantsData() && (read_buffer_.length() > 0 || transport_wants_read_)) {
// If the read_buffer_ is not empty or transport_wants_read_ is true, the connection may be
// able to process additional bytes even if there is no data in the kernel to kick off the
// filter chain. Alternately if the read buffer has data the fd could be read disabled. To
// handle these cases, fake an event to make sure the buffered data in the read buffer or in
// transport socket internal buffers gets processed regardless and ensure that we dispatch it
// via onRead.

// Sanity check: resumption with read_disable_count_ > 0 should only happen if the read
// buffer's high watermark has triggered.
ASSERT(read_buffer_.length() > 0 || read_disable_count_ == 0);

// If the read_buffer_ is not empty or transport_wants_read_ is true, the connection may be
// able to process additional bytes even if there is no data in the kernel to kick off the
// filter chain. Alternately the connection may need read resumption while read disabled and
// not registered for read events because the read buffer's high-watermark has triggered. To
// handle these cases, directly schedule a fake read event to make sure the buffered data in
// the read buffer or in transport socket internal buffers gets processed regardless and
// ensure that we dispatch it via onRead.
dispatch_buffered_data_ = true;
setReadBufferReady();
ioHandle().activateFileEvents(Event::FileReadyType::Read);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think it's worth a comment on why this case is different?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rearranged and expanded comments.

}
}
}
Expand Down Expand Up @@ -560,8 +574,9 @@ void ConnectionImpl::onReadReady() {
ASSERT(!connecting_);

// We get here while read disabled in two ways.
// 1) There was a call to setReadBufferReady(), for example if a raw buffer socket ceded due to
// shouldDrainReadBuffer(). In this case we defer the event until the socket is read enabled.
// 1) There was a call to setTransportSocketIsReadable(), for example if a raw buffer socket ceded
// due to shouldDrainReadBuffer(). In this case we defer the event until the socket is read
// enabled.
// 2) The consumer of connection data called readDisable(true), and instead of reading from the
// socket we simply need to dispatch already read data.
if (read_disable_count_ != 0) {
Expand Down
14 changes: 6 additions & 8 deletions source/common/network/connection_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,7 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback
// TODO(htuch): While this is the basis for also yielding to other connections to provide some
// fair sharing of CPU resources, the underlying event loop does not make any fairness guarantees.
// Reconsider how to make fairness happen.
void setReadBufferReady() override {
transport_wants_read_ = true;
ioHandle().activateFileEvents(Event::FileReadyType::Read);
}
void setTransportSocketIsReadable() override;
void flushWriteBuffer() override;

// Obtain global next connection ID. This should only be used in tests.
Expand Down Expand Up @@ -200,10 +197,11 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback
bool write_end_stream_ : 1;
bool current_write_end_stream_ : 1;
bool dispatch_buffered_data_ : 1;
// True if the most recent call to the transport socket's doRead method invoked setReadBufferReady
// to schedule read resumption after yielding due to shouldDrainReadBuffer(). When true,
// readDisable must schedule read resumption when read_disable_count_ == 0 to ensure that read
// resumption happens when remaining bytes are held in transport socket internal buffers.
// True if the most recent call to the transport socket's doRead method invoked
// setTransportSocketIsReadable to schedule read resumption after yielding due to
// shouldDrainReadBuffer(). When true, readDisable must schedule read resumption when
// read_disable_count_ == 0 to ensure that read resumption happens when remaining bytes are held
// in transport socket internal buffers.
bool transport_wants_read_ : 1;
};

Expand Down
2 changes: 1 addition & 1 deletion source/common/network/raw_buffer_socket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ IoResult RawBufferSocket::doRead(Buffer::Instance& buffer) {
}
bytes_read += result.rc_;
if (callbacks_->shouldDrainReadBuffer()) {
callbacks_->setReadBufferReady();
callbacks_->setTransportSocketIsReadable();
break;
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class NoOpTransportSocketCallbacks : public Network::TransportSocketCallbacks {
/*
* No-op for these two methods to hold back the callbacks.
*/
void setReadBufferReady() override {}
void setTransportSocketIsReadable() override {}
void raiseEvent(Network::ConnectionEvent) override {}
void flushWriteBuffer() override {}

Expand Down
2 changes: 1 addition & 1 deletion source/extensions/transport_sockets/alts/tsi_socket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ Network::PostIoAction TsiSocket::doHandshakeNextDone(NextResultPtr&& next_result
}

if (raw_read_buffer_.length() > 0) {
callbacks_->setReadBufferReady();
callbacks_->setTransportSocketIsReadable();
}

// Try to write raw buffer when next call is done, even this is not in do[Read|Write] stack.
Expand Down
4 changes: 2 additions & 2 deletions source/extensions/transport_sockets/tls/ssl_socket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ Network::IoResult SslSocket::doRead(Buffer::Instance& read_buffer) {
if (slices_to_commit > 0) {
read_buffer.commit(slices, slices_to_commit);
if (callbacks_->shouldDrainReadBuffer()) {
callbacks_->setReadBufferReady();
callbacks_->setTransportSocketIsReadable();
keep_reading = false;
}
}
Expand Down Expand Up @@ -301,7 +301,7 @@ void SslSocket::shutdownSsl() {
if (rc == 0) {
// See https://www.openssl.org/docs/manmaster/man3/SSL_shutdown.html
// if return value is 0, Call SSL_read() to do a bidirectional shutdown.
callbacks_->setReadBufferReady();
callbacks_->setTransportSocketIsReadable();
}
}
ENVOY_CONN_LOG(debug, "SSL shutdown: rc={}", callbacks_->connection(), rc);
Expand Down
24 changes: 12 additions & 12 deletions test/common/network/connection_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1889,8 +1889,8 @@ TEST_F(MockTransportConnectionImplTest, ObjectDestructOrder) {
file_ready_cb_(Event::FileReadyType::Read);
}

// Verify that read resumptions requested via setReadBufferReady() are scheduled once read is
// re-enabled.
// Verify that read resumptions requested via setTransportSocketIsReadable() are scheduled once read
// is re-enabled.
TEST_F(MockTransportConnectionImplTest, ReadBufferReadyResumeAfterReadDisable) {
InSequence s;

Expand All @@ -1905,9 +1905,9 @@ TEST_F(MockTransportConnectionImplTest, ReadBufferReadyResumeAfterReadDisable) {
EXPECT_CALL(*file_event_, activate(_)).Times(0);
connection_->readDisable(false);

// setReadBufferReady triggers an immediate call to activate.
// setTransportSocketIsReadable triggers an immediate call to activate.
EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read));
connection_->setReadBufferReady();
connection_->setTransportSocketIsReadable();

// When processing a sequence of read disable/read enable, changes to the enabled event mask
// happen only when the disable count transitions to/from 0.
Expand All @@ -1919,7 +1919,7 @@ TEST_F(MockTransportConnectionImplTest, ReadBufferReadyResumeAfterReadDisable) {
connection_->readDisable(false);
EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write));
// Expect a read activation since there have been no transport doRead calls since the call to
// setReadBufferReady.
// setTransportSocketIsReadable.
EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read));
connection_->readDisable(false);

Expand Down Expand Up @@ -2023,19 +2023,19 @@ TEST_F(MockTransportConnectionImplTest, ResumeWhileAndAfterReadDisable) {
connection_->enableHalfClose(true);
connection_->addReadFilter(read_filter);

// Add some data to the read buffer and also call setReadBufferReady to mimic what transport
// sockets are expected to do when the read buffer high watermark is hit.
// Add some data to the read buffer and also call setTransportSocketIsReadable to mimic what
// transport sockets are expected to do when the read buffer high watermark is hit.
EXPECT_CALL(*transport_socket_, doRead(_))
.WillOnce(Invoke([this](Buffer::Instance& buffer) -> IoResult {
buffer.add("0123456789");
connection_->setReadBufferReady();
connection_->setTransportSocketIsReadable();
return {PostIoAction::KeepOpen, 10, false};
}));
// Expect a change to the event mask when hitting the read buffer high-watermark.
EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Write));
// The setReadBufferReady call adds a spurious read activation.
// TODO(antoniovicente) Skip the read activate in setReadBufferReady when read_disable_count_ > 0.
EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read));
// The setTransportSocketIsReadable does not call activate because read_disable_count_ > 0 due to
// high-watermark.
EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read)).Times(0);
EXPECT_CALL(*read_filter, onNewConnection()).WillOnce(Return(FilterStatus::Continue));
EXPECT_CALL(*read_filter, onData(_, false)).WillOnce(Return(FilterStatus::Continue));
file_ready_cb_(Event::FileReadyType::Read);
Expand All @@ -2061,7 +2061,7 @@ TEST_F(MockTransportConnectionImplTest, ResumeWhileAndAfterReadDisable) {
data.drain(data.length());
return FilterStatus::Continue;
}));
// The buffer is fully drained. Expect a read activation because setReadBufferReady set
// The buffer is fully drained. Expect a read activation because setTransportSocketIsReadable set
// transport_wants_read_ and no transport doRead calls have happened.
EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write));
EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@ class TestTransportSocketCallbacks : public Network::TransportSocketCallbacks {
const Network::IoHandle& ioHandle() const override { return *io_handle_; }
Network::Connection& connection() override { return connection_; }
bool shouldDrainReadBuffer() override { return false; }
void setReadBufferReady() override { set_read_buffer_ready_ = true; }
void setTransportSocketIsReadable() override { transport_socket_is_readable_ = true; }
void raiseEvent(Network::ConnectionEvent) override { event_raised_ = true; }
void flushWriteBuffer() override { write_buffer_flushed_ = true; }

bool event_raised() const { return event_raised_; }
bool set_read_buffer_ready() const { return set_read_buffer_ready_; }
bool write_buffer_flushed() const { return write_buffer_flushed_; }
bool eventRaised() const { return event_raised_; }
bool transportSocketIsReadable() const { return transport_socket_is_readable_; }
bool writeBufferFlushed() const { return write_buffer_flushed_; }

private:
bool event_raised_{false};
bool set_read_buffer_ready_{false};
bool transport_socket_is_readable_{false};
bool write_buffer_flushed_{false};
Network::IoHandlePtr io_handle_;
Network::Connection& connection_;
Expand All @@ -55,12 +55,12 @@ TEST_F(NoOpTransportSocketCallbacksTest, TestAllCallbacks) {
EXPECT_EQ(&connection_, &wrapped_callbacks_.connection());
EXPECT_FALSE(wrapped_callbacks_.shouldDrainReadBuffer());

wrapped_callbacks_.setReadBufferReady();
EXPECT_FALSE(wrapper_callbacks_.set_read_buffer_ready());
wrapped_callbacks_.setTransportSocketIsReadable();
EXPECT_FALSE(wrapper_callbacks_.transportSocketIsReadable());
wrapped_callbacks_.raiseEvent(Network::ConnectionEvent::Connected);
EXPECT_FALSE(wrapper_callbacks_.event_raised());
EXPECT_FALSE(wrapper_callbacks_.eventRaised());
wrapped_callbacks_.flushWriteBuffer();
EXPECT_FALSE(wrapper_callbacks_.write_buffer_flushed());
EXPECT_FALSE(wrapper_callbacks_.writeBufferFlushed());
}

} // namespace
Expand Down
2 changes: 1 addition & 1 deletion test/mocks/network/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ class MockTransportSocketCallbacks : public TransportSocketCallbacks {
MOCK_METHOD(const IoHandle&, ioHandle, (), (const));
MOCK_METHOD(Connection&, connection, ());
MOCK_METHOD(bool, shouldDrainReadBuffer, ());
MOCK_METHOD(void, setReadBufferReady, ());
MOCK_METHOD(void, setTransportSocketIsReadable, ());
MOCK_METHOD(void, raiseEvent, (ConnectionEvent));
MOCK_METHOD(void, flushWriteBuffer, ());

Expand Down