From 076a78ac7dddff179ce19c57a80078dfb7481ea4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Thu, 11 Apr 2024 10:39:55 +0000 Subject: [PATCH] k/replicated_partition: fixed querying end offset of an empty log MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When querying offset translator we are relying on the property of the end offset or high watermark being exclusive. Changed the behavior of log end offset to increment offset in Raft offset space and then translate to achieve correct behavior when log is ended with a batch subjected to offset translation. Fixes: #16612 Signed-off-by: Michał Maślanka (cherry picked from commit fd04fd72c8f0d37e1e480c6f3a32b6ad53ae270f) --- src/v/kafka/server/replicated_partition.h | 4 +- src/v/kafka/server/tests/CMakeLists.txt | 3 +- .../server/tests/replicated_partition_test.cc | 78 +++++++++++++++++++ 3 files changed, 82 insertions(+), 3 deletions(-) create mode 100644 src/v/kafka/server/tests/replicated_partition_test.cc diff --git a/src/v/kafka/server/replicated_partition.h b/src/v/kafka/server/replicated_partition.h index 82b0fe9d1235f..3916e6992ffab 100644 --- a/src/v/kafka/server/replicated_partition.h +++ b/src/v/kafka/server/replicated_partition.h @@ -108,8 +108,8 @@ class replicated_partition final : public kafka::partition_proxy::impl { /** * By default we return a dirty_offset + 1 */ - return model::next_offset( - _translator->from_log_offset(_partition->dirty_offset())); + return _translator->from_log_offset( + model::next_offset(_partition->dirty_offset())); } model::offset leader_high_watermark() const { diff --git a/src/v/kafka/server/tests/CMakeLists.txt b/src/v/kafka/server/tests/CMakeLists.txt index 82fc3666d61e0..e7cbfc65fbaf9 100644 --- a/src/v/kafka/server/tests/CMakeLists.txt +++ b/src/v/kafka/server/tests/CMakeLists.txt @@ -56,7 +56,8 @@ set(srcs alter_config_test.cc produce_consume_test.cc group_metadata_serialization_test.cc - partition_reassignments_test.cc) + partition_reassignments_test.cc + replicated_partition_test.cc) rp_test( FIXTURE_TEST diff --git a/src/v/kafka/server/tests/replicated_partition_test.cc b/src/v/kafka/server/tests/replicated_partition_test.cc new file mode 100644 index 0000000000000..0b4ff46d7f823 --- /dev/null +++ b/src/v/kafka/server/tests/replicated_partition_test.cc @@ -0,0 +1,78 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#include "cluster/fwd.h" +#include "cluster/types.h" +#include "kafka/server/partition_proxy.h" +#include "kafka/server/replicated_partition.h" +#include "model/fundamental.h" +#include "model/metadata.h" +#include "model/namespace.h" +#include "model/record_batch_reader.h" +#include "model/record_batch_types.h" +#include "model/timeout_clock.h" +#include "raft/replicate.h" +#include "redpanda/tests/fixture.h" +#include "storage/record_batch_builder.h" +#include "test_utils/async.h" + +FIXTURE_TEST(test_replicated_partition_end_offset, redpanda_thread_fixture) { + wait_for_controller_leadership().get(); + + model::topic_namespace tp_ns( + model::kafka_namespace, model::topic("test-topic")); + + add_topic(tp_ns).get(); + model::ntp ntp(tp_ns.ns, tp_ns.tp, model::partition_id(0)); + auto shard = app.shard_table.local().shard_for(ntp); + + tests::cooperative_spin_wait_with_timeout(10s, [this, shard, &ntp] { + return app.partition_manager.invoke_on( + *shard, [&ntp](cluster::partition_manager& pm) { + auto p = pm.get(ntp); + return p->get_leader_id().has_value(); + }); + }).get(); + + app.partition_manager + .invoke_on( + *shard, + [&ntp](cluster::partition_manager& pm) { + auto p = pm.get(ntp); + kafka::replicated_partition rp(p); + auto p_info = rp.get_partition_info(); + /** + * Since log is empty from Kafka client perspective (no data + * batches), the end offset which is exclusive must be equal to 0 + */ + BOOST_REQUIRE_EQUAL(rp.log_end_offset(), model::offset{0}); + BOOST_REQUIRE_EQUAL(rp.high_watermark(), model::offset{0}); + + storage::record_batch_builder builder( + model::record_batch_type::version_fence, model::offset(0)); + builder.add_raw_kv(iobuf{}, iobuf{}); + builder.add_raw_kv(iobuf{}, iobuf{}); + builder.add_raw_kv(iobuf{}, iobuf{}); + + // replicate a batch that is subjected to offset translation + return p + ->replicate( + model::make_memory_record_batch_reader( + {std::move(builder).build()}), + raft::replicate_options(raft::consistency_level::quorum_ack)) + .then([p, rp](result rr) { + BOOST_REQUIRE(rr.has_value()); + BOOST_REQUIRE_GT(p->dirty_offset(), model::offset{0}); + + BOOST_REQUIRE_EQUAL(rp.log_end_offset(), model::offset{0}); + BOOST_REQUIRE_EQUAL(rp.high_watermark(), model::offset{0}); + }); + }) + .get(); +}