Skip to content

Commit

Permalink
Merge pull request #15129 from redpanda-data/stephan/v232-backport-fe…
Browse files Browse the repository at this point in the history
…tch-plan-execute-metric

[v23.2.x] kafka: Add fetch plan and execute latency metric
  • Loading branch information
piyushredpanda authored Nov 27, 2023
2 parents 06d914d + 7bf1cbd commit af939c3
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 0 deletions.
37 changes: 37 additions & 0 deletions src/v/kafka/latency_probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

#include <seastar/core/metrics.hh>

#include <chrono>

namespace kafka {
class latency_probe {
public:
Expand All @@ -36,6 +38,7 @@ class latency_probe {
if (config::shard_local_cfg().disable_metrics()) {
return;
}

std::vector<sm::label_instance> labels{
sm::label("latency_metric")("microseconds")};
auto aggregate_labels = config::shard_local_cfg().aggregate_metrics()
Expand All @@ -55,6 +58,29 @@ class latency_probe {
labels,
[this] { return _produce_latency.internal_histogram_logform(); })
.aggregate(aggregate_labels)});

auto add_plan_and_execute_metric = [this, &labels, &aggregate_labels](
const std::string& fetch_label,
hist_t& hist) {
auto fetch_labels = labels;
auto fetch_result_label = sm::label("fetch_result")(fetch_label);
fetch_labels.push_back(fetch_result_label);
_metrics.add_group(
prometheus_sanitize::metrics_name("fetch_stats"),
{// Measures latency off creating the fetch plan and
// subsequently executing it - aka "one poll loop"
sm::make_histogram(
"plan_and_execute_latency_us",
sm::description("Latency of fetch planning and excution"),
fetch_labels,
[&hist] { return hist.internal_histogram_logform(); })
.aggregate(aggregate_labels)});
};

add_plan_and_execute_metric(
"non-empty", _fetch_plan_and_execute_latency);
add_plan_and_execute_metric(
"empty", _fetch_plan_and_execute_latency_empty);
}

void setup_public_metrics() {
Expand Down Expand Up @@ -89,9 +115,20 @@ class latency_probe {
_fetch_latency.record(micros.count());
}

void record_fetch_plan_and_execute_measurement(
std::chrono::microseconds micros, bool empty) {
if (empty) {
_fetch_plan_and_execute_latency_empty.record(micros.count());
} else {
_fetch_plan_and_execute_latency.record(micros.count());
}
}

private:
hist_t _produce_latency;
hist_t _fetch_latency;
hist_t _fetch_plan_and_execute_latency;
hist_t _fetch_plan_and_execute_latency_empty;
ss::metrics::metric_groups _metrics;
ss::metrics::metric_groups _public_metrics{
ssx::metrics::public_metrics_handle};
Expand Down
12 changes: 12 additions & 0 deletions src/v/kafka/server/handlers/fetch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "cluster/partition_manager.h"
#include "cluster/shard_table.h"
#include "config/configuration.h"
#include "kafka/latency_probe.h"
#include "kafka/protocol/batch_consumer.h"
#include "kafka/protocol/errors.h"
#include "kafka/protocol/fetch.h"
Expand Down Expand Up @@ -843,6 +844,9 @@ class simple_fetch_planner final : public fetch_planner::impl {
*/

static ss::future<> fetch_topic_partitions(op_context& octx) {
auto bytes_left_before = octx.bytes_left;

auto start_time = latency_probe::hist_t::clock_type::now();
auto planner = make_fetch_planner<simple_fetch_planner>();

auto fetch_plan = planner.create_plan(octx);
Expand All @@ -851,6 +855,14 @@ static ss::future<> fetch_topic_partitions(op_context& octx) {
= make_fetch_plan_executor<parallel_fetch_plan_executor>();
co_await executor.execute_plan(octx, std::move(fetch_plan));

auto end_time = latency_probe::hist_t::clock_type::now();

auto latency = std::chrono::duration_cast<std::chrono::microseconds>(
end_time - start_time);

octx.rctx.probe().record_fetch_plan_and_execute_measurement(
latency, bytes_left_before == octx.bytes_left);

if (octx.should_stop_fetch()) {
co_return;
}
Expand Down

0 comments on commit af939c3

Please sign in to comment.