Skip to content

Commit

Permalink
kafka: add produce_tombstones flag to produce_consume_utils
Browse files Browse the repository at this point in the history
For ease of adding tombstone records to a partition in fixture tests.
  • Loading branch information
WillemKauf committed Sep 17, 2024
1 parent 503a5e0 commit 973d376
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 11 deletions.
4 changes: 2 additions & 2 deletions src/v/cloud_storage/tests/read_replica_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ FIXTURE_TEST(test_read_replica_basic_sync, read_replica_e2e_fixture) {
topic_name, model::partition_id(0), next)
.get();
BOOST_REQUIRE(!consumed_records.empty());
for (const auto& [k, v] : consumed_records) {
BOOST_REQUIRE_EQUAL(k, ssx::sformat("key{}", next()));
for (const auto& kv : consumed_records) {
BOOST_REQUIRE_EQUAL(kv.key, ssx::sformat("key{}", next()));
next += model::offset(1);
}
}
Expand Down
10 changes: 7 additions & 3 deletions src/v/kafka/server/tests/produce_consume_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,15 @@ kafka_produce_transport::produce_partition_requests(
storage::record_batch_builder builder(
model::record_batch_type::raft_data, model::offset(0));
kafka::produce_request::partition partition;
for (auto& [k, v] : records) {
for (auto& kv : records) {
const auto& k = kv.key;
const auto& v = kv.val;
iobuf key_buf;
key_buf.append(k.data(), k.size());
iobuf val_buf;
val_buf.append(v.data(), v.size());
std::optional<iobuf> val_buf;
if (!kv.is_tombstone) {
val_buf = iobuf::from({v.data(), v.size()});
}
builder.add_raw_kv(std::move(key_buf), std::move(val_buf));
}
if (ts.has_value()) {
Expand Down
24 changes: 18 additions & 6 deletions src/v/kafka/server/tests/produce_consume_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,29 @@ namespace tests {
struct kv_t {
ss::sstring key;
ss::sstring val;

bool is_tombstone;
friend std::ostream& operator<<(std::ostream& o, const kv_t& kv);

kv_t(ss::sstring k, ss::sstring v)
: key(std::move(k))
, val(std::move(v)) {}
, val(std::move(v))
, is_tombstone(false) {}

kv_t(ss::sstring k)
: key(std::move(k))
, is_tombstone(true) {}

friend bool operator==(const kv_t& l, const kv_t& r) {
return std::tie(l.key, l.val) == std::tie(r.key, r.val);
return std::tie(l.key, l.val, l.is_tombstone)
== std::tie(r.key, r.val, r.is_tombstone);
}

static std::vector<kv_t> sequence(
size_t start,
size_t num_records,
std::optional<size_t> val_start = std::nullopt,
size_t key_cardinality = 0) {
size_t key_cardinality = 0,
bool produce_tombstones = false) {
size_t vstart = val_start.value_or(start);
std::vector<kv_t> records;
records.reserve(num_records);
Expand All @@ -45,8 +52,13 @@ struct kv_t {
if (key_cardinality > 0) {
key = key % key_cardinality;
}
records.emplace_back(
ssx::sformat("key{}", key), ssx::sformat("val{}", vstart + i));
auto key_str = ssx::sformat("key{}", key);
if (produce_tombstones) {
records.emplace_back(std::move(key_str));
} else {
records.emplace_back(
std::move(key_str), ssx::sformat("val{}", vstart + i));
}
}
return records;
}
Expand Down

0 comments on commit 973d376

Please sign in to comment.