Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

udp: limit number of reads per event loop #16180

Merged
merged 34 commits into from
May 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
a84505d
add read limit
danzh1989 Apr 23, 2021
879cd6f
test pass
danzh1989 Apr 26, 2021
24ae38d
refine comments
danzh1989 Apr 26, 2021
48a644f
return num packets per loop
danzh1989 May 2, 2021
b89f7fe
typo
danzh1989 May 3, 2021
758c321
change udp_proxy_filter
danzh1989 May 3, 2021
ded13a6
fix udp proxy test
danzh1989 May 3, 2021
51c3404
fix build
danzh1989 May 3, 2021
f0949ea
fix build error
danzh1989 May 5, 2021
0858725
fix udp_fuzz
danzh1989 May 6, 2021
001f228
add config
danzh1989 May 6, 2021
efb5bef
Merge branch 'master' into udpreadlimit
danzh1989 May 6, 2021
09ba49d
spelling
danzh1989 May 6, 2021
fb6bb16
fix sign compare
danzh1989 May 6, 2021
372b3e3
fix for non-recvmmsg platform
danzh1989 May 8, 2021
2151e2d
added DEFAULT_PACKETS_TO_READ_PER_CONNECTION
danzh1989 May 12, 2021
d9989e8
make it build
danzh1989 May 12, 2021
e0aae6f
rename some variables
danzh1989 May 12, 2021
dcda8e8
fix built
danzh1989 May 17, 2021
80e91f6
remove READ
danzh1989 May 17, 2021
8cd5cda
update protobuf
danzh1989 May 17, 2021
45e3518
update comment
danzh1989 May 18, 2021
3be3b84
add runtime guard
danzh1989 May 18, 2021
3d82197
add runtime guard
danzh1989 May 18, 2021
077b6d3
Merge branch 'master' into udpreadlimit
danzh1989 May 18, 2021
33d1bd3
change order
danzh1989 May 18, 2021
9928fb3
use atomic in InjectableSingleton
danzh1989 May 18, 2021
5700114
Merge branch 'master' into udpreadlimit
danzh1989 May 18, 2021
3ec57b7
re-enable tests
danzh1989 May 18, 2021
7c0b9d4
Merge branch 'master' into udpreadlimit
danzh1989 May 19, 2021
c3f20a5
Merge branch 'master' into udpreadlimit
danzh1989 May 19, 2021
6622e2b
Merge branch 'master' into udpreadlimit
danzh1989 May 20, 2021
d8538ed
Merge branch 'master' into udpreadlimit
danzh1989 May 20, 2021
ee2db2d
disable tests uncer coverage
danzh1989 May 20, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions api/envoy/config/listener/v3/quic_config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import "envoy/config/core/v3/base.proto";
import "envoy/config/core/v3/protocol.proto";

import "google/protobuf/duration.proto";
import "google/protobuf/wrappers.proto";

import "udpa/annotations/status.proto";
import "udpa/annotations/versioning.proto";
import "validate/validate.proto";

option java_package = "io.envoyproxy.envoy.config.listener.v3";
option java_outer_classname = "QuicConfigProto";
Expand All @@ -18,6 +20,7 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE;
// [#protodoc-title: QUIC listener config]

// Configuration specific to the UDP QUIC listener.
// [#next-free-field: 6]
message QuicProtocolOptions {
option (udpa.annotations.versioning).previous_message_type =
"envoy.api.v2.listener.QuicProtocolOptions";
Expand All @@ -35,4 +38,14 @@ message QuicProtocolOptions {
// Runtime flag that controls whether the listener is enabled or not. If not specified, defaults
// to enabled.
core.v3.RuntimeFeatureFlag enabled = 4;

// A multiplier to number of connections which is used to determine how many packets to read per
// event loop. A reasonable number should allow the listener to process enough payload but not
// starve TCP and other UDP sockets and also prevent long event loop duration.
// The default value is 32. This means if there are N QUIC connections, the total number of
// packets to read in each read event will be 32 * N.
// The actual number of packets to read in total by the UDP listener is also
// bound by 6000, regardless of this field or how many connections there are.
google.protobuf.UInt32Value packets_to_read_to_connection_count_ratio = 5
[(validate.rules).uint32 = {gte: 1}];
alyssawilk marked this conversation as resolved.
Show resolved Hide resolved
}
13 changes: 13 additions & 0 deletions api/envoy/config/listener/v4alpha/quic_config.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion docs/root/version_history/current.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ Minor Behavior Changes
defined within the listener where the sockets are redirected to. Clear that field to restore the previous behavior.
* tcp: switched to the new connection pool by default. Any unexpected behavioral changes can be reverted by setting runtime guard ``envoy.reloadable_features.new_tcp_connection_pool`` to false.


Bug Fixes
---------
*Changes expected to improve the state of the world and are unlikely to have negative effects*

* http: port stripping now works for CONNECT requests, though the port will be restored if the CONNECT request is sent upstream. This behavior can be temporarily reverted by setting ``envoy.reloadable_features.strip_port_from_connect`` to false.
* http: raise max configurable max_request_headers_kb limit to 8192 KiB (8MiB) from 96 KiB in http connection manager.
* listener: fix the crash which could happen when the ongoing filter chain only listener update is followed by the listener removal or full listener update.
* udp: limit each UDP listener to read maxmium 6000 packets per event loop. This behavior can be temporarily reverted by setting ``envoy.reloadable_features.udp_per_event_loop_read_limit`` to false.
* validation: fix an issue that causes TAP sockets to panic during config validation mode.
* xray: fix the default sampling 'rate' for AWS X-Ray tracer extension to be 5% as opposed to 50%.
* zipkin: fix timestamp serializaiton in annotations. A prior bug fix exposed an issue with timestamps being serialized as strings.
Expand Down
13 changes: 13 additions & 0 deletions generated_api_shadow/envoy/config/listener/v3/quic_config.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions include/envoy/network/listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,11 @@ class UdpListenerCallbacks {
* Posts ``data`` to be delivered on this worker.
*/
virtual void post(Network::UdpRecvData&& data) PURE;

/**
* An estimated number of UDP packets this callback expects to process in current read event.
*/
virtual size_t numPacketsExpectedPerEventLoop() const PURE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is READ in all caps?

also for naming is this the number of packets expected, or the number of packets the listener should be willing to read?

Should we make it clear in comments this is TCP specific?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

READ doesn't need to be all caps. Fixed.

For the naming confusion, what's the difference between "the number of packets expected" and "the number of packets the listener should be willing to read"? This interface provides a hint to UDP listener but eventually the listener will decide how many to read.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is READ in all caps?

READ doesn't need to be all caps. Fixed.

also for naming is this the number of packets expected, or the number of packets the listener should be willing to read?

What's the difference between "the number of packets expected" and "the number of packets the listener should be willing to read"? This interface provides a hint to UDP listener but eventually the listener will decide how many to read.

Should we make it clear in comments this is TCP specific?

UDP specific? This is in UdpListenerCallbacks after all.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's expected vs allowed to be read.
If we have 10 sessions it's not that we expect 160 packets, it's that we'll read 16 packets per loop.

and yes, I meant UDP. UdpPackets maybe since tcp packets are a thing too?

};

using UdpListenerCallbacksOptRef = absl::optional<std::reference_wrapper<UdpListenerCallbacks>>;
Expand Down
7 changes: 6 additions & 1 deletion source/common/network/udp_listener_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,12 @@ void UdpListenerImpl::handleReadCallback() {
const Api::IoErrorPtr result = Utility::readPacketsFromSocket(
socket_->ioHandle(), *socket_->addressProvider().localAddress(), *this, time_source_,
config_.prefer_gro_, packets_dropped_);
// TODO(mattklein123): Handle no error when we limit the number of packets read.
if (result == nullptr) {
// No error. The number of reads was limited by read rate. There are more packets to read.
// Register to read more in the next event loop.
socket_->ioHandle().activateFileEvents(Event::FileReadyType::Read);
return;
}
if (result->getErrorCode() != Api::IoError::IoErrorCode::Again) {
// TODO(mattklein123): When rate limited logging is implemented log this at error level
// on a periodic basis.
Expand Down
3 changes: 3 additions & 0 deletions source/common/network/udp_listener_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ class UdpListenerImpl : public BaseListenerImpl,
MonotonicTime receive_time) override;
uint64_t maxDatagramSize() const override { return config_.max_rx_datagram_size_; }
void onDatagramsDropped(uint32_t dropped) override { cb_.onDatagramsDropped(dropped); }
size_t numPacketsExpectedPerEventLoop() const override {
return cb_.numPacketsExpectedPerEventLoop();
}

protected:
void handleWriteCallback();
Expand Down
26 changes: 23 additions & 3 deletions source/common/network/utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "common/network/io_socket_error_impl.h"
#include "common/protobuf/protobuf.h"
#include "common/protobuf/utility.h"
#include "common/runtime/runtime_features.h"

#include "absl/container/fixed_array.h"
#include "absl/strings/match.h"
Expand Down Expand Up @@ -576,10 +577,10 @@ void passPayloadToProcessor(uint64_t bytes_read, Buffer::InstancePtr buffer,
Api::IoCallUint64Result Utility::readFromSocket(IoHandle& handle,
const Address::Instance& local_address,
UdpPacketProcessor& udp_packet_processor,
MonotonicTime receive_time, bool prefer_gro,
MonotonicTime receive_time, bool use_gro,
uint32_t* packets_dropped) {

if (prefer_gro && handle.supportsUdpGro()) {
if (use_gro) {
Buffer::InstancePtr buffer = std::make_unique<Buffer::OwnedImpl>();
IoHandle::RecvMsgOutput output(1, packets_dropped);

Expand Down Expand Up @@ -696,11 +697,24 @@ Api::IoErrorPtr Utility::readPacketsFromSocket(IoHandle& handle,
UdpPacketProcessor& udp_packet_processor,
TimeSource& time_source, bool prefer_gro,
uint32_t& packets_dropped) {
// Read at least one time, and attempt to read numPacketsExpectedPerEventLoop() packets unless
// this goes over MAX_NUM_PACKETS_PER_EVENT_LOOP.
size_t num_packets_to_read = std::min<size_t>(
MAX_NUM_PACKETS_PER_EVENT_LOOP, udp_packet_processor.numPacketsExpectedPerEventLoop());
const bool use_gro = prefer_gro && handle.supportsUdpGro();
size_t num_reads =
use_gro ? (num_packets_to_read / NUM_DATAGRAMS_PER_GRO_RECEIVE)
: (handle.supportsMmsg() ? (num_packets_to_read / NUM_DATAGRAMS_PER_MMSG_RECEIVE)
: num_packets_to_read);
// Make sure to read at least once.
num_reads = std::max<size_t>(1, num_reads);
bool honor_read_limit =
Runtime::runtimeFeatureEnabled("envoy.reloadable_features.udp_per_event_loop_read_limit");
do {
const uint32_t old_packets_dropped = packets_dropped;
const MonotonicTime receive_time = time_source.monotonicTime();
Api::IoCallUint64Result result = Utility::readFromSocket(
handle, local_address, udp_packet_processor, receive_time, prefer_gro, &packets_dropped);
handle, local_address, udp_packet_processor, receive_time, use_gro, &packets_dropped);

if (!result.ok()) {
// No more to read or encountered a system error.
Expand All @@ -723,6 +737,12 @@ Api::IoErrorPtr Utility::readPacketsFromSocket(IoHandle& handle,
delta);
udp_packet_processor.onDatagramsDropped(delta);
}
if (honor_read_limit) {
--num_reads;
}
if (num_reads == 0) {
return std::move(result.err_);
}
} while (true);
}

Expand Down
14 changes: 11 additions & 3 deletions source/common/network/utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,17 @@ class UdpPacketProcessor {
* the size of datagrams received, they will be dropped.
*/
virtual uint64_t maxDatagramSize() const PURE;

/**
* An estimated number of packets to read in each read event.
*/
virtual size_t numPacketsExpectedPerEventLoop() const PURE;
};

static const uint64_t DEFAULT_UDP_MAX_DATAGRAM_SIZE = 1500;
static const uint64_t NUM_DATAGRAMS_PER_GRO_RECEIVE = 16;
static const uint64_t NUM_DATAGRAMS_PER_MMSG_RECEIVE = 16;
static const uint64_t MAX_NUM_PACKETS_PER_EVENT_LOOP = 6000;
alyssawilk marked this conversation as resolved.
Show resolved Hide resolved

/**
* Wrapper which resolves UDP socket proto config with defaults.
Expand Down Expand Up @@ -362,18 +368,20 @@ class Utility {
static Api::IoCallUint64Result readFromSocket(IoHandle& handle,
const Address::Instance& local_address,
UdpPacketProcessor& udp_packet_processor,
MonotonicTime receive_time, bool prefer_gro,
MonotonicTime receive_time, bool use_gro,
uint32_t* packets_dropped);

/**
* Read available packets from a given UDP socket and pass the packet to a given
* UdpPacketProcessor.
* Read some packets from a given UDP socket and pass the packet to a given
* UdpPacketProcessor. Read no more than MAX_NUM_PACKETS_PER_EVENT_LOOP packets.
* @param handle is the UDP socket to read from.
* @param local_address is the socket's local address used to populate port.
* @param udp_packet_processor is the callback to receive the packets.
* @param time_source is the time source used to generate the time stamp of the received packets.
* @param prefer_gro supplies whether to use GRO if the OS supports it.
* @param packets_dropped is the output parameter for number of packets dropped in kernel.
* Return the io error encountered or nullptr if no io error but read stopped
* because of MAX_NUM_PACKETS_PER_EVENT_LOOP.
*
* TODO(mattklein123): Allow the number of packets read to be limited for fairness. Currently
* this function will always return an error, even if EAGAIN. In the future
Expand Down
30 changes: 22 additions & 8 deletions source/common/quic/active_quic_listener.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "common/quic/envoy_quic_proof_source.h"
#include "common/quic/envoy_quic_utils.h"
#include "common/quic/envoy_quic_utils.h"
#include "common/quic/quic_network_connection.h"
#include "common/runtime/runtime_features.h"

namespace Envoy {
Expand All @@ -26,25 +27,29 @@ ActiveQuicListener::ActiveQuicListener(
uint32_t worker_index, uint32_t concurrency, Event::Dispatcher& dispatcher,
Network::UdpConnectionHandler& parent, Network::ListenerConfig& listener_config,
const quic::QuicConfig& quic_config, Network::Socket::OptionsSharedPtr options,
bool kernel_worker_routing, const envoy::config::core::v3::RuntimeFeatureFlag& enabled)
bool kernel_worker_routing, const envoy::config::core::v3::RuntimeFeatureFlag& enabled,
uint32_t packets_received_to_connection_count_ratio)
: ActiveQuicListener(worker_index, concurrency, dispatcher, parent,
listener_config.listenSocketFactory().getListenSocket(), listener_config,
quic_config, std::move(options), kernel_worker_routing, enabled) {}
quic_config, std::move(options), kernel_worker_routing, enabled,
packets_received_to_connection_count_ratio) {}

ActiveQuicListener::ActiveQuicListener(
uint32_t worker_index, uint32_t concurrency, Event::Dispatcher& dispatcher,
Network::UdpConnectionHandler& parent, Network::SocketSharedPtr listen_socket,
Network::ListenerConfig& listener_config, const quic::QuicConfig& quic_config,
Network::Socket::OptionsSharedPtr options, bool kernel_worker_routing,
const envoy::config::core::v3::RuntimeFeatureFlag& enabled)
const envoy::config::core::v3::RuntimeFeatureFlag& enabled,
uint32_t packets_to_read_to_connection_count_ratio)
: Server::ActiveUdpListenerBase(
worker_index, concurrency, parent, *listen_socket,
dispatcher.createUdpListener(
listen_socket, *this,
listener_config.udpListenerConfig()->config().downstream_socket_config()),
&listener_config),
dispatcher_(dispatcher), version_manager_(quic::CurrentSupportedVersions()),
kernel_worker_routing_(kernel_worker_routing) {
kernel_worker_routing_(kernel_worker_routing),
packets_to_read_to_connection_count_ratio_(packets_to_read_to_connection_count_ratio) {
// This flag fix a QUICHE issue which may crash Envoy during connection close.
SetQuicReloadableFlag(quic_single_ack_in_packet2, true);
// Do not include 32-byte per-entry overhead while counting header size.
Expand Down Expand Up @@ -215,9 +220,18 @@ uint32_t ActiveQuicListener::destination(const Network::UdpRecvData& data) const
return connection_id_snippet % concurrency_;
}

size_t ActiveQuicListener::numPacketsExpectedPerEventLoop() const {
// Expect each session to read packets_to_read_to_connection_count_ratio_ number of packets in
// this read event.
return quic_dispatcher_->NumSessions() * packets_to_read_to_connection_count_ratio_;
}

ActiveQuicListenerFactory::ActiveQuicListenerFactory(
const envoy::config::listener::v3::QuicProtocolOptions& config, uint32_t concurrency)
: concurrency_(concurrency), enabled_(config.enabled()) {
: concurrency_(concurrency), enabled_(config.enabled()),
packets_to_read_to_connection_count_ratio_(
PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, packets_to_read_to_connection_count_ratio,
DEFAULT_PACKETS_TO_READ_PER_CONNECTION)) {
uint64_t idle_network_timeout_ms =
config.has_idle_timeout() ? DurationUtil::durationToMilliseconds(config.idle_timeout())
: 300000;
Expand Down Expand Up @@ -300,9 +314,9 @@ Network::ConnectionHandler::ActiveUdpListenerPtr ActiveQuicListenerFactory::crea
}
#endif

return std::make_unique<ActiveQuicListener>(worker_index, concurrency_, disptacher, parent,
config, quic_config_, std::move(options),
kernel_worker_routing, enabled_);
return std::make_unique<ActiveQuicListener>(
worker_index, concurrency_, disptacher, parent, config, quic_config_, std::move(options),
kernel_worker_routing, enabled_, packets_to_read_to_connection_count_ratio_);
} // namespace Quic

} // namespace Quic
Expand Down
Loading