Skip to content

Commit

Permalink
kafka: redefine fetch latency metric
Browse files Browse the repository at this point in the history
The `kafka_latency_fetch_latency` metric originally measured the time
it'd take to complete one fetch poll. A fetch poll would create a fetch
plan then execute it in parallel on every shard. On a given shard
`fetch_ntps_in_parallel` would account for the majority of the execution time
of the plan.

Since fetches are no longer implemented by polling there isn't an
exactly equivalent measurement that can be assigned to the metric.

This commit instead records the duration of the first call to
`fetch_ntps_in_parallel` on every shard to the metric. This first call takes
as long as it would during a fetch poll. Hence the resulting measurement
should be close to the duration of a fetch poll.
  • Loading branch information
ballard26 committed Apr 10, 2024
1 parent dd4676a commit 44bde8d
Showing 1 changed file with 17 additions and 0 deletions.
17 changes: 17 additions & 0 deletions src/v/kafka/server/handlers/fetch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,8 @@ class fetch_worker {
std::vector<read_result> read_results;
// The total amount of bytes read across all results in `read_results`.
size_t total_size;
// The time it took for the first `fetch_ntps_in_parallel` to complete
std::chrono::microseconds first_run_latency_result;
};

ss::future<worker_result> run() {
Expand Down Expand Up @@ -796,6 +798,7 @@ class fetch_worker {

ss::future<worker_result> do_run() {
bool first_run{true};
std::chrono::microseconds first_run_latency_result{0};
// A map of indexes in `requests` to their corresponding index in
// `_ctx.requests`.
std::vector<size_t> requests_map;
Expand All @@ -822,13 +825,21 @@ class fetch_worker {
_completed_waiter_count.current());
}

std::optional<op_context::latency_point> start_time;
if (first_run) {
start_time = op_context::latency_clock::now();
}

auto q_results = co_await query_requests(std::move(requests));
if (first_run) {
results = std::move(q_results.results);
total_size = q_results.total_size;

_last_visible_indexes = std::move(
q_results.last_visible_indexes);
first_run_latency_result
= std::chrono::duration_cast<std::chrono::microseconds>(
op_context::latency_clock::now() - *start_time);
} else {
// Override the older results of the partitions with the newly
// queried results.
Expand All @@ -850,6 +861,7 @@ class fetch_worker {
co_return worker_result{
.read_results = std::move(results),
.total_size = total_size,
.first_run_latency_result = first_run_latency_result,
};
}

Expand All @@ -871,6 +883,7 @@ class fetch_worker {
co_return worker_result{
.read_results = std::move(results),
.total_size = total_size,
.first_run_latency_result = first_run_latency_result,
};
}

Expand All @@ -880,6 +893,7 @@ class fetch_worker {
co_return worker_result{
.read_results = std::move(results),
.total_size = total_size,
.first_run_latency_result = first_run_latency_result,
};
}

Expand Down Expand Up @@ -1072,6 +1086,9 @@ class nonpolling_fetch_plan_executor final : public fetch_plan_executor::impl {
fill_fetch_responses(
octx, std::move(results.read_results), fetch.responses);

octx.rctx.probe().record_fetch_latency(
results.first_run_latency_result);

_last_result_size[fetch.shard] = results.total_size;
_completed_shard_fetches.push_back(std::move(fetch));
_has_completed_shard_fetches.signal();
Expand Down

0 comments on commit 44bde8d

Please sign in to comment.