diff --git a/api/envoy/config/listener/v3/quic_config.proto b/api/envoy/config/listener/v3/quic_config.proto index 69df722c6fbb..d1e62cdaaf15 100644 --- a/api/envoy/config/listener/v3/quic_config.proto +++ b/api/envoy/config/listener/v3/quic_config.proto @@ -6,9 +6,11 @@ import "envoy/config/core/v3/base.proto"; import "envoy/config/core/v3/protocol.proto"; import "google/protobuf/duration.proto"; +import "google/protobuf/wrappers.proto"; import "udpa/annotations/status.proto"; import "udpa/annotations/versioning.proto"; +import "validate/validate.proto"; option java_package = "io.envoyproxy.envoy.config.listener.v3"; option java_outer_classname = "QuicConfigProto"; @@ -18,6 +20,7 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE; // [#protodoc-title: QUIC listener config] // Configuration specific to the UDP QUIC listener. +// [#next-free-field: 6] message QuicProtocolOptions { option (udpa.annotations.versioning).previous_message_type = "envoy.api.v2.listener.QuicProtocolOptions"; @@ -35,4 +38,14 @@ message QuicProtocolOptions { // Runtime flag that controls whether the listener is enabled or not. If not specified, defaults // to enabled. core.v3.RuntimeFeatureFlag enabled = 4; + + // A multiplier to number of connections which is used to determine how many packets to read per + // event loop. A reasonable number should allow the listener to process enough payload but not + // starve TCP and other UDP sockets and also prevent long event loop duration. + // The default value is 32. This means if there are N QUIC connections, the total number of + // packets to read in each read event will be 32 * N. + // The actual number of packets to read in total by the UDP listener is also + // bound by 6000, regardless of this field or how many connections there are. + google.protobuf.UInt32Value packets_to_read_to_connection_count_ratio = 5 + [(validate.rules).uint32 = {gte: 1}]; } diff --git a/api/envoy/config/listener/v4alpha/quic_config.proto b/api/envoy/config/listener/v4alpha/quic_config.proto index c9e218137ae2..6d0f5e51493b 100644 --- a/api/envoy/config/listener/v4alpha/quic_config.proto +++ b/api/envoy/config/listener/v4alpha/quic_config.proto @@ -6,9 +6,11 @@ import "envoy/config/core/v4alpha/base.proto"; import "envoy/config/core/v4alpha/protocol.proto"; import "google/protobuf/duration.proto"; +import "google/protobuf/wrappers.proto"; import "udpa/annotations/status.proto"; import "udpa/annotations/versioning.proto"; +import "validate/validate.proto"; option java_package = "io.envoyproxy.envoy.config.listener.v4alpha"; option java_outer_classname = "QuicConfigProto"; @@ -18,6 +20,7 @@ option (udpa.annotations.file_status).package_version_status = NEXT_MAJOR_VERSIO // [#protodoc-title: QUIC listener config] // Configuration specific to the UDP QUIC listener. +// [#next-free-field: 6] message QuicProtocolOptions { option (udpa.annotations.versioning).previous_message_type = "envoy.config.listener.v3.QuicProtocolOptions"; @@ -35,4 +38,14 @@ message QuicProtocolOptions { // Runtime flag that controls whether the listener is enabled or not. If not specified, defaults // to enabled. core.v4alpha.RuntimeFeatureFlag enabled = 4; + + // A multiplier to number of connections which is used to determine how many packets to read per + // event loop. A reasonable number should allow the listener to process enough payload but not + // starve TCP and other UDP sockets and also prevent long event loop duration. + // The default value is 32. This means if there are N QUIC connections, the total number of + // packets to read in each read event will be 32 * N. + // The actual number of packets to read in total by the UDP listener is also + // bound by 6000, regardless of this field or how many connections there are. + google.protobuf.UInt32Value packets_to_read_to_connection_count_ratio = 5 + [(validate.rules).uint32 = {gte: 1}]; } diff --git a/docs/root/version_history/current.rst b/docs/root/version_history/current.rst index 332df9444950..d0c325387d59 100644 --- a/docs/root/version_history/current.rst +++ b/docs/root/version_history/current.rst @@ -34,7 +34,6 @@ Minor Behavior Changes defined within the listener where the sockets are redirected to. Clear that field to restore the previous behavior. * tcp: switched to the new connection pool by default. Any unexpected behavioral changes can be reverted by setting runtime guard ``envoy.reloadable_features.new_tcp_connection_pool`` to false. - Bug Fixes --------- *Changes expected to improve the state of the world and are unlikely to have negative effects* @@ -42,6 +41,7 @@ Bug Fixes * http: port stripping now works for CONNECT requests, though the port will be restored if the CONNECT request is sent upstream. This behavior can be temporarily reverted by setting ``envoy.reloadable_features.strip_port_from_connect`` to false. * http: raise max configurable max_request_headers_kb limit to 8192 KiB (8MiB) from 96 KiB in http connection manager. * listener: fix the crash which could happen when the ongoing filter chain only listener update is followed by the listener removal or full listener update. +* udp: limit each UDP listener to read maxmium 6000 packets per event loop. This behavior can be temporarily reverted by setting ``envoy.reloadable_features.udp_per_event_loop_read_limit`` to false. * validation: fix an issue that causes TAP sockets to panic during config validation mode. * xray: fix the default sampling 'rate' for AWS X-Ray tracer extension to be 5% as opposed to 50%. * zipkin: fix timestamp serializaiton in annotations. A prior bug fix exposed an issue with timestamps being serialized as strings. diff --git a/generated_api_shadow/envoy/config/listener/v3/quic_config.proto b/generated_api_shadow/envoy/config/listener/v3/quic_config.proto index 69df722c6fbb..d1e62cdaaf15 100644 --- a/generated_api_shadow/envoy/config/listener/v3/quic_config.proto +++ b/generated_api_shadow/envoy/config/listener/v3/quic_config.proto @@ -6,9 +6,11 @@ import "envoy/config/core/v3/base.proto"; import "envoy/config/core/v3/protocol.proto"; import "google/protobuf/duration.proto"; +import "google/protobuf/wrappers.proto"; import "udpa/annotations/status.proto"; import "udpa/annotations/versioning.proto"; +import "validate/validate.proto"; option java_package = "io.envoyproxy.envoy.config.listener.v3"; option java_outer_classname = "QuicConfigProto"; @@ -18,6 +20,7 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE; // [#protodoc-title: QUIC listener config] // Configuration specific to the UDP QUIC listener. +// [#next-free-field: 6] message QuicProtocolOptions { option (udpa.annotations.versioning).previous_message_type = "envoy.api.v2.listener.QuicProtocolOptions"; @@ -35,4 +38,14 @@ message QuicProtocolOptions { // Runtime flag that controls whether the listener is enabled or not. If not specified, defaults // to enabled. core.v3.RuntimeFeatureFlag enabled = 4; + + // A multiplier to number of connections which is used to determine how many packets to read per + // event loop. A reasonable number should allow the listener to process enough payload but not + // starve TCP and other UDP sockets and also prevent long event loop duration. + // The default value is 32. This means if there are N QUIC connections, the total number of + // packets to read in each read event will be 32 * N. + // The actual number of packets to read in total by the UDP listener is also + // bound by 6000, regardless of this field or how many connections there are. + google.protobuf.UInt32Value packets_to_read_to_connection_count_ratio = 5 + [(validate.rules).uint32 = {gte: 1}]; } diff --git a/generated_api_shadow/envoy/config/listener/v4alpha/quic_config.proto b/generated_api_shadow/envoy/config/listener/v4alpha/quic_config.proto index c9e218137ae2..6d0f5e51493b 100644 --- a/generated_api_shadow/envoy/config/listener/v4alpha/quic_config.proto +++ b/generated_api_shadow/envoy/config/listener/v4alpha/quic_config.proto @@ -6,9 +6,11 @@ import "envoy/config/core/v4alpha/base.proto"; import "envoy/config/core/v4alpha/protocol.proto"; import "google/protobuf/duration.proto"; +import "google/protobuf/wrappers.proto"; import "udpa/annotations/status.proto"; import "udpa/annotations/versioning.proto"; +import "validate/validate.proto"; option java_package = "io.envoyproxy.envoy.config.listener.v4alpha"; option java_outer_classname = "QuicConfigProto"; @@ -18,6 +20,7 @@ option (udpa.annotations.file_status).package_version_status = NEXT_MAJOR_VERSIO // [#protodoc-title: QUIC listener config] // Configuration specific to the UDP QUIC listener. +// [#next-free-field: 6] message QuicProtocolOptions { option (udpa.annotations.versioning).previous_message_type = "envoy.config.listener.v3.QuicProtocolOptions"; @@ -35,4 +38,14 @@ message QuicProtocolOptions { // Runtime flag that controls whether the listener is enabled or not. If not specified, defaults // to enabled. core.v4alpha.RuntimeFeatureFlag enabled = 4; + + // A multiplier to number of connections which is used to determine how many packets to read per + // event loop. A reasonable number should allow the listener to process enough payload but not + // starve TCP and other UDP sockets and also prevent long event loop duration. + // The default value is 32. This means if there are N QUIC connections, the total number of + // packets to read in each read event will be 32 * N. + // The actual number of packets to read in total by the UDP listener is also + // bound by 6000, regardless of this field or how many connections there are. + google.protobuf.UInt32Value packets_to_read_to_connection_count_ratio = 5 + [(validate.rules).uint32 = {gte: 1}]; } diff --git a/include/envoy/network/listener.h b/include/envoy/network/listener.h index 7c6eabe3e3c6..73dbced7fa93 100644 --- a/include/envoy/network/listener.h +++ b/include/envoy/network/listener.h @@ -330,6 +330,11 @@ class UdpListenerCallbacks { * Posts ``data`` to be delivered on this worker. */ virtual void post(Network::UdpRecvData&& data) PURE; + + /** + * An estimated number of UDP packets this callback expects to process in current read event. + */ + virtual size_t numPacketsExpectedPerEventLoop() const PURE; }; using UdpListenerCallbacksOptRef = absl::optional>; diff --git a/source/common/network/udp_listener_impl.cc b/source/common/network/udp_listener_impl.cc index 5b5b98b34087..940ae25f9073 100644 --- a/source/common/network/udp_listener_impl.cc +++ b/source/common/network/udp_listener_impl.cc @@ -75,7 +75,12 @@ void UdpListenerImpl::handleReadCallback() { const Api::IoErrorPtr result = Utility::readPacketsFromSocket( socket_->ioHandle(), *socket_->addressProvider().localAddress(), *this, time_source_, config_.prefer_gro_, packets_dropped_); - // TODO(mattklein123): Handle no error when we limit the number of packets read. + if (result == nullptr) { + // No error. The number of reads was limited by read rate. There are more packets to read. + // Register to read more in the next event loop. + socket_->ioHandle().activateFileEvents(Event::FileReadyType::Read); + return; + } if (result->getErrorCode() != Api::IoError::IoErrorCode::Again) { // TODO(mattklein123): When rate limited logging is implemented log this at error level // on a periodic basis. diff --git a/source/common/network/udp_listener_impl.h b/source/common/network/udp_listener_impl.h index aac7aa1c957d..afd78713eb89 100644 --- a/source/common/network/udp_listener_impl.h +++ b/source/common/network/udp_listener_impl.h @@ -46,6 +46,9 @@ class UdpListenerImpl : public BaseListenerImpl, MonotonicTime receive_time) override; uint64_t maxDatagramSize() const override { return config_.max_rx_datagram_size_; } void onDatagramsDropped(uint32_t dropped) override { cb_.onDatagramsDropped(dropped); } + size_t numPacketsExpectedPerEventLoop() const override { + return cb_.numPacketsExpectedPerEventLoop(); + } protected: void handleWriteCallback(); diff --git a/source/common/network/utility.cc b/source/common/network/utility.cc index d30b5770b63e..190a07f64bf7 100644 --- a/source/common/network/utility.cc +++ b/source/common/network/utility.cc @@ -23,6 +23,7 @@ #include "common/network/io_socket_error_impl.h" #include "common/protobuf/protobuf.h" #include "common/protobuf/utility.h" +#include "common/runtime/runtime_features.h" #include "absl/container/fixed_array.h" #include "absl/strings/match.h" @@ -576,10 +577,10 @@ void passPayloadToProcessor(uint64_t bytes_read, Buffer::InstancePtr buffer, Api::IoCallUint64Result Utility::readFromSocket(IoHandle& handle, const Address::Instance& local_address, UdpPacketProcessor& udp_packet_processor, - MonotonicTime receive_time, bool prefer_gro, + MonotonicTime receive_time, bool use_gro, uint32_t* packets_dropped) { - if (prefer_gro && handle.supportsUdpGro()) { + if (use_gro) { Buffer::InstancePtr buffer = std::make_unique(); IoHandle::RecvMsgOutput output(1, packets_dropped); @@ -696,11 +697,24 @@ Api::IoErrorPtr Utility::readPacketsFromSocket(IoHandle& handle, UdpPacketProcessor& udp_packet_processor, TimeSource& time_source, bool prefer_gro, uint32_t& packets_dropped) { + // Read at least one time, and attempt to read numPacketsExpectedPerEventLoop() packets unless + // this goes over MAX_NUM_PACKETS_PER_EVENT_LOOP. + size_t num_packets_to_read = std::min( + MAX_NUM_PACKETS_PER_EVENT_LOOP, udp_packet_processor.numPacketsExpectedPerEventLoop()); + const bool use_gro = prefer_gro && handle.supportsUdpGro(); + size_t num_reads = + use_gro ? (num_packets_to_read / NUM_DATAGRAMS_PER_GRO_RECEIVE) + : (handle.supportsMmsg() ? (num_packets_to_read / NUM_DATAGRAMS_PER_MMSG_RECEIVE) + : num_packets_to_read); + // Make sure to read at least once. + num_reads = std::max(1, num_reads); + bool honor_read_limit = + Runtime::runtimeFeatureEnabled("envoy.reloadable_features.udp_per_event_loop_read_limit"); do { const uint32_t old_packets_dropped = packets_dropped; const MonotonicTime receive_time = time_source.monotonicTime(); Api::IoCallUint64Result result = Utility::readFromSocket( - handle, local_address, udp_packet_processor, receive_time, prefer_gro, &packets_dropped); + handle, local_address, udp_packet_processor, receive_time, use_gro, &packets_dropped); if (!result.ok()) { // No more to read or encountered a system error. @@ -723,6 +737,12 @@ Api::IoErrorPtr Utility::readPacketsFromSocket(IoHandle& handle, delta); udp_packet_processor.onDatagramsDropped(delta); } + if (honor_read_limit) { + --num_reads; + } + if (num_reads == 0) { + return std::move(result.err_); + } } while (true); } diff --git a/source/common/network/utility.h b/source/common/network/utility.h index 2e58c1d9606b..c892a8efb4bb 100644 --- a/source/common/network/utility.h +++ b/source/common/network/utility.h @@ -61,11 +61,17 @@ class UdpPacketProcessor { * the size of datagrams received, they will be dropped. */ virtual uint64_t maxDatagramSize() const PURE; + + /** + * An estimated number of packets to read in each read event. + */ + virtual size_t numPacketsExpectedPerEventLoop() const PURE; }; static const uint64_t DEFAULT_UDP_MAX_DATAGRAM_SIZE = 1500; static const uint64_t NUM_DATAGRAMS_PER_GRO_RECEIVE = 16; static const uint64_t NUM_DATAGRAMS_PER_MMSG_RECEIVE = 16; +static const uint64_t MAX_NUM_PACKETS_PER_EVENT_LOOP = 6000; /** * Wrapper which resolves UDP socket proto config with defaults. @@ -362,18 +368,20 @@ class Utility { static Api::IoCallUint64Result readFromSocket(IoHandle& handle, const Address::Instance& local_address, UdpPacketProcessor& udp_packet_processor, - MonotonicTime receive_time, bool prefer_gro, + MonotonicTime receive_time, bool use_gro, uint32_t* packets_dropped); /** - * Read available packets from a given UDP socket and pass the packet to a given - * UdpPacketProcessor. + * Read some packets from a given UDP socket and pass the packet to a given + * UdpPacketProcessor. Read no more than MAX_NUM_PACKETS_PER_EVENT_LOOP packets. * @param handle is the UDP socket to read from. * @param local_address is the socket's local address used to populate port. * @param udp_packet_processor is the callback to receive the packets. * @param time_source is the time source used to generate the time stamp of the received packets. * @param prefer_gro supplies whether to use GRO if the OS supports it. * @param packets_dropped is the output parameter for number of packets dropped in kernel. + * Return the io error encountered or nullptr if no io error but read stopped + * because of MAX_NUM_PACKETS_PER_EVENT_LOOP. * * TODO(mattklein123): Allow the number of packets read to be limited for fairness. Currently * this function will always return an error, even if EAGAIN. In the future diff --git a/source/common/quic/active_quic_listener.cc b/source/common/quic/active_quic_listener.cc index ff1f8d2c6462..75d32d5792cf 100644 --- a/source/common/quic/active_quic_listener.cc +++ b/source/common/quic/active_quic_listener.cc @@ -17,6 +17,7 @@ #include "common/quic/envoy_quic_proof_source.h" #include "common/quic/envoy_quic_utils.h" #include "common/quic/envoy_quic_utils.h" +#include "common/quic/quic_network_connection.h" #include "common/runtime/runtime_features.h" namespace Envoy { @@ -26,17 +27,20 @@ ActiveQuicListener::ActiveQuicListener( uint32_t worker_index, uint32_t concurrency, Event::Dispatcher& dispatcher, Network::UdpConnectionHandler& parent, Network::ListenerConfig& listener_config, const quic::QuicConfig& quic_config, Network::Socket::OptionsSharedPtr options, - bool kernel_worker_routing, const envoy::config::core::v3::RuntimeFeatureFlag& enabled) + bool kernel_worker_routing, const envoy::config::core::v3::RuntimeFeatureFlag& enabled, + uint32_t packets_received_to_connection_count_ratio) : ActiveQuicListener(worker_index, concurrency, dispatcher, parent, listener_config.listenSocketFactory().getListenSocket(), listener_config, - quic_config, std::move(options), kernel_worker_routing, enabled) {} + quic_config, std::move(options), kernel_worker_routing, enabled, + packets_received_to_connection_count_ratio) {} ActiveQuicListener::ActiveQuicListener( uint32_t worker_index, uint32_t concurrency, Event::Dispatcher& dispatcher, Network::UdpConnectionHandler& parent, Network::SocketSharedPtr listen_socket, Network::ListenerConfig& listener_config, const quic::QuicConfig& quic_config, Network::Socket::OptionsSharedPtr options, bool kernel_worker_routing, - const envoy::config::core::v3::RuntimeFeatureFlag& enabled) + const envoy::config::core::v3::RuntimeFeatureFlag& enabled, + uint32_t packets_to_read_to_connection_count_ratio) : Server::ActiveUdpListenerBase( worker_index, concurrency, parent, *listen_socket, dispatcher.createUdpListener( @@ -44,7 +48,8 @@ ActiveQuicListener::ActiveQuicListener( listener_config.udpListenerConfig()->config().downstream_socket_config()), &listener_config), dispatcher_(dispatcher), version_manager_(quic::CurrentSupportedVersions()), - kernel_worker_routing_(kernel_worker_routing) { + kernel_worker_routing_(kernel_worker_routing), + packets_to_read_to_connection_count_ratio_(packets_to_read_to_connection_count_ratio) { // This flag fix a QUICHE issue which may crash Envoy during connection close. SetQuicReloadableFlag(quic_single_ack_in_packet2, true); // Do not include 32-byte per-entry overhead while counting header size. @@ -215,9 +220,18 @@ uint32_t ActiveQuicListener::destination(const Network::UdpRecvData& data) const return connection_id_snippet % concurrency_; } +size_t ActiveQuicListener::numPacketsExpectedPerEventLoop() const { + // Expect each session to read packets_to_read_to_connection_count_ratio_ number of packets in + // this read event. + return quic_dispatcher_->NumSessions() * packets_to_read_to_connection_count_ratio_; +} + ActiveQuicListenerFactory::ActiveQuicListenerFactory( const envoy::config::listener::v3::QuicProtocolOptions& config, uint32_t concurrency) - : concurrency_(concurrency), enabled_(config.enabled()) { + : concurrency_(concurrency), enabled_(config.enabled()), + packets_to_read_to_connection_count_ratio_( + PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, packets_to_read_to_connection_count_ratio, + DEFAULT_PACKETS_TO_READ_PER_CONNECTION)) { uint64_t idle_network_timeout_ms = config.has_idle_timeout() ? DurationUtil::durationToMilliseconds(config.idle_timeout()) : 300000; @@ -300,9 +314,9 @@ Network::ConnectionHandler::ActiveUdpListenerPtr ActiveQuicListenerFactory::crea } #endif - return std::make_unique(worker_index, concurrency_, disptacher, parent, - config, quic_config_, std::move(options), - kernel_worker_routing, enabled_); + return std::make_unique( + worker_index, concurrency_, disptacher, parent, config, quic_config_, std::move(options), + kernel_worker_routing, enabled_, packets_to_read_to_connection_count_ratio_); } // namespace Quic } // namespace Quic diff --git a/source/common/quic/active_quic_listener.h b/source/common/quic/active_quic_listener.h index fb3f1b78cf4c..7c2264d2da9e 100644 --- a/source/common/quic/active_quic_listener.h +++ b/source/common/quic/active_quic_listener.h @@ -28,13 +28,15 @@ class ActiveQuicListener : public Envoy::Server::ActiveUdpListenerBase, Network::UdpConnectionHandler& parent, Network::ListenerConfig& listener_config, const quic::QuicConfig& quic_config, Network::Socket::OptionsSharedPtr options, bool kernel_worker_routing, - const envoy::config::core::v3::RuntimeFeatureFlag& enabled); + const envoy::config::core::v3::RuntimeFeatureFlag& enabled, + uint32_t packets_to_read_to_connection_count_ratio); ActiveQuicListener(uint32_t worker_index, uint32_t concurrency, Event::Dispatcher& dispatcher, Network::UdpConnectionHandler& parent, Network::SocketSharedPtr listen_socket, Network::ListenerConfig& listener_config, const quic::QuicConfig& quic_config, Network::Socket::OptionsSharedPtr options, bool kernel_worker_routing, - const envoy::config::core::v3::RuntimeFeatureFlag& enabled); + const envoy::config::core::v3::RuntimeFeatureFlag& enabled, + uint32_t packets_to_read_to_connection_count_ratio); ~ActiveQuicListener() override; @@ -52,6 +54,7 @@ class ActiveQuicListener : public Envoy::Server::ActiveUdpListenerBase, Network::UdpPacketWriter& udpPacketWriter() override { return *udp_packet_writer_; } void onDataWorker(Network::UdpRecvData&& data) override; uint32_t destination(const Network::UdpRecvData& data) const override; + size_t numPacketsExpectedPerEventLoop() const override; // ActiveListenerImplBase void pauseListening() override; @@ -73,6 +76,8 @@ class ActiveQuicListener : public Envoy::Server::ActiveUdpListenerBase, // The number of runs of the event loop in which at least one CHLO was buffered. // TODO(ggreenway): Consider making this a published stat, or some variation of this information. uint64_t event_loops_with_buffered_chlo_for_test_{0}; + + uint32_t packets_to_read_to_connection_count_ratio_; }; using ActiveQuicListenerPtr = std::unique_ptr; @@ -97,6 +102,7 @@ class ActiveQuicListenerFactory : public Network::ActiveUdpListenerFactory, const uint32_t concurrency_; absl::once_flag install_bpf_once_; envoy::config::core::v3::RuntimeFeatureFlag enabled_; + const uint32_t packets_to_read_to_connection_count_ratio_; }; } // namespace Quic diff --git a/source/common/quic/envoy_quic_client_connection.cc b/source/common/quic/envoy_quic_client_connection.cc index c9046d45b73a..ac30a8d8b9cf 100644 --- a/source/common/quic/envoy_quic_client_connection.cc +++ b/source/common/quic/envoy_quic_client_connection.cc @@ -119,7 +119,10 @@ void EnvoyQuicClientConnection::onFileEvent(uint32_t events) { Api::IoErrorPtr err = Network::Utility::readPacketsFromSocket( connectionSocket()->ioHandle(), *connectionSocket()->addressProvider().localAddress(), *this, dispatcher_.timeSource(), true, packets_dropped_); - // TODO(danzh): Handle no error when we limit the number of packets read. + if (err == nullptr) { + connectionSocket()->ioHandle().activateFileEvents(Event::FileReadyType::Read); + return; + } if (err->getErrorCode() != Api::IoError::IoErrorCode::Again) { ENVOY_CONN_LOG(error, "recvmsg result {}: {}", *this, static_cast(err->getErrorCode()), err->getErrorDetails()); diff --git a/source/common/quic/envoy_quic_client_connection.h b/source/common/quic/envoy_quic_client_connection.h index dc3587a3b05c..21000adce734 100644 --- a/source/common/quic/envoy_quic_client_connection.h +++ b/source/common/quic/envoy_quic_client_connection.h @@ -53,6 +53,9 @@ class EnvoyQuicClientConnection : public quic::QuicConnection, void onDatagramsDropped(uint32_t) override { // TODO(mattklein123): Emit a stat for this. } + size_t numPacketsExpectedPerEventLoop() const override { + return DEFAULT_PACKETS_TO_READ_PER_CONNECTION; + } // Register file event and apply socket options. void setUpConnectionSocket(); diff --git a/source/common/quic/quic_network_connection.h b/source/common/quic/quic_network_connection.h index 66c51a7d5215..78ea81402e5c 100644 --- a/source/common/quic/quic_network_connection.h +++ b/source/common/quic/quic_network_connection.h @@ -9,6 +9,9 @@ namespace Envoy { namespace Quic { +// Read ~32k bytes per connection by default, which is about the same as TCP. +static const uint32_t DEFAULT_PACKETS_TO_READ_PER_CONNECTION = 32u; + // A base class of both the client and server connections which keeps stats and // connection socket. class QuicNetworkConnection : protected Logger::Loggable { diff --git a/source/common/runtime/runtime_features.cc b/source/common/runtime/runtime_features.cc index c44cc2a55ad3..fad8f3678e36 100644 --- a/source/common/runtime/runtime_features.cc +++ b/source/common/runtime/runtime_features.cc @@ -87,6 +87,7 @@ constexpr const char* runtime_features[] = { "envoy.reloadable_features.strip_port_from_connect", "envoy.reloadable_features.treat_host_like_authority", "envoy.reloadable_features.treat_upstream_connect_timeout_as_connect_failure", + "envoy.reloadable_features.udp_per_event_loop_read_limit", "envoy.reloadable_features.upstream_host_weight_change_causes_rebuild", "envoy.reloadable_features.use_observable_cluster_name", "envoy.reloadable_features.vhds_heartbeats", diff --git a/source/common/singleton/threadsafe_singleton.h b/source/common/singleton/threadsafe_singleton.h index 5b55dc0af517..c60a4bb06be3 100644 --- a/source/common/singleton/threadsafe_singleton.h +++ b/source/common/singleton/threadsafe_singleton.h @@ -67,10 +67,10 @@ template class InjectableSingleton { static void clear() { loader_ = nullptr; } protected: - static T* loader_; + static std::atomic loader_; }; -template T* InjectableSingleton::loader_ = nullptr; +template std::atomic InjectableSingleton::loader_ = nullptr; template class ScopedInjectableLoader { public: diff --git a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc index 3f11d37f0f12..9d21d784ba57 100644 --- a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc +++ b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc @@ -222,7 +222,10 @@ void UdpProxyFilter::ActiveSession::onReadReady() { const Api::IoErrorPtr result = Network::Utility::readPacketsFromSocket( socket_->ioHandle(), *addresses_.local_, *this, cluster_.filter_.config_->timeSource(), cluster_.filter_.config_->upstreamSocketConfig().prefer_gro_, packets_dropped); - // TODO(mattklein123): Handle no error when we limit the number of packets read. + if (result == nullptr) { + socket_->ioHandle().activateFileEvents(Event::FileReadyType::Read); + return; + } if (result->getErrorCode() != Api::IoError::IoErrorCode::Again) { cluster_.cluster_stats_.sess_rx_errors_.inc(); } diff --git a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h index 4a437152acf9..28f037882cbc 100644 --- a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h +++ b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h @@ -178,6 +178,10 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter, void onDatagramsDropped(uint32_t dropped) override { cluster_.cluster_stats_.sess_rx_datagrams_dropped_.add(dropped); } + size_t numPacketsExpectedPerEventLoop() const final { + // TODO(mattklein123) change this to a reasonable number if needed. + return Network::MAX_NUM_PACKETS_PER_EVENT_LOOP; + } ClusterInfo& cluster_; const bool use_original_src_ip_; diff --git a/source/server/active_udp_listener.h b/source/server/active_udp_listener.h index e122b65d2e42..08472bc853c5 100644 --- a/source/server/active_udp_listener.h +++ b/source/server/active_udp_listener.h @@ -8,6 +8,8 @@ #include "envoy/network/listen_socket.h" #include "envoy/network/listener.h" +#include "common/network/utility.h" + #include "server/active_listener_base.h" namespace Envoy { @@ -82,6 +84,10 @@ class ActiveRawUdpListener : public ActiveUdpListenerBase, void onWriteReady(const Network::Socket& socket) override; void onReceiveError(Api::IoError::IoErrorCode error_code) override; Network::UdpPacketWriter& udpPacketWriter() override { return *udp_packet_writer_; } + size_t numPacketsExpectedPerEventLoop() const final { + // TODO(mattklein123) change this to a reasonable number if needed. + return Network::MAX_NUM_PACKETS_PER_EVENT_LOOP; + } // Network::UdpWorker void onDataWorker(Network::UdpRecvData&& data) override; diff --git a/test/common/network/udp_fuzz.cc b/test/common/network/udp_fuzz.cc index 792f1f93286b..ca1d2d41c3be 100644 --- a/test/common/network/udp_fuzz.cc +++ b/test/common/network/udp_fuzz.cc @@ -44,6 +44,7 @@ class FuzzUdpListenerCallbacks : public Network::UdpListenerCallbacks { void onDatagramsDropped(uint32_t dropped) override; uint32_t workerIndex() const override; Network::UdpPacketWriter& udpPacketWriter() override; + size_t numPacketsExpectedPerEventLoop() const override; private: UdpFuzz* my_upf_; @@ -178,6 +179,10 @@ void FuzzUdpListenerCallbacks::onDatagramsDropped(uint32_t dropped) { UNREFERENCED_PARAMETER(dropped); } +size_t FuzzUdpListenerCallbacks::numPacketsExpectedPerEventLoop() const { + return Network::MAX_NUM_PACKETS_PER_EVENT_LOOP; +} + DEFINE_FUZZER(const uint8_t* buf, size_t len) { UdpFuzz udp_instance(buf, len); } } // namespace } // namespace Envoy diff --git a/test/common/network/udp_listener_impl_test.cc b/test/common/network/udp_listener_impl_test.cc index d200af1a0a17..1ac4dcb2936f 100644 --- a/test/common/network/udp_listener_impl_test.cc +++ b/test/common/network/udp_listener_impl_test.cc @@ -56,6 +56,8 @@ class UdpListenerImplTest : public UdpListenerImplTestBase { // Return the real version by default. ON_CALL(override_syscall_, supportsMmsg()) .WillByDefault(Return(os_calls.latched().supportsMmsg())); + ON_CALL(listener_callbacks_, numPacketsExpectedPerEventLoop()) + .WillByDefault(Return(MAX_NUM_PACKETS_PER_EVENT_LOOP)); // Set listening socket options. server_socket_->addOptions(SocketOptionFactory::buildIpPacketInfoOptions()); @@ -63,6 +65,12 @@ class UdpListenerImplTest : public UdpListenerImplTestBase { if (Api::OsSysCallsSingleton::get().supportsUdpGro()) { server_socket_->addOptions(SocketOptionFactory::buildUdpGroOptions()); } + std::unique_ptr options = + std::make_unique(); + options->push_back(std::make_shared( + envoy::config::core::v3::SocketOption::STATE_BOUND, + ENVOY_MAKE_SOCKET_OPTION_NAME(SOL_SOCKET, SO_RCVBUF), 4 * 1024 * 1024)); + server_socket_->addOptions(std::move(options)); envoy::config::core::v3::UdpSocketConfig config; if (prefer_gro) { config.mutable_prefer_gro()->set_value(prefer_gro); @@ -71,11 +79,22 @@ class UdpListenerImplTest : public UdpListenerImplTestBase { std::make_unique(dispatcherImpl(), server_socket_, listener_callbacks_, dispatcherImpl().timeSource(), config); udp_packet_writer_ = std::make_unique(server_socket_->ioHandle()); + int get_recvbuf_size = 0; + socklen_t int_size = static_cast(sizeof(get_recvbuf_size)); + const Api::SysCallIntResult result2 = + server_socket_->getSocketOption(SOL_SOCKET, SO_RCVBUF, &get_recvbuf_size, &int_size); + EXPECT_EQ(0, result2.rc_); + // Kernel increases the buffer size to allow bookkeeping overhead. + if (get_recvbuf_size < 4 * 1024 * 1024) { + recvbuf_large_enough_ = false; + } + ON_CALL(listener_callbacks_, udpPacketWriter()).WillByDefault(ReturnRef(*udp_packet_writer_)); } NiceMock override_syscall_; TestThreadsafeSingletonInjector os_calls{&override_syscall_}; + bool recvbuf_large_enough_{true}; }; INSTANTIATE_TEST_SUITE_P(IpVersions, UdpListenerImplTest, @@ -145,7 +164,7 @@ TEST_P(UdpListenerImplTest, UseActualDstUdp) { dispatcher_->run(Event::Dispatcher::RunType::Block); } -// Test a large datagram that gets dropped using recvmmsg if supported. +// Test a large datagram that gets dropped using recvmsg or recvmmsg if supported. TEST_P(UdpListenerImplTest, LargeDatagramRecvmmsg) { setup(); @@ -172,33 +191,75 @@ TEST_P(UdpListenerImplTest, LargeDatagramRecvmmsg) { EXPECT_EQ(2, listener_->packetsDropped()); } -// Test a large datagram that gets dropped using recvmsg. -TEST_P(UdpListenerImplTest, LargeDatagramRecvmsg) { +TEST_P(UdpListenerImplTest, LimitNumberOfReadsPerLoop) { setup(); + if (!Runtime::runtimeFeatureEnabled("envoy.reloadable_features.udp_per_event_loop_read_limit")) { + return; + } + const uint64_t num_packets_per_read = + Api::OsSysCallsSingleton::get().supportsMmsg() ? NUM_DATAGRAMS_PER_MMSG_RECEIVE : 1u; + + size_t num_packets_expected_per_loop{32u}; + // These packets should be read in more than 3 loops. + const std::string payload1(10, 'a'); + for (uint64_t i = 0; i < 2 * num_packets_expected_per_loop; ++i) { + client_.write(payload1, *send_to_addr_); + } + const std::string last_piece("bbb"); + client_.write(last_piece, *send_to_addr_); - ON_CALL(override_syscall_, supportsMmsg()).WillByDefault(Return(false)); - - // This will get dropped. - const std::string first(4096, 'a'); - client_.write(first, *send_to_addr_); - const std::string second("second"); - client_.write(second, *send_to_addr_); - // This will get dropped. - const std::string third(4096, 'b'); - client_.write(third, *send_to_addr_); + EXPECT_CALL(listener_callbacks_, onReadReady()).Times(testing::AtLeast(3u)); + EXPECT_CALL(listener_callbacks_, numPacketsExpectedPerEventLoop()) + .WillRepeatedly(Return(num_packets_expected_per_loop)); + EXPECT_CALL(listener_callbacks_, onData(_)) + .WillRepeatedly(Invoke([&](const UdpRecvData& data) -> void { + validateRecvCallbackParams(data, num_packets_per_read); + if (last_piece == data.buffer_->toString()) { + dispatcher_->exit(); + } + })); + dispatcher_->run(Event::Dispatcher::RunType::Block); + num_packets_received_by_listener_ = 0u; + num_packets_expected_per_loop = 0u; + std::string payload2(10, 'c'); + // This packet should be read. + client_.write(payload2, *send_to_addr_); EXPECT_CALL(listener_callbacks_, onReadReady()); - EXPECT_CALL(listener_callbacks_, onDatagramsDropped(_)).Times(AtLeast(1)); + EXPECT_CALL(listener_callbacks_, numPacketsExpectedPerEventLoop()) + .WillRepeatedly(Return(num_packets_expected_per_loop)); EXPECT_CALL(listener_callbacks_, onData(_)).WillOnce(Invoke([&](const UdpRecvData& data) -> void { - validateRecvCallbackParams( - data, Api::OsSysCallsSingleton::get().supportsMmsg() ? NUM_DATAGRAMS_PER_MMSG_RECEIVE : 1u); - EXPECT_EQ(data.buffer_->toString(), second); - + validateRecvCallbackParams(data, num_packets_per_read); + EXPECT_EQ(payload2, data.buffer_->toString()); dispatcher_->exit(); })); + dispatcher_->run(Event::Dispatcher::RunType::Block); + if (!recvbuf_large_enough_) { + // If SO_RCVBUF failed to enlarge receive buffer to 4MB, the rest of test will likely to fail + // because packets may be easily dropped. Skip the rest of the test. + return; + } + num_packets_received_by_listener_ = 0u; + // Though the mocked callback wants to read more, only 6000 reads maximum are allowed. + num_packets_expected_per_loop = MAX_NUM_PACKETS_PER_EVENT_LOOP + 1u; + std::string payload3(10, 'd'); + for (uint64_t i = 0; i < num_packets_expected_per_loop; ++i) { + client_.write(payload3, *send_to_addr_); + } + std::string really_last_piece("eee"); + client_.write(really_last_piece, *send_to_addr_); + EXPECT_CALL(listener_callbacks_, onReadReady()).Times(testing::AtLeast(2u)); + EXPECT_CALL(listener_callbacks_, numPacketsExpectedPerEventLoop()) + .WillRepeatedly(Return(num_packets_expected_per_loop)); + EXPECT_CALL(listener_callbacks_, onData(_)) + .WillRepeatedly(Invoke([&](const UdpRecvData& data) -> void { + validateRecvCallbackParams(data, num_packets_per_read); + if (really_last_piece == data.buffer_->toString()) { + dispatcher_->exit(); + } + })); dispatcher_->run(Event::Dispatcher::RunType::Block); - EXPECT_EQ(2, listener_->packetsDropped()); } #ifdef UDP_GRO @@ -380,13 +441,16 @@ TEST_P(UdpListenerImplTest, UdpListenerRecvMsgError) { EXPECT_CALL(listener_callbacks_, onReceiveError(_)) .WillOnce(Invoke([&](Api::IoError::IoErrorCode err) -> void { ASSERT_EQ(Api::IoError::IoErrorCode::NoSupport, err); - dispatcher_->exit(); })); // Inject mocked OsSysCalls implementation to mock a read failure. Api::MockOsSysCalls os_sys_calls; TestThreadsafeSingletonInjector os_calls(&os_sys_calls); - EXPECT_CALL(os_sys_calls, supportsMmsg()); + EXPECT_CALL(os_sys_calls, supportsMmsg()) + .Times( + (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.udp_per_event_loop_read_limit") + ? 2u + : 1u)); EXPECT_CALL(os_sys_calls, recvmsg(_, _, _)) .WillOnce(Return(Api::SysCallSizeResult{-1, SOCKET_ERROR_NOT_SUP})); diff --git a/test/common/quic/active_quic_listener_test.cc b/test/common/quic/active_quic_listener_test.cc index b9df504e88a4..ffb12a05b56b 100644 --- a/test/common/quic/active_quic_listener_test.cc +++ b/test/common/quic/active_quic_listener_test.cc @@ -267,6 +267,7 @@ class ActiveQuicListenerTest : public QuicMultiVersionTest { enabled: default_value: true runtime_key: quic.enabled + packets_to_read_to_connection_count_ratio: 50 )EOF", connection_window_size_, stream_window_size_); } @@ -323,13 +324,13 @@ TEST_P(ActiveQuicListenerTest, FailSocketOptionUponCreation) { auto options = std::make_shared>(); options->emplace_back(std::move(option)); quic_listener_.reset(); - EXPECT_THROW_WITH_REGEX( - (void)std::make_unique( - 0, 1, *dispatcher_, connection_handler_, listen_socket_, listener_config_, quic_config_, - options, false, - ActiveQuicListenerFactoryPeer::runtimeEnabled( - static_cast(listener_factory_.get()))), - Network::CreateListenerException, "Failed to apply socket options."); + EXPECT_THROW_WITH_REGEX((void)std::make_unique( + 0, 1, *dispatcher_, connection_handler_, listen_socket_, + listener_config_, quic_config_, options, false, + ActiveQuicListenerFactoryPeer::runtimeEnabled( + static_cast(listener_factory_.get())), + 32u), + Network::CreateListenerException, "Failed to apply socket options."); } TEST_P(ActiveQuicListenerTest, ReceiveCHLO) { @@ -342,6 +343,10 @@ TEST_P(ActiveQuicListenerTest, ReceiveCHLO) { dispatcher_->run(Event::Dispatcher::RunType::NonBlock); EXPECT_FALSE(buffered_packets->HasChlosBuffered()); EXPECT_NE(0u, quic_dispatcher_->NumSessions()); + if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.udp_per_event_loop_read_limit")) { + EXPECT_EQ(50 * quic_dispatcher_->NumSessions(), + quic_listener_->numPacketsExpectedPerEventLoop()); + } const quic::QuicSession* session = quic::test::QuicDispatcherPeer::FindSession(quic_dispatcher_, connection_id); ASSERT(session != nullptr); @@ -389,7 +394,15 @@ TEST_P(ActiveQuicListenerTest, ProcessBufferedChlos) { quic::QuicBufferedPacketStore* const buffered_packets = quic::test::QuicDispatcherPeer::GetBufferedPackets(quic_dispatcher_); const uint32_t count = (ActiveQuicListener::kNumSessionsToCreatePerLoop * 2) + 1; - maybeConfigureMocks(count); + if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.udp_per_event_loop_read_limit")) { + maybeConfigureMocks(count + 1); + // Create 1 session to increase number of packet to read in the next read event. + sendCHLO(quic::test::TestConnectionId()); + dispatcher_->run(Event::Dispatcher::RunType::NonBlock); + EXPECT_NE(0u, quic_dispatcher_->NumSessions()); + } else { + maybeConfigureMocks(count); + } // Generate one more CHLO than can be processed immediately. for (size_t i = 1; i <= count; ++i) { @@ -407,6 +420,11 @@ TEST_P(ActiveQuicListenerTest, ProcessBufferedChlos) { } EXPECT_FALSE(buffered_packets->HasChlosBuffered()); EXPECT_NE(0u, quic_dispatcher_->NumSessions()); + if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.udp_per_event_loop_read_limit")) { + EXPECT_EQ(50 * quic_dispatcher_->NumSessions(), + quic_listener_->numPacketsExpectedPerEventLoop()); + } + readFromClientSockets(); } diff --git a/test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc b/test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc index ddcb11b89157..c0f78a1b9b74 100644 --- a/test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc +++ b/test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc @@ -105,7 +105,11 @@ class UdpProxyFilterTest : public testing::Test { if (parent_.expect_gro_) { EXPECT_CALL(*socket_->io_handle_, supportsUdpGro()); } - EXPECT_CALL(*socket_->io_handle_, supportsMmsg()); + EXPECT_CALL(*socket_->io_handle_, supportsMmsg()) + .Times(Runtime::runtimeFeatureEnabled( + "envoy.reloadable_features.udp_per_event_loop_read_limit") + ? 2u + : 1u); // Return the datagram. EXPECT_CALL(*socket_->io_handle_, recvmsg(_, 1, _, _)) .WillOnce( @@ -136,9 +140,6 @@ class UdpProxyFilterTest : public testing::Test { } })); // Return an EAGAIN result. - if (parent_.expect_gro_) { - EXPECT_CALL(*socket_->io_handle_, supportsUdpGro()); - } EXPECT_CALL(*socket_->io_handle_, supportsMmsg()); EXPECT_CALL(*socket_->io_handle_, recvmsg(_, 1, _, _)) .WillOnce(Return(ByMove(Api::IoCallUint64Result( diff --git a/test/integration/multiplexed_integration_test.cc b/test/integration/multiplexed_integration_test.cc index 63a976a738e0..1f80af00980d 100644 --- a/test/integration/multiplexed_integration_test.cc +++ b/test/integration/multiplexed_integration_test.cc @@ -80,8 +80,6 @@ TEST_P(Http2IntegrationTest, FlowControlOnAndGiantBodyWithContentLength) { } TEST_P(Http2IntegrationTest, LargeFlowControlOnAndGiantBodyWithContentLength) { - // https://github.com/envoyproxy/envoy/issues/16335 - EXCLUDE_DOWNSTREAM_HTTP3; config_helper_.addConfigModifier(ConfigHelper::adjustUpstreamTimeoutForTsan); config_helper_.setBufferLimits(128 * 1024, 128 * 1024); // Set buffer limits upstream and downstream. diff --git a/test/integration/protocol_integration_test.cc b/test/integration/protocol_integration_test.cc index b8ab19fda6c9..aa3bcd471e03 100644 --- a/test/integration/protocol_integration_test.cc +++ b/test/integration/protocol_integration_test.cc @@ -1586,10 +1586,6 @@ TEST_P(DownstreamProtocolIntegrationTest, LargeRequestHeadersAccepted) { } TEST_P(DownstreamProtocolIntegrationTest, ManyLargeRequestHeadersAccepted) { - // Fail under TSAN. Quic blackhole detection fired and closed the connection with - // QUIC_TOO_MANY_RTOS while waiting for upstream finishing transferring the large header. Observed - // long event loop. - EXCLUDE_DOWNSTREAM_HTTP3; // Send 70 headers each of size 100 kB with limit 8192 kB (8 MB) and 100 headers. testLargeRequestHeaders(100, 70, 8192, 100, TSAN_TIMEOUT_FACTOR * TestUtility::DefaultTimeout); } diff --git a/test/integration/quic_http_integration_test.cc b/test/integration/quic_http_integration_test.cc index 8f6ce0c4bc69..edcbeda8fa8e 100644 --- a/test/integration/quic_http_integration_test.cc +++ b/test/integration/quic_http_integration_test.cc @@ -40,10 +40,10 @@ #include "test/config/integration/certs/clientcert_hash.h" #include "extensions/transport_sockets/tls/context_config_impl.h" -#if (defined(__has_feature) && __has_feature(thread_sanitizer)) || defined(ENVOY_CONFIG_COVERAGE) -#define DISABLE_UNDER_TSAN_OR_COVERAGE return +#if defined(ENVOY_CONFIG_COVERAGE) +#define DISABLE_UNDER_COVERAGE return #else -#define DISABLE_UNDER_TSAN_OR_COVERAGE \ +#define DISABLE_UNDER_COVERAGE \ do { \ } while (0) #endif @@ -354,7 +354,7 @@ TEST_P(QuicHttpIntegrationTest, RouterUpstreamResponseBeforeRequestComplete) { TEST_P(QuicHttpIntegrationTest, Retry) { testRetry(); } TEST_P(QuicHttpIntegrationTest, UpstreamReadDisabledOnGiantResponseBody) { - DISABLE_UNDER_TSAN_OR_COVERAGE; + DISABLE_UNDER_COVERAGE; config_helper_.addConfigModifier(ConfigHelper::adjustUpstreamTimeoutForTsan); config_helper_.setBufferLimits(/*upstream_buffer_limit=*/1024, /*downstream_buffer_limit=*/1024); testRouterRequestAndResponseWithBody(/*request_size=*/512, /*response_size=*/10 * 1024 * 1024, @@ -363,7 +363,7 @@ TEST_P(QuicHttpIntegrationTest, UpstreamReadDisabledOnGiantResponseBody) { } TEST_P(QuicHttpIntegrationTest, DownstreamReadDisabledOnGiantPost) { - DISABLE_UNDER_TSAN_OR_COVERAGE; + DISABLE_UNDER_COVERAGE; config_helper_.addConfigModifier(ConfigHelper::adjustUpstreamTimeoutForTsan); config_helper_.setBufferLimits(/*upstream_buffer_limit=*/1024, /*downstream_buffer_limit=*/1024); testRouterRequestAndResponseWithBody(/*request_size=*/10 * 1024 * 1024, /*response_size=*/1024, @@ -371,7 +371,7 @@ TEST_P(QuicHttpIntegrationTest, DownstreamReadDisabledOnGiantPost) { } TEST_P(QuicHttpIntegrationTest, LargeFlowControlOnAndGiantBody) { - DISABLE_UNDER_TSAN_OR_COVERAGE; + DISABLE_UNDER_COVERAGE; config_helper_.addConfigModifier(ConfigHelper::adjustUpstreamTimeoutForTsan); config_helper_.setBufferLimits(/*upstream_buffer_limit=*/128 * 1024, /*downstream_buffer_limit=*/128 * 1024); diff --git a/test/mocks/network/mocks.h b/test/mocks/network/mocks.h index c102194d4016..9b68638ea21a 100644 --- a/test/mocks/network/mocks.h +++ b/test/mocks/network/mocks.h @@ -153,6 +153,7 @@ class MockUdpListenerCallbacks : public UdpListenerCallbacks { MOCK_METHOD(uint32_t, workerIndex, (), (const)); MOCK_METHOD(void, onDataWorker, (Network::UdpRecvData && data)); MOCK_METHOD(void, post, (Network::UdpRecvData && data)); + MOCK_METHOD(size_t, numPacketsExpectedPerEventLoop, (), (const)); }; class MockDrainDecision : public DrainDecision { diff --git a/test/test_common/network_utility.cc b/test/test_common/network_utility.cc index 1e1eeba9f1c1..7e413e3ac5ce 100644 --- a/test/test_common/network_utility.cc +++ b/test/test_common/network_utility.cc @@ -211,6 +211,9 @@ struct SyncPacketProcessor : public Network::UdpPacketProcessor { } uint64_t maxDatagramSize() const override { return max_rx_datagram_size_; } void onDatagramsDropped(uint32_t) override {} + size_t numPacketsExpectedPerEventLoop() const override { + return Network::MAX_NUM_PACKETS_PER_EVENT_LOOP; + } std::list& data_; const uint64_t max_rx_datagram_size_; diff --git a/tools/spelling/spelling_dictionary.txt b/tools/spelling/spelling_dictionary.txt index 44481c60f7af..6034a70da15a 100644 --- a/tools/spelling/spelling_dictionary.txt +++ b/tools/spelling/spelling_dictionary.txt @@ -34,6 +34,7 @@ HEXDIG HEXDIGIT OWS Preconnecting +RCVBUF RTCP RTP STATNAME