From 0e4b7165f1b61364acff5f5212ce915408f765d0 Mon Sep 17 00:00:00 2001 From: Yuchen Dai Date: Wed, 28 Apr 2021 17:55:04 -0700 Subject: [PATCH] Listener: respect the connection balancer of the redirected listener (#15842) If listener1 redirects the connection to listener2, the balancer field in listener2 decides whether to rebalance. Previously we rely on the rebalancing at listener1, however, the rebalance is weak because listener1 is likely to not own any connection and the rebalance is no-op. Risk Level: MID. Rebalance may introduce latency. User needs to clear rebalancer field of listener2 to recover the original behavior. Fix #15146 #16113 Signed-off-by: Yuchen Dai Signed-off-by: Gokul Nair --- api/envoy/config/listener/v3/listener.proto | 6 + .../config/listener/v4alpha/listener.proto | 6 + docs/root/version_history/current.rst | 3 + .../envoy/config/listener/v3/listener.proto | 6 + .../config/listener/v4alpha/listener.proto | 6 + source/server/active_tcp_listener.cc | 15 +- source/server/active_tcp_listener.h | 8 +- test/common/quic/BUILD | 1 + test/integration/BUILD | 3 + test/integration/filters/BUILD | 19 ++ .../address_restore_listener_filter.cc | 55 ++++++ .../listener_lds_integration_test.cc | 101 ++++++++++ test/mocks/network/mocks.cc | 12 +- test/mocks/network/mocks.h | 4 +- test/server/BUILD | 19 ++ test/server/active_tcp_listener_test.cc | 181 ++++++++++++++++++ 16 files changed, 431 insertions(+), 14 deletions(-) create mode 100644 test/integration/filters/address_restore_listener_filter.cc create mode 100644 test/server/active_tcp_listener_test.cc diff --git a/api/envoy/config/listener/v3/listener.proto b/api/envoy/config/listener/v3/listener.proto index 4e0a857ce256..5461318ada01 100644 --- a/api/envoy/config/listener/v3/listener.proto +++ b/api/envoy/config/listener/v3/listener.proto @@ -247,6 +247,12 @@ message Listener { // The listener's connection balancer configuration, currently only applicable to TCP listeners. // If no configuration is specified, Envoy will not attempt to balance active connections between // worker threads. + // + // In the scenario that the listener X redirects all the connections to the listeners Y1 and Y2 + // by setting :ref:`use_original_dst ` in X + // and :ref:`bind_to_port ` to false in Y1 and Y2, + // it is recommended to disable the balance config in listener X to avoid the cost of balancing, and + // enable the balance config in Y1 and Y2 to balance the connections among the workers. ConnectionBalanceConfig connection_balance_config = 20; // When this flag is set to true, listeners set the *SO_REUSEPORT* socket option and diff --git a/api/envoy/config/listener/v4alpha/listener.proto b/api/envoy/config/listener/v4alpha/listener.proto index e2eb3a1e6065..e40dbf9058af 100644 --- a/api/envoy/config/listener/v4alpha/listener.proto +++ b/api/envoy/config/listener/v4alpha/listener.proto @@ -249,6 +249,12 @@ message Listener { // The listener's connection balancer configuration, currently only applicable to TCP listeners. // If no configuration is specified, Envoy will not attempt to balance active connections between // worker threads. + // + // In the scenario that the listener X redirects all the connections to the listeners Y1 and Y2 + // by setting :ref:`use_original_dst ` in X + // and :ref:`bind_to_port ` to false in Y1 and Y2, + // it is recommended to disable the balance config in listener X to avoid the cost of balancing, and + // enable the balance config in Y1 and Y2 to balance the connections among the workers. ConnectionBalanceConfig connection_balance_config = 20; // When this flag is set to true, listeners set the *SO_REUSEPORT* socket option and diff --git a/docs/root/version_history/current.rst b/docs/root/version_history/current.rst index 2a465da67027..19db50652b02 100644 --- a/docs/root/version_history/current.rst +++ b/docs/root/version_history/current.rst @@ -16,6 +16,9 @@ Minor Behavior Changes (require upstream 1xx or 204 responses to not have Transfer-Encoding or non-zero Content-Length headers) and ``envoy.reloadable_features.send_strict_1xx_and_204_response_headers`` (do not send 1xx or 204 responses with these headers). Both are true by default. +* listener: respect the :ref:`connection balance config ` + defined within the listener where the sockets are redirected to. Clear that field to restore the previous behavior. + Bug Fixes --------- diff --git a/generated_api_shadow/envoy/config/listener/v3/listener.proto b/generated_api_shadow/envoy/config/listener/v3/listener.proto index 4e0a857ce256..5461318ada01 100644 --- a/generated_api_shadow/envoy/config/listener/v3/listener.proto +++ b/generated_api_shadow/envoy/config/listener/v3/listener.proto @@ -247,6 +247,12 @@ message Listener { // The listener's connection balancer configuration, currently only applicable to TCP listeners. // If no configuration is specified, Envoy will not attempt to balance active connections between // worker threads. + // + // In the scenario that the listener X redirects all the connections to the listeners Y1 and Y2 + // by setting :ref:`use_original_dst ` in X + // and :ref:`bind_to_port ` to false in Y1 and Y2, + // it is recommended to disable the balance config in listener X to avoid the cost of balancing, and + // enable the balance config in Y1 and Y2 to balance the connections among the workers. ConnectionBalanceConfig connection_balance_config = 20; // When this flag is set to true, listeners set the *SO_REUSEPORT* socket option and diff --git a/generated_api_shadow/envoy/config/listener/v4alpha/listener.proto b/generated_api_shadow/envoy/config/listener/v4alpha/listener.proto index cad42c77a1ff..47611a615efb 100644 --- a/generated_api_shadow/envoy/config/listener/v4alpha/listener.proto +++ b/generated_api_shadow/envoy/config/listener/v4alpha/listener.proto @@ -252,6 +252,12 @@ message Listener { // The listener's connection balancer configuration, currently only applicable to TCP listeners. // If no configuration is specified, Envoy will not attempt to balance active connections between // worker threads. + // + // In the scenario that the listener X redirects all the connections to the listeners Y1 and Y2 + // by setting :ref:`use_original_dst ` in X + // and :ref:`bind_to_port ` to false in Y1 and Y2, + // it is recommended to disable the balance config in listener X to avoid the cost of balancing, and + // enable the balance config in Y1 and Y2 to balance the connections among the workers. ConnectionBalanceConfig connection_balance_config = 20; // When this flag is set to true, listeners set the *SO_REUSEPORT* socket option and diff --git a/source/server/active_tcp_listener.cc b/source/server/active_tcp_listener.cc index 7a5f0a64edcc..4146ed86b39a 100644 --- a/source/server/active_tcp_listener.cc +++ b/source/server/active_tcp_listener.cc @@ -69,7 +69,8 @@ ActiveTcpListener::~ActiveTcpListener() { // for now. If it becomes a problem (developers hitting this assert when using debug builds) we // can revisit. This case, if it happens, should be benign on production builds. This case is // covered in ConnectionHandlerTest::RemoveListenerDuringRebalance. - ASSERT(num_listener_connections_ == 0); + ASSERT(num_listener_connections_ == 0, fmt::format("destroyed listener {} has {} connections", + config_->name(), numConnections())); } void ActiveTcpListener::removeConnection(ActiveTcpConnection& connection) { @@ -188,14 +189,12 @@ void ActiveTcpSocket::newConnection() { if (new_listener.has_value()) { // Hands off connections redirected by iptables to the listener associated with the // original destination address. Pass 'hand_off_restored_destination_connections' as false to - // prevent further redirection as well as 'rebalanced' as true since the connection has - // already been balanced if applicable inside onAcceptWorker() when the connection was - // initially accepted. Note also that we must account for the number of connections properly - // across both listeners. + // prevent further redirection. + // Leave the new listener to decide whether to execute re-balance. + // Note also that we must account for the number of connections properly across both listeners. // TODO(mattklein123): See note in ~ActiveTcpSocket() related to making this accounting better. listener_.decNumConnections(); - new_listener.value().get().incNumConnections(); - new_listener.value().get().onAcceptWorker(std::move(socket_), false, true); + new_listener.value().get().onAcceptWorker(std::move(socket_), false, false); } else { // Set default transport protocol if none of the listener filters did it. if (socket_->detectedTransportProtocol().empty()) { @@ -250,7 +249,7 @@ void ActiveTcpListener::onAcceptWorker(Network::ConnectionSocketPtr&& socket, auto active_socket = std::make_unique(*this, std::move(socket), hand_off_restored_destination_connections); - // Create and run the filters + // Create and run the filters. config_->filterChainFactory().createListenerFilterChain(*active_socket); active_socket->continueFilterChain(true); diff --git a/source/server/active_tcp_listener.h b/source/server/active_tcp_listener.h index 01cf11b72408..c698faaa0605 100644 --- a/source/server/active_tcp_listener.h +++ b/source/server/active_tcp_listener.h @@ -30,10 +30,10 @@ using RebalancedSocketSharedPtr = std::shared_ptr; /** * Wrapper for an active tcp listener owned by this handler. */ -class ActiveTcpListener : public Network::TcpListenerCallbacks, - public ActiveListenerImplBase, - public Network::BalancedConnectionHandler, - Logger::Loggable { +class ActiveTcpListener final : public Network::TcpListenerCallbacks, + public ActiveListenerImplBase, + public Network::BalancedConnectionHandler, + Logger::Loggable { public: ActiveTcpListener(Network::TcpConnectionHandler& parent, Network::ListenerConfig& config); ActiveTcpListener(Network::TcpConnectionHandler& parent, Network::ListenerPtr&& listener, diff --git a/test/common/quic/BUILD b/test/common/quic/BUILD index 38d7fa11a252..b5ed243f8bbb 100644 --- a/test/common/quic/BUILD +++ b/test/common/quic/BUILD @@ -79,6 +79,7 @@ envoy_cc_test( "//source/common/quic:envoy_quic_connection_helper_lib", "//source/common/quic:envoy_quic_server_connection_lib", "//source/common/quic:envoy_quic_server_session_lib", + "//source/server:active_listener_base", "//test/mocks/http:http_mocks", "//test/mocks/http:stream_decoder_mock", "//test/mocks/network:network_mocks", diff --git a/test/integration/BUILD b/test/integration/BUILD index 8ab9096c0c19..7d0ed082a021 100644 --- a/test/integration/BUILD +++ b/test/integration/BUILD @@ -1547,13 +1547,16 @@ envoy_cc_test( "//source/common/network:connection_lib", "//source/common/network:utility_lib", "//source/extensions/filters/http/health_check:config", + "//source/extensions/filters/network/tcp_proxy:config", "//test/common/grpc:grpc_client_integration_lib", + "//test/integration/filters:address_restore_listener_filter_lib", "//test/test_common:resources_lib", "//test/test_common:utility_lib", "@envoy_api//envoy/config/bootstrap/v3:pkg_cc_proto", "@envoy_api//envoy/config/core/v3:pkg_cc_proto", "@envoy_api//envoy/config/route/v3:pkg_cc_proto", "@envoy_api//envoy/extensions/filters/network/http_connection_manager/v3:pkg_cc_proto", + "@envoy_api//envoy/extensions/filters/network/tcp_proxy/v3:pkg_cc_proto", "@envoy_api//envoy/service/discovery/v3:pkg_cc_proto", ], ) diff --git a/test/integration/filters/BUILD b/test/integration/filters/BUILD index 5d8c073a5bb7..4722b15a3730 100644 --- a/test/integration/filters/BUILD +++ b/test/integration/filters/BUILD @@ -467,6 +467,25 @@ envoy_cc_test_library( ], ) +envoy_cc_test_library( + name = "address_restore_listener_filter_lib", + srcs = [ + "address_restore_listener_filter.cc", + ], + deps = [ + ":common_lib", + "//include/envoy/network:filter_interface", + "//include/envoy/network:listen_socket_interface", + "//include/envoy/registry", + "//include/envoy/server:filter_config_interface", + "//source/common/common:assert_lib", + "//source/common/common:minimal_logger_lib", + "//source/common/network:address_lib", + "//source/common/network:upstream_socket_options_filter_state_lib", + "//source/common/network:utility_lib", + ], +) + envoy_cc_test_library( name = "set_route_filter_lib", srcs = [ diff --git a/test/integration/filters/address_restore_listener_filter.cc b/test/integration/filters/address_restore_listener_filter.cc new file mode 100644 index 000000000000..c769994b9644 --- /dev/null +++ b/test/integration/filters/address_restore_listener_filter.cc @@ -0,0 +1,55 @@ + + +#include "envoy/network/filter.h" +#include "envoy/network/listen_socket.h" +#include "envoy/server/filter_config.h" + +#include "common/network/address_impl.h" +#include "common/network/utility.h" + +namespace Envoy { + +// The FakeOriginalDstListenerFilter restore desired local address without the dependency of OS. +class FakeOriginalDstListenerFilter : public Network::ListenerFilter { +public: + // Network::ListenerFilter + Network::FilterStatus onAccept(Network::ListenerFilterCallbacks& cb) override { + FANCY_LOG(debug, "in FakeOriginalDstListenerFilter::onAccept"); + Network::ConnectionSocket& socket = cb.socket(); + socket.addressProvider().restoreLocalAddress( + std::make_shared("127.0.0.2", 80)); + FANCY_LOG(debug, "current local socket address is {} restored = {}", + socket.addressProvider().localAddress()->asString(), + socket.addressProvider().localAddressRestored()); + return Network::FilterStatus::Continue; + } +}; + +class FakeOriginalDstListenerFilterConfigFactory + : public Server::Configuration::NamedListenerFilterConfigFactory { +public: + // NamedListenerFilterConfigFactory + Network::ListenerFilterFactoryCb createListenerFilterFactoryFromProto( + const Protobuf::Message&, + const Network::ListenerFilterMatcherSharedPtr& listener_filter_matcher, + Server::Configuration::ListenerFactoryContext&) override { + return [listener_filter_matcher](Network::ListenerFilterManager& filter_manager) -> void { + filter_manager.addAcceptFilter(listener_filter_matcher, + std::make_unique()); + }; + } + + ProtobufTypes::MessagePtr createEmptyConfigProto() override { + return ProtobufTypes::MessagePtr{new Envoy::ProtobufWkt::Struct()}; + } + + std::string name() const override { + // This fake original_dest should be used only in integration test! + return "envoy.filters.listener.original_dst"; + } +}; + +static Registry::RegisterFactory + register_; +} // namespace Envoy diff --git a/test/integration/listener_lds_integration_test.cc b/test/integration/listener_lds_integration_test.cc index e256e8a09275..2179d9b3c3e5 100644 --- a/test/integration/listener_lds_integration_test.cc +++ b/test/integration/listener_lds_integration_test.cc @@ -4,6 +4,8 @@ #include "envoy/config/route/v3/route.pb.h" #include "envoy/config/route/v3/scoped_route.pb.h" #include "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.pb.h" +#include "envoy/extensions/filters/network/tcp_proxy/v3/tcp_proxy.pb.h" +#include "envoy/network/connection.h" #include "envoy/service/discovery/v3/discovery.pb.h" #include "common/config/api_version.h" @@ -409,5 +411,104 @@ TEST_P(ListenerIntegrationTest, MultipleLdsUpdatesSharingListenSocketFactory) { } } +class RebalancerTest : public testing::TestWithParam, + public BaseIntegrationTest { +public: + RebalancerTest() + : BaseIntegrationTest(GetParam(), ConfigHelper::baseConfig() + R"EOF( + filter_chains: + - filters: + - name: envoy.filters.network.tcp_proxy + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy + stat_prefix: tcp_stats + cluster: cluster_0 +)EOF") {} + + void initialize() override { + config_helper_.renameListener("tcp"); + config_helper_.addConfigModifier( + [&](envoy::config::bootstrap::v3::Bootstrap& bootstrap) -> void { + auto& src_listener_config = *bootstrap.mutable_static_resources()->mutable_listeners(0); + src_listener_config.mutable_use_original_dst()->set_value(true); + // Note that the below original_dst is replaced by FakeOriginalDstListenerFilter at the + // link time. + src_listener_config.add_listener_filters()->set_name( + "envoy.filters.listener.original_dst"); + auto& virtual_listener_config = *bootstrap.mutable_static_resources()->add_listeners(); + virtual_listener_config = src_listener_config; + virtual_listener_config.mutable_use_original_dst()->set_value(false); + virtual_listener_config.clear_listener_filters(); + virtual_listener_config.mutable_bind_to_port()->set_value(false); + virtual_listener_config.set_name("balanced_target_listener"); + virtual_listener_config.mutable_connection_balance_config()->mutable_exact_balance(); + + // 127.0.0.2 is defined in FakeOriginalDstListenerFilter. This virtual listener does not + // listen on a passive socket so it's safe to use any ip address. + *virtual_listener_config.mutable_address()->mutable_socket_address()->mutable_address() = + "127.0.0.2"; + virtual_listener_config.mutable_address()->mutable_socket_address()->set_port_value(80); + }); + BaseIntegrationTest::initialize(); + } + + std::unique_ptr createConnectionAndWrite(const std::string& request, + std::string& response) { + Buffer::OwnedImpl buffer(request); + return std::make_unique( + lookupPort("tcp"), buffer, + [&response](Network::ClientConnection&, const Buffer::Instance& data) -> void { + response.append(data.toString()); + }, + version_, *dispatcher_); + } +}; + +struct PerConnection { + std::string response_; + std::unique_ptr client_conn_; + FakeRawConnectionPtr upstream_conn_; +}; + +// Verify the connections are distributed evenly on the 2 worker threads of the redirected +// listener. +TEST_P(RebalancerTest, RedirectConnectionIsBalancedOnDestinationListener) { + concurrency_ = 2; + int repeats = 10; + initialize(); + + // The balancer is balanced as per active connection instead of total connection. + // The below vector maintains all the connections alive. + std::vector connections; + for (uint32_t i = 0; i < repeats * concurrency_; ++i) { + connections.emplace_back(); + connections.back().client_conn_ = + createConnectionAndWrite("dummy", connections.back().response_); + connections.back().client_conn_->waitForConnection(); + ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(connections.back().upstream_conn_)); + } + for (auto& conn : connections) { + conn.client_conn_->close(); + while (!conn.client_conn_->closed()) { + dispatcher_->run(Event::Dispatcher::RunType::NonBlock); + } + } + + ASSERT_EQ(TestUtility::findCounter(test_server_->statStore(), + "listener.127.0.0.2_80.worker_0.downstream_cx_total") + + ->value(), + repeats); + ASSERT_EQ(TestUtility::findCounter(test_server_->statStore(), + "listener.127.0.0.2_80.worker_1.downstream_cx_total") + + ->value(), + repeats); +} + +INSTANTIATE_TEST_SUITE_P(IpVersions, RebalancerTest, + testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), + TestUtility::ipTestParamsToString); + } // namespace } // namespace Envoy diff --git a/test/mocks/network/mocks.cc b/test/mocks/network/mocks.cc index 7a2ff7458a5c..5575eeb722bb 100644 --- a/test/mocks/network/mocks.cc +++ b/test/mocks/network/mocks.cc @@ -171,7 +171,17 @@ MockListener::MockListener() = default; MockListener::~MockListener() { onDestroy(); } -MockConnectionHandler::MockConnectionHandler() = default; +MockConnectionHandler::MockConnectionHandler() { + ON_CALL(*this, incNumConnections()).WillByDefault(Invoke([this]() { + ++num_handler_connections_; + })); + ON_CALL(*this, decNumConnections()).WillByDefault(Invoke([this]() { + --num_handler_connections_; + })); + ON_CALL(*this, numConnections()).WillByDefault(Invoke([this]() { + return num_handler_connections_; + })); +} MockConnectionHandler::~MockConnectionHandler() = default; MockIp::MockIp() = default; diff --git a/test/mocks/network/mocks.h b/test/mocks/network/mocks.h index 63c4c5e7c2ff..c102194d4016 100644 --- a/test/mocks/network/mocks.h +++ b/test/mocks/network/mocks.h @@ -421,7 +421,7 @@ class MockListener : public Listener { MOCK_METHOD(void, setRejectFraction, (UnitFloat)); }; -class MockConnectionHandler : public ConnectionHandler { +class MockConnectionHandler : public virtual ConnectionHandler { public: MockConnectionHandler(); ~MockConnectionHandler() override; @@ -441,6 +441,8 @@ class MockConnectionHandler : public ConnectionHandler { MOCK_METHOD(void, enableListeners, ()); MOCK_METHOD(void, setListenerRejectFraction, (UnitFloat), (override)); MOCK_METHOD(const std::string&, statPrefix, (), (const)); + + uint64_t num_handler_connections_{}; }; class MockIp : public Address::Ip { diff --git a/test/server/BUILD b/test/server/BUILD index 7a591987be6b..498bc7ecef0b 100644 --- a/test/server/BUILD +++ b/test/server/BUILD @@ -92,6 +92,25 @@ envoy_cc_test( ], ) +envoy_cc_test( + name = "active_tcp_listener_test", + srcs = ["active_tcp_listener_test.cc"], + deps = [ + "//source/common/common:utility_lib", + "//source/common/config:utility_lib", + "//source/common/network:address_lib", + "//source/common/network:connection_balancer_lib", + "//source/common/stats:stats_lib", + "//source/server:active_raw_udp_listener_config", + "//source/server:connection_handler_lib", + "//test/mocks/access_log:access_log_mocks", + "//test/mocks/api:api_mocks", + "//test/mocks/network:network_mocks", + "//test/test_common:network_utility_lib", + "//test/test_common:threadsafe_singleton_injector_lib", + ], +) + envoy_cc_test( name = "drain_manager_impl_test", srcs = ["drain_manager_impl_test.cc"], diff --git a/test/server/active_tcp_listener_test.cc b/test/server/active_tcp_listener_test.cc new file mode 100644 index 000000000000..cc1e2710f9f2 --- /dev/null +++ b/test/server/active_tcp_listener_test.cc @@ -0,0 +1,181 @@ +#include + +#include "envoy/network/filter.h" +#include "envoy/network/listener.h" +#include "envoy/stats/scope.h" + +#include "common/network/address_impl.h" +#include "common/network/connection_balancer_impl.h" +#include "common/network/raw_buffer_socket.h" +#include "common/network/utility.h" + +#include "server/active_tcp_listener.h" + +#include "test/mocks/api/mocks.h" +#include "test/mocks/common.h" +#include "test/mocks/network/mocks.h" +#include "test/test_common/network_utility.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +using testing::_; +using testing::Invoke; +using testing::NiceMock; +using testing::Return; +using testing::ReturnRef; + +namespace Envoy { +namespace Server { +namespace { + +class MockTcpConnectionHandler : public Network::TcpConnectionHandler, + public Network::MockConnectionHandler { +public: + MOCK_METHOD(Event::Dispatcher&, dispatcher, ()); + MOCK_METHOD(Network::BalancedConnectionHandlerOptRef, getBalancedHandlerByTag, + (uint64_t listener_tag)); + MOCK_METHOD(Network::BalancedConnectionHandlerOptRef, getBalancedHandlerByAddress, + (const Network::Address::Instance& address)); +}; +class ActiveTcpListenerTest : public testing::Test, protected Logger::Loggable { +public: + ActiveTcpListenerTest() { + EXPECT_CALL(conn_handler_, dispatcher()).WillRepeatedly(ReturnRef(dispatcher_)); + EXPECT_CALL(conn_handler_, numConnections()).Times(testing::AnyNumber()); + EXPECT_CALL(conn_handler_, statPrefix()).WillRepeatedly(ReturnRef(listener_stat_prefix_)); + listener_filter_matcher_ = std::make_shared>(); + } + + std::string listener_stat_prefix_{"listener_stat_prefix"}; + std::shared_ptr socket_factory_{ + std::make_shared()}; + NiceMock dispatcher_{"test"}; + BasicResourceLimitImpl resource_limit_; + NiceMock conn_handler_; + Network::MockListener* generic_listener_; + Network::MockListenerConfig listener_config_; + NiceMock manager_; + NiceMock filter_chain_factory_; + std::shared_ptr filter_chain_; + std::shared_ptr> listener_filter_matcher_; +}; + +// Verify that the server connection with recovered address is rebalanced at redirected listener. +TEST_F(ActiveTcpListenerTest, RedirectedRebalancer) { + NiceMock listener_config1; + NiceMock balancer1; + EXPECT_CALL(balancer1, registerHandler(_)); + EXPECT_CALL(balancer1, unregisterHandler(_)); + + Network::Address::InstanceConstSharedPtr normal_address( + new Network::Address::Ipv4Instance("127.0.0.1", 10001)); + EXPECT_CALL(*socket_factory_, localAddress()).WillRepeatedly(ReturnRef(normal_address)); + EXPECT_CALL(listener_config1, connectionBalancer()).WillRepeatedly(ReturnRef(balancer1)); + EXPECT_CALL(listener_config1, listenerScope).Times(testing::AnyNumber()); + EXPECT_CALL(listener_config1, listenerFiltersTimeout()); + EXPECT_CALL(listener_config1, continueOnListenerFiltersTimeout()); + EXPECT_CALL(listener_config1, filterChainManager()).WillRepeatedly(ReturnRef(manager_)); + EXPECT_CALL(listener_config1, openConnections()).WillRepeatedly(ReturnRef(resource_limit_)); + EXPECT_CALL(listener_config1, handOffRestoredDestinationConnections()) + .WillRepeatedly(Return(true)); + + auto mock_listener_will_be_moved1 = std::make_unique(); + auto& listener1 = *mock_listener_will_be_moved1; + auto active_listener1 = std::make_unique( + conn_handler_, std::move(mock_listener_will_be_moved1), listener_config1); + + NiceMock listener_config2; + Network::MockConnectionBalancer balancer2; + EXPECT_CALL(balancer2, registerHandler(_)); + EXPECT_CALL(balancer2, unregisterHandler(_)); + + Network::Address::InstanceConstSharedPtr alt_address( + new Network::Address::Ipv4Instance("127.0.0.2", 20002)); + EXPECT_CALL(*socket_factory_, localAddress()).WillRepeatedly(ReturnRef(alt_address)); + EXPECT_CALL(listener_config2, listenerFiltersTimeout()); + EXPECT_CALL(listener_config2, connectionBalancer()).WillRepeatedly(ReturnRef(balancer2)); + EXPECT_CALL(listener_config2, listenerScope).Times(testing::AnyNumber()); + EXPECT_CALL(listener_config2, handOffRestoredDestinationConnections()) + .WillRepeatedly(Return(false)); + EXPECT_CALL(listener_config2, continueOnListenerFiltersTimeout()); + EXPECT_CALL(listener_config2, filterChainManager()).WillRepeatedly(ReturnRef(manager_)); + EXPECT_CALL(listener_config2, openConnections()).WillRepeatedly(ReturnRef(resource_limit_)); + auto mock_listener_will_be_moved2 = std::make_unique(); + auto& listener2 = *mock_listener_will_be_moved2; + auto active_listener2 = std::make_shared( + conn_handler_, std::move(mock_listener_will_be_moved2), listener_config2); + + auto* test_filter = new NiceMock(); + EXPECT_CALL(*test_filter, destroy_()); + Network::MockConnectionSocket* accepted_socket = new NiceMock(); + bool redirected = false; + + // 1. Listener1 re-balance. Set the balance target to the the active listener itself. + EXPECT_CALL(balancer1, pickTargetHandler(_)) + .WillOnce(testing::DoAll( + testing::WithArg<0>(Invoke([](auto& target) { target.incNumConnections(); })), + ReturnRef(*active_listener1))); + + EXPECT_CALL(listener_config1, filterChainFactory()) + .WillRepeatedly(ReturnRef(filter_chain_factory_)); + + // Listener1 has a listener filter in the listener filter chain. + EXPECT_CALL(filter_chain_factory_, createListenerFilterChain(_)) + .WillRepeatedly(Invoke([&](Network::ListenerFilterManager& manager) -> bool { + // Insert the Mock filter. + if (!redirected) { + manager.addAcceptFilter(nullptr, Network::ListenerFilterPtr{test_filter}); + redirected = true; + } + return true; + })); + EXPECT_CALL(*test_filter, onAccept(_)) + .WillOnce(Invoke([&](Network::ListenerFilterCallbacks& cb) -> Network::FilterStatus { + cb.socket().addressProvider().restoreLocalAddress(alt_address); + return Network::FilterStatus::Continue; + })); + // Verify that listener1 hands off the connection by not creating network filter chain. + EXPECT_CALL(manager_, findFilterChain(_)).Times(0); + + // 2. Redirect to Listener2. + EXPECT_CALL(conn_handler_, getBalancedHandlerByAddress(_)) + .WillOnce(Return(Network::BalancedConnectionHandlerOptRef(*active_listener2))); + + // 3. Listener2 re-balance. Set the balance target to the the active listener itself. + EXPECT_CALL(balancer2, pickTargetHandler(_)) + .WillOnce(testing::DoAll( + testing::WithArg<0>(Invoke([](auto& target) { target.incNumConnections(); })), + ReturnRef(*active_listener2))); + + auto filter_factory_callback = std::make_shared>(); + auto transport_socket_factory = Network::Test::createRawBufferSocketFactory(); + filter_chain_ = std::make_shared>(); + + EXPECT_CALL(conn_handler_, incNumConnections()); + EXPECT_CALL(manager_, findFilterChain(_)).WillOnce(Return(filter_chain_.get())); + EXPECT_CALL(*filter_chain_, transportSocketFactory) + .WillOnce(testing::ReturnRef(*transport_socket_factory)); + EXPECT_CALL(*filter_chain_, networkFilterFactories).WillOnce(ReturnRef(*filter_factory_callback)); + EXPECT_CALL(listener_config2, filterChainFactory()) + .WillRepeatedly(ReturnRef(filter_chain_factory_)); + + auto* connection = new NiceMock(); + EXPECT_CALL(dispatcher_, createServerConnection_()).WillOnce(Return(connection)); + EXPECT_CALL(filter_chain_factory_, createNetworkFilterChain(_, _)).WillOnce(Return(true)); + active_listener1->onAccept(Network::ConnectionSocketPtr{accepted_socket}); + + // Verify per-listener connection stats. + EXPECT_EQ(1UL, conn_handler_.numConnections()); + + EXPECT_CALL(conn_handler_, decNumConnections()); + connection->close(Network::ConnectionCloseType::NoFlush); + + EXPECT_CALL(listener1, onDestroy()); + active_listener1.reset(); + EXPECT_CALL(listener2, onDestroy()); + active_listener2.reset(); +} +} // namespace +} // namespace Server +} // namespace Envoy