From 39e296d8e66c19de0b4ea880c172d6526f13a3e5 Mon Sep 17 00:00:00 2001 From: Brandon Allard Date: Tue, 9 Apr 2024 20:45:45 -0400 Subject: [PATCH 1/2] kafka: redefine fetch latency metric The `kafka_latency_fetch_latency` metric originally measured the time it'd take to complete one fetch poll. A fetch poll would create a fetch plan then execute it in parallel on every shard. On a given shard `fetch_ntps_in_parallel` would account for the majority of the execution time of the plan. Since fetches are no longer implemented by polling there isn't an exactly equivalent measurement that can be assigned to the metric. This commit instead records the duration of the first call to `fetch_ntps_in_parallel` on every shard to the metric. This first call takes as long as it would during a fetch poll. Hence the resulting measurement should be close to the duration of a fetch poll. (cherry picked from commit 44bde8d839cac08b05b8e59fab65140d577ee209) --- src/v/kafka/server/handlers/fetch.cc | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/v/kafka/server/handlers/fetch.cc b/src/v/kafka/server/handlers/fetch.cc index b555943d9af6..4327ad2c351e 100644 --- a/src/v/kafka/server/handlers/fetch.cc +++ b/src/v/kafka/server/handlers/fetch.cc @@ -707,6 +707,8 @@ class fetch_worker { std::vector read_results; // The total amount of bytes read across all results in `read_results`. size_t total_size; + // The time it took for the first `fetch_ntps_in_parallel` to complete + std::chrono::microseconds first_run_latency_result; }; ss::future run() { @@ -868,6 +870,7 @@ class fetch_worker { ss::future do_run() { bool first_run{true}; + std::chrono::microseconds first_run_latency_result{0}; // A map of indexes in `requests` to their corresponding index in // `_ctx.requests`. std::vector requests_map; @@ -894,6 +897,11 @@ class fetch_worker { _completed_waiter_count.current()); } + std::optional start_time; + if (first_run) { + start_time = op_context::latency_clock::now(); + } + auto q_results = co_await query_requests(std::move(requests)); if (first_run) { results = std::move(q_results.results); @@ -901,6 +909,9 @@ class fetch_worker { _last_visible_indexes = std::move( q_results.last_visible_indexes); + first_run_latency_result + = std::chrono::duration_cast( + op_context::latency_clock::now() - *start_time); } else { // Override the older results of the partitions with the newly // queried results. @@ -922,6 +933,7 @@ class fetch_worker { co_return worker_result{ .read_results = std::move(results), .total_size = total_size, + .first_run_latency_result = first_run_latency_result, }; } @@ -943,6 +955,7 @@ class fetch_worker { co_return worker_result{ .read_results = std::move(results), .total_size = total_size, + .first_run_latency_result = first_run_latency_result, }; } @@ -952,6 +965,7 @@ class fetch_worker { co_return worker_result{ .read_results = std::move(results), .total_size = total_size, + .first_run_latency_result = first_run_latency_result, }; } @@ -1147,6 +1161,9 @@ class nonpolling_fetch_plan_executor final : public fetch_plan_executor::impl { fetch.responses, fetch.start_time); + octx.rctx.probe().record_fetch_latency( + results.first_run_latency_result); + _last_result_size[fetch.shard] = results.total_size; _completed_shard_fetches.push_back(std::move(fetch)); _has_completed_shard_fetches.signal(); From 7c5f71daec5e72c4cc6bc46b9e31bc322af533a0 Mon Sep 17 00:00:00 2001 From: Brandon Allard Date: Thu, 25 Apr 2024 11:19:29 -0400 Subject: [PATCH 2/2] kafka: add option to record latency in fill_fetch_responses --- src/v/kafka/server/handlers/fetch.cc | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/src/v/kafka/server/handlers/fetch.cc b/src/v/kafka/server/handlers/fetch.cc index 4327ad2c351e..40cbbe96d495 100644 --- a/src/v/kafka/server/handlers/fetch.cc +++ b/src/v/kafka/server/handlers/fetch.cc @@ -423,7 +423,8 @@ static void fill_fetch_responses( op_context& octx, std::vector results, const std::vector& responses, - op_context::latency_point start_time) { + op_context::latency_point start_time, + bool record_latency = true) { auto range = boost::irange(0, results.size()); if (unlikely(results.size() != responses.size())) { // soft assert & recovery attempt @@ -519,10 +520,13 @@ static void fill_fetch_responses( } resp_it->set(std::move(resp)); - std::chrono::microseconds fetch_latency - = std::chrono::duration_cast( - op_context::latency_clock::now() - start_time); - octx.rctx.probe().record_fetch_latency(fetch_latency); + + if (record_latency) { + std::chrono::microseconds fetch_latency + = std::chrono::duration_cast( + op_context::latency_clock::now() - start_time); + octx.rctx.probe().record_fetch_latency(fetch_latency); + } } } @@ -1159,7 +1163,8 @@ class nonpolling_fetch_plan_executor final : public fetch_plan_executor::impl { octx, std::move(results.read_results), fetch.responses, - fetch.start_time); + fetch.start_time, + false); octx.rctx.probe().record_fetch_latency( results.first_run_latency_result);