Skip to content

Commit

Permalink
Revert "kafka: remove existing fetch latency measurement"
Browse files Browse the repository at this point in the history
This reverts commit dd4676a.

Will be replaced with 23.3 commit
  • Loading branch information
StephanDollberg committed Apr 26, 2024
1 parent 03a9f27 commit 7262ff8
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 4 deletions.
12 changes: 10 additions & 2 deletions src/v/kafka/server/handlers/fetch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,8 @@ read_result::memory_units_t reserve_memory_units(
static void fill_fetch_responses(
op_context& octx,
std::vector<read_result> results,
const std::vector<op_context::response_placeholder_ptr>& responses) {
const std::vector<op_context::response_placeholder_ptr>& responses,
op_context::latency_point start_time) {
auto range = boost::irange<size_t>(0, results.size());
if (unlikely(results.size() != responses.size())) {
// soft assert & recovery attempt
Expand Down Expand Up @@ -519,6 +520,10 @@ static void fill_fetch_responses(
}

resp_it->set(std::move(resp));
std::chrono::microseconds fetch_latency
= std::chrono::duration_cast<std::chrono::microseconds>(
op_context::latency_clock::now() - start_time);
octx.rctx.probe().record_fetch_latency(fetch_latency);
}
}

Expand Down Expand Up @@ -1152,7 +1157,10 @@ class nonpolling_fetch_plan_executor final : public fetch_plan_executor::impl {
});

fill_fetch_responses(
octx, std::move(results.read_results), fetch.responses);
octx,
std::move(results.read_results),
fetch.responses,
fetch.start_time);

octx.rctx.probe().record_fetch_latency(
results.first_run_latency_result);
Expand Down
10 changes: 8 additions & 2 deletions src/v/kafka/server/handlers/fetch.h
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,9 @@ struct read_result {
// struct aggregating fetch requests and corresponding response iterators for
// the same shard
struct shard_fetch {
explicit shard_fetch(op_context::latency_point start_time)
: start_time{start_time} {}

void push_back(
ntp_fetch_config config, op_context::response_placeholder_ptr r_ph) {
requests.push_back(std::move(config));
Expand All @@ -346,6 +349,7 @@ struct shard_fetch {
ss::shard_id shard;
std::vector<ntp_fetch_config> requests;
std::vector<op_context::response_placeholder_ptr> responses;
op_context::latency_point start_time;

friend std::ostream& operator<<(std::ostream& o, const shard_fetch& sf) {
fmt::print(o, "{}", sf.requests);
Expand All @@ -354,8 +358,10 @@ struct shard_fetch {
};

struct fetch_plan {
explicit fetch_plan(size_t shards)
: fetches_per_shard(shards, shard_fetch()) {
explicit fetch_plan(
size_t shards,
op_context::latency_point start_time = op_context::latency_clock::now())
: fetches_per_shard(shards, shard_fetch(start_time)) {
for (size_t i = 0; i < fetches_per_shard.size(); i++) {
fetches_per_shard[i].shard = i;
}
Expand Down

0 comments on commit 7262ff8

Please sign in to comment.