Skip to content

Commit

Permalink
k/fetch: use a constructor to build fetch_session_partition
Browse files Browse the repository at this point in the history
Avoids having to copy & paste initialization.

(cherry picked from commit a98bfcd)
  • Loading branch information
nvartolomei authored and vbotbuildovich committed Apr 10, 2024
1 parent 0f442be commit d35add9
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 29 deletions.
10 changes: 10 additions & 0 deletions src/v/kafka/server/fetch_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
*/
#pragma once
#include "kafka/protocol/errors.h"
#include "kafka/protocol/fetch.h"
#include "kafka/types.h"
#include "model/fundamental.h"
#include "model/ktp.h"
Expand All @@ -31,6 +32,15 @@ struct fetch_session_partition {
model::offset high_watermark;
model::offset last_stable_offset;
kafka::leader_epoch current_leader_epoch = invalid_leader_epoch;

fetch_session_partition(
const model::topic& tp, const fetch_request::partition& p)
: topic_partition(tp, p.partition_index)
, max_bytes(p.max_bytes)
, fetch_offset(p.fetch_offset)
, high_watermark(model::offset(-1))
, last_stable_offset(model::offset(-1))
, current_leader_epoch(p.current_leader_epoch) {}
};
/**
* Map of partitions that is kept by fetch session. This map is using intrusive
Expand Down
15 changes: 2 additions & 13 deletions src/v/kafka/server/fetch_session_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "config/configuration.h"
#include "kafka/protocol/fetch.h"
#include "kafka/server/fetch_session.h"
#include "kafka/server/logger.h"
#include "kafka/types.h"
#include "model/fundamental.h"
Expand All @@ -14,18 +15,6 @@

namespace kafka {

static fetch_session_partition make_fetch_partition(
const model::topic& tp, const fetch_request::partition& p) {
return fetch_session_partition{
.topic_partition = {tp, p.partition_index},
.max_bytes = p.max_bytes,
.fetch_offset = p.fetch_offset,
.high_watermark = model::offset(-1),
.last_stable_offset = model::offset(-1),
.current_leader_epoch = p.current_leader_epoch,
};
}

void update_fetch_session(fetch_session& session, const fetch_request& req) {
for (auto it = req.cbegin(); it != req.cend(); ++it) {
auto& topic = *it->topic;
Expand All @@ -40,7 +29,7 @@ void update_fetch_session(fetch_session& session, const fetch_request& req) {
= partition.current_leader_epoch;
} else {
session.partitions().emplace(
make_fetch_partition(topic.name, partition));
fetch_session_partition(topic.name, partition));
}
}

Expand Down
8 changes: 1 addition & 7 deletions src/v/kafka/server/handlers/fetch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -700,13 +700,7 @@ void op_context::for_each_fetch_partition(Func&& f) const {
request.cend(),
[f = std::forward<Func>(f)](
const fetch_request::const_iterator::value_type& p) {
auto& part = *p.partition;
f(fetch_session_partition{
.topic_partition = {p.topic->name, part.partition_index},
.max_bytes = part.max_bytes,
.fetch_offset = part.fetch_offset,
.current_leader_epoch = part.current_leader_epoch,
});
f(fetch_session_partition(p.topic->name, *p.partition));
});
} else {
std::for_each(
Expand Down
9 changes: 0 additions & 9 deletions src/v/kafka/server/tests/fetch_plan_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,6 @@ static ss::logger fpt_logger("fpt_test");

using namespace std::chrono_literals; // NOLINT
struct fixture {
static kafka::fetch_session_partition make_fetch_partition(
model::topic topic, model::partition_id p_id, model::offset offset) {
return kafka::fetch_session_partition{
.topic_partition = {std::move(topic), p_id},
.max_bytes = 1_MiB,
.fetch_offset = offset,
.high_watermark = offset};
}

static kafka::fetch_request::topic
make_fetch_request_topic(model::topic tp, int partitions_count) {
kafka::fetch_request::topic fetch_topic{
Expand Down

0 comments on commit d35add9

Please sign in to comment.