Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Listener: respect the connection balancer of the redirected listener #15842

Merged
merged 23 commits into from
Apr 29, 2021
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions source/server/active_tcp_listener.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ ActiveTcpListener::~ActiveTcpListener() {
// for now. If it becomes a problem (developers hitting this assert when using debug builds) we
// can revisit. This case, if it happens, should be benign on production builds. This case is
// covered in ConnectionHandlerTest::RemoveListenerDuringRebalance.
ASSERT(num_listener_connections_ == 0);
ASSERT(num_listener_connections_ == 0, fmt::format("destroyed listener {} has {} connections",
config_->name(), numConnections()));
}

void ActiveTcpListener::removeConnection(ActiveTcpConnection& connection) {
Expand Down Expand Up @@ -188,14 +189,12 @@ void ActiveTcpSocket::newConnection() {
if (new_listener.has_value()) {
// Hands off connections redirected by iptables to the listener associated with the
// original destination address. Pass 'hand_off_restored_destination_connections' as false to
// prevent further redirection as well as 'rebalanced' as true since the connection has
// already been balanced if applicable inside onAcceptWorker() when the connection was
// initially accepted. Note also that we must account for the number of connections properly
// across both listeners.
// prevent further redirection.
// Leave the new listener to decide whether to execute re-balance.
// Note also that we must account for the number of connections properly across both listeners.
// TODO(mattklein123): See note in ~ActiveTcpSocket() related to making this accounting better.
listener_.decNumConnections();
new_listener.value().get().incNumConnections();
new_listener.value().get().onAcceptWorker(std::move(socket_), false, true);
new_listener.value().get().onAcceptWorker(std::move(socket_), false, false);
} else {
// Set default transport protocol if none of the listener filters did it.
if (socket_->detectedTransportProtocol().empty()) {
Expand Down
9 changes: 5 additions & 4 deletions source/server/active_tcp_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ using RebalancedSocketSharedPtr = std::shared_ptr<RebalancedSocket>;
/**
* Wrapper for an active tcp listener owned by this handler.
*/
class ActiveTcpListener : public Network::TcpListenerCallbacks,
public ActiveListenerImplBase,
public Network::BalancedConnectionHandler,
Logger::Loggable<Logger::Id::conn_handler> {
class ActiveTcpListener final : public Network::TcpListenerCallbacks,
public ActiveListenerImplBase,
public Network::BalancedConnectionHandler,
Logger::Loggable<Logger::Id::conn_handler> {
public:
ActiveTcpListener(Network::TcpConnectionHandler& parent, Network::ListenerConfig& config);
ActiveTcpListener(Network::TcpConnectionHandler& parent, Network::ListenerPtr&& listener,
Expand Down Expand Up @@ -249,6 +249,7 @@ struct ActiveTcpSocket : public Network::ListenerFilterManager,
ActiveTcpListener& listener_;
Network::ConnectionSocketPtr socket_;
const bool hand_off_restored_destination_connections_;

std::list<ListenerFilterWrapperPtr> accept_filters_;
std::list<ListenerFilterWrapperPtr>::iterator iter_;
Event::TimerPtr timer_;
Expand Down
1 change: 1 addition & 0 deletions test/common/quic/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ envoy_cc_test(
"//source/common/quic:envoy_quic_connection_helper_lib",
"//source/common/quic:envoy_quic_server_connection_lib",
"//source/common/quic:envoy_quic_server_session_lib",
"//source/server:active_listener_base",
"//test/mocks/http:http_mocks",
"//test/mocks/http:stream_decoder_mock",
"//test/mocks/network:network_mocks",
Expand Down
12 changes: 11 additions & 1 deletion test/mocks/network/mocks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,17 @@ MockListener::MockListener() = default;

MockListener::~MockListener() { onDestroy(); }

MockConnectionHandler::MockConnectionHandler() = default;
MockConnectionHandler::MockConnectionHandler() {
ON_CALL(*this, incNumConnections()).WillByDefault(Invoke([this]() {
++num_handler_connections_;
}));
ON_CALL(*this, decNumConnections()).WillByDefault(Invoke([this]() {
--num_handler_connections_;
}));
ON_CALL(*this, numConnections()).WillByDefault(Invoke([this]() {
return num_handler_connections_.load();
}));
}
MockConnectionHandler::~MockConnectionHandler() = default;

MockIp::MockIp() = default;
Expand Down
4 changes: 3 additions & 1 deletion test/mocks/network/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ class MockListener : public Listener {
MOCK_METHOD(void, setRejectFraction, (UnitFloat));
};

class MockConnectionHandler : public ConnectionHandler {
class MockConnectionHandler : public virtual ConnectionHandler {
lambdai marked this conversation as resolved.
Show resolved Hide resolved
public:
MockConnectionHandler();
~MockConnectionHandler() override;
Expand All @@ -441,6 +441,8 @@ class MockConnectionHandler : public ConnectionHandler {
MOCK_METHOD(void, enableListeners, ());
MOCK_METHOD(void, setListenerRejectFraction, (UnitFloat), (override));
MOCK_METHOD(const std::string&, statPrefix, (), (const));

std::atomic<uint64_t> num_handler_connections_{};
lambdai marked this conversation as resolved.
Show resolved Hide resolved
};

class MockIp : public Address::Ip {
Expand Down
19 changes: 19 additions & 0 deletions test/server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,25 @@ envoy_cc_test(
],
)

envoy_cc_test(
name = "active_tcp_listener_test",
srcs = ["active_tcp_listener_test.cc"],
deps = [
"//source/common/common:utility_lib",
"//source/common/config:utility_lib",
"//source/common/network:address_lib",
"//source/common/network:connection_balancer_lib",
"//source/common/stats:stats_lib",
"//source/server:active_raw_udp_listener_config",
"//source/server:connection_handler_lib",
"//test/mocks/access_log:access_log_mocks",
"//test/mocks/api:api_mocks",
"//test/mocks/network:network_mocks",
"//test/test_common:network_utility_lib",
"//test/test_common:threadsafe_singleton_injector_lib",
],
)

envoy_cc_test(
name = "drain_manager_impl_test",
srcs = ["drain_manager_impl_test.cc"],
Expand Down
181 changes: 181 additions & 0 deletions test/server/active_tcp_listener_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
#include <memory>

#include "envoy/network/filter.h"
#include "envoy/network/listener.h"
#include "envoy/stats/scope.h"

#include "common/network/address_impl.h"
#include "common/network/connection_balancer_impl.h"
#include "common/network/raw_buffer_socket.h"
#include "common/network/utility.h"

#include "server/active_tcp_listener.h"

#include "test/mocks/api/mocks.h"
#include "test/mocks/common.h"
#include "test/mocks/network/mocks.h"
#include "test/test_common/network_utility.h"

#include "gmock/gmock.h"
#include "gtest/gtest.h"

using testing::_;
using testing::Invoke;
using testing::NiceMock;
using testing::Return;
using testing::ReturnRef;

namespace Envoy {
namespace Server {
namespace {

class MockTcpConnectionHandler : public Network::TcpConnectionHandler,
lambdai marked this conversation as resolved.
Show resolved Hide resolved
public Network::MockConnectionHandler {
public:
MOCK_METHOD(Event::Dispatcher&, dispatcher, ());
MOCK_METHOD(Network::BalancedConnectionHandlerOptRef, getBalancedHandlerByTag,
(uint64_t listener_tag));
MOCK_METHOD(Network::BalancedConnectionHandlerOptRef, getBalancedHandlerByAddress,
(const Network::Address::Instance& address));
};
class ActiveTcpListenerTest : public testing::Test, protected Logger::Loggable<Logger::Id::main> {
public:
ActiveTcpListenerTest() {
EXPECT_CALL(conn_handler_, dispatcher()).WillRepeatedly(ReturnRef(dispatcher_));
EXPECT_CALL(conn_handler_, numConnections()).Times(testing::AnyNumber());
lambdai marked this conversation as resolved.
Show resolved Hide resolved
EXPECT_CALL(conn_handler_, statPrefix()).WillRepeatedly(ReturnRef(listener_stat_prefix_));
listener_filter_matcher_ = std::make_shared<NiceMock<Network::MockListenerFilterMatcher>>();
}

std::string listener_stat_prefix_{"listener_stat_prefix"};
std::shared_ptr<Network::MockListenSocketFactory> socket_factory_{
std::make_shared<Network::MockListenSocketFactory>()};
NiceMock<Event::MockDispatcher> dispatcher_{"test"};
BasicResourceLimitImpl resource_limit_;
NiceMock<MockTcpConnectionHandler> conn_handler_;
Network::MockListener* generic_listener_;
Network::MockListenerConfig listener_config_;
NiceMock<Network::MockFilterChainManager> manager_;
NiceMock<Network::MockFilterChainFactory> filter_chain_factory_;
std::shared_ptr<Network::MockFilterChain> filter_chain_;
std::shared_ptr<NiceMock<Network::MockListenerFilterMatcher>> listener_filter_matcher_;
};

// Verify that the server connection with recovered address is rebalanced at redirected listener.
TEST_F(ActiveTcpListenerTest, RedirectedRebalancer) {
NiceMock<Network::MockListenerConfig> listener_config1;
NiceMock<Network::MockConnectionBalancer> balancer1;
EXPECT_CALL(balancer1, registerHandler(_));
EXPECT_CALL(balancer1, unregisterHandler(_));

Network::Address::InstanceConstSharedPtr normal_address(
new Network::Address::Ipv4Instance("127.0.0.1", 10001));
EXPECT_CALL(*socket_factory_, localAddress()).WillRepeatedly(ReturnRef(normal_address));
EXPECT_CALL(listener_config1, connectionBalancer()).WillRepeatedly(ReturnRef(balancer1));
EXPECT_CALL(listener_config1, listenerScope).Times(testing::AnyNumber());
EXPECT_CALL(listener_config1, listenerFiltersTimeout());
EXPECT_CALL(listener_config1, continueOnListenerFiltersTimeout());
EXPECT_CALL(listener_config1, filterChainManager()).WillRepeatedly(ReturnRef(manager_));
EXPECT_CALL(listener_config1, openConnections()).WillRepeatedly(ReturnRef(resource_limit_));
EXPECT_CALL(listener_config1, handOffRestoredDestinationConnections())
.WillRepeatedly(Return(true));

auto mock_listener_will_be_moved1 = std::make_unique<Network::MockListener>();
auto& listener1 = *mock_listener_will_be_moved1;
auto active_listener1 = std::make_unique<ActiveTcpListener>(
conn_handler_, std::move(mock_listener_will_be_moved1), listener_config1);

NiceMock<Network::MockListenerConfig> listener_config2;
Network::MockConnectionBalancer balancer2;
EXPECT_CALL(balancer2, registerHandler(_));
EXPECT_CALL(balancer2, unregisterHandler(_));

Network::Address::InstanceConstSharedPtr alt_address(
new Network::Address::Ipv4Instance("127.0.0.2", 20002));
EXPECT_CALL(*socket_factory_, localAddress()).WillRepeatedly(ReturnRef(alt_address));
EXPECT_CALL(listener_config2, listenerFiltersTimeout());
EXPECT_CALL(listener_config2, connectionBalancer()).WillRepeatedly(ReturnRef(balancer2));
EXPECT_CALL(listener_config2, listenerScope).Times(testing::AnyNumber());
EXPECT_CALL(listener_config2, handOffRestoredDestinationConnections())
.WillRepeatedly(Return(false));
EXPECT_CALL(listener_config2, continueOnListenerFiltersTimeout());
EXPECT_CALL(listener_config2, filterChainManager()).WillRepeatedly(ReturnRef(manager_));
EXPECT_CALL(listener_config2, openConnections()).WillRepeatedly(ReturnRef(resource_limit_));
auto mock_listener_will_be_moved2 = std::make_unique<Network::MockListener>();
auto& listener2 = *mock_listener_will_be_moved2;
auto active_listener2 = std::make_shared<ActiveTcpListener>(
conn_handler_, std::move(mock_listener_will_be_moved2), listener_config2);

auto* test_filter = new NiceMock<Network::MockListenerFilter>();
EXPECT_CALL(*test_filter, destroy_());
Network::MockConnectionSocket* accepted_socket = new NiceMock<Network::MockConnectionSocket>();
bool redirected = false;

// 1. Listener1 re-balance. Set the balance target to the the active listener itself.
EXPECT_CALL(balancer1, pickTargetHandler(_))
.WillOnce(testing::DoAll(
testing::WithArg<0>(Invoke([](auto& target) { target.incNumConnections(); })),
ReturnRef(*active_listener1)));

EXPECT_CALL(listener_config1, filterChainFactory())
.WillRepeatedly(ReturnRef(filter_chain_factory_));

// Listener1 has a listener filter in the listener filter chain.
EXPECT_CALL(filter_chain_factory_, createListenerFilterChain(_))
.WillRepeatedly(Invoke([&](Network::ListenerFilterManager& manager) -> bool {
// Insert the Mock filter.
if (!redirected) {
manager.addAcceptFilter(nullptr, Network::ListenerFilterPtr{test_filter});
redirected = true;
}
return true;
}));
EXPECT_CALL(*test_filter, onAccept(_))
.WillOnce(Invoke([&](Network::ListenerFilterCallbacks& cb) -> Network::FilterStatus {
cb.socket().addressProvider().restoreLocalAddress(alt_address);
return Network::FilterStatus::Continue;
}));
// Verify that listener1 hands off the connection by not creating network filter chain.
EXPECT_CALL(manager_, findFilterChain(_)).Times(0);

// 2. Redirect to Listener2.
EXPECT_CALL(conn_handler_, getBalancedHandlerByAddress(_))
.WillOnce(Return(Network::BalancedConnectionHandlerOptRef(*active_listener2)));

// 3. Listener2 re-balance. Set the balance target to the the active listener itself.
EXPECT_CALL(balancer2, pickTargetHandler(_))
.WillOnce(testing::DoAll(
testing::WithArg<0>(Invoke([](auto& target) { target.incNumConnections(); })),
ReturnRef(*active_listener2)));

auto filter_factory_callback = std::make_shared<std::vector<Network::FilterFactoryCb>>();
auto transport_socket_factory = Network::Test::createRawBufferSocketFactory();
filter_chain_ = std::make_shared<NiceMock<Network::MockFilterChain>>();

EXPECT_CALL(conn_handler_, incNumConnections());
EXPECT_CALL(manager_, findFilterChain(_)).WillOnce(Return(filter_chain_.get()));
EXPECT_CALL(*filter_chain_, transportSocketFactory)
.WillOnce(testing::ReturnRef(*transport_socket_factory));
EXPECT_CALL(*filter_chain_, networkFilterFactories).WillOnce(ReturnRef(*filter_factory_callback));
EXPECT_CALL(listener_config2, filterChainFactory())
.WillRepeatedly(ReturnRef(filter_chain_factory_));

auto* connection = new NiceMock<Network::MockServerConnection>();
EXPECT_CALL(dispatcher_, createServerConnection_()).WillOnce(Return(connection));
EXPECT_CALL(filter_chain_factory_, createNetworkFilterChain(_, _)).WillOnce(Return(true));
active_listener1->onAccept(Network::ConnectionSocketPtr{accepted_socket});

// Verify per-listener connection stats.
EXPECT_EQ(1UL, conn_handler_.numConnections());

EXPECT_CALL(conn_handler_, decNumConnections());
connection->close(Network::ConnectionCloseType::NoFlush);

EXPECT_CALL(listener1, onDestroy());
active_listener1.reset();
EXPECT_CALL(listener2, onDestroy());
active_listener2.reset();
}
} // namespace
} // namespace Server
} // namespace Envoy