Skip to content

Commit

Permalink
Merge pull request redpanda-data#17977 from vbotbuildovich/backport-p…
Browse files Browse the repository at this point in the history
…r-17720-v23.3.x-868

[v23.3.x] Change the` kafka_latency_fetch_latency` metric
  • Loading branch information
piyushredpanda authored Apr 25, 2024
2 parents df8cf52 + 7c5f71d commit f445662
Showing 1 changed file with 28 additions and 6 deletions.
34 changes: 28 additions & 6 deletions src/v/kafka/server/handlers/fetch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,8 @@ static void fill_fetch_responses(
op_context& octx,
std::vector<read_result> results,
const std::vector<op_context::response_placeholder_ptr>& responses,
op_context::latency_point start_time) {
op_context::latency_point start_time,
bool record_latency = true) {
auto range = boost::irange<size_t>(0, results.size());
if (unlikely(results.size() != responses.size())) {
// soft assert & recovery attempt
Expand Down Expand Up @@ -520,10 +521,13 @@ static void fill_fetch_responses(
}

resp_it->set(std::move(resp));
std::chrono::microseconds fetch_latency
= std::chrono::duration_cast<std::chrono::microseconds>(
op_context::latency_clock::now() - start_time);
octx.rctx.probe().record_fetch_latency(fetch_latency);

if (record_latency) {
std::chrono::microseconds fetch_latency
= std::chrono::duration_cast<std::chrono::microseconds>(
op_context::latency_clock::now() - start_time);
octx.rctx.probe().record_fetch_latency(fetch_latency);
}
}
}

Expand Down Expand Up @@ -708,6 +712,8 @@ class fetch_worker {
std::vector<read_result> read_results;
// The total amount of bytes read across all results in `read_results`.
size_t total_size;
// The time it took for the first `fetch_ntps_in_parallel` to complete
std::chrono::microseconds first_run_latency_result;
};

ss::future<worker_result> run() {
Expand Down Expand Up @@ -869,6 +875,7 @@ class fetch_worker {

ss::future<worker_result> do_run() {
bool first_run{true};
std::chrono::microseconds first_run_latency_result{0};
// A map of indexes in `requests` to their corresponding index in
// `_ctx.requests`.
std::vector<size_t> requests_map;
Expand All @@ -895,13 +902,21 @@ class fetch_worker {
_completed_waiter_count.current());
}

std::optional<op_context::latency_point> start_time;
if (first_run) {
start_time = op_context::latency_clock::now();
}

auto q_results = co_await query_requests(std::move(requests));
if (first_run) {
results = std::move(q_results.results);
total_size = q_results.total_size;

_last_visible_indexes = std::move(
q_results.last_visible_indexes);
first_run_latency_result
= std::chrono::duration_cast<std::chrono::microseconds>(
op_context::latency_clock::now() - *start_time);
} else {
// Override the older results of the partitions with the newly
// queried results.
Expand All @@ -923,6 +938,7 @@ class fetch_worker {
co_return worker_result{
.read_results = std::move(results),
.total_size = total_size,
.first_run_latency_result = first_run_latency_result,
};
}

Expand All @@ -944,6 +960,7 @@ class fetch_worker {
co_return worker_result{
.read_results = std::move(results),
.total_size = total_size,
.first_run_latency_result = first_run_latency_result,
};
}

Expand All @@ -953,6 +970,7 @@ class fetch_worker {
co_return worker_result{
.read_results = std::move(results),
.total_size = total_size,
.first_run_latency_result = first_run_latency_result,
};
}

Expand Down Expand Up @@ -1146,7 +1164,11 @@ class nonpolling_fetch_plan_executor final : public fetch_plan_executor::impl {
octx,
std::move(results.read_results),
fetch.responses,
fetch.start_time);
fetch.start_time,
false);

octx.rctx.probe().record_fetch_latency(
results.first_run_latency_result);

_last_result_size[fetch.shard] = results.total_size;
_completed_shard_fetches.push_back(std::move(fetch));
Expand Down

0 comments on commit f445662

Please sign in to comment.