Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Listener: respect the connection balancer of the redirected listener #15842

Merged
merged 23 commits into from
Apr 29, 2021
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/envoy/config/listener/v3/listener.proto
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,10 @@ message Listener {
// The listener's connection balancer configuration, currently only applicable to TCP listeners.
// If no configuration is specified, Envoy will not attempt to balance active connections between
// worker threads.
//
// Consider the scenario that the listener X set :ref: `original_dst <config_listener_filters_original_dst>`,
// and the listeners Y1,Y2 set `bind_to_port <envoy_api_field_config.listener.v3.Listener.bind_to_port>` to
// false. The balance config should be cleared in listener X and be enabled in Y1 and Y2.
lambdai marked this conversation as resolved.
Show resolved Hide resolved
ConnectionBalanceConfig connection_balance_config = 20;

// When this flag is set to true, listeners set the *SO_REUSEPORT* socket option and
Expand Down
4 changes: 4 additions & 0 deletions api/envoy/config/listener/v4alpha/listener.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions docs/root/version_history/current.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ Minor Behavior Changes
(require upstream 1xx or 204 responses to not have Transfer-Encoding or non-zero Content-Length headers) and
`envoy.reloadable_features.send_strict_1xx_and_204_response_headers`
(do not send 1xx or 204 responses with these headers). Both are true by default.
* listener: respect the :ref:`connection balance config <envoy_v3_api_field_config.listener.v3.Listener.connection_balance_config>`
defined within the listener where the sockets are redirected to. Clear that field to restore the previous behavior.


Bug Fixes
---------
Expand Down
4 changes: 4 additions & 0 deletions generated_api_shadow/envoy/config/listener/v3/listener.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 7 additions & 8 deletions source/server/active_tcp_listener.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ ActiveTcpListener::~ActiveTcpListener() {
// for now. If it becomes a problem (developers hitting this assert when using debug builds) we
// can revisit. This case, if it happens, should be benign on production builds. This case is
// covered in ConnectionHandlerTest::RemoveListenerDuringRebalance.
ASSERT(num_listener_connections_ == 0);
ASSERT(num_listener_connections_ == 0, fmt::format("destroyed listener {} has {} connections",
config_->name(), numConnections()));
}

void ActiveTcpListener::removeConnection(ActiveTcpConnection& connection) {
Expand Down Expand Up @@ -188,14 +189,12 @@ void ActiveTcpSocket::newConnection() {
if (new_listener.has_value()) {
// Hands off connections redirected by iptables to the listener associated with the
// original destination address. Pass 'hand_off_restored_destination_connections' as false to
// prevent further redirection as well as 'rebalanced' as true since the connection has
// already been balanced if applicable inside onAcceptWorker() when the connection was
// initially accepted. Note also that we must account for the number of connections properly
// across both listeners.
// prevent further redirection.
// Leave the new listener to decide whether to execute re-balance.
// Note also that we must account for the number of connections properly across both listeners.
// TODO(mattklein123): See note in ~ActiveTcpSocket() related to making this accounting better.
listener_.decNumConnections();
new_listener.value().get().incNumConnections();
new_listener.value().get().onAcceptWorker(std::move(socket_), false, true);
new_listener.value().get().onAcceptWorker(std::move(socket_), false, false);
} else {
// Set default transport protocol if none of the listener filters did it.
if (socket_->detectedTransportProtocol().empty()) {
Expand Down Expand Up @@ -250,7 +249,7 @@ void ActiveTcpListener::onAcceptWorker(Network::ConnectionSocketPtr&& socket,
auto active_socket = std::make_unique<ActiveTcpSocket>(*this, std::move(socket),
hand_off_restored_destination_connections);

// Create and run the filters
// Create and run the filters.
config_->filterChainFactory().createListenerFilterChain(*active_socket);
active_socket->continueFilterChain(true);

Expand Down
8 changes: 4 additions & 4 deletions source/server/active_tcp_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ using RebalancedSocketSharedPtr = std::shared_ptr<RebalancedSocket>;
/**
* Wrapper for an active tcp listener owned by this handler.
*/
class ActiveTcpListener : public Network::TcpListenerCallbacks,
public ActiveListenerImplBase,
public Network::BalancedConnectionHandler,
Logger::Loggable<Logger::Id::conn_handler> {
class ActiveTcpListener final : public Network::TcpListenerCallbacks,
public ActiveListenerImplBase,
public Network::BalancedConnectionHandler,
Logger::Loggable<Logger::Id::conn_handler> {
public:
ActiveTcpListener(Network::TcpConnectionHandler& parent, Network::ListenerConfig& config);
ActiveTcpListener(Network::TcpConnectionHandler& parent, Network::ListenerPtr&& listener,
Expand Down
1 change: 1 addition & 0 deletions test/common/quic/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ envoy_cc_test(
"//source/common/quic:envoy_quic_connection_helper_lib",
"//source/common/quic:envoy_quic_server_connection_lib",
"//source/common/quic:envoy_quic_server_session_lib",
"//source/server:active_listener_base",
"//test/mocks/http:http_mocks",
"//test/mocks/http:stream_decoder_mock",
"//test/mocks/network:network_mocks",
Expand Down
3 changes: 3 additions & 0 deletions test/integration/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -1547,13 +1547,16 @@ envoy_cc_test(
"//source/common/network:connection_lib",
"//source/common/network:utility_lib",
"//source/extensions/filters/http/health_check:config",
"//source/extensions/filters/network/tcp_proxy:config",
"//test/common/grpc:grpc_client_integration_lib",
"//test/integration/filters:address_restore_listener_filter_lib",
"//test/test_common:resources_lib",
"//test/test_common:utility_lib",
"@envoy_api//envoy/config/bootstrap/v3:pkg_cc_proto",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
"@envoy_api//envoy/config/route/v3:pkg_cc_proto",
"@envoy_api//envoy/extensions/filters/network/http_connection_manager/v3:pkg_cc_proto",
"@envoy_api//envoy/extensions/filters/network/tcp_proxy/v3:pkg_cc_proto",
"@envoy_api//envoy/service/discovery/v3:pkg_cc_proto",
],
)
Expand Down
19 changes: 19 additions & 0 deletions test/integration/filters/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,25 @@ envoy_cc_test_library(
],
)

envoy_cc_test_library(
name = "address_restore_listener_filter_lib",
srcs = [
"address_restore_listener_filter.cc",
],
deps = [
":common_lib",
"//include/envoy/network:filter_interface",
"//include/envoy/network:listen_socket_interface",
"//include/envoy/registry",
"//include/envoy/server:filter_config_interface",
"//source/common/common:assert_lib",
"//source/common/common:minimal_logger_lib",
"//source/common/network:address_lib",
"//source/common/network:upstream_socket_options_filter_state_lib",
"//source/common/network:utility_lib",
],
)

envoy_cc_test_library(
name = "set_route_filter_lib",
srcs = [
Expand Down
55 changes: 55 additions & 0 deletions test/integration/filters/address_restore_listener_filter.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@


#include "envoy/network/filter.h"
#include "envoy/network/listen_socket.h"
#include "envoy/server/filter_config.h"

#include "common/network/address_impl.h"
#include "common/network/utility.h"

namespace Envoy {

// The FakeOriginalDstListenerFilter restore desired local address without the dependency of OS.
class FakeOriginalDstListenerFilter : public Network::ListenerFilter {
public:
// Network::ListenerFilter
Network::FilterStatus onAccept(Network::ListenerFilterCallbacks& cb) override {
FANCY_LOG(debug, "in FakeOriginalDstListenerFilter::onAccept");
Network::ConnectionSocket& socket = cb.socket();
socket.addressProvider().restoreLocalAddress(
std::make_shared<Network::Address::Ipv4Instance>("127.0.0.2", 80));
FANCY_LOG(debug, "current local socket address is {} restored = {}",
socket.addressProvider().localAddress()->asString(),
socket.addressProvider().localAddressRestored());
return Network::FilterStatus::Continue;
}
};

class FakeOriginalDstListenerFilterConfigFactory
: public Server::Configuration::NamedListenerFilterConfigFactory {
public:
// NamedListenerFilterConfigFactory
Network::ListenerFilterFactoryCb createListenerFilterFactoryFromProto(
const Protobuf::Message&,
const Network::ListenerFilterMatcherSharedPtr& listener_filter_matcher,
Server::Configuration::ListenerFactoryContext&) override {
return [listener_filter_matcher](Network::ListenerFilterManager& filter_manager) -> void {
filter_manager.addAcceptFilter(listener_filter_matcher,
std::make_unique<FakeOriginalDstListenerFilter>());
};
}

ProtobufTypes::MessagePtr createEmptyConfigProto() override {
return ProtobufTypes::MessagePtr{new Envoy::ProtobufWkt::Struct()};
}

std::string name() const override {
// This fake original_dest should be used only in integration test!
return "envoy.filters.listener.original_dst";
}
};

static Registry::RegisterFactory<FakeOriginalDstListenerFilterConfigFactory,
Server::Configuration::NamedListenerFilterConfigFactory>
register_;
} // namespace Envoy
100 changes: 100 additions & 0 deletions test/integration/listener_lds_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#include "envoy/config/route/v3/route.pb.h"
#include "envoy/config/route/v3/scoped_route.pb.h"
#include "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.pb.h"
#include "envoy/extensions/filters/network/tcp_proxy/v3/tcp_proxy.pb.h"
#include "envoy/network/connection.h"
#include "envoy/service/discovery/v3/discovery.pb.h"

#include "common/config/api_version.h"
Expand Down Expand Up @@ -409,5 +411,103 @@ TEST_P(ListenerIntegrationTest, MultipleLdsUpdatesSharingListenSocketFactory) {
}
}

class RebalancerTest : public testing::TestWithParam<Network::Address::IpVersion>,
public BaseIntegrationTest {
public:
RebalancerTest()
: BaseIntegrationTest(GetParam(), ConfigHelper::baseConfig() + R"EOF(
filter_chains:
- filters:
- name: envoy.filters.network.tcp_proxy
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy
stat_prefix: tcp_stats
cluster: cluster_0
)EOF") {}

void initialize() override {
config_helper_.renameListener("tcp");
config_helper_.addConfigModifier(
[&](envoy::config::bootstrap::v3::Bootstrap& bootstrap) -> void {
auto& src_listener_config = *bootstrap.mutable_static_resources()->mutable_listeners(0);
src_listener_config.mutable_use_original_dst()->set_value(true);
// Note that the below original_dst is replaced by FakeOriginalDstListenerFilter at the
// link time.
src_listener_config.add_listener_filters()->set_name(
"envoy.filters.listener.original_dst");
auto& dst_listener_config = *bootstrap.mutable_static_resources()->add_listeners();
dst_listener_config = src_listener_config;
dst_listener_config.mutable_use_original_dst()->set_value(false);
dst_listener_config.clear_listener_filters();
dst_listener_config.mutable_bind_to_port()->set_value(false);
dst_listener_config.set_name("balanced_target_listener");
dst_listener_config.mutable_connection_balance_config()->mutable_exact_balance();

// 127.0.0.2 is defined in FakeOriginalDstListenerFilter.
*dst_listener_config.mutable_address()->mutable_socket_address()->mutable_address() =
lambdai marked this conversation as resolved.
Show resolved Hide resolved
"127.0.0.2";
dst_listener_config.mutable_address()->mutable_socket_address()->set_port_value(80);
lambdai marked this conversation as resolved.
Show resolved Hide resolved
});
BaseIntegrationTest::initialize();
}

std::unique_ptr<RawConnectionDriver> createConnectionAndWrite(const std::string& request,
std::string& response) {
Buffer::OwnedImpl buffer(request);
return std::make_unique<RawConnectionDriver>(
lookupPort("tcp"), buffer,
[&response](Network::ClientConnection&, const Buffer::Instance& data) -> void {
response.append(data.toString());
},
version_, *dispatcher_);
}
};

struct PerConnection {
std::string response_;
std::unique_ptr<RawConnectionDriver> client_conn_;
FakeRawConnectionPtr upstream_conn_;
};

// Verify the connections are distributed evenly on the 2 worker threads of the redirected
// listener.
TEST_P(RebalancerTest, RedirectConnectionIsBalancedOnDestinationListener) {
concurrency_ = 2;
int repeats = 10;
initialize();

// The balancer is balanced as per active connection instead of total connection.
// The below vector maintains all the connections alive.
std::vector<PerConnection> connections;
for (uint32_t i = 0; i < repeats * concurrency_; ++i) {
connections.emplace_back();
connections.back().client_conn_ =
createConnectionAndWrite("dummy", connections.back().response_);
connections.back().client_conn_->waitForConnection();
ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(connections.back().upstream_conn_));
}
for (auto& conn : connections) {
conn.client_conn_->close();
while (!conn.client_conn_->closed()) {
dispatcher_->run(Event::Dispatcher::RunType::NonBlock);
}
}

ASSERT_EQ(TestUtility::findCounter(test_server_->statStore(),
"listener.127.0.0.2_80.worker_0.downstream_cx_total")

->value(),
repeats);
ASSERT_EQ(TestUtility::findCounter(test_server_->statStore(),
"listener.127.0.0.2_80.worker_1.downstream_cx_total")

->value(),
repeats);
}

INSTANTIATE_TEST_SUITE_P(IpVersions, RebalancerTest,
testing::ValuesIn(TestEnvironment::getIpVersionsForTest()),
TestUtility::ipTestParamsToString);

} // namespace
} // namespace Envoy
12 changes: 11 additions & 1 deletion test/mocks/network/mocks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,17 @@ MockListener::MockListener() = default;

MockListener::~MockListener() { onDestroy(); }

MockConnectionHandler::MockConnectionHandler() = default;
MockConnectionHandler::MockConnectionHandler() {
ON_CALL(*this, incNumConnections()).WillByDefault(Invoke([this]() {
++num_handler_connections_;
}));
ON_CALL(*this, decNumConnections()).WillByDefault(Invoke([this]() {
--num_handler_connections_;
}));
ON_CALL(*this, numConnections()).WillByDefault(Invoke([this]() {
return num_handler_connections_;
}));
}
MockConnectionHandler::~MockConnectionHandler() = default;

MockIp::MockIp() = default;
Expand Down
4 changes: 3 additions & 1 deletion test/mocks/network/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ class MockListener : public Listener {
MOCK_METHOD(void, setRejectFraction, (UnitFloat));
};

class MockConnectionHandler : public ConnectionHandler {
class MockConnectionHandler : public virtual ConnectionHandler {
lambdai marked this conversation as resolved.
Show resolved Hide resolved
public:
MockConnectionHandler();
~MockConnectionHandler() override;
Expand All @@ -441,6 +441,8 @@ class MockConnectionHandler : public ConnectionHandler {
MOCK_METHOD(void, enableListeners, ());
MOCK_METHOD(void, setListenerRejectFraction, (UnitFloat), (override));
MOCK_METHOD(const std::string&, statPrefix, (), (const));

uint64_t num_handler_connections_{};
};

class MockIp : public Address::Ip {
Expand Down
19 changes: 19 additions & 0 deletions test/server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,25 @@ envoy_cc_test(
],
)

envoy_cc_test(
name = "active_tcp_listener_test",
srcs = ["active_tcp_listener_test.cc"],
deps = [
"//source/common/common:utility_lib",
"//source/common/config:utility_lib",
"//source/common/network:address_lib",
"//source/common/network:connection_balancer_lib",
"//source/common/stats:stats_lib",
"//source/server:active_raw_udp_listener_config",
"//source/server:connection_handler_lib",
"//test/mocks/access_log:access_log_mocks",
"//test/mocks/api:api_mocks",
"//test/mocks/network:network_mocks",
"//test/test_common:network_utility_lib",
"//test/test_common:threadsafe_singleton_injector_lib",
],
)

envoy_cc_test(
name = "drain_manager_impl_test",
srcs = ["drain_manager_impl_test.cc"],
Expand Down
Loading