From cd9b9518bdf395e695fbedba52bca74e6b523b0e Mon Sep 17 00:00:00 2001 From: Alexey Biryukov Date: Mon, 15 May 2023 02:30:55 -0400 Subject: [PATCH] k/quotas: use "kafka_throughput_control" for tput exemptions connection_context now has all quota related stuff for the connection stored in `_snc_quota_context`. This object is supposed to be created once per connection lifetime by `snc_quota_manager`, but it will be recreated each time a client_id changes on the connection. When the quota context is created (lazily on the connection context), the `kafka_throughput_control` rules are used to select the matching throughput control group. If any group is matched, the context saves it as a flag to exempt the connection from any snc_quota_manager control. This will change into a full association with the control group. Currently the exempt flag simply tells the quota manager to skip any messages in that context. --- src/v/kafka/server/connection_context.cc | 50 +++++++++++++-- src/v/kafka/server/connection_context.h | 19 ++---- src/v/kafka/server/fwd.h | 1 + src/v/kafka/server/snc_quota_manager.cc | 78 +++++++++++++++++++++--- src/v/kafka/server/snc_quota_manager.h | 55 ++++++++++++++--- 5 files changed, 166 insertions(+), 37 deletions(-) diff --git a/src/v/kafka/server/connection_context.cc b/src/v/kafka/server/connection_context.cc index 76e779716eb7..8dda174d91d9 100644 --- a/src/v/kafka/server/connection_context.cc +++ b/src/v/kafka/server/connection_context.cc @@ -24,6 +24,7 @@ #include "kafka/server/response.h" #include "kafka/server/server.h" #include "kafka/server/snc_quota_manager.h" +#include "likely.h" #include "net/exceptions.h" #include "security/exceptions.h" #include "units.h" @@ -43,6 +44,29 @@ using namespace std::chrono_literals; namespace kafka { +connection_context::connection_context( + class server& s, + ss::lw_shared_ptr conn, + std::optional sasl, + bool enable_authorizer, + std::optional mtls_state, + config::binding max_request_size, + config::conversion_binding, std::vector> + kafka_throughput_controlled_api_keys) noexcept + : _server(s) + , conn(conn) + , _sasl(std::move(sasl)) + // tests may build a context without a live connection + , _client_addr(conn ? conn->addr.addr() : ss::net::inet_address{}) + , _enable_authorizer(enable_authorizer) + , _authlog(_client_addr, client_port()) + , _mtls_state(std::move(mtls_state)) + , _max_request_size(std::move(max_request_size)) + , _kafka_throughput_controlled_api_keys( + std::move(kafka_throughput_controlled_api_keys)) {} + +connection_context::~connection_context() noexcept = default; + ss::future<> connection_context::process() { while (true) { if (is_finished_parsing()) { @@ -215,9 +239,12 @@ connection_context::record_tp_and_calculate_throttle( // Throttle on shard wide quotas snc_quota_manager::delays_t shard_delays; if (_kafka_throughput_controlled_api_keys().at(hdr.key)) { - _server.snc_quota_mgr().record_request_receive(request_size, now); + _server.snc_quota_mgr().get_or_create_quota_context( + _snc_quota_context, hdr.client_id); + _server.snc_quota_mgr().record_request_receive( + *_snc_quota_context, request_size, now); shard_delays = _server.snc_quota_mgr().get_shard_delays( - _throttled_until, now); + *_snc_quota_context, now); } // Sum up @@ -335,7 +362,18 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { return ss::make_ready_future<>(); } if (_kafka_throughput_controlled_api_keys().at(hdr.key)) { - _server.snc_quota_mgr().record_request_intake(size); + // Normally we can only get here after a prior call to + // snc_quota_mgr().get_or_create_quota_context() in + // record_tp_and_calculate_throttle(), but there is possibility + // that the changing configuration could still take us into this + // branch with unmatching (and even null) _snc_quota_context. + // Simply an unmatching _snc_quota_context is no big deal because + // it is a one off event, but we need protection from it being + // nullptr + if (likely(_snc_quota_context)) { + _server.snc_quota_mgr().record_request_intake( + *_snc_quota_context, size); + } } auto sres = ss::make_lw_shared(std::move(sres_in)); @@ -526,7 +564,11 @@ ss::future<> connection_context::maybe_process_responses() { auto response_size = msg.size(); auto request_key = resp_and_res.resources->request_data.request_key; if (_kafka_throughput_controlled_api_keys().at(request_key)) { - _server.snc_quota_mgr().record_response(response_size); + // see the comment in dispatch_method_once() + if (likely(_snc_quota_context)) { + _server.snc_quota_mgr().record_response( + *_snc_quota_context, response_size); + } } _server.handler_probe(request_key).add_bytes_sent(response_size); try { diff --git a/src/v/kafka/server/connection_context.h b/src/v/kafka/server/connection_context.h index 0d55c9778761..c7d3fd508e22 100644 --- a/src/v/kafka/server/connection_context.h +++ b/src/v/kafka/server/connection_context.h @@ -117,20 +117,9 @@ class connection_context final std::optional mtls_state, config::binding max_request_size, config::conversion_binding, std::vector> - kafka_throughput_controlled_api_keys) noexcept - : _server(s) - , conn(conn) - , _sasl(std::move(sasl)) - // tests may build a context without a live connection - , _client_addr(conn ? conn->addr.addr() : ss::net::inet_address{}) - , _enable_authorizer(enable_authorizer) - , _authlog(_client_addr, client_port()) - , _mtls_state(std::move(mtls_state)) - , _max_request_size(std::move(max_request_size)) - , _kafka_throughput_controlled_api_keys( - std::move(kafka_throughput_controlled_api_keys)) {} - - ~connection_context() noexcept = default; + kafka_throughput_controlled_api_keys) noexcept; + ~connection_context() noexcept; + connection_context(const connection_context&) = delete; connection_context(connection_context&&) = delete; connection_context& operator=(const connection_context&) = delete; @@ -352,9 +341,9 @@ class connection_context final ctx_log _authlog; std::optional _mtls_state; config::binding _max_request_size; - ss::lowres_clock::time_point _throttled_until; config::conversion_binding, std::vector> _kafka_throughput_controlled_api_keys; + std::unique_ptr _snc_quota_context; }; } // namespace kafka diff --git a/src/v/kafka/server/fwd.h b/src/v/kafka/server/fwd.h index f5eff27ceb81..a3a06ce0a605 100644 --- a/src/v/kafka/server/fwd.h +++ b/src/v/kafka/server/fwd.h @@ -25,5 +25,6 @@ class request_context; class rm_group_frontend; class rm_group_proxy_impl; class usage_manager; +class snc_quota_context; } // namespace kafka diff --git a/src/v/kafka/server/snc_quota_manager.cc b/src/v/kafka/server/snc_quota_manager.cc index 6a61c1a55638..1280a7f9aa18 100644 --- a/src/v/kafka/server/snc_quota_manager.cc +++ b/src/v/kafka/server/snc_quota_manager.cc @@ -19,6 +19,8 @@ #include #include +#include +#include #include using namespace std::chrono_literals; @@ -248,14 +250,61 @@ snc_quota_manager::calc_node_quota_default() const { return default_quota; } +void snc_quota_manager::get_or_create_quota_context( + std::unique_ptr& ctx, + std::optional client_id) { + if (likely(ctx)) { + // NB: comparing string_view to sstring might be suboptimal + if (likely(ctx->_client_id == client_id)) { + // the context is the right one + return; + } + + // either of the context indexing propeties have changed on the client + // within the same connection. This is an unexpected path, quotas may + // misbehave if we ever get here. The design is based on assumption that + // this should not happen. If it does happen with a supported client, we + // probably should start supporting multiple quota contexts per + // connection + vlog( + klog.warn, + "qm - client_id has changed on the connection. Quotas are reset now. " + "Old client_id: {}, new client_id: {}", + ctx->_client_id, + client_id); + } + + ctx = std::make_unique(client_id); + const auto tcgroup_it = config::find_throughput_control_group( + _kafka_throughput_control().cbegin(), + _kafka_throughput_control().cend(), + client_id); + if (tcgroup_it == _kafka_throughput_control().cend()) { + ctx->_exempt = false; + vlog(klog.debug, "qm - No throughput control group assigned"); + } else { + ctx->_exempt = true; + if (tcgroup_it->is_noname()) { + vlog( + klog.debug, + "qm - Assigned throughput control group #{}", + std::distance(_kafka_throughput_control().cbegin(), tcgroup_it)); + } else { + vlog( + klog.debug, + "qm - Assigned throughput control group: {}", + tcgroup_it->name); + } + } +} + snc_quota_manager::delays_t snc_quota_manager::get_shard_delays( - clock::time_point& connection_throttle_until, - const clock::time_point now) const { + snc_quota_context& ctx, const clock::time_point now) const { delays_t res; // force throttle whatever the client did not do on its side - if (now < connection_throttle_until) { - res.enforce = connection_throttle_until - now; + if (now < ctx._throttled_until) { + res.enforce = ctx._throttled_until - now; } // throttling delay the connection should be requested to throttle @@ -263,23 +312,36 @@ snc_quota_manager::delays_t snc_quota_manager::get_shard_delays( res.request = std::min( _max_kafka_throttle_delay(), std::max(eval_delay(_shard_quota.in), eval_delay(_shard_quota.eg))); - connection_throttle_until = now + res.request; + ctx._throttled_until = now + res.request; return res; } void snc_quota_manager::record_request_receive( - const size_t request_size, const clock::time_point now) noexcept { + snc_quota_context& ctx, + const size_t request_size, + const clock::time_point now) noexcept { + if (ctx._exempt) { + return; + } _shard_quota.in.use(request_size, now); } void snc_quota_manager::record_request_intake( - const size_t request_size) noexcept { + snc_quota_context& ctx, const size_t request_size) noexcept { + if (ctx._exempt) { + return; + } _probe.rec_traffic_in(request_size); } void snc_quota_manager::record_response( - const size_t request_size, const clock::time_point now) noexcept { + snc_quota_context& ctx, + const size_t request_size, + const clock::time_point now) noexcept { + if (ctx._exempt) { + return; + } _shard_quota.eg.use(request_size, now); } diff --git a/src/v/kafka/server/snc_quota_manager.h b/src/v/kafka/server/snc_quota_manager.h index ac2f9bb08e65..1f55478b4afc 100644 --- a/src/v/kafka/server/snc_quota_manager.h +++ b/src/v/kafka/server/snc_quota_manager.h @@ -19,10 +19,12 @@ #include #include #include +#include #include #include #include +#include namespace kafka { @@ -58,6 +60,29 @@ class snc_quotas_probe { size_t _traffic_in = 0; }; +class snc_quota_context { +public: + explicit snc_quota_context(std::optional client_id) + : _client_id(client_id) {} + +private: + friend class snc_quota_manager; + + // Indexing + std::optional _client_id; + + // Configuration + + /// Whether the connection belongs to an exempt tput control group + bool _exempt{false}; + + // Operating + + /// What time the client on this conection should throttle (be throttled) + /// until + ss::lowres_clock::time_point _throttled_until; +}; + /// Isolates \ref quota_manager functionality related to /// shard/node/cluster (SNC) wide quotas and limits class snc_quota_manager @@ -83,28 +108,38 @@ class snc_quota_manager clock::duration request{0}; }; + /// Depending on the other arguments, create or actualize or keep the + /// existing \p ctx. The context object is supposed to be stored + /// in the connection context, and created only once per connection + /// lifetime. However since the kafka API allows changing client_id of a + /// connection on the fly, we may need to replace the existing context with + /// a new one if that happens (actualize). + /// \post (bool)ctx == true + void get_or_create_quota_context( + std::unique_ptr& ctx, + std::optional client_id); + /// Determine throttling required by shard level TP quotas. - /// @param connection_throttle_until (in,out) until what time the client - /// on this conection should throttle until. If it does not, this throttling - /// will be enforced on the next call. In: value from the last call, out: - /// value saved until the next call. - delays_t get_shard_delays( - clock::time_point& connection_throttle_until, - clock::time_point now) const; + delays_t get_shard_delays(snc_quota_context&, clock::time_point now) const; /// Record the request size when it has arrived from the transport. /// This should be done before calling \ref get_shard_delays because the /// recorded request size is used to calculate throttling parameters. void record_request_receive( - size_t request_size, clock::time_point now = clock::now()) noexcept; + snc_quota_context&, + size_t request_size, + clock::time_point now = clock::now()) noexcept; /// Record the request size when the request data is about to be consumed. /// This data is used to represent throttled throughput. - void record_request_intake(size_t request_size) noexcept; + void + record_request_intake(snc_quota_context&, size_t request_size) noexcept; /// Record the response size for all purposes void record_response( - size_t request_size, clock::time_point now = clock::now()) noexcept; + snc_quota_context&, + size_t request_size, + clock::time_point now = clock::now()) noexcept; /// Metrics probe object const snc_quotas_probe& get_snc_quotas_probe() const noexcept {