diff --git a/src/v/kafka/server/handlers/fetch.cc b/src/v/kafka/server/handlers/fetch.cc index f11ecc7473c2f..12770a470ed7c 100644 --- a/src/v/kafka/server/handlers/fetch.cc +++ b/src/v/kafka/server/handlers/fetch.cc @@ -422,7 +422,8 @@ read_result::memory_units_t reserve_memory_units( static void fill_fetch_responses( op_context& octx, std::vector results, - const std::vector& responses) { + const std::vector& responses, + op_context::latency_point start_time) { auto range = boost::irange(0, results.size()); if (unlikely(results.size() != responses.size())) { // soft assert & recovery attempt @@ -518,6 +519,10 @@ static void fill_fetch_responses( } resp_it->set(std::move(resp)); + std::chrono::microseconds fetch_latency + = std::chrono::duration_cast( + op_context::latency_clock::now() - start_time); + octx.rctx.probe().record_fetch_latency(fetch_latency); } } @@ -1151,7 +1156,10 @@ class nonpolling_fetch_plan_executor final : public fetch_plan_executor::impl { }); fill_fetch_responses( - octx, std::move(results.read_results), fetch.responses); + octx, + std::move(results.read_results), + fetch.responses, + fetch.start_time); octx.rctx.probe().record_fetch_latency( results.first_run_latency_result); diff --git a/src/v/kafka/server/handlers/fetch.h b/src/v/kafka/server/handlers/fetch.h index 2858867825417..dda2aef56ed13 100644 --- a/src/v/kafka/server/handlers/fetch.h +++ b/src/v/kafka/server/handlers/fetch.h @@ -331,6 +331,9 @@ struct read_result { // struct aggregating fetch requests and corresponding response iterators for // the same shard struct shard_fetch { + explicit shard_fetch(op_context::latency_point start_time) + : start_time{start_time} {} + void push_back( ntp_fetch_config config, op_context::response_placeholder_ptr r_ph) { requests.push_back(std::move(config)); @@ -346,6 +349,7 @@ struct shard_fetch { ss::shard_id shard; std::vector requests; std::vector responses; + op_context::latency_point start_time; friend std::ostream& operator<<(std::ostream& o, const shard_fetch& sf) { fmt::print(o, "{}", sf.requests); @@ -354,8 +358,10 @@ struct shard_fetch { }; struct fetch_plan { - explicit fetch_plan(size_t shards) - : fetches_per_shard(shards, shard_fetch()) { + explicit fetch_plan( + size_t shards, + op_context::latency_point start_time = op_context::latency_clock::now()) + : fetches_per_shard(shards, shard_fetch(start_time)) { for (size_t i = 0; i < fetches_per_shard.size(); i++) { fetches_per_shard[i].shard = i; }