Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http: refactor out stream rate limiter to common #14828

Merged
merged 9 commits into from
Apr 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions source/extensions/filters/http/common/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,18 @@ envoy_cc_library(
"//source/extensions/filters/http:well_known_names",
],
)

envoy_cc_library(
name = "stream_rate_limiter_lib",
srcs = ["stream_rate_limiter.cc"],
hdrs = ["stream_rate_limiter.h"],
deps = [
"//include/envoy/common:token_bucket_interface",
"//include/envoy/event:dispatcher_interface",
"//include/envoy/event:timer_interface",
"//source/common/buffer:watermark_buffer_lib",
"//source/common/common:assert_lib",
"//source/common/common:empty_string",
"//source/common/common:token_bucket_impl_lib",
],
)
117 changes: 117 additions & 0 deletions source/extensions/filters/http/common/stream_rate_limiter.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
#include "extensions/filters/http/common/stream_rate_limiter.h"

#include <chrono>

#include "envoy/event/dispatcher.h"
#include "envoy/event/timer.h"

#include "common/common/assert.h"
#include "common/common/token_bucket_impl.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace Common {

StreamRateLimiter::StreamRateLimiter(
uint64_t max_kbps, uint64_t max_buffered_data, std::function<void()> pause_data_cb,
std::function<void()> resume_data_cb,
std::function<void(Buffer::Instance&, bool)> write_data_cb, std::function<void()> continue_cb,
std::function<void(uint64_t)> write_stats_cb, TimeSource& time_source,
Event::Dispatcher& dispatcher, const ScopeTrackedObject& scope,
std::shared_ptr<TokenBucket> token_bucket, std::chrono::milliseconds fill_interval)
: fill_interval_(std::move(fill_interval)), write_data_cb_(write_data_cb),
continue_cb_(continue_cb), write_stats_cb_(std::move(write_stats_cb)), scope_(scope),
token_bucket_(std::move(token_bucket)),
token_timer_(dispatcher.createTimer([this] { onTokenTimer(); })),
buffer_(resume_data_cb, pause_data_cb,
[]() -> void { /* TODO(adisuissa): Handle overflow watermark */ }) {
ASSERT(max_buffered_data > 0);
ASSERT(fill_interval_.count() > 0);
ASSERT(fill_interval_.count() <= 1000);
auto max_tokens = kiloBytesToBytes(max_kbps);
if (!token_bucket_) {
// Initialize a new token bucket if caller didn't provide one.
// The token bucket is configured with a max token count of the number of bytes per second,
// and refills at the same rate, so that we have a per second limit which refills gradually
// in one fill_interval_ at a time.
token_bucket_ = std::make_shared<TokenBucketImpl>(max_tokens, time_source, max_tokens);
}
// Reset the bucket to contain only one fill_interval worth of tokens.
// If the token bucket is shared, only first reset call will work.
auto initial_tokens = max_tokens * fill_interval_.count() / 1000;
token_bucket_->maybeReset(initial_tokens);
ENVOY_LOG(debug,
"StreamRateLimiter <Ctor>: fill_interval={}ms "
"initial_tokens={} max_tokens={}",
fill_interval_.count(), initial_tokens, max_tokens);
buffer_.setWatermarks(max_buffered_data);
}

void StreamRateLimiter::onTokenTimer() {
Buffer::OwnedImpl data_to_write;

// Try to obtain that as many tokens as bytes in the buffer, and then
// figure out how many bytes to write given the number of tokens we actually got.
const uint64_t tokens_obtained = token_bucket_->consume(buffer_.length(), true);
const uint64_t bytes_to_write = std::min(tokens_obtained, buffer_.length());
ENVOY_LOG(debug,
"StreamRateLimiter <onTokenTimer>: tokens_needed={} "
"tokens_obtained={} to_write={}",
buffer_.length(), tokens_obtained, bytes_to_write);

// Move the data to write into the output buffer with as little copying as possible.
// NOTE: This might be moving zero bytes, but that should work fine.
data_to_write.move(buffer_, bytes_to_write);
write_stats_cb_(bytes_to_write);

// If the buffer still contains data in it, we couldn't get enough tokens, so schedule the next
// token available time.
// In case of a shared token bucket, this algorithm will prioritize one stream at a time.
// TODO(nitgoy): add round-robin and other policies for rationing bandwidth.
if (buffer_.length() > 0) {
ENVOY_LOG(debug,
"StreamRateLimiter <onTokenTimer>: scheduling wakeup for {}ms, "
"buffered={}",
fill_interval_.count(), buffer_.length());
token_timer_->enableTimer(fill_interval_, &scope_);
}

// Write the data out, indicating end stream if we saw end stream, there is no further data to
// send, and there are no trailers.
write_data_cb_(data_to_write, saw_end_stream_ && buffer_.length() == 0 && !saw_trailers_);

// If there is no more data to send and we saw trailers, we need to continue iteration to release
// the trailers to further filters.
if (buffer_.length() == 0 && saw_trailers_) {
continue_cb_();
}
}

void StreamRateLimiter::writeData(Buffer::Instance& incoming_buffer, bool end_stream) {
auto len = incoming_buffer.length();
buffer_.move(incoming_buffer);
saw_end_stream_ = end_stream;
ENVOY_LOG(debug,
"StreamRateLimiter <writeData>: got new {} bytes of data. token "
"timer {} scheduled.",
len, !token_timer_->enabled() ? "now" : "already");
if (!token_timer_->enabled()) {
// TODO(mattklein123): In an optimal world we would be able to continue iteration with the data
// we want in the buffer, but have a way to clear end_stream in case we can't send it all.
// The filter API does not currently support that and it will not be a trivial change to add.
// Instead we cheat here by scheduling the token timer to run immediately after the stack is
// unwound, at which point we can directly called encode/decodeData.
token_timer_->enableTimer(std::chrono::milliseconds(0), &scope_);
}
}

bool StreamRateLimiter::onTrailers() {
saw_end_stream_ = true;
saw_trailers_ = true;
return buffer_.length() > 0;
}
} // namespace Common
} // namespace HttpFilters
} // namespace Extensions
} // namespace Envoy
97 changes: 97 additions & 0 deletions source/extensions/filters/http/common/stream_rate_limiter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#pragma once
#include <cstdint>
#include <memory>
#include <string>
#include <vector>

#include "envoy/common/token_bucket.h"
#include "envoy/event/timer.h"
#include "envoy/runtime/runtime.h"

#include "common/buffer/watermark_buffer.h"

namespace Envoy {

class ScopeTrackedObject;

namespace Event {
class Timer;
} // namespace Event

namespace Extensions {
namespace HttpFilters {
namespace Common {

/**
* A generic HTTP stream rate limiter. It limits the rate of transfer for a stream to the specified
* max rate. It calls appropriate callbacks when the buffered data crosses certain high and low
* watermarks based on the max buffer size. It's used by the fault filter and bandwidth filter as
* the core logic for their stream limit functionality.
*/
class StreamRateLimiter : Logger::Loggable<Logger::Id::filter> {
public:
static constexpr std::chrono::milliseconds DefaultFillInterval = std::chrono::milliseconds(50);

static constexpr uint64_t kiloBytesToBytes(const uint64_t val) { return val * 1024; }

/**
* @param max_kbps maximum rate in KiB/s.
* @param max_buffered_data maximum data to buffer before invoking the pause callback.
* @param pause_data_cb callback invoked when the limiter has buffered too much data.
* @param resume_data_cb callback invoked when the limiter has gone under the buffer limit.
* @param write_data_cb callback invoked to write data to the stream.
* @param continue_cb callback invoked to continue the stream. This is only used to continue
* trailers that have been paused during body flush.
* @param time_source the time source to run the token bucket with.
* @param dispatcher the stream's dispatcher to use for creating timers.
* @param scope the stream's scope
*/
StreamRateLimiter(uint64_t max_kbps, uint64_t max_buffered_data,
std::function<void()> pause_data_cb, std::function<void()> resume_data_cb,
std::function<void(Buffer::Instance&, bool)> write_data_cb,
std::function<void()> continue_cb, std::function<void(uint64_t)> write_stats_cb,
TimeSource& time_source, Event::Dispatcher& dispatcher,
const ScopeTrackedObject& scope,
std::shared_ptr<TokenBucket> token_bucket = nullptr,
std::chrono::milliseconds fill_interval = DefaultFillInterval);

/**
* Called by the stream to write data. All data writes happen asynchronously, the stream should
* be stopped after this call (all data will be drained from incoming_buffer).
*/
void writeData(Buffer::Instance& incoming_buffer, bool end_stream);

/**
* Called if the stream receives trailers.
* Returns true if the read buffer is not completely drained yet.
*/
bool onTrailers();

/**
* Like the owning filter, we must handle inline destruction, so we have a destroy() method which
* kills any callbacks.
*/
void destroy() { token_timer_.reset(); }
bool destroyed() { return token_timer_ == nullptr; }

private:
friend class StreamRateLimiterTest;
using TimerPtr = std::unique_ptr<Event::Timer>;

void onTokenTimer();

const std::chrono::milliseconds fill_interval_;
const std::function<void(Buffer::Instance&, bool)> write_data_cb_;
const std::function<void()> continue_cb_;
const std::function<void(uint64_t)> write_stats_cb_;
const ScopeTrackedObject& scope_;
std::shared_ptr<TokenBucket> token_bucket_;
Event::TimerPtr token_timer_;
bool saw_end_stream_{};
bool saw_trailers_{};
Buffer::WatermarkBuffer buffer_;
};
} // namespace Common
} // namespace HttpFilters
} // namespace Extensions
} // namespace Envoy
1 change: 1 addition & 0 deletions source/extensions/filters/http/fault/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ envoy_cc_library(
"//source/common/stats:utility_lib",
"//source/extensions/filters/common/fault:fault_config_lib",
"//source/extensions/filters/http:well_known_names",
"//source/extensions/filters/http/common:stream_rate_limiter_lib",
"@envoy_api//envoy/extensions/filters/http/fault/v3:pkg_cc_proto",
"@envoy_api//envoy/type/v3:pkg_cc_proto",
],
Expand Down
104 changes: 8 additions & 96 deletions source/extensions/filters/http/fault/fault_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -208,15 +208,18 @@ void FaultFilter::maybeSetupResponseRateLimit(const Http::RequestHeaderMap& requ

config_->stats().response_rl_injected_.inc();

response_limiter_ = std::make_unique<StreamRateLimiter>(
response_limiter_ = std::make_unique<Envoy::Extensions::HttpFilters::Common::StreamRateLimiter>(
rate_kbps.value(), encoder_callbacks_->encoderBufferLimit(),
[this] { encoder_callbacks_->onEncoderFilterAboveWriteBufferHighWatermark(); },
[this] { encoder_callbacks_->onEncoderFilterBelowWriteBufferLowWatermark(); },
[this](Buffer::Instance& data, bool end_stream) {
encoder_callbacks_->injectEncodedDataToFilterChain(data, end_stream);
},
[this] { encoder_callbacks_->continueEncoding(); }, config_->timeSource(),
decoder_callbacks_->dispatcher(), decoder_callbacks_->scope());
[this] { encoder_callbacks_->continueEncoding(); },
[](uint64_t) {
// write stats callback.
},
config_->timeSource(), decoder_callbacks_->dispatcher(), decoder_callbacks_->scope());
}

bool FaultFilter::faultOverflow() {
Expand Down Expand Up @@ -516,104 +519,13 @@ Http::FilterDataStatus FaultFilter::encodeData(Buffer::Instance& data, bool end_

Http::FilterTrailersStatus FaultFilter::encodeTrailers(Http::ResponseTrailerMap&) {
if (response_limiter_ != nullptr) {
return response_limiter_->onTrailers();
return response_limiter_->onTrailers() ? Http::FilterTrailersStatus::StopIteration
: Http::FilterTrailersStatus::Continue;
}

return Http::FilterTrailersStatus::Continue;
}

StreamRateLimiter::StreamRateLimiter(uint64_t max_kbps, uint64_t max_buffered_data,
std::function<void()> pause_data_cb,
std::function<void()> resume_data_cb,
std::function<void(Buffer::Instance&, bool)> write_data_cb,
std::function<void()> continue_cb, TimeSource& time_source,
Event::Dispatcher& dispatcher, const ScopeTrackedObject& scope)
: // bytes_per_time_slice is KiB converted to bytes divided by the number of ticks per second.
bytes_per_time_slice_((max_kbps * 1024) / SecondDivisor), write_data_cb_(write_data_cb),
continue_cb_(continue_cb), scope_(scope),
// The token bucket is configured with a max token count of the number of ticks per second,
// and refills at the same rate, so that we have a per second limit which refills gradually in
// ~63ms intervals.
token_bucket_(SecondDivisor, time_source, SecondDivisor),
token_timer_(dispatcher.createTimer([this] { onTokenTimer(); })),
buffer_(resume_data_cb, pause_data_cb,
[]() -> void { /* TODO(adisuissa): Handle overflow watermark */ }) {
ASSERT(bytes_per_time_slice_ > 0);
ASSERT(max_buffered_data > 0);
buffer_.setWatermarks(max_buffered_data);
}

void StreamRateLimiter::onTokenTimer() {
ENVOY_LOG(trace, "limiter: timer wakeup: buffered={}", buffer_.length());
Buffer::OwnedImpl data_to_write;

if (!saw_data_) {
// The first time we see any data on this stream (via writeData()), reset the number of tokens
// to 1. This will ensure that we start pacing the data at the desired rate (and don't send a
// full 1s of data right away which might not introduce enough delay for a stream that doesn't
// have enough data to span more than 1s of rate allowance). Once we reset, we will subsequently
// allow for bursting within the second to account for our data provider being bursty.
token_bucket_.maybeReset(1);
saw_data_ = true;
}

// Compute the number of tokens needed (rounded up), try to obtain that many tickets, and then
// figure out how many bytes to write given the number of tokens we actually got.
const uint64_t tokens_needed =
(buffer_.length() + bytes_per_time_slice_ - 1) / bytes_per_time_slice_;
const uint64_t tokens_obtained = token_bucket_.consume(tokens_needed, true);
const uint64_t bytes_to_write =
std::min(tokens_obtained * bytes_per_time_slice_, buffer_.length());
ENVOY_LOG(trace, "limiter: tokens_needed={} tokens_obtained={} to_write={}", tokens_needed,
tokens_obtained, bytes_to_write);

// Move the data to write into the output buffer with as little copying as possible.
// NOTE: This might be moving zero bytes, but that should work fine.
data_to_write.move(buffer_, bytes_to_write);

// If the buffer still contains data in it, we couldn't get enough tokens, so schedule the next
// token available time.
if (buffer_.length() > 0) {
const std::chrono::milliseconds ms = token_bucket_.nextTokenAvailable();
if (ms.count() > 0) {
ENVOY_LOG(trace, "limiter: scheduling wakeup for {}ms", ms.count());
token_timer_->enableTimer(ms, &scope_);
}
}

// Write the data out, indicating end stream if we saw end stream, there is no further data to
// send, and there are no trailers.
write_data_cb_(data_to_write, saw_end_stream_ && buffer_.length() == 0 && !saw_trailers_);

// If there is no more data to send and we saw trailers, we need to continue iteration to release
// the trailers to further filters.
if (buffer_.length() == 0 && saw_trailers_) {
continue_cb_();
}
}

void StreamRateLimiter::writeData(Buffer::Instance& incoming_buffer, bool end_stream) {
ENVOY_LOG(trace, "limiter: incoming data length={} buffered={}", incoming_buffer.length(),
buffer_.length());
buffer_.move(incoming_buffer);
saw_end_stream_ = end_stream;
if (!token_timer_->enabled()) {
// TODO(mattklein123): In an optimal world we would be able to continue iteration with the data
// we want in the buffer, but have a way to clear end_stream in case we can't send it all.
// The filter API does not currently support that and it will not be a trivial change to add.
// Instead we cheat here by scheduling the token timer to run immediately after the stack is
// unwound, at which point we can directly called encode/decodeData.
token_timer_->enableTimer(std::chrono::milliseconds(0), &scope_);
}
}

Http::FilterTrailersStatus StreamRateLimiter::onTrailers() {
saw_end_stream_ = true;
saw_trailers_ = true;
return buffer_.length() > 0 ? Http::FilterTrailersStatus::StopIteration
: Http::FilterTrailersStatus::Continue;
}

} // namespace Fault
} // namespace HttpFilters
} // namespace Extensions
Expand Down
Loading