Skip to content

Commit

Permalink
Revert "kafka: remove existing fetch latency measurement"
Browse files Browse the repository at this point in the history
This reverts commit dd4676a.

Will be replaced with 23.3 commit

(cherry picked from commit 7262ff8)
  • Loading branch information
StephanDollberg authored and vbotbuildovich committed Apr 30, 2024
1 parent 568c78f commit 4629f30
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 4 deletions.
12 changes: 10 additions & 2 deletions src/v/kafka/server/handlers/fetch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,8 @@ read_result::memory_units_t reserve_memory_units(
static void fill_fetch_responses(
op_context& octx,
std::vector<read_result> results,
const std::vector<op_context::response_placeholder_ptr>& responses) {
const std::vector<op_context::response_placeholder_ptr>& responses,
op_context::latency_point start_time) {
auto range = boost::irange<size_t>(0, results.size());
if (unlikely(results.size() != responses.size())) {
// soft assert & recovery attempt
Expand Down Expand Up @@ -518,6 +519,10 @@ static void fill_fetch_responses(
}

resp_it->set(std::move(resp));
std::chrono::microseconds fetch_latency
= std::chrono::duration_cast<std::chrono::microseconds>(
op_context::latency_clock::now() - start_time);
octx.rctx.probe().record_fetch_latency(fetch_latency);
}
}

Expand Down Expand Up @@ -1151,7 +1156,10 @@ class nonpolling_fetch_plan_executor final : public fetch_plan_executor::impl {
});

fill_fetch_responses(
octx, std::move(results.read_results), fetch.responses);
octx,
std::move(results.read_results),
fetch.responses,
fetch.start_time);

octx.rctx.probe().record_fetch_latency(
results.first_run_latency_result);
Expand Down
10 changes: 8 additions & 2 deletions src/v/kafka/server/handlers/fetch.h
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,9 @@ struct read_result {
// struct aggregating fetch requests and corresponding response iterators for
// the same shard
struct shard_fetch {
explicit shard_fetch(op_context::latency_point start_time)
: start_time{start_time} {}

void push_back(
ntp_fetch_config config, op_context::response_placeholder_ptr r_ph) {
requests.push_back(std::move(config));
Expand All @@ -346,6 +349,7 @@ struct shard_fetch {
ss::shard_id shard;
std::vector<ntp_fetch_config> requests;
std::vector<op_context::response_placeholder_ptr> responses;
op_context::latency_point start_time;

friend std::ostream& operator<<(std::ostream& o, const shard_fetch& sf) {
fmt::print(o, "{}", sf.requests);
Expand All @@ -354,8 +358,10 @@ struct shard_fetch {
};

struct fetch_plan {
explicit fetch_plan(size_t shards)
: fetches_per_shard(shards, shard_fetch()) {
explicit fetch_plan(
size_t shards,
op_context::latency_point start_time = op_context::latency_clock::now())
: fetches_per_shard(shards, shard_fetch(start_time)) {
for (size_t i = 0; i < fetches_per_shard.size(); i++) {
fetches_per_shard[i].shard = i;
}
Expand Down

0 comments on commit 4629f30

Please sign in to comment.