diff --git a/src/v/kafka/server/handlers/fetch.cc b/src/v/kafka/server/handlers/fetch.cc index 319414807a80f..6c3be6901cf1a 100644 --- a/src/v/kafka/server/handlers/fetch.cc +++ b/src/v/kafka/server/handlers/fetch.cc @@ -424,7 +424,8 @@ static void fill_fetch_responses( op_context& octx, std::vector results, const std::vector& responses, - op_context::latency_point start_time) { + op_context::latency_point start_time, + bool record_latency = true) { auto range = boost::irange(0, results.size()); if (unlikely(results.size() != responses.size())) { // soft assert & recovery attempt @@ -520,10 +521,13 @@ static void fill_fetch_responses( } resp_it->set(std::move(resp)); - std::chrono::microseconds fetch_latency - = std::chrono::duration_cast( - op_context::latency_clock::now() - start_time); - octx.rctx.probe().record_fetch_latency(fetch_latency); + + if (record_latency) { + std::chrono::microseconds fetch_latency + = std::chrono::duration_cast( + op_context::latency_clock::now() - start_time); + octx.rctx.probe().record_fetch_latency(fetch_latency); + } } } @@ -708,6 +712,8 @@ class fetch_worker { std::vector read_results; // The total amount of bytes read across all results in `read_results`. size_t total_size; + // The time it took for the first `fetch_ntps_in_parallel` to complete + std::chrono::microseconds first_run_latency_result; }; ss::future run() { @@ -869,6 +875,7 @@ class fetch_worker { ss::future do_run() { bool first_run{true}; + std::chrono::microseconds first_run_latency_result{0}; // A map of indexes in `requests` to their corresponding index in // `_ctx.requests`. std::vector requests_map; @@ -895,6 +902,11 @@ class fetch_worker { _completed_waiter_count.current()); } + std::optional start_time; + if (first_run) { + start_time = op_context::latency_clock::now(); + } + auto q_results = co_await query_requests(std::move(requests)); if (first_run) { results = std::move(q_results.results); @@ -902,6 +914,9 @@ class fetch_worker { _last_visible_indexes = std::move( q_results.last_visible_indexes); + first_run_latency_result + = std::chrono::duration_cast( + op_context::latency_clock::now() - *start_time); } else { // Override the older results of the partitions with the newly // queried results. @@ -923,6 +938,7 @@ class fetch_worker { co_return worker_result{ .read_results = std::move(results), .total_size = total_size, + .first_run_latency_result = first_run_latency_result, }; } @@ -944,6 +960,7 @@ class fetch_worker { co_return worker_result{ .read_results = std::move(results), .total_size = total_size, + .first_run_latency_result = first_run_latency_result, }; } @@ -953,6 +970,7 @@ class fetch_worker { co_return worker_result{ .read_results = std::move(results), .total_size = total_size, + .first_run_latency_result = first_run_latency_result, }; } @@ -1146,7 +1164,11 @@ class nonpolling_fetch_plan_executor final : public fetch_plan_executor::impl { octx, std::move(results.read_results), fetch.responses, - fetch.start_time); + fetch.start_time, + false); + + octx.rctx.probe().record_fetch_latency( + results.first_run_latency_result); _last_result_size[fetch.shard] = results.total_size; _completed_shard_fetches.push_back(std::move(fetch));