Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

udp: limit number of reads per event loop #16180

Merged
merged 34 commits into from
May 24, 2021
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
a84505d
add read limit
danzh1989 Apr 23, 2021
879cd6f
test pass
danzh1989 Apr 26, 2021
24ae38d
refine comments
danzh1989 Apr 26, 2021
48a644f
return num packets per loop
danzh1989 May 2, 2021
b89f7fe
typo
danzh1989 May 3, 2021
758c321
change udp_proxy_filter
danzh1989 May 3, 2021
ded13a6
fix udp proxy test
danzh1989 May 3, 2021
51c3404
fix build
danzh1989 May 3, 2021
f0949ea
fix build error
danzh1989 May 5, 2021
0858725
fix udp_fuzz
danzh1989 May 6, 2021
001f228
add config
danzh1989 May 6, 2021
efb5bef
Merge branch 'master' into udpreadlimit
danzh1989 May 6, 2021
09ba49d
spelling
danzh1989 May 6, 2021
fb6bb16
fix sign compare
danzh1989 May 6, 2021
372b3e3
fix for non-recvmmsg platform
danzh1989 May 8, 2021
2151e2d
added DEFAULT_PACKETS_TO_READ_PER_CONNECTION
danzh1989 May 12, 2021
d9989e8
make it build
danzh1989 May 12, 2021
e0aae6f
rename some variables
danzh1989 May 12, 2021
dcda8e8
fix built
danzh1989 May 17, 2021
80e91f6
remove READ
danzh1989 May 17, 2021
8cd5cda
update protobuf
danzh1989 May 17, 2021
45e3518
update comment
danzh1989 May 18, 2021
3be3b84
add runtime guard
danzh1989 May 18, 2021
3d82197
add runtime guard
danzh1989 May 18, 2021
077b6d3
Merge branch 'master' into udpreadlimit
danzh1989 May 18, 2021
33d1bd3
change order
danzh1989 May 18, 2021
9928fb3
use atomic in InjectableSingleton
danzh1989 May 18, 2021
5700114
Merge branch 'master' into udpreadlimit
danzh1989 May 18, 2021
3ec57b7
re-enable tests
danzh1989 May 18, 2021
7c0b9d4
Merge branch 'master' into udpreadlimit
danzh1989 May 19, 2021
c3f20a5
Merge branch 'master' into udpreadlimit
danzh1989 May 19, 2021
6622e2b
Merge branch 'master' into udpreadlimit
danzh1989 May 20, 2021
d8538ed
Merge branch 'master' into udpreadlimit
danzh1989 May 20, 2021
ee2db2d
disable tests uncer coverage
danzh1989 May 20, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions api/envoy/config/listener/v3/quic_config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import "envoy/config/core/v3/base.proto";
import "envoy/config/core/v3/protocol.proto";

import "google/protobuf/duration.proto";
import "google/protobuf/wrappers.proto";

import "udpa/annotations/status.proto";
import "udpa/annotations/versioning.proto";
import "validate/validate.proto";

option java_package = "io.envoyproxy.envoy.config.listener.v3";
option java_outer_classname = "QuicConfigProto";
Expand All @@ -18,6 +20,7 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE;
// [#protodoc-title: QUIC listener config]

// Configuration specific to the UDP QUIC listener.
// [#next-free-field: 6]
message QuicProtocolOptions {
option (udpa.annotations.versioning).previous_message_type =
"envoy.api.v2.listener.QuicProtocolOptions";
Expand All @@ -35,4 +38,9 @@ message QuicProtocolOptions {
// Runtime flag that controls whether the listener is enabled or not. If not specified, defaults
// to enabled.
core.v3.RuntimeFeatureFlag enabled = 4;

// A multiplier to number of connections which is used to determine how many packets to read per event loop. A reasonable ratio should allow the listener to process enough payload but not starve TCP and other UDP sockets and also prevent long event loop duration.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: line length

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// The default value is 32. This means if there are N QUIC connections, the total number of packets to read in each READ event will be 32 * N.
google.protobuf.UInt32Value packets_received_to_connection_count_ratio = 5
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optional: packets_to_read_per_connection ?

We should document the upper bound here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

[(validate.rules).uint32 = {gte: 1}];
alyssawilk marked this conversation as resolved.
Show resolved Hide resolved
}
8 changes: 8 additions & 0 deletions api/envoy/config/listener/v4alpha/quic_config.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions include/envoy/network/listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,11 @@ class UdpListenerCallbacks {
* Posts ``data`` to be delivered on this worker.
*/
virtual void post(Network::UdpRecvData&& data) PURE;

/**
* An estimated number of packets this callback expects to process in current READ event.
*/
virtual size_t numPacketsExpectedPerEventLoop() const PURE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is READ in all caps?

also for naming is this the number of packets expected, or the number of packets the listener should be willing to read?

Should we make it clear in comments this is TCP specific?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

READ doesn't need to be all caps. Fixed.

For the naming confusion, what's the difference between "the number of packets expected" and "the number of packets the listener should be willing to read"? This interface provides a hint to UDP listener but eventually the listener will decide how many to read.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is READ in all caps?

READ doesn't need to be all caps. Fixed.

also for naming is this the number of packets expected, or the number of packets the listener should be willing to read?

What's the difference between "the number of packets expected" and "the number of packets the listener should be willing to read"? This interface provides a hint to UDP listener but eventually the listener will decide how many to read.

Should we make it clear in comments this is TCP specific?

UDP specific? This is in UdpListenerCallbacks after all.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's expected vs allowed to be read.
If we have 10 sessions it's not that we expect 160 packets, it's that we'll read 16 packets per loop.

and yes, I meant UDP. UdpPackets maybe since tcp packets are a thing too?

};

using UdpListenerCallbacksOptRef = absl::optional<std::reference_wrapper<UdpListenerCallbacks>>;
Expand Down
7 changes: 6 additions & 1 deletion source/common/network/udp_listener_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,12 @@ void UdpListenerImpl::handleReadCallback() {
const Api::IoErrorPtr result = Utility::readPacketsFromSocket(
socket_->ioHandle(), *socket_->addressProvider().localAddress(), *this, time_source_,
config_.prefer_gro_, packets_dropped_);
// TODO(mattklein123): Handle no error when we limit the number of packets read.
if (result == nullptr) {
// No error. The number of reads was limited by read rate. There are more packets to read.
// Register READ for next event loop.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Register to read more in the next event loop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

socket_->ioHandle().activateFileEvents(Event::FileReadyType::Read);
return;
}
if (result->getErrorCode() != Api::IoError::IoErrorCode::Again) {
// TODO(mattklein123): When rate limited logging is implemented log this at error level
// on a periodic basis.
Expand Down
3 changes: 3 additions & 0 deletions source/common/network/udp_listener_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ class UdpListenerImpl : public BaseListenerImpl,
MonotonicTime receive_time) override;
uint64_t maxDatagramSize() const override { return config_.max_rx_datagram_size_; }
void onDatagramsDropped(uint32_t dropped) override { cb_.onDatagramsDropped(dropped); }
size_t numPacketsExpectedPerEventLoop() const override {
return cb_.numPacketsExpectedPerEventLoop();
}

protected:
void handleWriteCallback();
Expand Down
21 changes: 18 additions & 3 deletions source/common/network/utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -576,10 +576,10 @@ void passPayloadToProcessor(uint64_t bytes_read, Buffer::InstancePtr buffer,
Api::IoCallUint64Result Utility::readFromSocket(IoHandle& handle,
const Address::Instance& local_address,
UdpPacketProcessor& udp_packet_processor,
MonotonicTime receive_time, bool prefer_gro,
MonotonicTime receive_time, bool use_gro,
uint32_t* packets_dropped) {

if (prefer_gro && handle.supportsUdpGro()) {
if (use_gro) {
Buffer::InstancePtr buffer = std::make_unique<Buffer::OwnedImpl>();
IoHandle::RecvMsgOutput output(1, packets_dropped);

Expand Down Expand Up @@ -696,11 +696,22 @@ Api::IoErrorPtr Utility::readPacketsFromSocket(IoHandle& handle,
UdpPacketProcessor& udp_packet_processor,
TimeSource& time_source, bool prefer_gro,
uint32_t& packets_dropped) {
// Make sure at not too many packets will be read in following loop, but at least it will read
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this comment needs a rephrase.
maybe

Read at least one time, and attempt to read numPacketsExpectedPerEventLoop() packets unless this goes over MAX_NUM_PACKETS_PER_EVENT_LOOP

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// once.
size_t num_packets_to_read = std::min(MAX_NUM_PACKETS_PER_EVENT_LOOP,
udp_packet_processor.numPacketsExpectedPerEventLoop());
const bool use_gro = prefer_gro && handle.supportsUdpGro();
size_t num_reads =
use_gro ? (num_packets_to_read / NUM_DATAGRAMS_PER_GRO_RECEIVE)
: (handle.supportsMmsg() ? (num_packets_to_read / NUM_DATAGRAMS_PER_MMSG_RECEIVE)
: num_packets_to_read);
// Make sure at least read once.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Make sure to read at least once

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

num_reads = std::max(1ul, num_reads);
do {
const uint32_t old_packets_dropped = packets_dropped;
const MonotonicTime receive_time = time_source.monotonicTime();
Api::IoCallUint64Result result = Utility::readFromSocket(
handle, local_address, udp_packet_processor, receive_time, prefer_gro, &packets_dropped);
handle, local_address, udp_packet_processor, receive_time, use_gro, &packets_dropped);

if (!result.ok()) {
// No more to read or encountered a system error.
Expand All @@ -723,6 +734,10 @@ Api::IoErrorPtr Utility::readPacketsFromSocket(IoHandle& handle,
delta);
udp_packet_processor.onDatagramsDropped(delta);
}
--num_reads;
if (num_reads == 0) {
return std::move(result.err_);
}
} while (true);
}

Expand Down
14 changes: 11 additions & 3 deletions source/common/network/utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,17 @@ class UdpPacketProcessor {
* the size of datagrams received, they will be dropped.
*/
virtual uint64_t maxDatagramSize() const PURE;

/**
* An estimated number of packets to read in each READ event.
*/
virtual size_t numPacketsExpectedPerEventLoop() const PURE;
};

static const uint64_t DEFAULT_UDP_MAX_DATAGRAM_SIZE = 1500;
static const uint64_t NUM_DATAGRAMS_PER_GRO_RECEIVE = 16;
static const uint64_t NUM_DATAGRAMS_PER_MMSG_RECEIVE = 16;
static const uint64_t MAX_NUM_PACKETS_PER_EVENT_LOOP = 6000;
alyssawilk marked this conversation as resolved.
Show resolved Hide resolved

/**
* Wrapper which resolves UDP socket proto config with defaults.
Expand Down Expand Up @@ -362,18 +368,20 @@ class Utility {
static Api::IoCallUint64Result readFromSocket(IoHandle& handle,
const Address::Instance& local_address,
UdpPacketProcessor& udp_packet_processor,
MonotonicTime receive_time, bool prefer_gro,
MonotonicTime receive_time, bool use_gro,
uint32_t* packets_dropped);

/**
* Read available packets from a given UDP socket and pass the packet to a given
* UdpPacketProcessor.
* Read some packets from a given UDP socket and pass the packet to a given
* UdpPacketProcessor. Only read no more than MAX_NUM_PACKETS_PER_EVENT_LOOP packets.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Read no more than ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

* @param handle is the UDP socket to read from.
* @param local_address is the socket's local address used to populate port.
* @param udp_packet_processor is the callback to receive the packets.
* @param time_source is the time source used to generate the time stamp of the received packets.
* @param prefer_gro supplies whether to use GRO if the OS supports it.
* @param packets_dropped is the output parameter for number of packets dropped in kernel.
* Return the io error encountered or nullptr if no io error but read stopped
* because of MAX_NUM_PACKETS_PER_EVENT_LOOP.
*
* TODO(mattklein123): Allow the number of packets read to be limited for fairness. Currently
* this function will always return an error, even if EAGAIN. In the future
Expand Down
27 changes: 19 additions & 8 deletions source/common/quic/active_quic_listener.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,29 @@ ActiveQuicListener::ActiveQuicListener(
uint32_t worker_index, uint32_t concurrency, Event::Dispatcher& dispatcher,
Network::UdpConnectionHandler& parent, Network::ListenerConfig& listener_config,
const quic::QuicConfig& quic_config, Network::Socket::OptionsSharedPtr options,
bool kernel_worker_routing, const envoy::config::core::v3::RuntimeFeatureFlag& enabled)
bool kernel_worker_routing, const envoy::config::core::v3::RuntimeFeatureFlag& enabled,
uint32_t packets_received_to_connection_count_ratio)
: ActiveQuicListener(worker_index, concurrency, dispatcher, parent,
listener_config.listenSocketFactory().getListenSocket(), listener_config,
quic_config, std::move(options), kernel_worker_routing, enabled) {}
quic_config, std::move(options), kernel_worker_routing, enabled,
packets_received_to_connection_count_ratio) {}

ActiveQuicListener::ActiveQuicListener(
uint32_t worker_index, uint32_t concurrency, Event::Dispatcher& dispatcher,
Network::UdpConnectionHandler& parent, Network::SocketSharedPtr listen_socket,
Network::ListenerConfig& listener_config, const quic::QuicConfig& quic_config,
Network::Socket::OptionsSharedPtr options, bool kernel_worker_routing,
const envoy::config::core::v3::RuntimeFeatureFlag& enabled)
const envoy::config::core::v3::RuntimeFeatureFlag& enabled,
uint32_t packets_received_to_connection_count_ratio)
: Server::ActiveUdpListenerBase(
worker_index, concurrency, parent, *listen_socket,
dispatcher.createUdpListener(
listen_socket, *this,
listener_config.udpListenerConfig()->config().downstream_socket_config()),
&listener_config),
dispatcher_(dispatcher), version_manager_(quic::CurrentSupportedVersions()),
kernel_worker_routing_(kernel_worker_routing) {
kernel_worker_routing_(kernel_worker_routing),
packets_received_to_connection_count_ratio_(packets_received_to_connection_count_ratio) {
// This flag fix a QUICHE issue which may crash Envoy during connection close.
SetQuicReloadableFlag(quic_single_ack_in_packet2, true);
// Do not include 32-byte per-entry overhead while counting header size.
Expand Down Expand Up @@ -214,9 +218,16 @@ uint32_t ActiveQuicListener::destination(const Network::UdpRecvData& data) const
return connection_id_snippet % concurrency_;
}

size_t ActiveQuicListener::numPacketsExpectedPerEventLoop() const {
// Expect each session to read 32 packets per READ event.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the configured number may not be 32.?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

return quic_dispatcher_->NumSessions() * packets_received_to_connection_count_ratio_;
}

ActiveQuicListenerFactory::ActiveQuicListenerFactory(
const envoy::config::listener::v3::QuicProtocolOptions& config, uint32_t concurrency)
: concurrency_(concurrency), enabled_(config.enabled()) {
: concurrency_(concurrency), enabled_(config.enabled()),
packets_received_to_connection_count_ratio_(
PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, packets_received_to_connection_count_ratio, 32)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should 32 be a const default defined somewhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added MAX_NUM_PACKETS_PER_EVENT_LOOP

uint64_t idle_network_timeout_ms =
config.has_idle_timeout() ? DurationUtil::durationToMilliseconds(config.idle_timeout())
: 300000;
Expand Down Expand Up @@ -299,9 +310,9 @@ Network::ConnectionHandler::ActiveUdpListenerPtr ActiveQuicListenerFactory::crea
}
#endif

return std::make_unique<ActiveQuicListener>(worker_index, concurrency_, disptacher, parent,
config, quic_config_, std::move(options),
kernel_worker_routing, enabled_);
return std::make_unique<ActiveQuicListener>(
worker_index, concurrency_, disptacher, parent, config, quic_config_, std::move(options),
kernel_worker_routing, enabled_, packets_received_to_connection_count_ratio_);
} // namespace Quic

} // namespace Quic
Expand Down
10 changes: 8 additions & 2 deletions source/common/quic/active_quic_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ class ActiveQuicListener : public Envoy::Server::ActiveUdpListenerBase,
Network::UdpConnectionHandler& parent,
Network::ListenerConfig& listener_config, const quic::QuicConfig& quic_config,
Network::Socket::OptionsSharedPtr options, bool kernel_worker_routing,
const envoy::config::core::v3::RuntimeFeatureFlag& enabled);
const envoy::config::core::v3::RuntimeFeatureFlag& enabled,
uint32_t packets_received_to_connection_count_ratio);

ActiveQuicListener(uint32_t worker_index, uint32_t concurrency, Event::Dispatcher& dispatcher,
Network::UdpConnectionHandler& parent, Network::SocketSharedPtr listen_socket,
Network::ListenerConfig& listener_config, const quic::QuicConfig& quic_config,
Network::Socket::OptionsSharedPtr options, bool kernel_worker_routing,
const envoy::config::core::v3::RuntimeFeatureFlag& enabled);
const envoy::config::core::v3::RuntimeFeatureFlag& enabled,
uint32_t packets_received_to_connection_count_ratio);

~ActiveQuicListener() override;

Expand All @@ -52,6 +54,7 @@ class ActiveQuicListener : public Envoy::Server::ActiveUdpListenerBase,
Network::UdpPacketWriter& udpPacketWriter() override { return *udp_packet_writer_; }
void onDataWorker(Network::UdpRecvData&& data) override;
uint32_t destination(const Network::UdpRecvData& data) const override;
size_t numPacketsExpectedPerEventLoop() const override;

// ActiveListenerImplBase
void pauseListening() override;
Expand All @@ -73,6 +76,8 @@ class ActiveQuicListener : public Envoy::Server::ActiveUdpListenerBase,
// The number of runs of the event loop in which at least one CHLO was buffered.
// TODO(ggreenway): Consider making this a published stat, or some variation of this information.
uint64_t event_loops_with_buffered_chlo_for_test_{0};

uint32_t packets_received_to_connection_count_ratio_;
};

using ActiveQuicListenerPtr = std::unique_ptr<ActiveQuicListener>;
Expand All @@ -97,6 +102,7 @@ class ActiveQuicListenerFactory : public Network::ActiveUdpListenerFactory,
const uint32_t concurrency_;
absl::once_flag install_bpf_once_;
envoy::config::core::v3::RuntimeFeatureFlag enabled_;
const uint32_t packets_received_to_connection_count_ratio_;
};

} // namespace Quic
Expand Down
5 changes: 4 additions & 1 deletion source/common/quic/envoy_quic_client_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,10 @@ void EnvoyQuicClientConnection::onFileEvent(uint32_t events) {
Api::IoErrorPtr err = Network::Utility::readPacketsFromSocket(
connectionSocket()->ioHandle(), *connectionSocket()->addressProvider().localAddress(),
*this, dispatcher_.timeSource(), true, packets_dropped_);
// TODO(danzh): Handle no error when we limit the number of packets read.
if (err == nullptr) {
connectionSocket()->ioHandle().activateFileEvents(Event::FileReadyType::Read);
return;
}
if (err->getErrorCode() != Api::IoError::IoErrorCode::Again) {
ENVOY_CONN_LOG(error, "recvmsg result {}: {}", *this, static_cast<int>(err->getErrorCode()),
err->getErrorDetails());
Expand Down
4 changes: 4 additions & 0 deletions source/common/quic/envoy_quic_client_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ class EnvoyQuicClientConnection : public quic::QuicConnection,
void onDatagramsDropped(uint32_t) override {
// TODO(mattklein123): Emit a stat for this.
}
size_t numPacketsExpectedPerEventLoop() const override {
// Use ~32k to read the same amount as a TCP connection.
return 32u;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, let's define this somewhere, comment there, and use in both places.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

// Register file event and apply socket options.
void setUpConnectionSocket();
Expand Down
Loading