Skip to content

Commit

Permalink
k/replicated_partition: fixed querying end offset of an empty log
Browse files Browse the repository at this point in the history
When querying offset translator we are relying on the property of the
end offset or high watermark being exclusive. Changed the behavior of
log end offset to increment offset in Raft offset space and then
translate to achieve correct behavior when log is ended with a batch
subjected to offset translation.

Fixes: redpanda-data#16612

Signed-off-by: Michał Maślanka <michal@redpanda.com>
  • Loading branch information
mmaslankaprv committed Apr 11, 2024
1 parent a5a7355 commit fd04fd7
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 3 deletions.
4 changes: 2 additions & 2 deletions src/v/kafka/server/replicated_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ model::offset replicated_partition::log_end_offset() const {
/**
* By default we return a dirty_offset + 1
*/
return model::next_offset(
_translator->from_log_offset(_partition->dirty_offset()));
return _translator->from_log_offset(
model::next_offset(_partition->dirty_offset()));
}

model::offset replicated_partition::leader_high_watermark() const {
Expand Down
3 changes: 2 additions & 1 deletion src/v/kafka/server/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ set(srcs
alter_config_test.cc
produce_consume_test.cc
group_metadata_serialization_test.cc
partition_reassignments_test.cc)
partition_reassignments_test.cc
replicated_partition_test.cc)

rp_test(
FIXTURE_TEST
Expand Down
78 changes: 78 additions & 0 deletions src/v/kafka/server/tests/replicated_partition_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

#include "cluster/fwd.h"
#include "cluster/types.h"
#include "kafka/server/partition_proxy.h"
#include "kafka/server/replicated_partition.h"
#include "model/fundamental.h"
#include "model/metadata.h"
#include "model/namespace.h"
#include "model/record_batch_reader.h"
#include "model/record_batch_types.h"
#include "model/timeout_clock.h"
#include "raft/replicate.h"
#include "redpanda/tests/fixture.h"
#include "storage/record_batch_builder.h"
#include "test_utils/async.h"

FIXTURE_TEST(test_replicated_partition_end_offset, redpanda_thread_fixture) {
wait_for_controller_leadership().get();

model::topic_namespace tp_ns(
model::kafka_namespace, model::topic("test-topic"));

add_topic(tp_ns).get();
model::ntp ntp(tp_ns.ns, tp_ns.tp, model::partition_id(0));
auto shard = app.shard_table.local().shard_for(ntp);

tests::cooperative_spin_wait_with_timeout(10s, [this, shard, &ntp] {
return app.partition_manager.invoke_on(
*shard, [&ntp](cluster::partition_manager& pm) {
auto p = pm.get(ntp);
return p->get_leader_id().has_value();
});
}).get();

app.partition_manager
.invoke_on(
*shard,
[&ntp](cluster::partition_manager& pm) {
auto p = pm.get(ntp);
kafka::replicated_partition rp(p);
auto p_info = rp.get_partition_info();
/**
* Since log is empty from Kafka client perspective (no data
* batches), the end offset which is exclusive must be equal to 0
*/
BOOST_REQUIRE_EQUAL(rp.log_end_offset(), model::offset{0});
BOOST_REQUIRE_EQUAL(rp.high_watermark(), model::offset{0});

storage::record_batch_builder builder(
model::record_batch_type::version_fence, model::offset(0));
builder.add_raw_kv(iobuf{}, iobuf{});
builder.add_raw_kv(iobuf{}, iobuf{});
builder.add_raw_kv(iobuf{}, iobuf{});

// replicate a batch that is subjected to offset translation
return p
->replicate(
model::make_memory_record_batch_reader(
{std::move(builder).build()}),
raft::replicate_options(raft::consistency_level::quorum_ack))
.then([p, rp](result<cluster::kafka_result> rr) {
BOOST_REQUIRE(rr.has_value());
BOOST_REQUIRE_GT(p->dirty_offset(), model::offset{0});

BOOST_REQUIRE_EQUAL(rp.log_end_offset(), model::offset{0});
BOOST_REQUIRE_EQUAL(rp.high_watermark(), model::offset{0});
});
})
.get();
}

0 comments on commit fd04fd7

Please sign in to comment.