From d8fbcb76ddec78cba206ebaccc62ff01ba2b0234 Mon Sep 17 00:00:00 2001 From: Brandon Allard Date: Tue, 9 Apr 2024 20:45:45 -0400 Subject: [PATCH] kafka: redefine fetch latency metric The `kafka_latency_fetch_latency` metric originally measured the time it'd take to complete one fetch poll. A fetch poll would create a fetch plan then execute it in parallel on every shard. On a given shard `fetch_ntps_in_parallel` would account for the majority of the execution time of the plan. Since fetches are no longer implemented by polling there isn't an exactly equivalent measurement that can be assigned to the metric. This commit instead records the duration of the first call to `fetch_ntps_in_parallel` on every shard to the metric. This first call takes as long as it would during a fetch poll. Hence the resulting measurement should be close to the duration of a fetch poll. (cherry picked from commit 44bde8d839cac08b05b8e59fab65140d577ee209) (cherry picked from commit afd1c9a8947ade4514bb0b8cf588c27a98483e70) --- src/v/kafka/server/handlers/fetch.cc | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/v/kafka/server/handlers/fetch.cc b/src/v/kafka/server/handlers/fetch.cc index 9b0ca9717d0c..12770a470ed7 100644 --- a/src/v/kafka/server/handlers/fetch.cc +++ b/src/v/kafka/server/handlers/fetch.cc @@ -707,6 +707,8 @@ class fetch_worker { std::vector read_results; // The total amount of bytes read across all results in `read_results`. size_t total_size; + // The time it took for the first `fetch_ntps_in_parallel` to complete + std::chrono::microseconds first_run_latency_result; }; ss::future run() { @@ -868,6 +870,7 @@ class fetch_worker { ss::future do_run() { bool first_run{true}; + std::chrono::microseconds first_run_latency_result{0}; // A map of indexes in `requests` to their corresponding index in // `_ctx.requests`. std::vector requests_map; @@ -894,6 +897,11 @@ class fetch_worker { _completed_waiter_count.current()); } + std::optional start_time; + if (first_run) { + start_time = op_context::latency_clock::now(); + } + auto q_results = co_await query_requests(std::move(requests)); if (first_run) { results = std::move(q_results.results); @@ -901,6 +909,9 @@ class fetch_worker { _last_visible_indexes = std::move( q_results.last_visible_indexes); + first_run_latency_result + = std::chrono::duration_cast( + op_context::latency_clock::now() - *start_time); } else { // Override the older results of the partitions with the newly // queried results. @@ -922,6 +933,7 @@ class fetch_worker { co_return worker_result{ .read_results = std::move(results), .total_size = total_size, + .first_run_latency_result = first_run_latency_result, }; } @@ -943,6 +955,7 @@ class fetch_worker { co_return worker_result{ .read_results = std::move(results), .total_size = total_size, + .first_run_latency_result = first_run_latency_result, }; } @@ -952,6 +965,7 @@ class fetch_worker { co_return worker_result{ .read_results = std::move(results), .total_size = total_size, + .first_run_latency_result = first_run_latency_result, }; } @@ -1147,6 +1161,9 @@ class nonpolling_fetch_plan_executor final : public fetch_plan_executor::impl { fetch.responses, fetch.start_time); + octx.rctx.probe().record_fetch_latency( + results.first_run_latency_result); + _last_result_size[fetch.shard] = results.total_size; _completed_shard_fetches.push_back(std::move(fetch)); _has_completed_shard_fetches.signal();