Skip to content

Commit

Permalink
Merge pull request #17727 from vbotbuildovich/backport-pr-17674-v23.2…
Browse files Browse the repository at this point in the history
….x-954

[v23.2.x] k/fetch: handle initial_leader_epoch correctly
  • Loading branch information
nvartolomei authored Apr 11, 2024
2 parents f5df263 + d35add9 commit b5c3804
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 51 deletions.
10 changes: 10 additions & 0 deletions src/v/kafka/server/fetch_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
*/
#pragma once
#include "kafka/protocol/errors.h"
#include "kafka/protocol/fetch.h"
#include "kafka/types.h"
#include "model/fundamental.h"
#include "model/ktp.h"
Expand All @@ -31,6 +32,15 @@ struct fetch_session_partition {
model::offset high_watermark;
model::offset last_stable_offset;
kafka::leader_epoch current_leader_epoch = invalid_leader_epoch;

fetch_session_partition(
const model::topic& tp, const fetch_request::partition& p)
: topic_partition(tp, p.partition_index)
, max_bytes(p.max_bytes)
, fetch_offset(p.fetch_offset)
, high_watermark(model::offset(-1))
, last_stable_offset(model::offset(-1))
, current_leader_epoch(p.current_leader_epoch) {}
};
/**
* Map of partitions that is kept by fetch session. This map is using intrusive
Expand Down
17 changes: 4 additions & 13 deletions src/v/kafka/server/fetch_session_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "config/configuration.h"
#include "kafka/protocol/fetch.h"
#include "kafka/server/fetch_session.h"
#include "kafka/server/logger.h"
#include "kafka/types.h"
#include "model/fundamental.h"
Expand All @@ -14,18 +15,6 @@

namespace kafka {

static fetch_session_partition make_fetch_partition(
const model::topic& tp, const fetch_request::partition& p) {
return fetch_session_partition{
.topic_partition = {tp, p.partition_index},
.max_bytes = p.max_bytes,
.fetch_offset = p.fetch_offset,
.high_watermark = model::offset(-1),
.last_stable_offset = model::offset(-1),
.current_leader_epoch = p.current_leader_epoch,
};
}

void update_fetch_session(fetch_session& session, const fetch_request& req) {
for (auto it = req.cbegin(); it != req.cend(); ++it) {
auto& topic = *it->topic;
Expand All @@ -36,9 +25,11 @@ void update_fetch_session(fetch_session& session, const fetch_request& req) {
s_it != session.partitions().end()) {
s_it->second->partition.max_bytes = partition.max_bytes;
s_it->second->partition.fetch_offset = partition.fetch_offset;
s_it->second->partition.current_leader_epoch
= partition.current_leader_epoch;
} else {
session.partitions().emplace(
make_fetch_partition(topic.name, partition));
fetch_session_partition(topic.name, partition));
}
}

Expand Down
7 changes: 1 addition & 6 deletions src/v/kafka/server/handlers/fetch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -700,12 +700,7 @@ void op_context::for_each_fetch_partition(Func&& f) const {
request.cend(),
[f = std::forward<Func>(f)](
const fetch_request::const_iterator::value_type& p) {
auto& part = *p.partition;
f(fetch_session_partition{
.topic_partition = {p.topic->name, part.partition_index},
.max_bytes = part.max_bytes,
.fetch_offset = part.fetch_offset,
});
f(fetch_session_partition(p.topic->name, *p.partition));
});
} else {
std::for_each(
Expand Down
9 changes: 0 additions & 9 deletions src/v/kafka/server/tests/fetch_plan_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,6 @@ static ss::logger fpt_logger("fpt_test");

using namespace std::chrono_literals; // NOLINT
struct fixture {
static kafka::fetch_session_partition make_fetch_partition(
model::topic topic, model::partition_id p_id, model::offset offset) {
return kafka::fetch_session_partition{
.topic_partition = {std::move(topic), p_id},
.max_bytes = 1_MiB,
.fetch_offset = offset,
.high_watermark = offset};
}

static kafka::fetch_request::topic
make_fetch_request_topic(model::topic tp, int partitions_count) {
kafka::fetch_request::topic fetch_topic{
Expand Down
59 changes: 36 additions & 23 deletions src/v/kafka/server/tests/fetch_session_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
* by the Apache License, Version 2.0
*/
#include "kafka/protocol/fetch.h"
#include "kafka/protocol/schemata/fetch_request.h"
#include "kafka/server/fetch_session.h"
#include "kafka/server/fetch_session_cache.h"
#include "kafka/types.h"
Expand All @@ -27,13 +28,15 @@

using namespace std::chrono_literals; // NOLINT
struct fixture {
static kafka::fetch_session_partition make_fetch_partition(
model::topic topic, model::partition_id p_id, model::offset offset) {
return kafka::fetch_session_partition{
.topic_partition = {topic, p_id},
.max_bytes = 1_MiB,
.fetch_offset = offset,
.high_watermark = offset};
static kafka::fetch_partition
make_fetch_partition(const model::partition_id p_id) {
return {
.partition_index = p_id,
.current_leader_epoch = kafka::leader_epoch(
random_generators::get_int(100)),
.fetch_offset = model::offset(random_generators::get_int(10000)),
.max_bytes = random_generators::get_int(1024, 1024 * 1024),
};
}

static kafka::fetch_request::topic
Expand All @@ -45,11 +48,7 @@ struct fixture {

for (int i = 0; i < partitions_count; ++i) {
fetch_topic.fetch_partitions.push_back(
kafka::fetch_request::partition{
.partition_index = model::partition_id(i),
.fetch_offset = model::offset(i * 10),
.max_bytes = 100_KiB,
});
make_fetch_partition(model::partition_id(i)));
}
return fetch_topic;
}
Expand Down Expand Up @@ -79,16 +78,18 @@ FIXTURE_TEST(test_fetch_session_basic_operations, fixture) {
expected.reserve(20);

for (int i = 0; i < 20; ++i) {
auto req = make_fetch_request_topic(
model::topic(random_generators::gen_alphanum_string(5)), 1);
req.fetch_partitions[0].partition_index = model::partition_id(
random_generators::get_int(i * 10, ((i + 1) * 10) - 1));

expected.push_back(tpo{
model::ktp{
model::topic(random_generators::gen_alphanum_string(5)),
model::partition_id(
random_generators::get_int(i * 10, ((i + 1) * 10) - 1))},
model::offset(random_generators::get_int(10000))});

auto& t = expected.back();
session.partitions().emplace(fixture::make_fetch_partition(
t.ktp.get_topic(), t.ktp.get_partition(), t.offset));
model::topic(req.name),
model::partition_id(req.fetch_partitions[0].partition_index)},
model::offset(req.fetch_partitions[0].fetch_offset)});
session.partitions().emplace(
kafka::fetch_session_partition(req.name, req.fetch_partitions[0]));
}

BOOST_TEST_MESSAGE("test insertion order iteration");
Expand Down Expand Up @@ -156,6 +157,9 @@ FIXTURE_TEST(test_session_operations, fixture) {
BOOST_REQUIRE_EQUAL(
fp.topic_partition.get_partition(),
req.data.topics[0].fetch_partitions[i].partition_index);
BOOST_REQUIRE_EQUAL(
fp.current_leader_epoch,
req.data.topics[0].fetch_partitions[i].current_leader_epoch);
BOOST_REQUIRE_EQUAL(
fp.fetch_offset,
req.data.topics[0].fetch_partitions[i].fetch_offset);
Expand All @@ -170,13 +174,17 @@ FIXTURE_TEST(test_session_operations, fixture) {

BOOST_TEST_MESSAGE("test updating session");
{
// Remove and forget about the first partition.
req.data.topics[0].fetch_partitions.erase(
std::next(req.data.topics[0].fetch_partitions.begin()));
// add 2 partitons from new topic, forget one from the first topic
req.data.topics.push_back(
make_fetch_request_topic(model::topic("test-new"), 2));
req.data.forgotten.push_back(kafka::fetch_request::forgotten_topic{
.name = model::topic("test"), .forgotten_partition_indexes = {1}});
// Update the second partition.
req.data.topics[0].fetch_partitions[0] = make_fetch_partition(
req.data.topics[0].fetch_partitions[0].partition_index);
// Add 2 partitions from new topic.
req.data.topics.push_back(
make_fetch_request_topic(model::topic("test-new"), 2));

auto ctx = cache.maybe_get_session(req);

Expand All @@ -203,6 +211,11 @@ FIXTURE_TEST(test_session_operations, fixture) {
BOOST_REQUIRE_EQUAL(
fp.topic_partition.get_partition(),
req.data.topics[t_idx].fetch_partitions[p_idx].partition_index);
BOOST_REQUIRE_EQUAL(
fp.current_leader_epoch,
req.data.topics[t_idx]
.fetch_partitions[p_idx]
.current_leader_epoch);
BOOST_REQUIRE_EQUAL(
fp.fetch_offset,
req.data.topics[t_idx].fetch_partitions[p_idx].fetch_offset);
Expand Down
73 changes: 73 additions & 0 deletions src/v/kafka/server/tests/fetch_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// by the Apache License, Version 2.0

#include "kafka/protocol/batch_consumer.h"
#include "kafka/protocol/types.h"
#include "kafka/server/handlers/fetch.h"
#include "kafka/types.h"
#include "model/fundamental.h"
Expand All @@ -17,6 +18,7 @@

#include <seastar/core/smp.hh>

#include <boost/test/tools/old/interface.hpp>
#include <fmt/ostream.h>

#include <chrono>
Expand Down Expand Up @@ -374,6 +376,77 @@ FIXTURE_TEST(fetch_empty, redpanda_thread_fixture) {
BOOST_REQUIRE(resp_2.data.topics.empty());
}

FIXTURE_TEST(fetch_leader_epoch, redpanda_thread_fixture) {
// create a topic partition with some data
model::topic topic("foo");
model::partition_id pid(0);
auto ntp = make_default_ntp(topic, pid);
auto log_config = make_default_config();
wait_for_controller_leadership().get0();
add_topic(model::topic_namespace_view(ntp)).get();

wait_for_partition_offset(ntp, model::offset(0)).get0();

const auto shard = app.shard_table.local().shard_for(ntp);
app.partition_manager
.invoke_on(
*shard,
[ntp, this](cluster::partition_manager& mgr) {
auto partition = mgr.get(ntp);
{
auto batches = model::test::make_random_batches(
model::offset(0), 5);
auto rdr = model::make_memory_record_batch_reader(
std::move(batches));
partition->raft()
->replicate(
std::move(rdr),
raft::replicate_options(
raft::consistency_level::quorum_ack))
.discard_result()
.get0();
}
partition->raft()->step_down("trigger epoch change").get0();
wait_for_leader(ntp).get0();
{
auto batches = model::test::make_random_batches(
model::offset(0), 5);
auto rdr = model::make_memory_record_batch_reader(
std::move(batches));
partition->raft()
->replicate(
std::move(rdr),
raft::replicate_options(
raft::consistency_level::quorum_ack))
.discard_result()
.get0();
}
})
.get0();

kafka::fetch_request req;
req.data.max_bytes = std::numeric_limits<int32_t>::max();
req.data.min_bytes = 1;
req.data.max_wait_ms = std::chrono::milliseconds(1000);
req.data.topics = {
{.name = topic,
.fetch_partitions = {{
.partition_index = pid,
.current_leader_epoch = kafka::leader_epoch(1),
.fetch_offset = model::offset(6),
}}}};

auto client = make_kafka_client().get0();
client.connect().get();
auto resp = client.dispatch(req, kafka::api_version(9)).get0();
client.stop().then([&client] { client.shutdown(); }).get();

BOOST_REQUIRE_MESSAGE(
resp.data.topics[0].partitions[0].error_code
== kafka::error_code::fenced_leader_epoch,
fmt::format("error: {}", resp.data.topics[0].partitions[0].error_code));
}

FIXTURE_TEST(fetch_multi_partitions_debounce, redpanda_thread_fixture) {
// create a topic partition with some data
model::topic topic("foo");
Expand Down

0 comments on commit b5c3804

Please sign in to comment.