Skip to content

Commit

Permalink
Merge pull request #19861 from pgellert/quotas/observability-metrics
Browse files Browse the repository at this point in the history
CORE-2715 Quotas: add metrics on client throttling
  • Loading branch information
pgellert authored Jun 20, 2024
2 parents 74b4cfe + c53cb02 commit f94b94a
Show file tree
Hide file tree
Showing 6 changed files with 345 additions and 136 deletions.
13 changes: 13 additions & 0 deletions src/v/kafka/server/client_quota_translator.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ enum class client_quota_type {
partition_mutation_quota
};

static constexpr std::array all_client_quota_types = {
client_quota_type::produce_quota,
client_quota_type::fetch_quota,
client_quota_type::partition_mutation_quota};

std::ostream& operator<<(std::ostream&, client_quota_type);

struct client_quota_request_ctx {
Expand All @@ -85,6 +90,14 @@ enum class client_quota_rule {
kafka_client_id
};

static constexpr std::array all_client_quota_rules = {
client_quota_rule::not_applicable,
client_quota_rule::kafka_client_default,
client_quota_rule::cluster_client_default,
client_quota_rule::kafka_client_prefix,
client_quota_rule::cluster_client_prefix,
client_quota_rule::kafka_client_id};

std::ostream& operator<<(std::ostream&, client_quota_rule);

struct client_quota_value {
Expand Down
125 changes: 125 additions & 0 deletions src/v/kafka/server/quota_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,22 @@
#include "kafka/server/atomic_token_bucket.h"
#include "kafka/server/client_quota_translator.h"
#include "kafka/server/logger.h"
#include "metrics/metrics.h"
#include "prometheus/prometheus_sanitize.h"
#include "ssx/future-util.h"
#include "utils/log_hist.h"

#include <seastar/core/future.hh>
#include <seastar/core/map_reduce.hh>
#include <seastar/core/metrics.hh>
#include <seastar/core/shard_id.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/smp.hh>

#include <fmt/chrono.h>

#include <chrono>
#include <memory>
#include <optional>
#include <string_view>
#include <variant>
Expand All @@ -35,6 +40,114 @@ using namespace std::chrono_literals;

namespace kafka {

template<typename clock>
class client_quotas_probe {
public:
client_quotas_probe() = default;
client_quotas_probe(const client_quotas_probe&) = delete;
client_quotas_probe& operator=(const client_quotas_probe&) = delete;
client_quotas_probe(client_quotas_probe&&) = delete;
client_quotas_probe& operator=(client_quotas_probe&&) = delete;
~client_quotas_probe() noexcept = default;

void setup_metrics() {
namespace sm = ss::metrics;

auto metric_defs = std::vector<ss::metrics::metric_definition>{};
metric_defs.reserve(
all_client_quota_types.size() * all_client_quota_rules.size() * 2);

auto rule_label = metrics::make_namespaced_label("quota_rule");
auto quota_type_label = metrics::make_namespaced_label("quota_type");

for (auto quota_type : all_client_quota_types) {
for (auto rule : all_client_quota_rules) {
metric_defs.emplace_back(
sm::make_histogram(
"client_quota_throttle_time",
[this, rule, quota_type] {
return get_throttle_time(rule, quota_type);
},
sm::description(
"Client quota throttling delay per rule and "
"quota type (in seconds)"),
{rule_label(rule), quota_type_label(quota_type)})
.aggregate({sm::shard_label}));
metric_defs.emplace_back(
sm::make_histogram(
"client_quota_throughput",
[this, rule, quota_type] {
return get_throughput(rule, quota_type);
},
sm::description(
"Client quota throughput per rule and quota type"),
{rule_label(rule), quota_type_label(quota_type)})
.aggregate({sm::shard_label}));
}
}

auto group_name = prometheus_sanitize::metrics_name("kafka:quotas");
if (!config::shard_local_cfg().disable_metrics()) {
_internal_metrics.add_group(group_name, metric_defs);
}
if (!config::shard_local_cfg().disable_public_metrics()) {
_public_metrics.add_group(group_name, metric_defs);
}
}

void record_throttle_time(
client_quota_rule rule, client_quota_type qt, clock::duration t) {
auto& gm = (*this)(rule, qt);
// The Kafka response field `ThrottleTimeMs` is in milliseconds, so
// round the metric to milliseconds as well.
auto t_ms = std::chrono::duration_cast<std::chrono::milliseconds>(t);
gm.throttle_time.record(t_ms);
}
void record_throughput(
client_quota_rule rule, client_quota_type qt, uint64_t count) {
auto& gm = (*this)(rule, qt);
gm.throughput.record(count);
}

private:
ss::metrics::histogram
get_throttle_time(client_quota_rule rule, client_quota_type qt) const {
auto& gm = (*this)(rule, qt);
return gm.throttle_time.client_quota_histogram_logform();
}
ss::metrics::histogram
get_throughput(client_quota_rule rule, client_quota_type qt) const {
auto& gm = (*this)(rule, qt);
return gm.throughput.internal_histogram_logform();
}

struct granular_metrics {
log_hist_client_quota throttle_time;
log_hist_internal throughput;
};

granular_metrics&
operator()(client_quota_rule rule, client_quota_type type) {
return _metrics[static_cast<size_t>(rule)][static_cast<size_t>(type)];
}

const granular_metrics&
operator()(client_quota_rule rule, client_quota_type type) const {
return _metrics[static_cast<size_t>(rule)][static_cast<size_t>(type)];
}

// Assume the enums values are in sequence: [0, all_*.size())
static_assert(static_cast<size_t>(all_client_quota_rules[0]) == 0);
static_assert(static_cast<size_t>(all_client_quota_types[0]) == 0);
using metrics_container_t = std::array<
std::array<granular_metrics, all_client_quota_types.size()>,
all_client_quota_rules.size()>;

metrics::internal_metric_groups _internal_metrics;
metrics::public_metric_groups _public_metrics;
metrics_container_t _metrics{};
};

using clock = quota_manager::clock;

quota_manager::quota_manager(
Expand All @@ -58,9 +171,12 @@ quota_manager::~quota_manager() { _gc_timer.cancel(); }
ss::future<> quota_manager::stop() {
_gc_timer.cancel();
co_await _gate.close();
_probe.reset();
}

ss::future<> quota_manager::start() {
_probe = std::make_unique<client_quotas_probe<clock>>();
_probe->setup_metrics();
if (ss::this_shard_id() == _client_quotas.shard_id()) {
co_await _client_quotas.reset(client_quotas_map_t{});
_gc_timer.arm_periodic(_gc_freq);
Expand Down Expand Up @@ -208,7 +324,9 @@ ss::future<std::chrono::milliseconds> quota_manager::record_partition_mutations(
.client_id = client_id,
};
auto [key, value] = _translator.find_quota(ctx);
_probe->record_throughput(value.rule, ctx.q_type, mutations);
if (!value.limit) {
_probe->record_throttle_time(value.rule, ctx.q_type, 0ms);
vlog(
client_quota_log.trace,
"request: ctx:{}, key:{}, value:{}, mutations: {}, delay:{}"
Expand Down Expand Up @@ -236,6 +354,7 @@ ss::future<std::chrono::milliseconds> quota_manager::record_partition_mutations(
});

auto capped_delay = cap_to_max_delay(key, delay);
_probe->record_throttle_time(value.rule, ctx.q_type, capped_delay);
vlog(
client_quota_log.trace,
"request: ctx:{}, key:{}, value:{}, mutations: {}, delay:{}"
Expand Down Expand Up @@ -276,7 +395,9 @@ ss::future<clock::duration> quota_manager::record_produce_tp_and_throttle(
.client_id = client_id,
};
auto [key, value] = _translator.find_quota(ctx);
_probe->record_throughput(value.rule, ctx.q_type, bytes);
if (!value.limit) {
_probe->record_throttle_time(value.rule, ctx.q_type, 0ms);
vlog(
client_quota_log.trace,
"request: ctx:{}, key:{}, value:{}, bytes: {}, delay:{}, "
Expand All @@ -300,6 +421,7 @@ ss::future<clock::duration> quota_manager::record_produce_tp_and_throttle(
});

auto capped_delay = cap_to_max_delay(key, delay);
_probe->record_throttle_time(value.rule, ctx.q_type, capped_delay);
vlog(
client_quota_log.trace,
"request: ctx:{}, key:{}, value:{}, bytes: {}, delay:{}, "
Expand All @@ -322,6 +444,7 @@ ss::future<> quota_manager::record_fetch_tp(
.client_id = client_id,
};
auto [key, value] = _translator.find_quota(ctx);
_probe->record_throughput(value.rule, ctx.q_type, bytes);
vlog(
client_quota_log.trace,
"record request: ctx:{}, key:{}, value:{}, bytes:{}",
Expand Down Expand Up @@ -351,6 +474,7 @@ ss::future<clock::duration> quota_manager::throttle_fetch_tp(
};
auto [key, value] = _translator.find_quota(ctx);
if (!value.limit) {
_probe->record_throttle_time(value.rule, ctx.q_type, 0ms);
vlog(
client_quota_log.trace,
"throttle request: ctx:{}, key:{}, value:{}, delay:{}, "
Expand All @@ -373,6 +497,7 @@ ss::future<clock::duration> quota_manager::throttle_fetch_tp(
});

auto capped_delay = cap_to_max_delay(key, delay);
_probe->record_throttle_time(value.rule, ctx.q_type, capped_delay);
vlog(
client_quota_log.trace,
"throttle request: ctx:{}, key:{}, value:{}, delay:{}, "
Expand Down
5 changes: 5 additions & 0 deletions src/v/kafka/server/quota_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,15 @@
#include <absl/container/node_hash_map.h>

#include <chrono>
#include <memory>
#include <optional>
#include <string_view>

namespace kafka {

template<typename clock>
class client_quotas_probe;

// quota_manager tracks quota usage
//
// TODO:
Expand Down Expand Up @@ -138,6 +142,7 @@ class quota_manager : public ss::peering_sharded_service<quota_manager> {

client_quotas_t& _client_quotas;
client_quota_translator _translator;
std::unique_ptr<client_quotas_probe<clock>> _probe;

ss::timer<> _gc_timer;
clock::duration _gc_freq;
Expand Down
14 changes: 14 additions & 0 deletions src/v/utils/log_hist.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,23 @@ log_hist<duration_t, number_of_buckets, first_bucket_upper_bound>::
return seastar_histogram_logform<read_distribution_config>();
}

template<
typename duration_t,
int number_of_buckets,
uint64_t first_bucket_upper_bound>
seastar::metrics::histogram
log_hist<duration_t, number_of_buckets, first_bucket_upper_bound>::
client_quota_histogram_logform() const {
using client_quota_config = logform_config<1'000l, 1ul, 15>;

return seastar_histogram_logform<client_quota_config>();
}

// Explicit instantiation for log_hist_public
template class log_hist<std::chrono::microseconds, 18, 256ul>;
// Explicit instantiation for log_hist_internal
template class log_hist<std::chrono::microseconds, 26, 8ul>;
// Explicit instantiation for log_hist_read_dist
template class log_hist<std::chrono::minutes, 16, 4ul>;
// Explicit instantiation for log_hist_client_quota
template class log_hist<std::chrono::milliseconds, 15, 1ul>;
17 changes: 17 additions & 0 deletions src/v/utils/log_hist.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,15 @@ class log_hist {
*/
seastar::metrics::histogram read_dist_histogram_logform() const;

/*
* Generates a Prometheus histogram with 15 buckets. The first bucket has an
* upper bound of 1 - 1 and subsequent buckets have an upper bound of 2
* times the upper bound of the previous bucket.
*
* This is the histogram type used for the Kafka client quota distribution.
*/
seastar::metrics::histogram client_quota_histogram_logform() const;

private:
friend measurement;

Expand Down Expand Up @@ -241,3 +250,11 @@ using log_hist_internal = log_hist<std::chrono::microseconds, 26, 8ul>;
* the first bucket to greater than 91 days in the last bucket.
*/
using log_hist_read_dist = log_hist<std::chrono::minutes, 16, 4ul>;

/*
* This histogram has units of milliseconds instead of microseconds, and is
* used for measuring the Kafka client quota delays on the scale of less than
* 1 milliseconds in the first bucket to greater than 32 seconds in the last
* bucket.
*/
using log_hist_client_quota = log_hist<std::chrono::milliseconds, 15, 1ul>;
Loading

0 comments on commit f94b94a

Please sign in to comment.