diff --git a/src/v/kafka/latency_probe.h b/src/v/kafka/latency_probe.h index c4d3649e3706..c6cc1eec8918 100644 --- a/src/v/kafka/latency_probe.h +++ b/src/v/kafka/latency_probe.h @@ -18,6 +18,8 @@ #include +#include + namespace kafka { class latency_probe { public: @@ -36,6 +38,7 @@ class latency_probe { if (config::shard_local_cfg().disable_metrics()) { return; } + std::vector labels{ sm::label("latency_metric")("microseconds")}; auto aggregate_labels = config::shard_local_cfg().aggregate_metrics() @@ -55,6 +58,29 @@ class latency_probe { labels, [this] { return _produce_latency.internal_histogram_logform(); }) .aggregate(aggregate_labels)}); + + auto add_plan_and_execute_metric = [this, &labels, &aggregate_labels]( + const std::string& fetch_label, + hist_t& hist) { + auto fetch_labels = labels; + auto fetch_result_label = sm::label("fetch_result")(fetch_label); + fetch_labels.push_back(fetch_result_label); + _metrics.add_group( + prometheus_sanitize::metrics_name("fetch_stats"), + {// Measures latency off creating the fetch plan and + // subsequently executing it - aka "one poll loop" + sm::make_histogram( + "plan_and_execute_latency_us", + sm::description("Latency of fetch planning and excution"), + fetch_labels, + [&hist] { return hist.internal_histogram_logform(); }) + .aggregate(aggregate_labels)}); + }; + + add_plan_and_execute_metric( + "non-empty", _fetch_plan_and_execute_latency); + add_plan_and_execute_metric( + "empty", _fetch_plan_and_execute_latency_empty); } void setup_public_metrics() { @@ -89,9 +115,20 @@ class latency_probe { _fetch_latency.record(micros.count()); } + void record_fetch_plan_and_execute_measurement( + std::chrono::microseconds micros, bool empty) { + if (empty) { + _fetch_plan_and_execute_latency_empty.record(micros.count()); + } else { + _fetch_plan_and_execute_latency.record(micros.count()); + } + } + private: hist_t _produce_latency; hist_t _fetch_latency; + hist_t _fetch_plan_and_execute_latency; + hist_t _fetch_plan_and_execute_latency_empty; ss::metrics::metric_groups _metrics; ss::metrics::metric_groups _public_metrics{ ssx::metrics::public_metrics_handle}; diff --git a/src/v/kafka/server/handlers/fetch.cc b/src/v/kafka/server/handlers/fetch.cc index 7880c5016daf..78963094ae2d 100644 --- a/src/v/kafka/server/handlers/fetch.cc +++ b/src/v/kafka/server/handlers/fetch.cc @@ -13,6 +13,7 @@ #include "cluster/partition_manager.h" #include "cluster/shard_table.h" #include "config/configuration.h" +#include "kafka/latency_probe.h" #include "kafka/protocol/batch_consumer.h" #include "kafka/protocol/errors.h" #include "kafka/protocol/fetch.h" @@ -843,6 +844,9 @@ class simple_fetch_planner final : public fetch_planner::impl { */ static ss::future<> fetch_topic_partitions(op_context& octx) { + auto bytes_left_before = octx.bytes_left; + + auto start_time = latency_probe::hist_t::clock_type::now(); auto planner = make_fetch_planner(); auto fetch_plan = planner.create_plan(octx); @@ -851,6 +855,14 @@ static ss::future<> fetch_topic_partitions(op_context& octx) { = make_fetch_plan_executor(); co_await executor.execute_plan(octx, std::move(fetch_plan)); + auto end_time = latency_probe::hist_t::clock_type::now(); + + auto latency = std::chrono::duration_cast( + end_time - start_time); + + octx.rctx.probe().record_fetch_plan_and_execute_measurement( + latency, bytes_left_before == octx.bytes_left); + if (octx.should_stop_fetch()) { co_return; }