Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

listener: enable socket_options for multiple addresses #24210

Merged
merged 13 commits into from
Nov 29, 2022
1 change: 0 additions & 1 deletion api/envoy/config/listener/v3/listener.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE;
message AdditionalAddress {
core.v3.Address address = 1;

// [#not-implemented-hide:]
// Additional socket options that may not be present in Envoy source code or
// precompiled binaries. If specified, this will override the
// :ref:`socket_options <envoy_v3_api_field_config.listener.v3.Listener.socket_options>`
Expand Down
3 changes: 3 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ new_features:
- area: generic_proxy
change: |
added :ref:`generic rds support <envoy_v3_api_field_extensions.filters.network.generic_proxy.v3.GenericProxy.generic_rds>`.
- area: listener
change: |
added a new field :ref:`socket_options <envoy_v3_api_field_config.listener.v3.AdditionalAddress.socket_options>` to the AdditionalAddress, allowing specifying discrete socket options for each listen address.
- area: listener
change: |
added ``continueFilterChain()`` and ``dispatcher()`` methods to the ``ListenerFilterCallback``. This allows listener filters to continue listener filter iteration after stopping iteration e.g. if the listener filter depends on an async process.
Expand Down
152 changes: 109 additions & 43 deletions source/server/listener_impl.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include "source/server/listener_impl.h"

#include <functional>

#include "envoy/config/core/v3/base.pb.h"
#include "envoy/config/listener/v3/listener.pb.h"
#include "envoy/config/listener/v3/listener_components.pb.h"
Expand Down Expand Up @@ -359,15 +361,20 @@ ListenerImpl::ListenerImpl(const envoy::config::listener::v3::Listener& config,
quic_stat_names_(parent_.quicStatNames()),
missing_listener_config_stats_({ALL_MISSING_LISTENER_CONFIG_STATS(
POOL_COUNTER(listener_factory_context_->listenerScope()))}) {
std::vector<std::reference_wrapper<
const Protobuf::RepeatedPtrField<envoy::config::core::v3::SocketOption>>>
address_opts_list;
if (config.has_internal_listener()) {
addresses_.emplace_back(
std::make_shared<Network::Address::EnvoyInternalInstance>(config.name()));
address_opts_list.emplace_back(std::ref(config_.socket_options()));
} else {
// All the addresses should be same socket type, so get the first address's socket type is
// enough.
auto address = Network::Address::resolveProtoAddress(config.address());
checkIpv4CompatAddress(address, config.address());
addresses_.emplace_back(address);
address_opts_list.emplace_back(std::ref(config_.socket_options()));

for (auto i = 0; i < config.additional_addresses_size(); i++) {
if (socket_type_ !=
Expand All @@ -381,6 +388,12 @@ ListenerImpl::ListenerImpl(const envoy::config::listener::v3::Listener& config,
Network::Address::resolveProtoAddress(config.additional_addresses(i).address());
checkIpv4CompatAddress(address, config.additional_addresses(i).address());
addresses_.emplace_back(additional_address);
if (config.additional_addresses(i).has_socket_options()) {
address_opts_list.emplace_back(
std::ref(config.additional_addresses(i).socket_options().socket_options()));
} else {
address_opts_list.emplace_back(std::ref(config_.socket_options()));
}
}
}

Expand All @@ -402,7 +415,7 @@ ListenerImpl::ListenerImpl(const envoy::config::listener::v3::Listener& config,
// buildUdpListenerFactory() must come before buildListenSocketOptions() because the UDP
// listener factory can provide additional options.
buildUdpListenerFactory(parent_.server_.options().concurrency());
buildListenSocketOptions();
buildListenSocketOptions(address_opts_list);
createListenerFilterFactories();
validateFilterChains();
buildFilterChains();
Expand Down Expand Up @@ -445,6 +458,7 @@ ListenerImpl::ListenerImpl(ListenerImpl& origin,
dynamic_init_manager_(std::make_unique<Init::ManagerImpl>(
fmt::format("Listener-local-init-manager {} {}", name, hash))),
config_(config), version_info_(version_info),
listen_socket_options_list_(origin.listen_socket_options_list_),
listener_filters_timeout_(
PROTOBUF_GET_MS_OR_DEFAULT(config, listener_filters_timeout, 15000)),
continue_on_listener_filters_timeout_(config.continue_on_listener_filters_timeout()),
Expand All @@ -467,7 +481,6 @@ ListenerImpl::ListenerImpl(ListenerImpl& origin,
POOL_COUNTER(listener_factory_context_->listenerScope()))}) {
buildAccessLog();
validateConfig();
buildListenSocketOptions();
createListenerFilterFactories();
validateFilterChains();
buildFilterChains();
Expand Down Expand Up @@ -629,40 +642,59 @@ void ListenerImpl::buildUdpListenerFactory(uint32_t concurrency) {
}
}

void ListenerImpl::buildListenSocketOptions() {
// The process-wide `signal()` handling may fail to handle SIGPIPE if overridden
// in the process (i.e., on a mobile client). Some OSes support handling it at the socket layer:
if (ENVOY_SOCKET_SO_NOSIGPIPE.hasValue()) {
addListenSocketOptions(Network::SocketOptionFactory::buildSocketNoSigpipeOptions());
}
if (PROTOBUF_GET_WRAPPED_OR_DEFAULT(config_, transparent, false)) {
addListenSocketOptions(Network::SocketOptionFactory::buildIpTransparentOptions());
}
if (PROTOBUF_GET_WRAPPED_OR_DEFAULT(config_, freebind, false)) {
addListenSocketOptions(Network::SocketOptionFactory::buildIpFreebindOptions());
}
if (reuse_port_) {
addListenSocketOptions(Network::SocketOptionFactory::buildReusePortOptions());
}
if (!config_.socket_options().empty()) {
addListenSocketOptions(
Network::SocketOptionFactory::buildLiteralOptions(config_.socket_options()));
}
if (socket_type_ == Network::Socket::Type::Datagram) {
// Needed for recvmsg to return destination address in IP header.
addListenSocketOptions(Network::SocketOptionFactory::buildIpPacketInfoOptions());
// Needed to return receive buffer overflown indicator.
addListenSocketOptions(Network::SocketOptionFactory::buildRxQueueOverFlowOptions());
// TODO(yugant) : Add a config option for UDP_GRO
if (Api::OsSysCallsSingleton::get().supportsUdpGro()) {
// Needed to receive gso_size option
addListenSocketOptions(Network::SocketOptionFactory::buildUdpGroOptions());
void ListenerImpl::buildListenSocketOptions(
std::vector<std::reference_wrapper<
const Protobuf::RepeatedPtrField<envoy::config::core::v3::SocketOption>>>&
address_opts_list) {
listen_socket_options_list_.insert(listen_socket_options_list_.begin(), addresses_.size(),
nullptr);
for (std::vector<std::reference_wrapper<
const Protobuf::RepeatedPtrField<envoy::config::core::v3::SocketOption>&>>::size_type i =
0;
i < address_opts_list.size(); i++) {
// The process-wide `signal()` handling may fail to handle SIGPIPE if overridden
// in the process (i.e., on a mobile client). Some OSes support handling it at the socket layer:
if (ENVOY_SOCKET_SO_NOSIGPIPE.hasValue()) {
addListenSocketOptions(listen_socket_options_list_[i],
Network::SocketOptionFactory::buildSocketNoSigpipeOptions());
}
if (PROTOBUF_GET_WRAPPED_OR_DEFAULT(config_, transparent, false)) {
addListenSocketOptions(listen_socket_options_list_[i],
Network::SocketOptionFactory::buildIpTransparentOptions());
}
if (PROTOBUF_GET_WRAPPED_OR_DEFAULT(config_, freebind, false)) {
addListenSocketOptions(listen_socket_options_list_[i],
Network::SocketOptionFactory::buildIpFreebindOptions());
}
if (reuse_port_) {
addListenSocketOptions(listen_socket_options_list_[i],
Network::SocketOptionFactory::buildReusePortOptions());
}
if (!config_.socket_options().empty()) {
addListenSocketOptions(
listen_socket_options_list_[i],
Network::SocketOptionFactory::buildLiteralOptions(address_opts_list[i]));
}
if (socket_type_ == Network::Socket::Type::Datagram) {
// Needed for recvmsg to return destination address in IP header.
addListenSocketOptions(listen_socket_options_list_[i],
Network::SocketOptionFactory::buildIpPacketInfoOptions());
// Needed to return receive buffer overflown indicator.
addListenSocketOptions(listen_socket_options_list_[i],
Network::SocketOptionFactory::buildRxQueueOverFlowOptions());
// TODO(yugant) : Add a config option for UDP_GRO
if (Api::OsSysCallsSingleton::get().supportsUdpGro()) {
// Needed to receive gso_size option
addListenSocketOptions(listen_socket_options_list_[i],
Network::SocketOptionFactory::buildUdpGroOptions());
}

// Additional factory specific options.
ASSERT(udp_listener_config_->listener_factory_ != nullptr,
"buildUdpListenerFactory() must run first");
addListenSocketOptions(udp_listener_config_->listener_factory_->socketOptions());
// Additional factory specific options.
ASSERT(udp_listener_config_->listener_factory_ != nullptr,
"buildUdpListenerFactory() must run first");
addListenSocketOptions(listen_socket_options_list_[i],
udp_listener_config_->listener_factory_->socketOptions());
}
}
}

Expand Down Expand Up @@ -777,8 +809,12 @@ void ListenerImpl::buildConnectionBalancer(const Network::Address::Instance& add

void ListenerImpl::buildSocketOptions() {
if (config_.has_tcp_fast_open_queue_length()) {
addListenSocketOptions(Network::SocketOptionFactory::buildTcpFastOpenOptions(
config_.tcp_fast_open_queue_length().value()));
for (std::vector<Network::Address::InstanceConstSharedPtr>::size_type i = 0;
i < addresses_.size(); i++) {
addListenSocketOptions(listen_socket_options_list_[i],
Network::SocketOptionFactory::buildTcpFastOpenOptions(
config_.tcp_fast_open_queue_length().value()));
}
}
}

Expand Down Expand Up @@ -1149,13 +1185,43 @@ bool ListenerMessageUtil::socketOptionsEqual(const envoy::config::listener::v3::
return false;
}

return std::equal(lhs.socket_options().begin(), lhs.socket_options().end(),
rhs.socket_options().begin(), rhs.socket_options().end(),
[](const ::envoy::config::core::v3::SocketOption& option,
const ::envoy::config::core::v3::SocketOption& other_option) {
Protobuf::util::MessageDifferencer differencer;
return differencer.Compare(option, other_option);
});
bool is_equal = std::equal(lhs.socket_options().begin(), lhs.socket_options().end(),
rhs.socket_options().begin(), rhs.socket_options().end(),
[](const ::envoy::config::core::v3::SocketOption& option,
const ::envoy::config::core::v3::SocketOption& other_option) {
Protobuf::util::MessageDifferencer differencer;
return differencer.Compare(option, other_option);
});
if (!is_equal) {
return false;
}

if (lhs.additional_addresses_size() != rhs.additional_addresses_size()) {
return false;
}
// Assume people won't change the order of additional addresses.
for (auto i = 0; i < lhs.additional_addresses_size(); i++) {
if (lhs.additional_addresses(i).has_socket_options() !=
rhs.additional_addresses(i).has_socket_options()) {
return false;
}
if (lhs.additional_addresses(i).has_socket_options()) {
is_equal = std::equal(lhs.additional_addresses(i).socket_options().socket_options().begin(),
lhs.additional_addresses(i).socket_options().socket_options().end(),
rhs.additional_addresses(i).socket_options().socket_options().begin(),
rhs.additional_addresses(i).socket_options().socket_options().end(),
[](const ::envoy::config::core::v3::SocketOption& option,
const ::envoy::config::core::v3::SocketOption& other_option) {
Protobuf::util::MessageDifferencer differencer;
return differencer.Compare(option, other_option);
});
if (!is_equal) {
return false;
}
}
}

return true;
}

bool ListenerMessageUtil::filterChainOnlyChange(const envoy::config::listener::v3::Listener& lhs,
Expand Down
25 changes: 15 additions & 10 deletions source/server/listener_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,10 @@ class ListenerImpl final : public Network::ListenerConfig,
}
void addSocketFactory(Network::ListenSocketFactoryPtr&& socket_factory);
void setSocketAndOptions(const Network::SocketSharedPtr& socket);
const Network::Socket::OptionsSharedPtr& listenSocketOptions() { return listen_socket_options_; }
const Network::Socket::OptionsSharedPtr& listenSocketOptions(uint32_t address_index) {
ASSERT(listen_socket_options_list_.size() > address_index);
return listen_socket_options_list_[address_index];
}
const std::string& versionInfo() const { return version_info_; }
bool reusePort() const { return reuse_port_; }
static bool getReusePortOrDefault(Server::Instance& server,
Expand Down Expand Up @@ -380,10 +383,9 @@ class ListenerImpl final : public Network::ListenerConfig,
return config().traffic_direction();
}

void ensureSocketOptions() {
if (!listen_socket_options_) {
listen_socket_options_ =
std::make_shared<std::vector<Network::Socket::OptionConstSharedPtr>>();
void ensureSocketOptions(Network::Socket::OptionsSharedPtr& options) {
if (options == nullptr) {
options = std::make_shared<std::vector<Network::Socket::OptionConstSharedPtr>>();
}
}

Expand Down Expand Up @@ -450,7 +452,8 @@ class ListenerImpl final : public Network::ListenerConfig,
void buildUdpListenerWorkerRouter(const Network::Address::Instance& address,
uint32_t concurrency);
void buildUdpListenerFactory(uint32_t concurrency);
void buildListenSocketOptions();
void buildListenSocketOptions(std::vector<std::reference_wrapper<const Protobuf::RepeatedPtrField<
envoy::config::core::v3::SocketOption>>>& address_opts_list);
void createListenerFilterFactories();
void validateFilterChains();
void buildFilterChains();
Expand All @@ -461,9 +464,10 @@ class ListenerImpl final : public Network::ListenerConfig,
void checkIpv4CompatAddress(const Network::Address::InstanceConstSharedPtr& address,
const envoy::config::core::v3::Address& proto_address);

void addListenSocketOptions(const Network::Socket::OptionsSharedPtr& options) {
ensureSocketOptions();
Network::Socket::appendOptions(listen_socket_options_, options);
void addListenSocketOptions(Network::Socket::OptionsSharedPtr& options,
const Network::Socket::OptionsSharedPtr& append_options) {
ensureSocketOptions(options);
Network::Socket::appendOptions(options, append_options);
}

ListenerManagerImpl& parent_;
Expand Down Expand Up @@ -495,7 +499,8 @@ class ListenerImpl final : public Network::ListenerConfig,
std::vector<AccessLog::InstanceSharedPtr> access_logs_;
const envoy::config::listener::v3::Listener config_;
const std::string version_info_;
Network::Socket::OptionsSharedPtr listen_socket_options_;
// Using std::vector instead of hash map for supporting multiple zero port addresses.
std::vector<Network::Socket::OptionsSharedPtr> listen_socket_options_list_;
const std::chrono::milliseconds listener_filters_timeout_;
const bool continue_on_listener_filters_timeout_;
std::shared_ptr<UdpListenerConfigImpl> udp_listener_config_;
Expand Down
8 changes: 5 additions & 3 deletions source/server/listener_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1089,10 +1089,12 @@ void ListenerManagerImpl::createListenSocketFactory(ListenerImpl& listener) {
TRY_ASSERT_MAIN_THREAD {
Network::SocketCreationOptions creation_options;
creation_options.mptcp_enabled_ = listener.mptcpEnabled();
for (auto& address : listener.addresses()) {
for (std::vector<Network::Address::InstanceConstSharedPtr>::size_type i = 0;
i < listener.addresses().size(); i++) {
listener.addSocketFactory(std::make_unique<ListenSocketFactoryImpl>(
factory_, address, socket_type, listener.listenSocketOptions(), listener.name(),
listener.tcpBacklogSize(), bind_type, creation_options, server_.options().concurrency()));
factory_, listener.addresses()[i], socket_type, listener.listenSocketOptions(i),
listener.name(), listener.tcpBacklogSize(), bind_type, creation_options,
server_.options().concurrency()));
}
}
END_TRY
Expand Down
11 changes: 5 additions & 6 deletions test/integration/base_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ void BaseIntegrationTest::setUpstreamAddress(
}

bool BaseIntegrationTest::getSocketOption(const std::string& listener_name, int level, int optname,
void* optval, socklen_t* optlen) {
void* optval, socklen_t* optlen, int address_index) {
bool listeners_ready = false;
absl::Mutex l;
std::vector<std::reference_wrapper<Network::ListenerConfig>> listeners;
Expand All @@ -359,11 +359,10 @@ bool BaseIntegrationTest::getSocketOption(const std::string& listener_name, int

for (auto& listener : listeners) {
if (listener.get().name() == listener_name) {
for (auto& socket_factory : listener.get().listenSocketFactories()) {
auto socket = socket_factory->getListenSocket(0);
if (socket->getSocketOption(level, optname, optval, optlen).return_value_ != 0) {
return false;
}
auto& socket_factory = listener.get().listenSocketFactories()[address_index];
auto socket = socket_factory->getListenSocket(0);
if (socket->getSocketOption(level, optname, optval, optlen).return_value_ != 0) {
return false;
}
return true;
}
Expand Down
2 changes: 1 addition & 1 deletion test/integration/base_integration_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class BaseIntegrationTest : protected Logger::Loggable<Logger::Id::testing> {
void setDeterministicValue(uint64_t value = 0) { deterministic_value_ = value; }
// Get socket option for a specific listener's socket.
bool getSocketOption(const std::string& listener_name, int level, int optname, void* optval,
socklen_t* optlen);
socklen_t* optlen, int address_index = 0);

Http::CodecType upstreamProtocol() const { return upstream_config_.upstream_protocol_; }

Expand Down
Loading