diff --git a/src/v/kafka/server/handlers/fetch.cc b/src/v/kafka/server/handlers/fetch.cc index 5da0f9a893bfb..272b857b01b8c 100644 --- a/src/v/kafka/server/handlers/fetch.cc +++ b/src/v/kafka/server/handlers/fetch.cc @@ -635,6 +635,8 @@ class fetch_worker { std::vector read_results; // The total amount of bytes read across all results in `read_results`. size_t total_size; + // The time it took for the first `fetch_ntps_in_parallel` to complete + std::chrono::microseconds first_run_latency_result; }; ss::future run() { @@ -796,6 +798,7 @@ class fetch_worker { ss::future do_run() { bool first_run{true}; + std::chrono::microseconds first_run_latency_result{0}; // A map of indexes in `requests` to their corresponding index in // `_ctx.requests`. std::vector requests_map; @@ -822,6 +825,11 @@ class fetch_worker { _completed_waiter_count.current()); } + std::optional start_time; + if (first_run) { + start_time = op_context::latency_clock::now(); + } + auto q_results = co_await query_requests(std::move(requests)); if (first_run) { results = std::move(q_results.results); @@ -829,6 +837,9 @@ class fetch_worker { _last_visible_indexes = std::move( q_results.last_visible_indexes); + first_run_latency_result + = std::chrono::duration_cast( + op_context::latency_clock::now() - *start_time); } else { // Override the older results of the partitions with the newly // queried results. @@ -850,6 +861,7 @@ class fetch_worker { co_return worker_result{ .read_results = std::move(results), .total_size = total_size, + .first_run_latency_result = first_run_latency_result, }; } @@ -871,6 +883,7 @@ class fetch_worker { co_return worker_result{ .read_results = std::move(results), .total_size = total_size, + .first_run_latency_result = first_run_latency_result, }; } @@ -880,6 +893,7 @@ class fetch_worker { co_return worker_result{ .read_results = std::move(results), .total_size = total_size, + .first_run_latency_result = first_run_latency_result, }; } @@ -1072,6 +1086,9 @@ class nonpolling_fetch_plan_executor final : public fetch_plan_executor::impl { fill_fetch_responses( octx, std::move(results.read_results), fetch.responses); + octx.rctx.probe().record_fetch_latency( + results.first_run_latency_result); + _last_result_size[fetch.shard] = results.total_size; _completed_shard_fetches.push_back(std::move(fetch)); _has_completed_shard_fetches.signal();