Skip to content

Commit

Permalink
kafka: Add fetch plan and execute latency metric
Browse files Browse the repository at this point in the history
Adds a histogram metric to measure the time it takes to create the fetch
plan and execute it - aka a single fetch poll.

It's an approximation for the time it takes to process the data in a
fetch request once it is available.

I have separated two series one which is tracking empty fetches and one
that isn't.

Further the count of the histogram can be used to calculate the ratio of
fetch requests to polls like so:

```
sum(irate(vectorized_kafka_handler_requests_completed_total{...,
handler="fetch"}[$__rate_interval])) by ($aggr_criteria) /
sum(irate(vectorized_fetch_stats_plan_and_execute_latency_us_count{...}[$__rate_interval])) by
($aggr_criteria)
```

Looking at some scenarios we get the following values:

 - 500MB/s, 4P/4C, 288P, ~110k batch, 1ms debounce: ~0.37
 - 500MB/s, 4P/4C, 288P, ~110k batch, 10ms debounce: ~0.66
 - 125MB/s, 8kP/8kC, 40k partitions, 1ms debounce: ~0.012
 - 125MB/s, 8kP/8kC, 40k partitions, 10ms debounce: ~0.035
 - 125MB/s, 8kP/8kC, 40k partitions, 100ms debounce: ~0.24

(cherry picked from commit 220b11e)
  • Loading branch information
StephanDollberg committed Nov 24, 2023
1 parent 49b298e commit 7bf1cbd
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 0 deletions.
37 changes: 37 additions & 0 deletions src/v/kafka/latency_probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

#include <seastar/core/metrics.hh>

#include <chrono>

namespace kafka {
class latency_probe {
public:
Expand All @@ -36,6 +38,7 @@ class latency_probe {
if (config::shard_local_cfg().disable_metrics()) {
return;
}

std::vector<sm::label_instance> labels{
sm::label("latency_metric")("microseconds")};
auto aggregate_labels = config::shard_local_cfg().aggregate_metrics()
Expand All @@ -55,6 +58,29 @@ class latency_probe {
labels,
[this] { return _produce_latency.internal_histogram_logform(); })
.aggregate(aggregate_labels)});

auto add_plan_and_execute_metric = [this, &labels, &aggregate_labels](
const std::string& fetch_label,
hist_t& hist) {
auto fetch_labels = labels;
auto fetch_result_label = sm::label("fetch_result")(fetch_label);
fetch_labels.push_back(fetch_result_label);
_metrics.add_group(
prometheus_sanitize::metrics_name("fetch_stats"),
{// Measures latency off creating the fetch plan and
// subsequently executing it - aka "one poll loop"
sm::make_histogram(
"plan_and_execute_latency_us",
sm::description("Latency of fetch planning and excution"),
fetch_labels,
[&hist] { return hist.internal_histogram_logform(); })
.aggregate(aggregate_labels)});
};

add_plan_and_execute_metric(
"non-empty", _fetch_plan_and_execute_latency);
add_plan_and_execute_metric(
"empty", _fetch_plan_and_execute_latency_empty);
}

void setup_public_metrics() {
Expand Down Expand Up @@ -89,9 +115,20 @@ class latency_probe {
_fetch_latency.record(micros.count());
}

void record_fetch_plan_and_execute_measurement(
std::chrono::microseconds micros, bool empty) {
if (empty) {
_fetch_plan_and_execute_latency_empty.record(micros.count());
} else {
_fetch_plan_and_execute_latency.record(micros.count());
}
}

private:
hist_t _produce_latency;
hist_t _fetch_latency;
hist_t _fetch_plan_and_execute_latency;
hist_t _fetch_plan_and_execute_latency_empty;
ss::metrics::metric_groups _metrics;
ss::metrics::metric_groups _public_metrics{
ssx::metrics::public_metrics_handle};
Expand Down
12 changes: 12 additions & 0 deletions src/v/kafka/server/handlers/fetch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "cluster/partition_manager.h"
#include "cluster/shard_table.h"
#include "config/configuration.h"
#include "kafka/latency_probe.h"
#include "kafka/protocol/batch_consumer.h"
#include "kafka/protocol/errors.h"
#include "kafka/protocol/fetch.h"
Expand Down Expand Up @@ -843,6 +844,9 @@ class simple_fetch_planner final : public fetch_planner::impl {
*/

static ss::future<> fetch_topic_partitions(op_context& octx) {
auto bytes_left_before = octx.bytes_left;

auto start_time = latency_probe::hist_t::clock_type::now();
auto planner = make_fetch_planner<simple_fetch_planner>();

auto fetch_plan = planner.create_plan(octx);
Expand All @@ -851,6 +855,14 @@ static ss::future<> fetch_topic_partitions(op_context& octx) {
= make_fetch_plan_executor<parallel_fetch_plan_executor>();
co_await executor.execute_plan(octx, std::move(fetch_plan));

auto end_time = latency_probe::hist_t::clock_type::now();

auto latency = std::chrono::duration_cast<std::chrono::microseconds>(
end_time - start_time);

octx.rctx.probe().record_fetch_plan_and_execute_measurement(
latency, bytes_left_before == octx.bytes_left);

if (octx.should_stop_fetch()) {
co_return;
}
Expand Down

0 comments on commit 7bf1cbd

Please sign in to comment.