Skip to content

Commit

Permalink
Merge pull request redpanda-data#24109 from dotnwat/bazel-test-backfill
Browse files Browse the repository at this point in the history
kafka: backfill kafka client fixture tests
  • Loading branch information
dotnwat authored Nov 15, 2024
2 parents 9d4782e + ebf9cd7 commit 303a7a2
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 12 deletions.
148 changes: 148 additions & 0 deletions src/v/kafka/client/test/BUILD
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
load("//bazel:test.bzl", "redpanda_cc_btest", "redpanda_test_cc_library")

redpanda_test_cc_library(
name = "fixture",
hdrs = [
"fixture.h",
],
implementation_deps = [
],
include_prefix = "kafka/client/test",
deps = [
"//src/v/kafka/client",
"//src/v/redpanda/tests:fixture",
],
)

redpanda_test_cc_library(
name = "utils",
hdrs = [
Expand Down Expand Up @@ -103,3 +117,137 @@ redpanda_cc_btest(
"@seastar//:testing",
],
)

redpanda_cc_btest(
name = "consumer_group",
timeout = "short",
srcs = [
"consumer_group.cc",
],
tags = ["exclusive"],
deps = [
"//src/v/base",
"//src/v/bytes",
"//src/v/kafka/client",
"//src/v/kafka/client/test:fixture",
"//src/v/kafka/client/test:utils",
"//src/v/kafka/protocol",
"//src/v/kafka/protocol:describe_groups",
"//src/v/kafka/protocol:fetch",
"//src/v/kafka/protocol:find_coordinator",
"//src/v/kafka/protocol:heartbeat",
"//src/v/kafka/protocol:join_group",
"//src/v/kafka/protocol:leave_group",
"//src/v/kafka/protocol:list_groups",
"//src/v/kafka/protocol:list_offset",
"//src/v/kafka/protocol:metadata",
"//src/v/kafka/protocol:offset_fetch",
"//src/v/kafka/protocol:sync_group",
"//src/v/kafka/protocol/schemata:join_group_request",
"//src/v/kafka/protocol/schemata:join_group_response",
"//src/v/kafka/protocol/schemata:offset_fetch_response",
"//src/v/kafka/server",
"//src/v/model",
"//src/v/redpanda/tests:fixture",
"//src/v/ssx:future_util",
"//src/v/test_utils:seastar_boost",
"//src/v/utils:unresolved_address",
"@abseil-cpp//absl/container:flat_hash_map",
"@boost//:test",
"@seastar",
"@seastar//:testing",
],
)

redpanda_cc_btest(
name = "fetch",
timeout = "short",
srcs = [
"fetch.cc",
],
tags = ["exclusive"],
deps = [
"//src/v/cluster",
"//src/v/kafka/client",
"//src/v/kafka/client/test:fixture",
"//src/v/kafka/client/test:utils",
"//src/v/kafka/protocol",
"//src/v/kafka/protocol:fetch",
"//src/v/kafka/protocol:metadata",
"//src/v/model",
"//src/v/redpanda/tests:fixture",
"//src/v/test_utils:seastar_boost",
"//src/v/utils:unresolved_address",
"@boost//:test",
"@seastar//:testing",
],
)

redpanda_cc_btest(
name = "produce",
timeout = "short",
srcs = [
"produce.cc",
],
tags = ["exclusive"],
deps = [
"//src/v/kafka/client",
"//src/v/kafka/client/test:fixture",
"//src/v/kafka/client/test:utils",
"//src/v/kafka/protocol",
"//src/v/kafka/protocol:metadata",
"//src/v/kafka/protocol:produce",
"//src/v/model",
"//src/v/redpanda/tests:fixture",
"//src/v/test_utils:seastar_boost",
"//src/v/utils:unresolved_address",
"@boost//:test",
"@seastar//:testing",
],
)

redpanda_cc_btest(
name = "reconnect",
timeout = "short",
srcs = [
"reconnect.cc",
],
tags = ["exclusive"],
deps = [
"//src/v/http",
"//src/v/kafka/client",
"//src/v/kafka/client/test:fixture",
"//src/v/kafka/client/test:utils",
"//src/v/kafka/protocol",
"//src/v/kafka/protocol:metadata",
"//src/v/kafka/protocol:produce",
"//src/v/model",
"//src/v/pandaproxy/test:utils",
"//src/v/redpanda/tests:fixture",
"//src/v/test_utils:seastar_boost",
"//src/v/utils:unresolved_address",
"@boost//:test",
"@seastar",
"@seastar//:testing",
],
)

redpanda_cc_btest(
name = "retry",
timeout = "short",
srcs = [
"retry.cc",
],
tags = ["exclusive"],
deps = [
"//src/v/kafka/client",
"//src/v/kafka/client/test:fixture",
"//src/v/kafka/client/test:utils",
"//src/v/kafka/protocol",
"//src/v/redpanda/tests:fixture",
"//src/v/test_utils:seastar_boost",
"@boost//:test",
"@seastar",
"@seastar//:testing",
],
)
18 changes: 9 additions & 9 deletions src/v/kafka/client/test/consumer_group.cc
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ FIXTURE_TEST(consumer_group, kafka_client_fixture) {
auto remove_consumers = ss::defer([&client, &group_id, &members]() {
for (const auto& m_id : members) {
client.remove_consumer(group_id, m_id)
.handle_exception([](std::exception_ptr e) {})
.handle_exception([](std::exception_ptr) {})
.get();
}
});
Expand Down Expand Up @@ -223,7 +223,7 @@ FIXTURE_TEST(consumer_group, kafka_client_fixture) {
}

// Check topic subscriptions - one each expected
for (int i = 0; i < members.size(); ++i) {
for (size_t i = 0; i < members.size(); ++i) {
auto consumer_topics
= client.consumer_topics(group_id, members[i]).get();
BOOST_REQUIRE_EQUAL(consumer_topics.size(), 1);
Expand All @@ -245,7 +245,7 @@ FIXTURE_TEST(consumer_group, kafka_client_fixture) {
info("list res: {}", list_res);

// Check topic subscriptions - one each expected
for (int i = 0; i < members.size(); ++i) {
for (size_t i = 0; i < members.size(); ++i) {
auto consumer_topics
= client.consumer_topics(group_id, members[i]).get();
BOOST_REQUIRE_EQUAL(consumer_topics.size(), 1);
Expand All @@ -256,7 +256,7 @@ FIXTURE_TEST(consumer_group, kafka_client_fixture) {
// range_assignment is allocated according to sorted member ids
auto sorted_members = members;
std::sort(sorted_members.begin(), sorted_members.end());
for (int i = 0; i < sorted_members.size(); ++i) {
for (size_t i = 0; i < sorted_members.size(); ++i) {
auto m_id = sorted_members[i];
auto assignment = client.consumer_assignment(group_id, m_id).get();
BOOST_REQUIRE_EQUAL(assignment.size(), 3);
Expand Down Expand Up @@ -302,7 +302,7 @@ FIXTURE_TEST(consumer_group, kafka_client_fixture) {
.get();

// Commit 5 offsets, with metadata of the member id.
for (int i = 0; i < sorted_members.size(); ++i) {
for (size_t i = 0; i < sorted_members.size(); ++i) {
auto m_id = sorted_members[i];
auto t = std::vector<kafka::offset_commit_request_topic>{};
t.reserve(3);
Expand All @@ -314,7 +314,7 @@ FIXTURE_TEST(consumer_group, kafka_client_fixture) {
return kafka::offset_commit_request_topic{
.name = topic,
.partitions = {
{.partition_index = model::partition_id{i},
{.partition_index = model::partition_id{static_cast<int>(i)},
.committed_offset = model::offset{5},
.committed_metadata{m_id()}}}};
});
Expand All @@ -331,7 +331,7 @@ FIXTURE_TEST(consumer_group, kafka_client_fixture) {

// Check member assignment and offsets
// range_assignment is allocated according to sorted member ids
for (int i = 0; i < sorted_members.size(); ++i) {
for (size_t i = 0; i < sorted_members.size(); ++i) {
auto m_id = sorted_members[i];
auto assignment = client.consumer_assignment(group_id, m_id).get();
BOOST_REQUIRE_EQUAL(assignment.size(), 3);
Expand Down Expand Up @@ -359,7 +359,7 @@ FIXTURE_TEST(consumer_group, kafka_client_fixture) {

// Commit all offsets
// empty list means commit all offsets
for (int i = 0; i < sorted_members.size(); ++i) {
for (size_t i = 0; i < sorted_members.size(); ++i) {
auto m_id = sorted_members[i];
auto t = std::vector<kafka::offset_commit_request_topic>{};
auto res
Expand All @@ -375,7 +375,7 @@ FIXTURE_TEST(consumer_group, kafka_client_fixture) {

// Check comimtted offsets match the fetched offsets
// range_assignment is allocated according to sorted member ids
for (int i = 0; i < sorted_members.size(); ++i) {
for (size_t i = 0; i < sorted_members.size(); ++i) {
auto m_id = sorted_members[i];
auto assignment = client.consumer_assignment(group_id, m_id).get();

Expand Down
2 changes: 1 addition & 1 deletion src/v/kafka/client/test/retry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ FIXTURE_TEST(test_retry_create_topic, kafka_client_create_topic_fixture) {
};

size_t num_topics = 20;
for (int i = 0; i < num_topics; ++i) {
for (size_t i = 0; i < num_topics; ++i) {
auto creatable_topic = make_topic(fmt::format("topic-{}", i));
try {
auto res = client.create_topic(creatable_topic).get();
Expand Down
16 changes: 15 additions & 1 deletion src/v/pandaproxy/test/BUILD
Original file line number Diff line number Diff line change
@@ -1,4 +1,18 @@
load("//bazel:test.bzl", "redpanda_cc_btest")
load("//bazel:test.bzl", "redpanda_cc_btest", "redpanda_test_cc_library")

redpanda_test_cc_library(
name = "utils",
hdrs = [
"utils.h",
],
implementation_deps = [
],
include_prefix = "pandaproxy/test",
visibility = ["//visibility:public"],
deps = [
"//src/v/pandaproxy",
],
)

redpanda_cc_btest(
name = "errors_test",
Expand Down
18 changes: 18 additions & 0 deletions src/v/redpanda/tests/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
load("//bazel:test.bzl", "redpanda_test_cc_library")

redpanda_test_cc_library(
name = "fixture",
hdrs = [
"fixture.h",
],
implementation_deps = [
],
include_prefix = "redpanda/tests",
visibility = ["//visibility:public"],
deps = [
"//src/v/cloud_storage/tests:s3_imposter",
"//src/v/redpanda:application",
"//src/v/storage/tests:disk_log_builder",
"//src/v/test_utils:seastar_boost",
],
)
2 changes: 1 addition & 1 deletion src/v/redpanda/tests/fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,7 @@ class redpanda_thread_fixture {
auto ctrl_p = p_mgr.get(model::controller_ntp);
vassert(ctrl_p, "Controller partition must exists");
return ctrl_p->linearizable_barrier().then(
[ctrl_p](result<model::offset> o) {
[ctrl_p](result<model::offset>) {
return ctrl_p->committed_offset();
});
});
Expand Down

0 comments on commit 303a7a2

Please sign in to comment.