Skip to content

Commit

Permalink
connection: Do not reset delayed closed timer if doWrite consumes 0 b…
Browse files Browse the repository at this point in the history
…ytes from the output buffer. (envoyproxy#11833)

Commit Message: connection: Do not reset delayed closed timer if doWrite consumes 0 bytes from the output buffer.
Additional Description: Only reset the delayed close timer if the write attempt made progress. This works around spurious fd Write events which are delivered to a connection even after it manages to fully drain the output buffer and should be waiting for client FIN or the delay close timer to expire. This is known to happen when listening for level events instead of edge-trigger fd events.
Risk Level: medium, changes to timeout behavior.
Testing: unit
Docs Changes: n/a
Release Notes: n/a
Fixes envoyproxy#11829

Signed-off-by: Antonio Vicente <avd@google.com>
Signed-off-by: scheler <santosh.cheler@appdynamics.com>
  • Loading branch information
antoniovicente authored and scheler committed Aug 4, 2020
1 parent 3aab220 commit ced9d24
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 6 deletions.
9 changes: 6 additions & 3 deletions source/common/network/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -645,15 +645,18 @@ void ConnectionImpl::onWriteReady() {
} else if ((inDelayedClose() && new_buffer_size == 0) || bothSidesHalfClosed()) {
ENVOY_CONN_LOG(debug, "write flush complete", *this);
if (delayed_close_state_ == DelayedCloseState::CloseAfterFlushAndWait) {
ASSERT(delayed_close_timer_ != nullptr);
delayed_close_timer_->enableTimer(delayed_close_timeout_);
ASSERT(delayed_close_timer_ != nullptr && delayed_close_timer_->enabled());
if (result.bytes_processed_ > 0) {
delayed_close_timer_->enableTimer(delayed_close_timeout_);
}
} else {
ASSERT(bothSidesHalfClosed() || delayed_close_state_ == DelayedCloseState::CloseAfterFlush);
closeConnectionImmediately();
}
} else {
ASSERT(result.action_ == PostIoAction::KeepOpen);
if (delayed_close_timer_ != nullptr) {
ASSERT(!delayed_close_timer_ || delayed_close_timer_->enabled());
if (delayed_close_timer_ != nullptr && result.bytes_processed_ > 0) {
delayed_close_timer_->enableTimer(delayed_close_timeout_);
}
if (result.bytes_processed_ > 0) {
Expand Down
107 changes: 104 additions & 3 deletions test/common/network/connection_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1455,6 +1455,11 @@ TEST_P(ConnectionImplTest, DelayedCloseTimerResetWithPendingWriteBufferFlushes)
std::make_unique<ConnectionSocketImpl>(std::move(io_handle), nullptr, nullptr),
std::move(mocks.transport_socket_), stream_info_, true);

#ifndef NDEBUG
// Ignore timer enabled() calls used to check timer state in ASSERTs.
EXPECT_CALL(*mocks.timer_, enabled()).Times(AnyNumber());
#endif

InSequence s1;
// The actual timeout is insignificant, we just need to enable delayed close processing by
// setting it to > 0.
Expand All @@ -1477,18 +1482,114 @@ TEST_P(ConnectionImplTest, DelayedCloseTimerResetWithPendingWriteBufferFlushes)
// The write ready event cb (ConnectionImpl::onWriteReady()) will reset the timer to its
// original timeout value to avoid triggering while the write buffer is being actively flushed.
EXPECT_CALL(*transport_socket, doWrite(BufferStringEqual("data"), _))
.WillOnce(Invoke([&](Buffer::Instance&, bool) -> IoResult {
.WillOnce(Invoke([&](Buffer::Instance& buffer, bool) -> IoResult {
// Partial flush.
return IoResult{PostIoAction::KeepOpen, 1, false};
uint64_t bytes_drained = 1;
buffer.drain(bytes_drained);
return IoResult{PostIoAction::KeepOpen, bytes_drained, false};
}));
EXPECT_CALL(*mocks.timer_, enableTimer(timeout, _)).Times(1);
(*mocks.file_ready_cb_)(Event::FileReadyType::Write);

EXPECT_CALL(*transport_socket, doWrite(BufferStringEqual("ata"), _))
.WillOnce(Invoke([&](Buffer::Instance& buffer, bool) -> IoResult {
// Flush the entire buffer.
uint64_t bytes_drained = buffer.length();
buffer.drain(buffer.length());
return IoResult{PostIoAction::KeepOpen, bytes_drained, false};
}));
EXPECT_CALL(*mocks.timer_, enableTimer(timeout, _)).Times(1);
(*mocks.file_ready_cb_)(Event::FileReadyType::Write);

// Force the delayed close timeout to trigger so the connection is cleaned up.
mocks.timer_->invokeCallback();
}

// Test that the delayed close timer is not reset by spurious fd Write events that either consume 0
// bytes from the output buffer or are delivered after close(FlushWriteAndDelay).
TEST_P(ConnectionImplTest, IgnoreSpuriousFdWriteEventsDuringFlushWriteAndDelay) {
ConnectionMocks mocks = createConnectionMocks();
MockTransportSocket* transport_socket = mocks.transport_socket_.get();
IoHandlePtr io_handle = std::make_unique<IoSocketHandleImpl>(0);
auto server_connection = std::make_unique<Network::ConnectionImpl>(
*mocks.dispatcher_,
std::make_unique<ConnectionSocketImpl>(std::move(io_handle), nullptr, nullptr),
std::move(mocks.transport_socket_), stream_info_, true);

#ifndef NDEBUG
// Ignore timer enabled() calls used to check timer state in ASSERTs.
EXPECT_CALL(*mocks.timer_, enabled()).Times(AnyNumber());
#endif

InSequence s1;
// The actual timeout is insignificant, we just need to enable delayed close processing by
// setting it to > 0.
auto timeout = std::chrono::milliseconds(100);
server_connection->setDelayedCloseTimeout(timeout);

EXPECT_CALL(*mocks.file_event_, activate(Event::FileReadyType::Write))
.WillOnce(Invoke(*mocks.file_ready_cb_));
EXPECT_CALL(*transport_socket, doWrite(BufferStringEqual("data"), _))
.WillOnce(Invoke([&](Buffer::Instance&, bool) -> IoResult {
// Do not drain the buffer and return 0 bytes processed to simulate backpressure.
return IoResult{PostIoAction::KeepOpen, 0, false};
}));
Buffer::OwnedImpl data("data");
server_connection->write(data, false);

EXPECT_CALL(*mocks.timer_, enableTimer(timeout, _)).Times(1);
server_connection->close(ConnectionCloseType::FlushWriteAndDelay);

// The write ready event cb (ConnectionImpl::onWriteReady()) will reset the timer to its
// original timeout value to avoid triggering while the write buffer is being actively flushed.
EXPECT_CALL(*transport_socket, doWrite(BufferStringEqual("data"), _))
.WillOnce(Invoke([&](Buffer::Instance& buffer, bool) -> IoResult {
// Partial flush.
uint64_t bytes_drained = 1;
buffer.drain(bytes_drained);
return IoResult{PostIoAction::KeepOpen, bytes_drained, false};
}));
EXPECT_CALL(*mocks.timer_, enableTimer(timeout, _)).Times(1);
(*mocks.file_ready_cb_)(Event::FileReadyType::Write);

// Handle a write event and drain 0 bytes from the buffer. Verify that the timer is not reset.
EXPECT_CALL(*transport_socket, doWrite(BufferStringEqual("ata"), _))
.WillOnce(Invoke([&](Buffer::Instance&, bool) -> IoResult {
// Don't consume any bytes.
return IoResult{PostIoAction::KeepOpen, 0, false};
}));
EXPECT_CALL(*mocks.timer_, enableTimer(timeout, _)).Times(0);
(*mocks.file_ready_cb_)(Event::FileReadyType::Write);

// Handle a write event and drain the remainder of the buffer. Verify that the timer is reset.
EXPECT_CALL(*transport_socket, doWrite(BufferStringEqual("ata"), _))
.WillOnce(Invoke([&](Buffer::Instance& buffer, bool) -> IoResult {
// Flush the entire buffer.
ASSERT(buffer.length() > 0);
uint64_t bytes_drained = buffer.length();
buffer.drain(buffer.length());
return IoResult{PostIoAction::KeepOpen, buffer.length(), false};
EXPECT_EQ(server_connection->state(), Connection::State::Closing);
return IoResult{PostIoAction::KeepOpen, bytes_drained, false};
}));
EXPECT_CALL(*mocks.timer_, enableTimer(timeout, _)).Times(1);
(*mocks.file_ready_cb_)(Event::FileReadyType::Write);

// Handle a write event after entering the half-closed state. Verify that the timer is not reset
// because write consumed 0 bytes from the empty buffer.
EXPECT_CALL(*transport_socket, doWrite(BufferStringEqual(""), _))
.WillOnce(Invoke([&](Buffer::Instance&, bool) -> IoResult {
EXPECT_EQ(server_connection->state(), Connection::State::Closing);
return IoResult{PostIoAction::KeepOpen, 0, false};
}));
EXPECT_CALL(*mocks.timer_, enableTimer(timeout, _)).Times(0);
(*mocks.file_ready_cb_)(Event::FileReadyType::Write);

// Handle a write event that somehow drains bytes from an empty output buffer. Since
// some bytes were consumed, the timer is reset.
EXPECT_CALL(*transport_socket, doWrite(BufferStringEqual(""), _))
.WillOnce(Invoke([&](Buffer::Instance&, bool) -> IoResult {
EXPECT_EQ(server_connection->state(), Connection::State::Closing);
return IoResult{PostIoAction::KeepOpen, 1, false};
}));
EXPECT_CALL(*mocks.timer_, enableTimer(timeout, _)).Times(1);
(*mocks.file_ready_cb_)(Event::FileReadyType::Write);
Expand Down

0 comments on commit ced9d24

Please sign in to comment.