diff --git a/CODEOWNERS b/CODEOWNERS index 49ba44cebb84..2fc70c0398b7 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -183,6 +183,8 @@ extensions/filters/http/oauth2 @rgs1 @derekargueta @snowp /*/extensions/filters/common/ext_authz @esmet @gsagula @dio /*/extensions/filters/http/ext_authz @esmet @gsagula @dio /*/extensions/filters/network/ext_authz @esmet @gsagula @dio +# HTTP Bandwidth Limit +/*/extensions/filters/http/bandwidth_limit @nitgoy @mattklein123 @yanavlasov @tonya11en # Original IP detection /*/extensions/http/original_ip_detection/custom_header @rgs1 @alyssawilk @antoniovicente /*/extensions/http/original_ip_detection/xff @rgs1 @alyssawilk @antoniovicente diff --git a/api/BUILD b/api/BUILD index 136e248fbf15..e051250b8075 100644 --- a/api/BUILD +++ b/api/BUILD @@ -177,6 +177,7 @@ proto_library( "//envoy/extensions/filters/http/admission_control/v3alpha:pkg", "//envoy/extensions/filters/http/aws_lambda/v3:pkg", "//envoy/extensions/filters/http/aws_request_signing/v3:pkg", + "//envoy/extensions/filters/http/bandwidth_limit/v3alpha:pkg", "//envoy/extensions/filters/http/buffer/v3:pkg", "//envoy/extensions/filters/http/cache/v3alpha:pkg", "//envoy/extensions/filters/http/cdn_loop/v3alpha:pkg", diff --git a/api/STYLE.md b/api/STYLE.md index 18d96fd4ae47..d73e17b773b2 100644 --- a/api/STYLE.md +++ b/api/STYLE.md @@ -113,11 +113,11 @@ organization](#package-organization) above. To add an extension config to the API, the steps below should be followed: 1. If this is still WiP and subject to breaking changes, use `vNalpha` instead of `vN` in steps - below. Refer to the [Cache filter config](envoy/extensions/filter/http/cache/v3alpha/cache.proto) + below. Refer to the [Cache filter config](envoy/extensions/filters/http/cache/v3alpha/cache.proto) as an example of `v3alpha`, and the - [Buffer filter config](envoy/extensions/filter/http/buffer/v3/buffer.proto) as an example of `v3`. + [Buffer filter config](envoy/extensions/filters/http/buffer/v3/buffer.proto) as an example of `v3`. 1. Place the v3 extension configuration `.proto` in `api/envoy/extensions`, e.g. - `api/envoy/extensions/filter/http/foobar/v3/foobar.proto` together with an initial BUILD file: + `api/envoy/extensions/filters/http/foobar/v3/foobar.proto` together with an initial BUILD file: ```bazel load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package") diff --git a/api/envoy/extensions/filters/http/bandwidth_limit/v3alpha/BUILD b/api/envoy/extensions/filters/http/bandwidth_limit/v3alpha/BUILD new file mode 100644 index 000000000000..1c1a6f6b4423 --- /dev/null +++ b/api/envoy/extensions/filters/http/bandwidth_limit/v3alpha/BUILD @@ -0,0 +1,12 @@ +# DO NOT EDIT. This file is generated by tools/proto_format/proto_sync.py. + +load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package") + +licenses(["notice"]) # Apache 2 + +api_proto_package( + deps = [ + "//envoy/config/core/v3:pkg", + "@com_github_cncf_udpa//udpa/annotations:pkg", + ], +) diff --git a/api/envoy/extensions/filters/http/bandwidth_limit/v3alpha/bandwidth_limit.proto b/api/envoy/extensions/filters/http/bandwidth_limit/v3alpha/bandwidth_limit.proto new file mode 100644 index 000000000000..4cd5f8268b70 --- /dev/null +++ b/api/envoy/extensions/filters/http/bandwidth_limit/v3alpha/bandwidth_limit.proto @@ -0,0 +1,70 @@ +syntax = "proto3"; + +package envoy.extensions.filters.http.bandwidth_limit.v3alpha; + +import "envoy/config/core/v3/base.proto"; + +import "google/protobuf/duration.proto"; +import "google/protobuf/wrappers.proto"; + +import "udpa/annotations/status.proto"; +import "validate/validate.proto"; + +option java_package = "io.envoyproxy.envoy.extensions.filters.http.bandwidth_limit.v3alpha"; +option java_outer_classname = "BandwidthLimitProto"; +option java_multiple_files = true; +option (udpa.annotations.file_status).work_in_progress = true; +option (udpa.annotations.file_status).package_version_status = ACTIVE; + +// [#protodoc-title: Bandwidth limit] +// Bandwidth limit :ref:`configuration overview `. +// [#extension: envoy.filters.http.bandwidth_limit] + +// [#next-free-field: 6] +message BandwidthLimit { + // Defines the mode for the bandwidth limit filter. + // Values represent bitmask. + enum EnableMode { + // Filter is disabled. + DISABLED = 0; + + // Filter enabled only for incoming traffic. + REQUEST = 1; + + // Filter enabled only for outgoing traffic. + RESPONSE = 2; + + // Filter enabled for both incoming and outgoing traffic. + REQUEST_AND_RESPONSE = 3; + } + + // The human readable prefix to use when emitting stats. + string stat_prefix = 1 [(validate.rules).string = {min_len: 1}]; + + // The enable mode for the bandwidth limit filter. + // Default is Disabled. + EnableMode enable_mode = 2 [(validate.rules).enum = {defined_only: true}]; + + // The limit supplied in KiB/s. + // + // .. note:: + // It's fine for the limit to be unset for the global configuration since the bandwidth limit + // can be applied at a the virtual host or route level. Thus, the limit must be set for the + // per route configuration otherwise the config will be rejected. + // + // .. note:: + // When using per route configuration, the limit becomes unique to that route. + // + google.protobuf.UInt64Value limit_kbps = 3 [(validate.rules).uint64 = {gte: 1}]; + + // Optional Fill interval in milliseconds for the token refills. Defaults to 50ms. + // It must be at least 20ms to avoid too aggressive refills. + google.protobuf.Duration fill_interval = 4 [(validate.rules).duration = { + lte {seconds: 1} + gte {nanos: 20000000} + }]; + + // Runtime flag that controls whether the filter is enabled or not. If not specified, defaults + // to enabled. + config.core.v3.RuntimeFeatureFlag runtime_enabled = 5; +} diff --git a/api/versioning/BUILD b/api/versioning/BUILD index 0703d9ee7def..90f48a6b33fe 100644 --- a/api/versioning/BUILD +++ b/api/versioning/BUILD @@ -60,6 +60,7 @@ proto_library( "//envoy/extensions/filters/http/admission_control/v3alpha:pkg", "//envoy/extensions/filters/http/aws_lambda/v3:pkg", "//envoy/extensions/filters/http/aws_request_signing/v3:pkg", + "//envoy/extensions/filters/http/bandwidth_limit/v3alpha:pkg", "//envoy/extensions/filters/http/buffer/v3:pkg", "//envoy/extensions/filters/http/cache/v3alpha:pkg", "//envoy/extensions/filters/http/cdn_loop/v3alpha:pkg", diff --git a/docs/root/api-v3/config/filter/http/http.rst b/docs/root/api-v3/config/filter/http/http.rst index 20f2c75664db..861a920b5a8e 100644 --- a/docs/root/api-v3/config/filter/http/http.rst +++ b/docs/root/api-v3/config/filter/http/http.rst @@ -6,4 +6,4 @@ HTTP filters :maxdepth: 2 */empty/* - ../../../extensions/filters/http/*/v3/* + ../../../extensions/filters/http/*/v3*/* diff --git a/docs/root/configuration/http/http_filters/_include/bandwidth-limit-filter.yaml b/docs/root/configuration/http/http_filters/_include/bandwidth-limit-filter.yaml new file mode 100644 index 000000000000..9e86c1e6663b --- /dev/null +++ b/docs/root/configuration/http/http_filters/_include/bandwidth-limit-filter.yaml @@ -0,0 +1,65 @@ +static_resources: + listeners: + - address: + socket_address: + address: 0.0.0.0 + port_value: 8000 + filter_chains: + - filters: + - name: envoy.filters.network.http_connection_manager + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager + codec_type: AUTO + stat_prefix: ingress_http + route_config: + name: local_route + virtual_hosts: + - name: local_service + domains: ["*"] + routes: + - match: { prefix: "/path/with/bandwidth/limit" } + route: { cluster: service_protected_by_bandwidth_limit } + typed_per_filter_config: + envoy.filters.http.bandwidth_limit: + "@type": type.googleapis.com/envoy.extensions.filters.http.bandwidth_limit.v3alpha.BandwidthLimit + stat_prefix: bandwidth_limiter_custom_route + enable_mode: REQUEST_AND_RESPONSE + limit_kbps: 500 + fill_interval: 0.1s + - match: { prefix: "/" } + route: { cluster: web_service } + http_filters: + - name: envoy.filters.http.bandwidth_limit + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.bandwidth_limit.v3alpha.BandwidthLimit + stat_prefix: bandwidth_limiter_default + - name: envoy.filters.http.router + typed_config: {} + + clusters: + - name: service_protected_by_bandwidth_limit + connect_timeout: 0.25s + type: STRICT_DNS + lb_policy: ROUND_ROBIN + load_assignment: + cluster_name: service1 + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: web_service + port_value: 9000 + - name: web_service + connect_timeout: 0.25s + type: STRICT_DNS + lb_policy: ROUND_ROBIN + load_assignment: + cluster_name: service1 + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: web_service + port_value: 9000 diff --git a/docs/root/configuration/http/http_filters/bandwidth_limit_filter.rst b/docs/root/configuration/http/http_filters/bandwidth_limit_filter.rst new file mode 100644 index 000000000000..4576e9d3ac36 --- /dev/null +++ b/docs/root/configuration/http/http_filters/bandwidth_limit_filter.rst @@ -0,0 +1,64 @@ +.. _config_http_filters_bandwidth_limit: + +Bandwidth limit +==================== + +* Bandwidth limiting :ref:`architecture overview ` +* :ref:`v3 API reference ` +* This filter should be configured with the name ``envoy.filters.http.bandwidth_limit``. + +The HTTP Bandwidth limit filter limits the size of data flow to the max bandwidth set in the ``limit_kbps`` +when the request's route, virtual host or filter chain has a +:ref:`bandwidth limit configuration `. + +If the bandwidth limit has been exhausted the filter stops further transfer until more bandwidth gets allocated +according to the ``fill_interval`` (default is 50 milliseconds). If the connection buffer fills up with accumulated +data then the source of data will have ``readDisable(true)`` set as described in the :repo:`flow control doc`. + +.. note:: + The token bucket is shared across all workers, thus the limits are applied per Envoy process. + +Example configuration +--------------------- + +Example filter configuration for a globally disabled bandwidth limiter but enabled for a specific route: + +.. literalinclude:: _include/bandwidth-limit-filter.yaml + :language: yaml + :lines: 11-53 + :emphasize-lines: 9-25 + :caption: :download:`bandwidth-limit-filter.yaml <_include/bandwidth-limit-filter.yaml>` + +Note that if this filter is configured as globally disabled and there are no virtual host or route level +token buckets, no bandwidth limiting will be applied. + +Statistics +---------- + +The HTTP bandwidth limit filter outputs statistics in the ``.http_bandwidth_limit.`` namespace. + +.. csv-table:: + :header: Name, Type, Description + :widths: 1, 1, 2 + + request_enabled, Counter, Total number of request streams for which the bandwidth limiter was consulted + request_pending, GAUGE, Number of request streams which are currently pending transfer in bandwidth limiter + request_incoming_size, GAUGE, Size in bytes of incoming request data to bandwidth limiter + request_allowed_size, GAUGE, Size in bytes of outgoing request data from bandwidth limiter + request_transfer_duration, HISTOGRAM, Total time (including added delay) it took for the request stream transfer + response_enabled, Counter, Total number of response streams for which the bandwidth limiter was consulted + response_pending, GAUGE, Number of response streams which are currently pending transfer in bandwidth limiter + response_incoming_size, GAUGE, Size in bytes of incoming response data to bandwidth limiter + response_allowed_size, GAUGE, Size in bytes of outgoing response data from bandwidth limiter + response_transfer_duration, HISTOGRAM, Total time (including added delay) it took for the response stream transfer + +.. _config_http_filters_bandwidth_limit_runtime: + +Runtime +------- + +The HTTP bandwidth limit filter supports the following runtime settings: + +The bandwidth limit filter can be runtime feature flagged via the :ref:`enabled +` +configuration field. diff --git a/docs/root/configuration/http/http_filters/http_filters.rst b/docs/root/configuration/http/http_filters/http_filters.rst index 0899edd07c1e..b983afd3fd17 100644 --- a/docs/root/configuration/http/http_filters/http_filters.rst +++ b/docs/root/configuration/http/http_filters/http_filters.rst @@ -10,6 +10,7 @@ HTTP filters admission_control_filter aws_lambda_filter aws_request_signing_filter + bandwidth_limit_filter buffer_filter cdn_loop_filter compressor_filter diff --git a/docs/root/intro/arch_overview/other_features/bandwidth_limiting.rst b/docs/root/intro/arch_overview/other_features/bandwidth_limiting.rst new file mode 100644 index 000000000000..146ce50cc863 --- /dev/null +++ b/docs/root/intro/arch_overview/other_features/bandwidth_limiting.rst @@ -0,0 +1,9 @@ +.. _arch_overview_bandwidth_limit: + +Bandwidth limiting +=================== + +Envoy supports local (non-distributed) bandwidth limiting of HTTP requests and response via the +:ref:`HTTP bandwidth limit filter `. This can be activated +globally at the listener level or at a more specific level (e.g.: the virtual host or route level). + diff --git a/docs/root/intro/arch_overview/other_features/other_features.rst b/docs/root/intro/arch_overview/other_features/other_features.rst index 2e2a2c054b74..be43344957d6 100644 --- a/docs/root/intro/arch_overview/other_features/other_features.rst +++ b/docs/root/intro/arch_overview/other_features/other_features.rst @@ -6,6 +6,7 @@ Other features local_rate_limiting global_rate_limiting + bandwidth_limiting scripting ip_transparency compression/libraries diff --git a/docs/root/version_history/current.rst b/docs/root/version_history/current.rst index cc29eecdeb2d..9d57fe3f19e5 100644 --- a/docs/root/version_history/current.rst +++ b/docs/root/version_history/current.rst @@ -62,7 +62,8 @@ Removed Config or Runtime New Features ------------ -* crash support: restore crash context when continuing to processing requests or responses as a result of an asynchronous callback that invokes a filter directly. This is unlike the call stacks that go through the various network layers, to eventually reach the filter. For a concrete example see: ``Envoy::Extensions::HttpFilters::Cache::CacheFilter::getHeaders`` which posts a callback on the dispatcher that will invoke the filter directly. +* bandwidth_limit: added new :ref:`HTTP bandwidth limit filter `. +* crash support: restore crash context when continuing to processing requests or responses as a result of an asynchronous callback that invokes a filter directly. This is unlike the call stacks that go through the various network layers, to eventually reach the filter. For a concrete example see: ``Envoy::Extensions::HttpFilters::Cache::CacheFilter::getHeaders`` which posts a callback on the dispatcher that will invoke the filter directly. * http: a new field `is_optional` is added to `extensions.filters.network.http_connection_manager.v3.HttpFilter`. When value is `true`, the unsupported http filter will be ignored by envoy. This is also same with unsupported http filter in the typed per filter config. For more information, please reference diff --git a/generated_api_shadow/BUILD b/generated_api_shadow/BUILD index 136e248fbf15..e051250b8075 100644 --- a/generated_api_shadow/BUILD +++ b/generated_api_shadow/BUILD @@ -177,6 +177,7 @@ proto_library( "//envoy/extensions/filters/http/admission_control/v3alpha:pkg", "//envoy/extensions/filters/http/aws_lambda/v3:pkg", "//envoy/extensions/filters/http/aws_request_signing/v3:pkg", + "//envoy/extensions/filters/http/bandwidth_limit/v3alpha:pkg", "//envoy/extensions/filters/http/buffer/v3:pkg", "//envoy/extensions/filters/http/cache/v3alpha:pkg", "//envoy/extensions/filters/http/cdn_loop/v3alpha:pkg", diff --git a/generated_api_shadow/envoy/extensions/filters/http/bandwidth_limit/v3alpha/BUILD b/generated_api_shadow/envoy/extensions/filters/http/bandwidth_limit/v3alpha/BUILD new file mode 100644 index 000000000000..1c1a6f6b4423 --- /dev/null +++ b/generated_api_shadow/envoy/extensions/filters/http/bandwidth_limit/v3alpha/BUILD @@ -0,0 +1,12 @@ +# DO NOT EDIT. This file is generated by tools/proto_format/proto_sync.py. + +load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package") + +licenses(["notice"]) # Apache 2 + +api_proto_package( + deps = [ + "//envoy/config/core/v3:pkg", + "@com_github_cncf_udpa//udpa/annotations:pkg", + ], +) diff --git a/generated_api_shadow/envoy/extensions/filters/http/bandwidth_limit/v3alpha/bandwidth_limit.proto b/generated_api_shadow/envoy/extensions/filters/http/bandwidth_limit/v3alpha/bandwidth_limit.proto new file mode 100644 index 000000000000..4cd5f8268b70 --- /dev/null +++ b/generated_api_shadow/envoy/extensions/filters/http/bandwidth_limit/v3alpha/bandwidth_limit.proto @@ -0,0 +1,70 @@ +syntax = "proto3"; + +package envoy.extensions.filters.http.bandwidth_limit.v3alpha; + +import "envoy/config/core/v3/base.proto"; + +import "google/protobuf/duration.proto"; +import "google/protobuf/wrappers.proto"; + +import "udpa/annotations/status.proto"; +import "validate/validate.proto"; + +option java_package = "io.envoyproxy.envoy.extensions.filters.http.bandwidth_limit.v3alpha"; +option java_outer_classname = "BandwidthLimitProto"; +option java_multiple_files = true; +option (udpa.annotations.file_status).work_in_progress = true; +option (udpa.annotations.file_status).package_version_status = ACTIVE; + +// [#protodoc-title: Bandwidth limit] +// Bandwidth limit :ref:`configuration overview `. +// [#extension: envoy.filters.http.bandwidth_limit] + +// [#next-free-field: 6] +message BandwidthLimit { + // Defines the mode for the bandwidth limit filter. + // Values represent bitmask. + enum EnableMode { + // Filter is disabled. + DISABLED = 0; + + // Filter enabled only for incoming traffic. + REQUEST = 1; + + // Filter enabled only for outgoing traffic. + RESPONSE = 2; + + // Filter enabled for both incoming and outgoing traffic. + REQUEST_AND_RESPONSE = 3; + } + + // The human readable prefix to use when emitting stats. + string stat_prefix = 1 [(validate.rules).string = {min_len: 1}]; + + // The enable mode for the bandwidth limit filter. + // Default is Disabled. + EnableMode enable_mode = 2 [(validate.rules).enum = {defined_only: true}]; + + // The limit supplied in KiB/s. + // + // .. note:: + // It's fine for the limit to be unset for the global configuration since the bandwidth limit + // can be applied at a the virtual host or route level. Thus, the limit must be set for the + // per route configuration otherwise the config will be rejected. + // + // .. note:: + // When using per route configuration, the limit becomes unique to that route. + // + google.protobuf.UInt64Value limit_kbps = 3 [(validate.rules).uint64 = {gte: 1}]; + + // Optional Fill interval in milliseconds for the token refills. Defaults to 50ms. + // It must be at least 20ms to avoid too aggressive refills. + google.protobuf.Duration fill_interval = 4 [(validate.rules).duration = { + lte {seconds: 1} + gte {nanos: 20000000} + }]; + + // Runtime flag that controls whether the filter is enabled or not. If not specified, defaults + // to enabled. + config.core.v3.RuntimeFeatureFlag runtime_enabled = 5; +} diff --git a/source/extensions/extensions_build_config.bzl b/source/extensions/extensions_build_config.bzl index 6f2b772c4660..d64a97928582 100644 --- a/source/extensions/extensions_build_config.bzl +++ b/source/extensions/extensions_build_config.bzl @@ -67,6 +67,7 @@ EXTENSIONS = { "envoy.filters.http.admission_control": "//source/extensions/filters/http/admission_control:config", "envoy.filters.http.aws_lambda": "//source/extensions/filters/http/aws_lambda:config", "envoy.filters.http.aws_request_signing": "//source/extensions/filters/http/aws_request_signing:config", + "envoy.filters.http.bandwidth_limit": "//source/extensions/filters/http/bandwidth_limit:config", "envoy.filters.http.buffer": "//source/extensions/filters/http/buffer:config", "envoy.filters.http.cache": "//source/extensions/filters/http/cache:config", "envoy.filters.http.cdn_loop": "//source/extensions/filters/http/cdn_loop:config", diff --git a/source/extensions/filters/http/bandwidth_limit/BUILD b/source/extensions/filters/http/bandwidth_limit/BUILD new file mode 100644 index 000000000000..437a2c679747 --- /dev/null +++ b/source/extensions/filters/http/bandwidth_limit/BUILD @@ -0,0 +1,48 @@ +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_extension", + "envoy_cc_library", + "envoy_extension_package", +) + +licenses(["notice"]) # Apache 2 + +# Local Bandwidthlimit HTTP L7 filter +# Public docs: docs/root/configuration/http_filters/bandwidth_limit_filter.rst + +envoy_extension_package() + +envoy_cc_library( + name = "bandwidth_limit_lib", + srcs = ["bandwidth_limit.cc"], + hdrs = ["bandwidth_limit.h"], + deps = [ + "//include/envoy/http:codes_interface", + "//include/envoy/server:filter_config_interface", + "//include/envoy/stats:stats_macros", + "//source/common/common:shared_token_bucket_impl_lib", + "//source/common/common:utility_lib", + "//source/common/http:header_utility_lib", + "//source/common/http:headers_lib", + "//source/common/router:header_parser_lib", + "//source/common/runtime:runtime_lib", + "//source/common/stats:timespan_lib", + "//source/extensions/filters/http/common:stream_rate_limiter_lib", + "@envoy_api//envoy/extensions/filters/http/bandwidth_limit/v3alpha:pkg_cc_proto", + ], +) + +envoy_cc_extension( + name = "config", + srcs = ["config.cc"], + hdrs = ["config.h"], + category = "envoy.filters.http", + security_posture = "unknown", + deps = [ + ":bandwidth_limit_lib", + "//include/envoy/http:filter_interface", + "//source/common/protobuf:utility_lib", + "//source/extensions/filters/http/common:factory_base_lib", + "@envoy_api//envoy/extensions/filters/http/bandwidth_limit/v3alpha:pkg_cc_proto", + ], +) diff --git a/source/extensions/filters/http/bandwidth_limit/bandwidth_limit.cc b/source/extensions/filters/http/bandwidth_limit/bandwidth_limit.cc new file mode 100644 index 000000000000..23735fe9b166 --- /dev/null +++ b/source/extensions/filters/http/bandwidth_limit/bandwidth_limit.cc @@ -0,0 +1,202 @@ +#include "extensions/filters/http/bandwidth_limit/bandwidth_limit.h" + +#include +#include + +#include "envoy/http/codes.h" + +#include "common/http/utility.h" +#include "common/stats/timespan_impl.h" + +using envoy::extensions::filters::http::bandwidth_limit::v3alpha::BandwidthLimit; +using Envoy::Extensions::HttpFilters::Common::StreamRateLimiter; + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace BandwidthLimitFilter { + +FilterConfig::FilterConfig(const BandwidthLimit& config, Stats::Scope& scope, + Runtime::Loader& runtime, TimeSource& time_source, bool per_route) + : runtime_(runtime), time_source_(time_source), enable_mode_(config.enable_mode()), + limit_kbps_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, limit_kbps, 0)), + fill_interval_(std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT( + config, fill_interval, StreamRateLimiter::DefaultFillInterval.count()))), + enabled_(config.runtime_enabled(), runtime), + stats_(generateStats(config.stat_prefix(), scope)) { + if (per_route && !config.has_limit_kbps()) { + throw EnvoyException("bandwidthlimitfilter: limit must be set for per route filter config"); + } + + // The token bucket is configured with a max token count of the number of + // bytes per second, and refills at the same rate, so that we have a per + // second limit which refills gradually in 1/fill_interval increments. + token_bucket_ = std::make_shared( + StreamRateLimiter::kiloBytesToBytes(limit_kbps_), time_source, + StreamRateLimiter::kiloBytesToBytes(limit_kbps_)); +} + +BandwidthLimitStats FilterConfig::generateStats(const std::string& prefix, Stats::Scope& scope) { + const std::string final_prefix = prefix + ".http_bandwidth_limit"; + return {ALL_BANDWIDTH_LIMIT_STATS(POOL_COUNTER_PREFIX(scope, final_prefix), + POOL_GAUGE_PREFIX(scope, final_prefix), + POOL_HISTOGRAM_PREFIX(scope, final_prefix))}; +} + +// BandwidthLimiter members + +Http::FilterHeadersStatus BandwidthLimiter::decodeHeaders(Http::RequestHeaderMap&, bool) { + const auto& config = getConfig(); + + if (config.enabled() && (config.enableMode() & BandwidthLimit::REQUEST)) { + config.stats().request_enabled_.inc(); + request_limiter_ = std::make_unique( + config.limit(), decoder_callbacks_->decoderBufferLimit(), + [this] { decoder_callbacks_->onDecoderFilterAboveWriteBufferHighWatermark(); }, + [this] { decoder_callbacks_->onDecoderFilterBelowWriteBufferLowWatermark(); }, + [this](Buffer::Instance& data, bool end_stream) { + if (end_stream) { + updateStatsOnDecodeFinish(); + } + decoder_callbacks_->injectDecodedDataToFilterChain(data, end_stream); + }, + [this] { + updateStatsOnDecodeFinish(); + decoder_callbacks_->continueDecoding(); + }, + [config](uint64_t len) { config.stats().request_allowed_size_.set(len); }, + const_cast(&config)->timeSource(), decoder_callbacks_->dispatcher(), + decoder_callbacks_->scope(), config.tokenBucket(), config.fillInterval()); + } + + return Http::FilterHeadersStatus::Continue; +} + +Http::FilterDataStatus BandwidthLimiter::decodeData(Buffer::Instance& data, bool end_stream) { + if (request_limiter_ != nullptr) { + const auto& config = getConfig(); + + if (!request_latency_) { + request_latency_ = std::make_unique( + config.stats().request_transfer_duration_, + const_cast(&config)->timeSource()); + config.stats().request_pending_.inc(); + } + config.stats().request_incoming_size_.set(data.length()); + + request_limiter_->writeData(data, end_stream); + return Http::FilterDataStatus::StopIterationNoBuffer; + } + ENVOY_LOG(debug, "BandwidthLimiter : request_limiter not set."); + return Http::FilterDataStatus::Continue; +} + +Http::FilterTrailersStatus BandwidthLimiter::decodeTrailers(Http::RequestTrailerMap&) { + if (request_limiter_ != nullptr) { + if (request_limiter_->onTrailers()) { + return Http::FilterTrailersStatus::StopIteration; + } else { + updateStatsOnDecodeFinish(); + return Http::FilterTrailersStatus::Continue; + } + } + return Http::FilterTrailersStatus::Continue; +} + +Http::FilterHeadersStatus BandwidthLimiter::encodeHeaders(Http::ResponseHeaderMap&, bool) { + auto& config = getConfig(); + + if (config.enabled() && (config.enableMode() & BandwidthLimit::RESPONSE)) { + config.stats().response_enabled_.inc(); + + response_limiter_ = std::make_unique( + config.limit(), encoder_callbacks_->encoderBufferLimit(), + [this] { encoder_callbacks_->onEncoderFilterAboveWriteBufferHighWatermark(); }, + [this] { encoder_callbacks_->onEncoderFilterBelowWriteBufferLowWatermark(); }, + [this](Buffer::Instance& data, bool end_stream) { + if (end_stream) { + updateStatsOnEncodeFinish(); + } + encoder_callbacks_->injectEncodedDataToFilterChain(data, end_stream); + }, + [this] { + updateStatsOnEncodeFinish(); + encoder_callbacks_->continueEncoding(); + }, + [config](uint64_t len) { config.stats().response_allowed_size_.set(len); }, + const_cast(&config)->timeSource(), encoder_callbacks_->dispatcher(), + encoder_callbacks_->scope(), config.tokenBucket(), config.fillInterval()); + } + + return Http::FilterHeadersStatus::Continue; +} + +Http::FilterDataStatus BandwidthLimiter::encodeData(Buffer::Instance& data, bool end_stream) { + if (response_limiter_ != nullptr) { + const auto& config = getConfig(); + + if (!response_latency_) { + response_latency_ = std::make_unique( + config.stats().response_transfer_duration_, + const_cast(&config)->timeSource()); + config.stats().response_pending_.inc(); + } + config.stats().response_incoming_size_.set(data.length()); + + response_limiter_->writeData(data, end_stream); + return Http::FilterDataStatus::StopIterationNoBuffer; + } + ENVOY_LOG(debug, "BandwidthLimiter : response_limiter not set"); + return Http::FilterDataStatus::Continue; +} + +Http::FilterTrailersStatus BandwidthLimiter::encodeTrailers(Http::ResponseTrailerMap&) { + if (response_limiter_ != nullptr) { + if (response_limiter_->onTrailers()) { + return Http::FilterTrailersStatus::StopIteration; + } else { + updateStatsOnEncodeFinish(); + return Http::FilterTrailersStatus::Continue; + } + } + return Http::FilterTrailersStatus::Continue; +} + +void BandwidthLimiter::updateStatsOnDecodeFinish() { + if (request_latency_) { + request_latency_->complete(); + request_latency_.reset(); + getConfig().stats().request_pending_.dec(); + } +} + +void BandwidthLimiter::updateStatsOnEncodeFinish() { + if (response_latency_) { + response_latency_->complete(); + response_latency_.reset(); + getConfig().stats().response_pending_.dec(); + } +} + +const FilterConfig& BandwidthLimiter::getConfig() const { + const auto* config = Http::Utility::resolveMostSpecificPerFilterConfig( + "envoy.filters.http.bandwidth_limit", decoder_callbacks_->route()); + if (config) { + return *config; + } + return *config_; +} + +void BandwidthLimiter::onDestroy() { + if (request_limiter_ != nullptr) { + request_limiter_->destroy(); + } + if (response_limiter_ != nullptr) { + response_limiter_->destroy(); + } +} + +} // namespace BandwidthLimitFilter +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/http/bandwidth_limit/bandwidth_limit.h b/source/extensions/filters/http/bandwidth_limit/bandwidth_limit.h new file mode 100644 index 000000000000..d3b44e75380a --- /dev/null +++ b/source/extensions/filters/http/bandwidth_limit/bandwidth_limit.h @@ -0,0 +1,151 @@ +#pragma once + +#include +#include +#include +#include + +#include "envoy/extensions/filters/http/bandwidth_limit/v3alpha/bandwidth_limit.pb.h" +#include "envoy/http/filter.h" +#include "envoy/runtime/runtime.h" +#include "envoy/stats/scope.h" +#include "envoy/stats/stats_macros.h" +#include "envoy/stats/timespan.h" + +#include "common/common/assert.h" +#include "common/common/shared_token_bucket_impl.h" +#include "common/http/header_map_impl.h" +#include "common/router/header_parser.h" +#include "common/runtime/runtime_protos.h" + +#include "extensions/filters/http/common/stream_rate_limiter.h" + +#include "absl/synchronization/mutex.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace BandwidthLimitFilter { + +/** + * All bandwidth limit stats. @see stats_macros.h + */ +#define ALL_BANDWIDTH_LIMIT_STATS(COUNTER, GAUGE, HISTOGRAM) \ + COUNTER(request_enabled) \ + COUNTER(response_enabled) \ + GAUGE(request_pending, Accumulate) \ + GAUGE(response_pending, Accumulate) \ + GAUGE(request_incoming_size, Accumulate) \ + GAUGE(response_incoming_size, Accumulate) \ + GAUGE(request_allowed_size, Accumulate) \ + GAUGE(response_allowed_size, Accumulate) \ + HISTOGRAM(request_transfer_duration, Milliseconds) \ + HISTOGRAM(response_transfer_duration, Milliseconds) + +/** + * Struct definition for all bandwidth limit stats. @see stats_macros.h + */ +struct BandwidthLimitStats { + ALL_BANDWIDTH_LIMIT_STATS(GENERATE_COUNTER_STRUCT, GENERATE_GAUGE_STRUCT, + GENERATE_HISTOGRAM_STRUCT) +}; + +/** + * Configuration for the HTTP bandwidth limit filter. + */ +class FilterConfig : public ::Envoy::Router::RouteSpecificFilterConfig { +public: + using EnableMode = + envoy::extensions::filters::http::bandwidth_limit::v3alpha::BandwidthLimit_EnableMode; + + FilterConfig( + const envoy::extensions::filters::http::bandwidth_limit::v3alpha::BandwidthLimit& config, + Stats::Scope& scope, Runtime::Loader& runtime, TimeSource& time_source, + bool per_route = false); + ~FilterConfig() override = default; + Runtime::Loader& runtime() { return runtime_; } + BandwidthLimitStats& stats() const { return stats_; } + TimeSource& timeSource() { return time_source_; } + // Must call enabled() before calling limit(). + uint64_t limit() const { return limit_kbps_; } + bool enabled() const { return enabled_.enabled(); } + EnableMode enableMode() const { return enable_mode_; }; + const std::shared_ptr tokenBucket() const { return token_bucket_; } + std::chrono::milliseconds fillInterval() const { return fill_interval_; } + +private: + friend class FilterTest; + + static BandwidthLimitStats generateStats(const std::string& prefix, Stats::Scope& scope); + + Runtime::Loader& runtime_; + TimeSource& time_source_; + const EnableMode enable_mode_; + const uint64_t limit_kbps_; + const std::chrono::milliseconds fill_interval_; + const Runtime::FeatureFlag enabled_; + mutable BandwidthLimitStats stats_; + // Filter chain's shared token bucket + std::shared_ptr token_bucket_; +}; + +using FilterConfigSharedPtr = std::shared_ptr; + +/** + * HTTP bandwidth limit filter. Depending on the route configuration, this + * filter calls consults with local token bucket before allowing further filter + * iteration. + */ +class BandwidthLimiter : public Http::StreamFilter, Logger::Loggable { +public: + BandwidthLimiter(FilterConfigSharedPtr config) : config_(config) {} + + // Http::StreamDecoderFilter + Http::FilterHeadersStatus decodeHeaders(Http::RequestHeaderMap&, bool) override; + Http::FilterDataStatus decodeData(Buffer::Instance& data, bool end_stream) override; + Http::FilterTrailersStatus decodeTrailers(Http::RequestTrailerMap& trailers) override; + + void setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) override { + decoder_callbacks_ = &callbacks; + } + + // Http::StreamEncoderFilter + Http::FilterHeadersStatus encode100ContinueHeaders(Http::ResponseHeaderMap&) override { + return Http::FilterHeadersStatus::Continue; + } + + Http::FilterHeadersStatus encodeHeaders(Http::ResponseHeaderMap&, bool) override; + Http::FilterDataStatus encodeData(Buffer::Instance& data, bool end_stream) override; + Http::FilterTrailersStatus encodeTrailers(Http::ResponseTrailerMap&) override; + + Http::FilterMetadataStatus encodeMetadata(Http::MetadataMap&) override { + return Http::FilterMetadataStatus::Continue; + } + + void setEncoderFilterCallbacks(Http::StreamEncoderFilterCallbacks& callbacks) override { + encoder_callbacks_ = &callbacks; + } + + // Http::StreamFilterBase + void onDestroy() override; + +private: + friend class FilterTest; + const FilterConfig& getConfig() const; + + void updateStatsOnDecodeFinish(); + void updateStatsOnEncodeFinish(); + + Http::StreamDecoderFilterCallbacks* decoder_callbacks_{}; + Http::StreamEncoderFilterCallbacks* encoder_callbacks_{}; + FilterConfigSharedPtr config_; + std::unique_ptr request_limiter_; + std::unique_ptr response_limiter_; + Stats::TimespanPtr request_latency_; + Stats::TimespanPtr response_latency_; +}; + +} // namespace BandwidthLimitFilter +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/http/bandwidth_limit/config.cc b/source/extensions/filters/http/bandwidth_limit/config.cc new file mode 100644 index 000000000000..2ed2118fea45 --- /dev/null +++ b/source/extensions/filters/http/bandwidth_limit/config.cc @@ -0,0 +1,43 @@ +#include "extensions/filters/http/bandwidth_limit/config.h" + +#include + +#include "envoy/registry/registry.h" + +#include "common/protobuf/utility.h" + +#include "extensions/filters/http/bandwidth_limit/bandwidth_limit.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace BandwidthLimitFilter { + +Http::FilterFactoryCb BandwidthLimitFilterConfig::createFilterFactoryFromProtoTyped( + const envoy::extensions::filters::http::bandwidth_limit::v3alpha::BandwidthLimit& proto_config, + const std::string&, Server::Configuration::FactoryContext& context) { + FilterConfigSharedPtr filter_config = std::make_shared( + proto_config, context.scope(), context.runtime(), context.timeSource()); + return [filter_config](Http::FilterChainFactoryCallbacks& callbacks) -> void { + callbacks.addStreamFilter(std::make_shared(filter_config)); + }; +} + +Router::RouteSpecificFilterConfigConstSharedPtr +BandwidthLimitFilterConfig::createRouteSpecificFilterConfigTyped( + const envoy::extensions::filters::http::bandwidth_limit::v3alpha::BandwidthLimit& proto_config, + Server::Configuration::ServerFactoryContext& context, ProtobufMessage::ValidationVisitor&) { + return std::make_shared(proto_config, context.scope(), context.runtime(), + context.timeSource(), true); +} + +/** + * Static registration for the bandwidth limit filter. @see RegisterFactory. + */ +REGISTER_FACTORY(BandwidthLimitFilterConfig, + Server::Configuration::NamedHttpFilterConfigFactory){"envoy.bandwidth_limit"}; + +} // namespace BandwidthLimitFilter +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/http/bandwidth_limit/config.h b/source/extensions/filters/http/bandwidth_limit/config.h new file mode 100644 index 000000000000..e9945bbf745d --- /dev/null +++ b/source/extensions/filters/http/bandwidth_limit/config.h @@ -0,0 +1,37 @@ +#pragma once + +#include "envoy/extensions/filters/http/bandwidth_limit/v3alpha/bandwidth_limit.pb.h" +#include "envoy/extensions/filters/http/bandwidth_limit/v3alpha/bandwidth_limit.pb.validate.h" + +#include "extensions/filters/http/common/factory_base.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace BandwidthLimitFilter { + +/** + * Config registration for the bandwidth limit filter. @see NamedHttpFilterConfigFactory. + */ +class BandwidthLimitFilterConfig + : public Common::FactoryBase< + envoy::extensions::filters::http::bandwidth_limit::v3alpha::BandwidthLimit> { +public: + BandwidthLimitFilterConfig() : FactoryBase("envoy.filters.http.bandwidth_limit") {} + +private: + Http::FilterFactoryCb createFilterFactoryFromProtoTyped( + const envoy::extensions::filters::http::bandwidth_limit::v3alpha::BandwidthLimit& + proto_config, + const std::string& stats_prefix, Server::Configuration::FactoryContext& context) override; + + Router::RouteSpecificFilterConfigConstSharedPtr createRouteSpecificFilterConfigTyped( + const envoy::extensions::filters::http::bandwidth_limit::v3alpha::BandwidthLimit& + proto_config, + Server::Configuration::ServerFactoryContext&, ProtobufMessage::ValidationVisitor&) override; +}; + +} // namespace BandwidthLimitFilter +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/http/well_known_names.h b/source/extensions/filters/http/well_known_names.h index d1904f868673..609e13f5bb49 100644 --- a/source/extensions/filters/http/well_known_names.h +++ b/source/extensions/filters/http/well_known_names.h @@ -14,6 +14,8 @@ class HttpFilterNameValues { public: // Buffer filter const std::string Buffer = "envoy.filters.http.buffer"; + // Bandwidth limit filter + const std::string BandwidthLimit = "envoy.filters.http.bandwidth_limit"; // Cache filter const std::string Cache = "envoy.filters.http.cache"; // CDN Loop filter diff --git a/test/extensions/filters/http/bandwidth_limit/BUILD b/test/extensions/filters/http/bandwidth_limit/BUILD new file mode 100644 index 000000000000..ec46a1a91780 --- /dev/null +++ b/test/extensions/filters/http/bandwidth_limit/BUILD @@ -0,0 +1,38 @@ +load( + "//bazel:envoy_build_system.bzl", + "envoy_package", +) +load( + "//test/extensions:extensions_build_system.bzl", + "envoy_extension_cc_test", +) + +licenses(["notice"]) # Apache 2 + +envoy_package() + +envoy_extension_cc_test( + name = "filter_test", + srcs = ["filter_test.cc"], + extension_name = "envoy.filters.http.bandwidth_limit", + deps = [ + "//source/common/common:utility_lib", + "//source/common/http:header_utility_lib", + "//source/common/http:headers_lib", + "//source/common/router:header_parser_lib", + "//source/common/runtime:runtime_lib", + "//source/extensions/filters/http/bandwidth_limit:bandwidth_limit_lib", + "//test/mocks/server:server_mocks", + "@envoy_api//envoy/extensions/filters/http/bandwidth_limit/v3alpha:pkg_cc_proto", + ], +) + +envoy_extension_cc_test( + name = "config_test", + srcs = ["config_test.cc"], + extension_name = "envoy.filters.http.bandwidth_limit", + deps = [ + "//source/extensions/filters/http/bandwidth_limit:config", + "//test/mocks/server:server_mocks", + ], +) diff --git a/test/extensions/filters/http/bandwidth_limit/config_test.cc b/test/extensions/filters/http/bandwidth_limit/config_test.cc new file mode 100644 index 000000000000..29675d090fa9 --- /dev/null +++ b/test/extensions/filters/http/bandwidth_limit/config_test.cc @@ -0,0 +1,119 @@ +#include "extensions/filters/http/bandwidth_limit/bandwidth_limit.h" +#include "extensions/filters/http/bandwidth_limit/config.h" + +#include "test/mocks/server/mocks.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace BandwidthLimitFilter { + +using EnableMode = + envoy::extensions::filters::http::bandwidth_limit::v3alpha::BandwidthLimit_EnableMode; + +TEST(Factory, GlobalEmptyConfig) { + const std::string yaml = R"( + stat_prefix: test + )"; + + BandwidthLimitFilterConfig factory; + ProtobufTypes::MessagePtr proto_config = factory.createEmptyRouteConfigProto(); + TestUtility::loadFromYaml(yaml, *proto_config); + + NiceMock context; + + EXPECT_CALL(context.dispatcher_, createTimer_(_)).Times(0); + auto callback = factory.createFilterFactoryFromProto(*proto_config, "stats", context); + Http::MockFilterChainFactoryCallbacks filter_callback; + EXPECT_CALL(filter_callback, addStreamFilter(_)); + callback(filter_callback); +} + +TEST(Factory, RouteSpecificFilterConfig) { + const std::string config_yaml = R"( + stat_prefix: test + enable_mode: REQUEST_AND_RESPONSE + limit_kbps: 10 + fill_interval: 0.1s + )"; + + BandwidthLimitFilterConfig factory; + ProtobufTypes::MessagePtr proto_config = factory.createEmptyRouteConfigProto(); + TestUtility::loadFromYaml(config_yaml, *proto_config); + + NiceMock context; + + EXPECT_CALL(context.dispatcher_, createTimer_(_)).Times(0); + const auto route_config = factory.createRouteSpecificFilterConfig( + *proto_config, context, ProtobufMessage::getNullValidationVisitor()); + const auto* config = dynamic_cast(route_config.get()); + EXPECT_EQ(config->limit(), 10); + EXPECT_EQ(config->fillInterval().count(), 100); + EXPECT_EQ(config->enableMode(), EnableMode::BandwidthLimit_EnableMode_REQUEST_AND_RESPONSE); + EXPECT_FALSE(config->tokenBucket() == nullptr); +} + +TEST(Factory, RouteSpecificFilterConfigDisabledByDefault) { + const std::string config_yaml = R"( + stat_prefix: test + limit_kbps: 10 + fill_interval: 0.1s + )"; + + BandwidthLimitFilterConfig factory; + ProtobufTypes::MessagePtr proto_config = factory.createEmptyRouteConfigProto(); + TestUtility::loadFromYaml(config_yaml, *proto_config); + + NiceMock context; + + EXPECT_CALL(context.dispatcher_, createTimer_(_)).Times(0); + const auto route_config = factory.createRouteSpecificFilterConfig( + *proto_config, context, ProtobufMessage::getNullValidationVisitor()); + const auto* config = dynamic_cast(route_config.get()); + EXPECT_EQ(config->enableMode(), EnableMode::BandwidthLimit_EnableMode_DISABLED); + EXPECT_EQ(config->limit(), 10); + EXPECT_EQ(config->fillInterval().count(), 100); +} + +TEST(Factory, RouteSpecificFilterConfigDefaultFillInterval) { + const std::string config_yaml = R"( + stat_prefix: test + enable_mode: REQUEST_AND_RESPONSE + limit_kbps: 10 + )"; + + BandwidthLimitFilterConfig factory; + ProtobufTypes::MessagePtr proto_config = factory.createEmptyRouteConfigProto(); + TestUtility::loadFromYaml(config_yaml, *proto_config); + + NiceMock context; + + EXPECT_CALL(context.dispatcher_, createTimer_(_)).Times(0); + const auto route_config = factory.createRouteSpecificFilterConfig( + *proto_config, context, ProtobufMessage::getNullValidationVisitor()); + const auto* config = dynamic_cast(route_config.get()); + EXPECT_EQ(config->limit(), 10); + EXPECT_EQ(config->fillInterval().count(), 50); +} + +TEST(Factory, PerRouteConfigNoLimits) { + const std::string config_yaml = R"( + stat_prefix: test + )"; + + BandwidthLimitFilterConfig factory; + ProtobufTypes::MessagePtr proto_config = factory.createEmptyRouteConfigProto(); + TestUtility::loadFromYaml(config_yaml, *proto_config); + + NiceMock context; + EXPECT_THROW(factory.createRouteSpecificFilterConfig(*proto_config, context, + ProtobufMessage::getNullValidationVisitor()), + EnvoyException); +} +} // namespace BandwidthLimitFilter +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/test/extensions/filters/http/bandwidth_limit/filter_test.cc b/test/extensions/filters/http/bandwidth_limit/filter_test.cc new file mode 100644 index 000000000000..497a9472f8d3 --- /dev/null +++ b/test/extensions/filters/http/bandwidth_limit/filter_test.cc @@ -0,0 +1,558 @@ +#include "envoy/extensions/filters/http/bandwidth_limit/v3alpha/bandwidth_limit.pb.h" + +#include "extensions/filters/http/bandwidth_limit/bandwidth_limit.h" + +#include "test/mocks/http/mocks.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +using testing::_; +using testing::AnyNumber; +using testing::NiceMock; +using testing::Return; + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace BandwidthLimitFilter { + +class FilterTest : public testing::Test { +public: + FilterTest() = default; + + void setup(const std::string& yaml) { + envoy::extensions::filters::http::bandwidth_limit::v3alpha::BandwidthLimit config; + TestUtility::loadFromYaml(yaml, config); + config_ = std::make_shared(config, stats_, runtime_, time_system_, true); + filter_ = std::make_shared(config_); + filter_->setDecoderFilterCallbacks(decoder_filter_callbacks_); + filter_->setEncoderFilterCallbacks(encoder_filter_callbacks_); + + EXPECT_CALL(decoder_filter_callbacks_.dispatcher_, pushTrackedObject(_)).Times(AnyNumber()); + EXPECT_CALL(decoder_filter_callbacks_.dispatcher_, popTrackedObject(_)).Times(AnyNumber()); + EXPECT_CALL(encoder_filter_callbacks_.dispatcher_, pushTrackedObject(_)).Times(AnyNumber()); + EXPECT_CALL(encoder_filter_callbacks_.dispatcher_, popTrackedObject(_)).Times(AnyNumber()); + } + + uint64_t findCounter(const std::string& name) { + const auto counter = TestUtility::findCounter(stats_, name); + return counter != nullptr ? counter->value() : 0; + } + + uint64_t findGauge(const std::string& name) { + const auto gauge = TestUtility::findGauge(stats_, name); + return gauge != nullptr ? gauge->value() : 0; + } + + NiceMock stats_; + NiceMock decoder_filter_callbacks_; + NiceMock encoder_filter_callbacks_; + NiceMock runtime_; + std::shared_ptr config_; + std::shared_ptr filter_; + Http::TestRequestHeaderMapImpl request_headers_; + Http::TestRequestTrailerMapImpl request_trailers_; + Http::TestResponseHeaderMapImpl response_headers_; + Http::TestResponseTrailerMapImpl response_trailers_; + Buffer::OwnedImpl data_; + Event::SimulatedTimeSystem time_system_; +}; + +TEST_F(FilterTest, Disabled) { + const std::string config_yaml = R"( + stat_prefix: test + runtime_enabled: + default_value: false + runtime_key: foo_key + enable_mode: DISABLED + limit_kbps: 10 + fill_interval: 1s + )"; + setup(fmt::format(config_yaml, "1")); + + EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_->decodeHeaders(request_headers_, false)); + EXPECT_EQ(Http::FilterDataStatus::Continue, filter_->decodeData(data_, false)); + EXPECT_EQ(Http::FilterTrailersStatus::Continue, filter_->decodeTrailers(request_trailers_)); + EXPECT_EQ(0U, findCounter("test.http_bandwidth_limit.request_enabled")); + + EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_->encodeHeaders(response_headers_, false)); + EXPECT_EQ(Http::FilterDataStatus::Continue, filter_->encodeData(data_, false)); + EXPECT_EQ(Http::FilterTrailersStatus::Continue, filter_->encodeTrailers(response_trailers_)); + EXPECT_EQ(0U, findCounter("test.http_bandwidth_limit.response_enabled")); +} + +TEST_F(FilterTest, LimitOnDecode) { + const std::string config_yaml = R"( + stat_prefix: test + runtime_enabled: + default_value: true + runtime_key: foo_key + enable_mode: REQUEST + limit_kbps: 1 + )"; + setup(fmt::format(config_yaml, "1")); + + ON_CALL(decoder_filter_callbacks_, decoderBufferLimit()).WillByDefault(Return(1100)); + Event::MockTimer* token_timer = + new NiceMock(&decoder_filter_callbacks_.dispatcher_); + EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_->decodeHeaders(request_headers_, true)); + + EXPECT_EQ(1U, findCounter("test.http_bandwidth_limit.request_enabled")); + EXPECT_EQ(1UL, config_->limit()); + EXPECT_EQ(50UL, config_->fillInterval().count()); + + // Send a small amount of data which should be within limit. + Buffer::OwnedImpl data1("hello"); + EXPECT_CALL(*token_timer, enableTimer(std::chrono::milliseconds(0), _)); + EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer, filter_->decodeData(data1, false)); + EXPECT_EQ(1, findGauge("test.http_bandwidth_limit.request_pending")); + EXPECT_EQ(5, findGauge("test.http_bandwidth_limit.request_incoming_size")); + EXPECT_CALL(decoder_filter_callbacks_, + injectDecodedDataToFilterChain(BufferStringEqual("hello"), false)); + token_timer->invokeCallback(); + EXPECT_EQ(5, findGauge("test.http_bandwidth_limit.request_allowed_size")); + + // Advance time by 1s which should refill all tokens. + time_system_.advanceTimeWait(std::chrono::seconds(1)); + + // Send 1126 bytes of data which is 1s + 2 refill cycles of data. + EXPECT_CALL(decoder_filter_callbacks_, onDecoderFilterAboveWriteBufferHighWatermark()); + EXPECT_CALL(*token_timer, enableTimer(std::chrono::milliseconds(0), _)); + Buffer::OwnedImpl data2(std::string(1126, 'a')); + EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer, filter_->decodeData(data2, false)); + EXPECT_EQ(1, findGauge("test.http_bandwidth_limit.request_pending")); + EXPECT_EQ(1126, findGauge("test.http_bandwidth_limit.request_incoming_size")); + + EXPECT_CALL(*token_timer, enableTimer(std::chrono::milliseconds(50), _)); + EXPECT_CALL(decoder_filter_callbacks_, onDecoderFilterBelowWriteBufferLowWatermark()); + EXPECT_CALL(decoder_filter_callbacks_, + injectDecodedDataToFilterChain(BufferStringEqual(std::string(1024, 'a')), false)); + token_timer->invokeCallback(); + EXPECT_EQ(1024, findGauge("test.http_bandwidth_limit.request_allowed_size")); + EXPECT_EQ(1126, findGauge("test.http_bandwidth_limit.request_incoming_size")); + + // Fire timer, also advance time. + time_system_.advanceTimeWait(std::chrono::milliseconds(50)); + EXPECT_CALL(*token_timer, enableTimer(std::chrono::milliseconds(50), _)); + EXPECT_CALL(decoder_filter_callbacks_, + injectDecodedDataToFilterChain(BufferStringEqual(std::string(51, 'a')), false)); + token_timer->invokeCallback(); + EXPECT_EQ(51, findGauge("test.http_bandwidth_limit.request_allowed_size")); + EXPECT_EQ(1126, findGauge("test.http_bandwidth_limit.request_incoming_size")); + + // Get new data with current data buffered, not end_stream. + Buffer::OwnedImpl data3(std::string(51, 'b')); + EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer, filter_->decodeData(data3, false)); + EXPECT_EQ(1, findGauge("test.http_bandwidth_limit.request_pending")); + EXPECT_EQ(51, findGauge("test.http_bandwidth_limit.request_incoming_size")); + + // Fire timer, also advance time. + time_system_.advanceTimeWait(std::chrono::milliseconds(50)); + EXPECT_CALL(*token_timer, enableTimer(std::chrono::milliseconds(50), _)); + EXPECT_CALL(decoder_filter_callbacks_, + injectDecodedDataToFilterChain(BufferStringEqual(std::string(51, 'a')), false)); + token_timer->invokeCallback(); + EXPECT_EQ(51, findGauge("test.http_bandwidth_limit.request_allowed_size")); + + // Fire timer, also advance time. No timer enable because there is nothing + // buffered. + time_system_.advanceTimeWait(std::chrono::milliseconds(50)); + EXPECT_CALL(decoder_filter_callbacks_, + injectDecodedDataToFilterChain(BufferStringEqual(std::string(51, 'b')), false)); + token_timer->invokeCallback(); + EXPECT_EQ(51, findGauge("test.http_bandwidth_limit.request_allowed_size")); + + // Advance time by 1s for a full refill. + time_system_.advanceTimeWait(std::chrono::seconds(1)); + + // Now send 1024 in one shot with end_stream true which should go through and + // end the stream. + EXPECT_CALL(*token_timer, enableTimer(std::chrono::milliseconds(0), _)); + Buffer::OwnedImpl data4(std::string(1024, 'c')); + EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer, filter_->decodeData(data4, true)); + EXPECT_EQ(1024, findGauge("test.http_bandwidth_limit.request_incoming_size")); + EXPECT_CALL(decoder_filter_callbacks_, + injectDecodedDataToFilterChain(BufferStringEqual(std::string(1024, 'c')), true)); + token_timer->invokeCallback(); + EXPECT_EQ(1024, findGauge("test.http_bandwidth_limit.request_allowed_size")); + EXPECT_EQ(0, findGauge("test.http_bandwidth_limit.request_pending")); + + filter_->onDestroy(); +} + +TEST_F(FilterTest, LimitOnEncode) { + const std::string config_yaml = R"( + stat_prefix: test + runtime_enabled: + default_value: true + runtime_key: foo_key + enable_mode: RESPONSE + limit_kbps: 1 + )"; + setup(fmt::format(config_yaml, "1")); + + ON_CALL(encoder_filter_callbacks_, encoderBufferLimit()).WillByDefault(Return(1100)); + Event::MockTimer* token_timer = + new NiceMock(&encoder_filter_callbacks_.dispatcher_); + + EXPECT_EQ(1UL, config_->limit()); + EXPECT_EQ(50UL, config_->fillInterval().count()); + + EXPECT_EQ(Http::FilterHeadersStatus::Continue, + filter_->encode100ContinueHeaders(response_headers_)); + Http::MetadataMap metadata_map; + EXPECT_EQ(Http::FilterMetadataStatus::Continue, filter_->encodeMetadata(metadata_map)); + EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_->encodeHeaders(response_headers_, false)); + EXPECT_EQ(1U, findCounter("test.http_bandwidth_limit.response_enabled")); + + // Send a small amount of data which should be within limit. + Buffer::OwnedImpl data1("hello"); + EXPECT_CALL(*token_timer, enableTimer(std::chrono::milliseconds(0), _)); + EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer, filter_->encodeData(data1, false)); + EXPECT_EQ(1, findGauge("test.http_bandwidth_limit.response_pending")); + EXPECT_EQ(5, findGauge("test.http_bandwidth_limit.response_incoming_size")); + EXPECT_CALL(encoder_filter_callbacks_, + injectEncodedDataToFilterChain(BufferStringEqual("hello"), false)); + token_timer->invokeCallback(); + EXPECT_EQ(5, findGauge("test.http_bandwidth_limit.response_allowed_size")); + + // Advance time by 1s which should refill all tokens. + time_system_.advanceTimeWait(std::chrono::seconds(1)); + + // Send 1126 bytes of data which is 1s + 2 refill cycles of data. + EXPECT_CALL(encoder_filter_callbacks_, onEncoderFilterAboveWriteBufferHighWatermark()); + EXPECT_CALL(*token_timer, enableTimer(std::chrono::milliseconds(0), _)); + Buffer::OwnedImpl data2(std::string(1126, 'a')); + EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer, filter_->encodeData(data2, false)); + EXPECT_EQ(1126, findGauge("test.http_bandwidth_limit.response_incoming_size")); + + EXPECT_CALL(*token_timer, enableTimer(std::chrono::milliseconds(50), _)); + EXPECT_CALL(encoder_filter_callbacks_, onEncoderFilterBelowWriteBufferLowWatermark()); + EXPECT_CALL(encoder_filter_callbacks_, + injectEncodedDataToFilterChain(BufferStringEqual(std::string(1024, 'a')), false)); + token_timer->invokeCallback(); + EXPECT_EQ(1, findGauge("test.http_bandwidth_limit.response_pending")); + EXPECT_EQ(1126, findGauge("test.http_bandwidth_limit.response_incoming_size")); + EXPECT_EQ(1024, findGauge("test.http_bandwidth_limit.response_allowed_size")); + + // Fire timer, also advance time. + time_system_.advanceTimeWait(std::chrono::milliseconds(50)); + EXPECT_CALL(*token_timer, enableTimer(std::chrono::milliseconds(50), _)); + EXPECT_CALL(encoder_filter_callbacks_, + injectEncodedDataToFilterChain(BufferStringEqual(std::string(51, 'a')), false)); + token_timer->invokeCallback(); + EXPECT_EQ(51, findGauge("test.http_bandwidth_limit.response_allowed_size")); + + // Get new data with current data buffered, not end_stream. + Buffer::OwnedImpl data3(std::string(51, 'b')); + EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer, filter_->encodeData(data3, false)); + + // Fire timer, also advance time. + time_system_.advanceTimeWait(std::chrono::milliseconds(50)); + EXPECT_CALL(*token_timer, enableTimer(std::chrono::milliseconds(50), _)); + EXPECT_CALL(encoder_filter_callbacks_, + injectEncodedDataToFilterChain(BufferStringEqual(std::string(51, 'a')), false)); + token_timer->invokeCallback(); + EXPECT_EQ(51, findGauge("test.http_bandwidth_limit.response_allowed_size")); + + // Fire timer, also advance time. No time enable because there is nothing + // buffered. + time_system_.advanceTimeWait(std::chrono::milliseconds(50)); + EXPECT_CALL(encoder_filter_callbacks_, + injectEncodedDataToFilterChain(BufferStringEqual(std::string(51, 'b')), false)); + token_timer->invokeCallback(); + EXPECT_EQ(51, findGauge("test.http_bandwidth_limit.response_allowed_size")); + + // Advance time by 1s for a full refill. + time_system_.advanceTimeWait(std::chrono::seconds(1)); + + // Now send 1024 in one shot with end_stream true which should go through and + // end the stream. + EXPECT_CALL(*token_timer, enableTimer(std::chrono::milliseconds(0), _)); + Buffer::OwnedImpl data4(std::string(1024, 'c')); + EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer, filter_->encodeData(data4, true)); + EXPECT_EQ(1024, findGauge("test.http_bandwidth_limit.response_incoming_size")); + EXPECT_CALL(encoder_filter_callbacks_, + injectEncodedDataToFilterChain(BufferStringEqual(std::string(1024, 'c')), true)); + token_timer->invokeCallback(); + EXPECT_EQ(0, findGauge("test.http_bandwidth_limit.response_pending")); + EXPECT_EQ(1024, findGauge("test.http_bandwidth_limit.response_allowed_size")); + + filter_->onDestroy(); +} + +TEST_F(FilterTest, LimitOnDecodeAndEncode) { + const std::string config_yaml = R"( + stat_prefix: test + runtime_enabled: + default_value: true + runtime_key: foo_key + enable_mode: REQUEST_AND_RESPONSE + limit_kbps: 1 + )"; + setup(fmt::format(config_yaml, "1")); + + ON_CALL(decoder_filter_callbacks_, decoderBufferLimit()).WillByDefault(Return(1050)); + ON_CALL(encoder_filter_callbacks_, encoderBufferLimit()).WillByDefault(Return(1100)); + Event::MockTimer* request_timer = + new NiceMock(&decoder_filter_callbacks_.dispatcher_); + Event::MockTimer* response_timer = + new NiceMock(&encoder_filter_callbacks_.dispatcher_); + + EXPECT_EQ(1UL, config_->limit()); + EXPECT_EQ(50UL, config_->fillInterval().count()); + + EXPECT_EQ(Http::FilterHeadersStatus::Continue, + filter_->encode100ContinueHeaders(response_headers_)); + Http::MetadataMap metadata_map; + EXPECT_EQ(Http::FilterMetadataStatus::Continue, filter_->decodeMetadata(metadata_map)); + EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_->decodeHeaders(request_headers_, false)); + EXPECT_EQ(Http::FilterMetadataStatus::Continue, filter_->encodeMetadata(metadata_map)); + EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_->encodeHeaders(response_headers_, false)); + + // Send small amount of data from both sides which should be within initial + // bucket limit. + Buffer::OwnedImpl dec_data1("hello"); + EXPECT_CALL(*request_timer, enableTimer(std::chrono::milliseconds(0), _)); + EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer, filter_->decodeData(dec_data1, false)); + EXPECT_CALL(decoder_filter_callbacks_, + injectDecodedDataToFilterChain(BufferStringEqual("hello"), false)); + request_timer->invokeCallback(); + + Buffer::OwnedImpl enc_data1("world!"); + EXPECT_CALL(*response_timer, enableTimer(std::chrono::milliseconds(0), _)); + EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer, filter_->encodeData(enc_data1, false)); + EXPECT_CALL(encoder_filter_callbacks_, + injectEncodedDataToFilterChain(BufferStringEqual("world!"), false)); + response_timer->invokeCallback(); + + // Advance time by 1s which should refill all tokens. + time_system_.advanceTimeWait(std::chrono::seconds(1)); + // Send 1075 bytes of data on request path which is 1s + 1 refill cycle of + // data. Send 102 bytes of data on response path which is 2 refill cycles of + // data. + Buffer::OwnedImpl dec_data2(std::string(1075, 'd')); + Buffer::OwnedImpl enc_data2(std::string(102, 'e')); + + EXPECT_CALL(decoder_filter_callbacks_, onDecoderFilterAboveWriteBufferHighWatermark()); + EXPECT_CALL(*request_timer, enableTimer(std::chrono::milliseconds(0), _)); + EXPECT_CALL(*response_timer, enableTimer(std::chrono::milliseconds(0), _)); + EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer, filter_->decodeData(dec_data2, false)); + EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer, filter_->encodeData(enc_data2, false)); + + EXPECT_CALL(*request_timer, enableTimer(std::chrono::milliseconds(50), _)); + EXPECT_CALL(decoder_filter_callbacks_, onDecoderFilterBelowWriteBufferLowWatermark()); + EXPECT_CALL(decoder_filter_callbacks_, + injectDecodedDataToFilterChain(BufferStringEqual(std::string(1024, 'd')), false)); + request_timer->invokeCallback(); + + // Encoder will not be able to write any bytes due to insufficient tokens. + EXPECT_CALL(*response_timer, enableTimer(std::chrono::milliseconds(50), _)); + EXPECT_CALL(encoder_filter_callbacks_, + injectEncodedDataToFilterChain(BufferStringEqual(std::string("")), false)); + response_timer->invokeCallback(); + + // Fire timer, also advance time by 1 unit. + time_system_.advanceTimeWait(std::chrono::milliseconds(50)); + EXPECT_CALL(decoder_filter_callbacks_, + injectDecodedDataToFilterChain(BufferStringEqual(std::string(51, 'd')), false)); + request_timer->invokeCallback(); + // Encoder will not be able to write any bytes due to insufficient tokens. + EXPECT_CALL(*response_timer, enableTimer(std::chrono::milliseconds(50), _)); + EXPECT_CALL(encoder_filter_callbacks_, + injectEncodedDataToFilterChain(BufferStringEqual(std::string("")), false)); + response_timer->invokeCallback(); + + // Fire timer, also advance time by 1 unit. + time_system_.advanceTimeWait(std::chrono::milliseconds(50)); + EXPECT_CALL(*response_timer, enableTimer(std::chrono::milliseconds(50), _)); + EXPECT_CALL(encoder_filter_callbacks_, + injectEncodedDataToFilterChain(BufferStringEqual(std::string(51, 'e')), false)); + response_timer->invokeCallback(); + + // Get new data with current data buffered, not end_stream. + Buffer::OwnedImpl data3(std::string(51, 'b')); + EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer, filter_->encodeData(data3, false)); + + // Fire timer, also advance time. + time_system_.advanceTimeWait(std::chrono::milliseconds(50)); + EXPECT_CALL(*response_timer, enableTimer(std::chrono::milliseconds(50), _)); + EXPECT_CALL(encoder_filter_callbacks_, + injectEncodedDataToFilterChain(BufferStringEqual(std::string(51, 'e')), false)); + response_timer->invokeCallback(); + + // Fire timer, also advance time. No time enable because there is nothing + // buffered. + time_system_.advanceTimeWait(std::chrono::milliseconds(50)); + EXPECT_CALL(encoder_filter_callbacks_, + injectEncodedDataToFilterChain(BufferStringEqual(std::string(51, 'b')), false)); + response_timer->invokeCallback(); + + // Advance time by 1s for a full refill. + time_system_.advanceTimeWait(std::chrono::seconds(1)); + + // Now send 1024 in total with end_stream true which should go through and end + // the streams. + Buffer::OwnedImpl enc_data4(std::string(960, 'e')); + Buffer::OwnedImpl dec_data4(std::string(51, 'd')); + EXPECT_CALL(*response_timer, enableTimer(std::chrono::milliseconds(0), _)); + EXPECT_CALL(*request_timer, enableTimer(std::chrono::milliseconds(0), _)); + EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer, filter_->decodeData(dec_data4, true)); + EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer, filter_->encodeData(enc_data4, true)); + EXPECT_CALL(decoder_filter_callbacks_, + injectDecodedDataToFilterChain(BufferStringEqual(std::string(51, 'd')), true)); + EXPECT_CALL(encoder_filter_callbacks_, + injectEncodedDataToFilterChain(BufferStringEqual(std::string(960, 'e')), true)); + response_timer->invokeCallback(); + request_timer->invokeCallback(); + + filter_->onDestroy(); +} + +TEST_F(FilterTest, WithTrailers) { + const std::string config_yaml = R"( + stat_prefix: test + runtime_enabled: + default_value: true + runtime_key: foo_key + enable_mode: REQUEST_AND_RESPONSE + limit_kbps: 1 + )"; + setup(fmt::format(config_yaml, "1")); + + ON_CALL(decoder_filter_callbacks_, decoderBufferLimit()).WillByDefault(Return(1050)); + ON_CALL(encoder_filter_callbacks_, encoderBufferLimit()).WillByDefault(Return(1100)); + Event::MockTimer* request_timer = + new NiceMock(&decoder_filter_callbacks_.dispatcher_); + Event::MockTimer* response_timer = + new NiceMock(&encoder_filter_callbacks_.dispatcher_); + + EXPECT_EQ(1UL, config_->limit()); + EXPECT_EQ(50UL, config_->fillInterval().count()); + + EXPECT_EQ(Http::FilterHeadersStatus::Continue, + filter_->encode100ContinueHeaders(response_headers_)); + Http::MetadataMap metadata_map; + EXPECT_EQ(Http::FilterMetadataStatus::Continue, filter_->decodeMetadata(metadata_map)); + EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_->decodeHeaders(request_headers_, false)); + EXPECT_EQ(Http::FilterMetadataStatus::Continue, filter_->encodeMetadata(metadata_map)); + EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_->encodeHeaders(response_headers_, false)); + + Buffer::OwnedImpl dec_data(std::string(102, 'd')); + Buffer::OwnedImpl enc_data(std::string(56, 'e')); + + EXPECT_EQ(0, findGauge("test.http_bandwidth_limit.request_pending")); + EXPECT_EQ(0, findGauge("test.http_bandwidth_limit.response_pending")); + EXPECT_CALL(*request_timer, enableTimer(std::chrono::milliseconds(0), _)); + EXPECT_CALL(*response_timer, enableTimer(std::chrono::milliseconds(0), _)); + EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer, filter_->decodeData(dec_data, false)); + EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer, filter_->encodeData(enc_data, false)); + EXPECT_EQ(1, findGauge("test.http_bandwidth_limit.request_pending")); + EXPECT_EQ(1, findGauge("test.http_bandwidth_limit.response_pending")); + + EXPECT_CALL(*request_timer, enableTimer(std::chrono::milliseconds(50), _)); + EXPECT_CALL(decoder_filter_callbacks_, + injectDecodedDataToFilterChain(BufferStringEqual(std::string(51, 'd')), false)); + EXPECT_EQ(Http::FilterTrailersStatus::StopIteration, filter_->decodeTrailers(request_trailers_)); + request_timer->invokeCallback(); + EXPECT_EQ(1, findGauge("test.http_bandwidth_limit.request_pending")); + + // Fire timer, also advance time by 1 unit. + time_system_.advanceTimeWait(std::chrono::milliseconds(50)); + EXPECT_CALL(decoder_filter_callbacks_, + injectDecodedDataToFilterChain(BufferStringEqual(std::string(51, 'd')), false)); + request_timer->invokeCallback(); + EXPECT_EQ(0, findGauge("test.http_bandwidth_limit.request_pending")); + + // Fire timer, also advance time by 1 unit. + time_system_.advanceTimeWait(std::chrono::milliseconds(50)); + EXPECT_CALL(*response_timer, enableTimer(std::chrono::milliseconds(50), _)); + EXPECT_CALL(encoder_filter_callbacks_, + injectEncodedDataToFilterChain(BufferStringEqual(std::string(51, 'e')), false)); + response_timer->invokeCallback(); + EXPECT_EQ(Http::FilterTrailersStatus::StopIteration, filter_->encodeTrailers(response_trailers_)); + EXPECT_EQ(1, findGauge("test.http_bandwidth_limit.response_pending")); + + time_system_.advanceTimeWait(std::chrono::milliseconds(50)); + EXPECT_CALL(encoder_filter_callbacks_, + injectEncodedDataToFilterChain(BufferStringEqual(std::string(5, 'e')), false)); + response_timer->invokeCallback(); + EXPECT_EQ(0, findGauge("test.http_bandwidth_limit.response_pending")); +} + +TEST_F(FilterTest, WithTrailersNoEndStream) { + const std::string config_yaml = R"( + stat_prefix: test + runtime_enabled: + default_value: true + runtime_key: foo_key + enable_mode: REQUEST_AND_RESPONSE + limit_kbps: 1 + )"; + setup(fmt::format(config_yaml, "1")); + + ON_CALL(decoder_filter_callbacks_, decoderBufferLimit()).WillByDefault(Return(1050)); + ON_CALL(encoder_filter_callbacks_, encoderBufferLimit()).WillByDefault(Return(1100)); + Event::MockTimer* request_timer = + new NiceMock(&decoder_filter_callbacks_.dispatcher_); + Event::MockTimer* response_timer = + new NiceMock(&encoder_filter_callbacks_.dispatcher_); + + EXPECT_EQ(1UL, config_->limit()); + EXPECT_EQ(50UL, config_->fillInterval().count()); + + EXPECT_EQ(Http::FilterHeadersStatus::Continue, + filter_->encode100ContinueHeaders(response_headers_)); + Http::MetadataMap metadata_map; + EXPECT_EQ(Http::FilterMetadataStatus::Continue, filter_->decodeMetadata(metadata_map)); + EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_->decodeHeaders(request_headers_, false)); + EXPECT_EQ(Http::FilterMetadataStatus::Continue, filter_->encodeMetadata(metadata_map)); + EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_->encodeHeaders(response_headers_, false)); + + Buffer::OwnedImpl dec_data(std::string(102, 'd')); + Buffer::OwnedImpl enc_data(std::string(56, 'e')); + + EXPECT_EQ(0, findGauge("test.http_bandwidth_limit.request_pending")); + EXPECT_EQ(0, findGauge("test.http_bandwidth_limit.response_pending")); + EXPECT_CALL(*request_timer, enableTimer(std::chrono::milliseconds(0), _)); + EXPECT_CALL(*response_timer, enableTimer(std::chrono::milliseconds(0), _)); + EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer, filter_->decodeData(dec_data, false)); + EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer, filter_->encodeData(enc_data, false)); + EXPECT_EQ(1, findGauge("test.http_bandwidth_limit.request_pending")); + EXPECT_EQ(1, findGauge("test.http_bandwidth_limit.response_pending")); + + EXPECT_CALL(*request_timer, enableTimer(std::chrono::milliseconds(50), _)); + EXPECT_CALL(decoder_filter_callbacks_, + injectDecodedDataToFilterChain(BufferStringEqual(std::string(51, 'd')), false)); + request_timer->invokeCallback(); + + // Fire timer, also advance time by 1 unit. + time_system_.advanceTimeWait(std::chrono::milliseconds(50)); + EXPECT_CALL(decoder_filter_callbacks_, + injectDecodedDataToFilterChain(BufferStringEqual(std::string(51, 'd')), false)); + request_timer->invokeCallback(); + EXPECT_EQ(1, findGauge("test.http_bandwidth_limit.request_pending")); + EXPECT_EQ(Http::FilterTrailersStatus::Continue, filter_->decodeTrailers(request_trailers_)); + EXPECT_EQ(0, findGauge("test.http_bandwidth_limit.request_pending")); + + // Fire timer, also advance time by 1 unit. + time_system_.advanceTimeWait(std::chrono::milliseconds(50)); + EXPECT_CALL(*response_timer, enableTimer(std::chrono::milliseconds(50), _)); + EXPECT_CALL(encoder_filter_callbacks_, + injectEncodedDataToFilterChain(BufferStringEqual(std::string(51, 'e')), false)); + response_timer->invokeCallback(); + + time_system_.advanceTimeWait(std::chrono::milliseconds(50)); + EXPECT_CALL(encoder_filter_callbacks_, + injectEncodedDataToFilterChain(BufferStringEqual(std::string(5, 'e')), false)); + response_timer->invokeCallback(); + EXPECT_EQ(1, findGauge("test.http_bandwidth_limit.response_pending")); + EXPECT_EQ(Http::FilterTrailersStatus::Continue, filter_->encodeTrailers(response_trailers_)); + EXPECT_EQ(0, findGauge("test.http_bandwidth_limit.response_pending")); +} + +} // namespace BandwidthLimitFilter +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/test/extensions/filters/http/common/stream_rate_limiter_test.cc b/test/extensions/filters/http/common/stream_rate_limiter_test.cc index d2e57de2d4f9..3048add95643 100644 --- a/test/extensions/filters/http/common/stream_rate_limiter_test.cc +++ b/test/extensions/filters/http/common/stream_rate_limiter_test.cc @@ -142,6 +142,9 @@ TEST_F(StreamRateLimiterTest, RateLimitOnSingleStream) { EXPECT_CALL(decoder_callbacks_, injectDecodedDataToFilterChain(BufferStringEqual(std::string(1024, 'c')), true)); token_timer->invokeCallback(); + + limiter_->destroy(); + EXPECT_EQ(limiter_->destroyed(), true); } } // namespace Common diff --git a/tools/spelling/spelling_dictionary.txt b/tools/spelling/spelling_dictionary.txt index 6034a70da15a..93f6286e29c6 100644 --- a/tools/spelling/spelling_dictionary.txt +++ b/tools/spelling/spelling_dictionary.txt @@ -444,6 +444,8 @@ benchmarked bidi bignum bitfield +bitmask +bitmasks bitset bitwise blackhole