diff --git a/source/common/network/connection_impl.cc b/source/common/network/connection_impl.cc index 649e8057d6d2..2abbea352b76 100644 --- a/source/common/network/connection_impl.cc +++ b/source/common/network/connection_impl.cc @@ -645,15 +645,18 @@ void ConnectionImpl::onWriteReady() { } else if ((inDelayedClose() && new_buffer_size == 0) || bothSidesHalfClosed()) { ENVOY_CONN_LOG(debug, "write flush complete", *this); if (delayed_close_state_ == DelayedCloseState::CloseAfterFlushAndWait) { - ASSERT(delayed_close_timer_ != nullptr); - delayed_close_timer_->enableTimer(delayed_close_timeout_); + ASSERT(delayed_close_timer_ != nullptr && delayed_close_timer_->enabled()); + if (result.bytes_processed_ > 0) { + delayed_close_timer_->enableTimer(delayed_close_timeout_); + } } else { ASSERT(bothSidesHalfClosed() || delayed_close_state_ == DelayedCloseState::CloseAfterFlush); closeConnectionImmediately(); } } else { ASSERT(result.action_ == PostIoAction::KeepOpen); - if (delayed_close_timer_ != nullptr) { + ASSERT(!delayed_close_timer_ || delayed_close_timer_->enabled()); + if (delayed_close_timer_ != nullptr && result.bytes_processed_ > 0) { delayed_close_timer_->enableTimer(delayed_close_timeout_); } if (result.bytes_processed_ > 0) { diff --git a/test/common/network/connection_impl_test.cc b/test/common/network/connection_impl_test.cc index 72168e2a666d..61c4bb2e6348 100644 --- a/test/common/network/connection_impl_test.cc +++ b/test/common/network/connection_impl_test.cc @@ -1455,6 +1455,11 @@ TEST_P(ConnectionImplTest, DelayedCloseTimerResetWithPendingWriteBufferFlushes) std::make_unique(std::move(io_handle), nullptr, nullptr), std::move(mocks.transport_socket_), stream_info_, true); +#ifndef NDEBUG + // Ignore timer enabled() calls used to check timer state in ASSERTs. + EXPECT_CALL(*mocks.timer_, enabled()).Times(AnyNumber()); +#endif + InSequence s1; // The actual timeout is insignificant, we just need to enable delayed close processing by // setting it to > 0. @@ -1477,18 +1482,114 @@ TEST_P(ConnectionImplTest, DelayedCloseTimerResetWithPendingWriteBufferFlushes) // The write ready event cb (ConnectionImpl::onWriteReady()) will reset the timer to its // original timeout value to avoid triggering while the write buffer is being actively flushed. EXPECT_CALL(*transport_socket, doWrite(BufferStringEqual("data"), _)) - .WillOnce(Invoke([&](Buffer::Instance&, bool) -> IoResult { + .WillOnce(Invoke([&](Buffer::Instance& buffer, bool) -> IoResult { // Partial flush. - return IoResult{PostIoAction::KeepOpen, 1, false}; + uint64_t bytes_drained = 1; + buffer.drain(bytes_drained); + return IoResult{PostIoAction::KeepOpen, bytes_drained, false}; + })); + EXPECT_CALL(*mocks.timer_, enableTimer(timeout, _)).Times(1); + (*mocks.file_ready_cb_)(Event::FileReadyType::Write); + + EXPECT_CALL(*transport_socket, doWrite(BufferStringEqual("ata"), _)) + .WillOnce(Invoke([&](Buffer::Instance& buffer, bool) -> IoResult { + // Flush the entire buffer. + uint64_t bytes_drained = buffer.length(); + buffer.drain(buffer.length()); + return IoResult{PostIoAction::KeepOpen, bytes_drained, false}; })); EXPECT_CALL(*mocks.timer_, enableTimer(timeout, _)).Times(1); (*mocks.file_ready_cb_)(Event::FileReadyType::Write); + // Force the delayed close timeout to trigger so the connection is cleaned up. + mocks.timer_->invokeCallback(); +} + +// Test that the delayed close timer is not reset by spurious fd Write events that either consume 0 +// bytes from the output buffer or are delivered after close(FlushWriteAndDelay). +TEST_P(ConnectionImplTest, IgnoreSpuriousFdWriteEventsDuringFlushWriteAndDelay) { + ConnectionMocks mocks = createConnectionMocks(); + MockTransportSocket* transport_socket = mocks.transport_socket_.get(); + IoHandlePtr io_handle = std::make_unique(0); + auto server_connection = std::make_unique( + *mocks.dispatcher_, + std::make_unique(std::move(io_handle), nullptr, nullptr), + std::move(mocks.transport_socket_), stream_info_, true); + +#ifndef NDEBUG + // Ignore timer enabled() calls used to check timer state in ASSERTs. + EXPECT_CALL(*mocks.timer_, enabled()).Times(AnyNumber()); +#endif + + InSequence s1; + // The actual timeout is insignificant, we just need to enable delayed close processing by + // setting it to > 0. + auto timeout = std::chrono::milliseconds(100); + server_connection->setDelayedCloseTimeout(timeout); + + EXPECT_CALL(*mocks.file_event_, activate(Event::FileReadyType::Write)) + .WillOnce(Invoke(*mocks.file_ready_cb_)); + EXPECT_CALL(*transport_socket, doWrite(BufferStringEqual("data"), _)) + .WillOnce(Invoke([&](Buffer::Instance&, bool) -> IoResult { + // Do not drain the buffer and return 0 bytes processed to simulate backpressure. + return IoResult{PostIoAction::KeepOpen, 0, false}; + })); + Buffer::OwnedImpl data("data"); + server_connection->write(data, false); + + EXPECT_CALL(*mocks.timer_, enableTimer(timeout, _)).Times(1); + server_connection->close(ConnectionCloseType::FlushWriteAndDelay); + + // The write ready event cb (ConnectionImpl::onWriteReady()) will reset the timer to its + // original timeout value to avoid triggering while the write buffer is being actively flushed. EXPECT_CALL(*transport_socket, doWrite(BufferStringEqual("data"), _)) + .WillOnce(Invoke([&](Buffer::Instance& buffer, bool) -> IoResult { + // Partial flush. + uint64_t bytes_drained = 1; + buffer.drain(bytes_drained); + return IoResult{PostIoAction::KeepOpen, bytes_drained, false}; + })); + EXPECT_CALL(*mocks.timer_, enableTimer(timeout, _)).Times(1); + (*mocks.file_ready_cb_)(Event::FileReadyType::Write); + + // Handle a write event and drain 0 bytes from the buffer. Verify that the timer is not reset. + EXPECT_CALL(*transport_socket, doWrite(BufferStringEqual("ata"), _)) + .WillOnce(Invoke([&](Buffer::Instance&, bool) -> IoResult { + // Don't consume any bytes. + return IoResult{PostIoAction::KeepOpen, 0, false}; + })); + EXPECT_CALL(*mocks.timer_, enableTimer(timeout, _)).Times(0); + (*mocks.file_ready_cb_)(Event::FileReadyType::Write); + + // Handle a write event and drain the remainder of the buffer. Verify that the timer is reset. + EXPECT_CALL(*transport_socket, doWrite(BufferStringEqual("ata"), _)) .WillOnce(Invoke([&](Buffer::Instance& buffer, bool) -> IoResult { // Flush the entire buffer. + ASSERT(buffer.length() > 0); + uint64_t bytes_drained = buffer.length(); buffer.drain(buffer.length()); - return IoResult{PostIoAction::KeepOpen, buffer.length(), false}; + EXPECT_EQ(server_connection->state(), Connection::State::Closing); + return IoResult{PostIoAction::KeepOpen, bytes_drained, false}; + })); + EXPECT_CALL(*mocks.timer_, enableTimer(timeout, _)).Times(1); + (*mocks.file_ready_cb_)(Event::FileReadyType::Write); + + // Handle a write event after entering the half-closed state. Verify that the timer is not reset + // because write consumed 0 bytes from the empty buffer. + EXPECT_CALL(*transport_socket, doWrite(BufferStringEqual(""), _)) + .WillOnce(Invoke([&](Buffer::Instance&, bool) -> IoResult { + EXPECT_EQ(server_connection->state(), Connection::State::Closing); + return IoResult{PostIoAction::KeepOpen, 0, false}; + })); + EXPECT_CALL(*mocks.timer_, enableTimer(timeout, _)).Times(0); + (*mocks.file_ready_cb_)(Event::FileReadyType::Write); + + // Handle a write event that somehow drains bytes from an empty output buffer. Since + // some bytes were consumed, the timer is reset. + EXPECT_CALL(*transport_socket, doWrite(BufferStringEqual(""), _)) + .WillOnce(Invoke([&](Buffer::Instance&, bool) -> IoResult { + EXPECT_EQ(server_connection->state(), Connection::State::Closing); + return IoResult{PostIoAction::KeepOpen, 1, false}; })); EXPECT_CALL(*mocks.timer_, enableTimer(timeout, _)).Times(1); (*mocks.file_ready_cb_)(Event::FileReadyType::Write);