From 44bde8d839cac08b05b8e59fab65140d577ee209 Mon Sep 17 00:00:00 2001 From: Brandon Allard Date: Tue, 9 Apr 2024 20:45:45 -0400 Subject: [PATCH] kafka: redefine fetch latency metric The `kafka_latency_fetch_latency` metric originally measured the time it'd take to complete one fetch poll. A fetch poll would create a fetch plan then execute it in parallel on every shard. On a given shard `fetch_ntps_in_parallel` would account for the majority of the execution time of the plan. Since fetches are no longer implemented by polling there isn't an exactly equivalent measurement that can be assigned to the metric. This commit instead records the duration of the first call to `fetch_ntps_in_parallel` on every shard to the metric. This first call takes as long as it would during a fetch poll. Hence the resulting measurement should be close to the duration of a fetch poll. --- src/v/kafka/server/handlers/fetch.cc | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/v/kafka/server/handlers/fetch.cc b/src/v/kafka/server/handlers/fetch.cc index 5da0f9a893bfb..272b857b01b8c 100644 --- a/src/v/kafka/server/handlers/fetch.cc +++ b/src/v/kafka/server/handlers/fetch.cc @@ -635,6 +635,8 @@ class fetch_worker { std::vector read_results; // The total amount of bytes read across all results in `read_results`. size_t total_size; + // The time it took for the first `fetch_ntps_in_parallel` to complete + std::chrono::microseconds first_run_latency_result; }; ss::future run() { @@ -796,6 +798,7 @@ class fetch_worker { ss::future do_run() { bool first_run{true}; + std::chrono::microseconds first_run_latency_result{0}; // A map of indexes in `requests` to their corresponding index in // `_ctx.requests`. std::vector requests_map; @@ -822,6 +825,11 @@ class fetch_worker { _completed_waiter_count.current()); } + std::optional start_time; + if (first_run) { + start_time = op_context::latency_clock::now(); + } + auto q_results = co_await query_requests(std::move(requests)); if (first_run) { results = std::move(q_results.results); @@ -829,6 +837,9 @@ class fetch_worker { _last_visible_indexes = std::move( q_results.last_visible_indexes); + first_run_latency_result + = std::chrono::duration_cast( + op_context::latency_clock::now() - *start_time); } else { // Override the older results of the partitions with the newly // queried results. @@ -850,6 +861,7 @@ class fetch_worker { co_return worker_result{ .read_results = std::move(results), .total_size = total_size, + .first_run_latency_result = first_run_latency_result, }; } @@ -871,6 +883,7 @@ class fetch_worker { co_return worker_result{ .read_results = std::move(results), .total_size = total_size, + .first_run_latency_result = first_run_latency_result, }; } @@ -880,6 +893,7 @@ class fetch_worker { co_return worker_result{ .read_results = std::move(results), .total_size = total_size, + .first_run_latency_result = first_run_latency_result, }; } @@ -1072,6 +1086,9 @@ class nonpolling_fetch_plan_executor final : public fetch_plan_executor::impl { fill_fetch_responses( octx, std::move(results.read_results), fetch.responses); + octx.rctx.probe().record_fetch_latency( + results.first_run_latency_result); + _last_result_size[fetch.shard] = results.total_size; _completed_shard_fetches.push_back(std::move(fetch)); _has_completed_shard_fetches.signal();