Skip to content

Commit

Permalink
k/quotas: use "kafka_throughput_control" for tput exemptions
Browse files Browse the repository at this point in the history
connection_context now has all quota related stuff for the connection
stored in `_snc_quota_context`. This object is supposed to be created
once per connection lifetime by `snc_quota_manager`, but it will be
recreated each time a client_id changes on the connection.

When the quota context is created (lazily on the connection context),
the `kafka_throughput_control` rules are used to select the matching
throughput control group. If any group is matched, the context saves it
as a flag to exempt the connection from any snc_quota_manager control.
This will change into a full association with the control group.
Currently the exempt flag simply tells the quota manager to skip any
messages in that context.
  • Loading branch information
dlex committed Jun 2, 2023
1 parent 2ded90f commit cd9b951
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 37 deletions.
50 changes: 46 additions & 4 deletions src/v/kafka/server/connection_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "kafka/server/response.h"
#include "kafka/server/server.h"
#include "kafka/server/snc_quota_manager.h"
#include "likely.h"
#include "net/exceptions.h"
#include "security/exceptions.h"
#include "units.h"
Expand All @@ -43,6 +44,29 @@ using namespace std::chrono_literals;

namespace kafka {

connection_context::connection_context(
class server& s,
ss::lw_shared_ptr<net::connection> conn,
std::optional<security::sasl_server> sasl,
bool enable_authorizer,
std::optional<security::tls::mtls_state> mtls_state,
config::binding<uint32_t> max_request_size,
config::conversion_binding<std::vector<bool>, std::vector<ss::sstring>>
kafka_throughput_controlled_api_keys) noexcept
: _server(s)
, conn(conn)
, _sasl(std::move(sasl))
// tests may build a context without a live connection
, _client_addr(conn ? conn->addr.addr() : ss::net::inet_address{})
, _enable_authorizer(enable_authorizer)
, _authlog(_client_addr, client_port())
, _mtls_state(std::move(mtls_state))
, _max_request_size(std::move(max_request_size))
, _kafka_throughput_controlled_api_keys(
std::move(kafka_throughput_controlled_api_keys)) {}

connection_context::~connection_context() noexcept = default;

ss::future<> connection_context::process() {
while (true) {
if (is_finished_parsing()) {
Expand Down Expand Up @@ -215,9 +239,12 @@ connection_context::record_tp_and_calculate_throttle(
// Throttle on shard wide quotas
snc_quota_manager::delays_t shard_delays;
if (_kafka_throughput_controlled_api_keys().at(hdr.key)) {
_server.snc_quota_mgr().record_request_receive(request_size, now);
_server.snc_quota_mgr().get_or_create_quota_context(
_snc_quota_context, hdr.client_id);
_server.snc_quota_mgr().record_request_receive(
*_snc_quota_context, request_size, now);
shard_delays = _server.snc_quota_mgr().get_shard_delays(
_throttled_until, now);
*_snc_quota_context, now);
}

// Sum up
Expand Down Expand Up @@ -335,7 +362,18 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) {
return ss::make_ready_future<>();
}
if (_kafka_throughput_controlled_api_keys().at(hdr.key)) {
_server.snc_quota_mgr().record_request_intake(size);
// Normally we can only get here after a prior call to
// snc_quota_mgr().get_or_create_quota_context() in
// record_tp_and_calculate_throttle(), but there is possibility
// that the changing configuration could still take us into this
// branch with unmatching (and even null) _snc_quota_context.
// Simply an unmatching _snc_quota_context is no big deal because
// it is a one off event, but we need protection from it being
// nullptr
if (likely(_snc_quota_context)) {
_server.snc_quota_mgr().record_request_intake(
*_snc_quota_context, size);
}
}

auto sres = ss::make_lw_shared(std::move(sres_in));
Expand Down Expand Up @@ -526,7 +564,11 @@ ss::future<> connection_context::maybe_process_responses() {
auto response_size = msg.size();
auto request_key = resp_and_res.resources->request_data.request_key;
if (_kafka_throughput_controlled_api_keys().at(request_key)) {
_server.snc_quota_mgr().record_response(response_size);
// see the comment in dispatch_method_once()
if (likely(_snc_quota_context)) {
_server.snc_quota_mgr().record_response(
*_snc_quota_context, response_size);
}
}
_server.handler_probe(request_key).add_bytes_sent(response_size);
try {
Expand Down
19 changes: 4 additions & 15 deletions src/v/kafka/server/connection_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,20 +117,9 @@ class connection_context final
std::optional<security::tls::mtls_state> mtls_state,
config::binding<uint32_t> max_request_size,
config::conversion_binding<std::vector<bool>, std::vector<ss::sstring>>
kafka_throughput_controlled_api_keys) noexcept
: _server(s)
, conn(conn)
, _sasl(std::move(sasl))
// tests may build a context without a live connection
, _client_addr(conn ? conn->addr.addr() : ss::net::inet_address{})
, _enable_authorizer(enable_authorizer)
, _authlog(_client_addr, client_port())
, _mtls_state(std::move(mtls_state))
, _max_request_size(std::move(max_request_size))
, _kafka_throughput_controlled_api_keys(
std::move(kafka_throughput_controlled_api_keys)) {}

~connection_context() noexcept = default;
kafka_throughput_controlled_api_keys) noexcept;
~connection_context() noexcept;

connection_context(const connection_context&) = delete;
connection_context(connection_context&&) = delete;
connection_context& operator=(const connection_context&) = delete;
Expand Down Expand Up @@ -352,9 +341,9 @@ class connection_context final
ctx_log _authlog;
std::optional<security::tls::mtls_state> _mtls_state;
config::binding<uint32_t> _max_request_size;
ss::lowres_clock::time_point _throttled_until;
config::conversion_binding<std::vector<bool>, std::vector<ss::sstring>>
_kafka_throughput_controlled_api_keys;
std::unique_ptr<snc_quota_context> _snc_quota_context;
};

} // namespace kafka
1 change: 1 addition & 0 deletions src/v/kafka/server/fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,6 @@ class request_context;
class rm_group_frontend;
class rm_group_proxy_impl;
class usage_manager;
class snc_quota_context;

} // namespace kafka
78 changes: 70 additions & 8 deletions src/v/kafka/server/snc_quota_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include <fmt/core.h>
#include <fmt/ranges.h>

#include <iterator>
#include <memory>
#include <numeric>

using namespace std::chrono_literals;
Expand Down Expand Up @@ -248,38 +250,98 @@ snc_quota_manager::calc_node_quota_default() const {
return default_quota;
}

void snc_quota_manager::get_or_create_quota_context(
std::unique_ptr<snc_quota_context>& ctx,
std::optional<std::string_view> client_id) {
if (likely(ctx)) {
// NB: comparing string_view to sstring might be suboptimal
if (likely(ctx->_client_id == client_id)) {
// the context is the right one
return;
}

// either of the context indexing propeties have changed on the client
// within the same connection. This is an unexpected path, quotas may
// misbehave if we ever get here. The design is based on assumption that
// this should not happen. If it does happen with a supported client, we
// probably should start supporting multiple quota contexts per
// connection
vlog(
klog.warn,
"qm - client_id has changed on the connection. Quotas are reset now. "
"Old client_id: {}, new client_id: {}",
ctx->_client_id,
client_id);
}

ctx = std::make_unique<snc_quota_context>(client_id);
const auto tcgroup_it = config::find_throughput_control_group(
_kafka_throughput_control().cbegin(),
_kafka_throughput_control().cend(),
client_id);
if (tcgroup_it == _kafka_throughput_control().cend()) {
ctx->_exempt = false;
vlog(klog.debug, "qm - No throughput control group assigned");
} else {
ctx->_exempt = true;
if (tcgroup_it->is_noname()) {
vlog(
klog.debug,
"qm - Assigned throughput control group #{}",
std::distance(_kafka_throughput_control().cbegin(), tcgroup_it));
} else {
vlog(
klog.debug,
"qm - Assigned throughput control group: {}",
tcgroup_it->name);
}
}
}

snc_quota_manager::delays_t snc_quota_manager::get_shard_delays(
clock::time_point& connection_throttle_until,
const clock::time_point now) const {
snc_quota_context& ctx, const clock::time_point now) const {
delays_t res;

// force throttle whatever the client did not do on its side
if (now < connection_throttle_until) {
res.enforce = connection_throttle_until - now;
if (now < ctx._throttled_until) {
res.enforce = ctx._throttled_until - now;
}

// throttling delay the connection should be requested to throttle
// this time
res.request = std::min(
_max_kafka_throttle_delay(),
std::max(eval_delay(_shard_quota.in), eval_delay(_shard_quota.eg)));
connection_throttle_until = now + res.request;
ctx._throttled_until = now + res.request;

return res;
}

void snc_quota_manager::record_request_receive(
const size_t request_size, const clock::time_point now) noexcept {
snc_quota_context& ctx,
const size_t request_size,
const clock::time_point now) noexcept {
if (ctx._exempt) {
return;
}
_shard_quota.in.use(request_size, now);
}

void snc_quota_manager::record_request_intake(
const size_t request_size) noexcept {
snc_quota_context& ctx, const size_t request_size) noexcept {
if (ctx._exempt) {
return;
}
_probe.rec_traffic_in(request_size);
}

void snc_quota_manager::record_response(
const size_t request_size, const clock::time_point now) noexcept {
snc_quota_context& ctx,
const size_t request_size,
const clock::time_point now) noexcept {
if (ctx._exempt) {
return;
}
_shard_quota.eg.use(request_size, now);
}

Expand Down
55 changes: 45 additions & 10 deletions src/v/kafka/server/snc_quota_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
#include <seastar/core/future.hh>
#include <seastar/core/lowres_clock.hh>
#include <seastar/core/sharded.hh>
#include <seastar/core/sstring.hh>
#include <seastar/core/timer.hh>

#include <chrono>
#include <optional>
#include <string_view>

namespace kafka {

Expand Down Expand Up @@ -58,6 +60,29 @@ class snc_quotas_probe {
size_t _traffic_in = 0;
};

class snc_quota_context {
public:
explicit snc_quota_context(std::optional<std::string_view> client_id)
: _client_id(client_id) {}

private:
friend class snc_quota_manager;

// Indexing
std::optional<ss::sstring> _client_id;

// Configuration

/// Whether the connection belongs to an exempt tput control group
bool _exempt{false};

// Operating

/// What time the client on this conection should throttle (be throttled)
/// until
ss::lowres_clock::time_point _throttled_until;
};

/// Isolates \ref quota_manager functionality related to
/// shard/node/cluster (SNC) wide quotas and limits
class snc_quota_manager
Expand All @@ -83,28 +108,38 @@ class snc_quota_manager
clock::duration request{0};
};

/// Depending on the other arguments, create or actualize or keep the
/// existing \p ctx. The context object is supposed to be stored
/// in the connection context, and created only once per connection
/// lifetime. However since the kafka API allows changing client_id of a
/// connection on the fly, we may need to replace the existing context with
/// a new one if that happens (actualize).
/// \post (bool)ctx == true
void get_or_create_quota_context(
std::unique_ptr<snc_quota_context>& ctx,
std::optional<std::string_view> client_id);

/// Determine throttling required by shard level TP quotas.
/// @param connection_throttle_until (in,out) until what time the client
/// on this conection should throttle until. If it does not, this throttling
/// will be enforced on the next call. In: value from the last call, out:
/// value saved until the next call.
delays_t get_shard_delays(
clock::time_point& connection_throttle_until,
clock::time_point now) const;
delays_t get_shard_delays(snc_quota_context&, clock::time_point now) const;

/// Record the request size when it has arrived from the transport.
/// This should be done before calling \ref get_shard_delays because the
/// recorded request size is used to calculate throttling parameters.
void record_request_receive(
size_t request_size, clock::time_point now = clock::now()) noexcept;
snc_quota_context&,
size_t request_size,
clock::time_point now = clock::now()) noexcept;

/// Record the request size when the request data is about to be consumed.
/// This data is used to represent throttled throughput.
void record_request_intake(size_t request_size) noexcept;
void
record_request_intake(snc_quota_context&, size_t request_size) noexcept;

/// Record the response size for all purposes
void record_response(
size_t request_size, clock::time_point now = clock::now()) noexcept;
snc_quota_context&,
size_t request_size,
clock::time_point now = clock::now()) noexcept;

/// Metrics probe object
const snc_quotas_probe& get_snc_quotas_probe() const noexcept {
Expand Down

0 comments on commit cd9b951

Please sign in to comment.