Skip to content

Commit

Permalink
cluster: exclude internal topics from check_cluster_limits
Browse files Browse the repository at this point in the history
Internal topics are excluded from checks to prevent allocation failures
when creating them. This is to ensure that lazy-allocated internal
topics (eg. the transactions topic) can always be created.

This excludes them from the global `check_cluster_limits`. There has
already been a fixture test to effectively test that internal topics are
excluded from the limit checks, however, it erroniously relied on the
fact that the shard0 reservations were not considered in
`check_cluster_limits` to allow the test to pass. (See
`allocation_over_capacity` and the previous commit.)

This adds a new test to validate that internal topics can be created
even with partitions that are above the global shard0 reservation.
  • Loading branch information
pgellert committed Dec 4, 2024
1 parent 4b4f6a2 commit 19bc4f2
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 4 deletions.
11 changes: 8 additions & 3 deletions src/v/cluster/scheduling/partition_allocator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,16 @@ allocation_constraints partition_allocator::default_constraints() {
* with partitions that cannot be re-accommodated on smaller peers).
*/
std::error_code partition_allocator::check_cluster_limits(
const uint64_t new_partitions_replicas_requested) const {
const uint64_t new_partitions_replicas_requested,
const model::topic_namespace& topic) const {
if (_members.local().nodes().empty()) {
// Empty members table, we're probably running in a unit test
return errc::success;
}
if (allocation_node::is_internal_topic(_internal_kafka_topics, topic)) {
return errc::success;
}

// Calculate how many partition-replicas already exist, so that we can
// check if the new topic would take us past any limits.
uint64_t existent_partitions{0};
Expand Down Expand Up @@ -242,7 +247,7 @@ partition_allocator::allocate(simple_allocation_request simple_req) {
const uint64_t create_count
= static_cast<uint64_t>(simple_req.additional_partitions)
* static_cast<uint64_t>(simple_req.replication_factor);
auto cluster_errc = check_cluster_limits(create_count);
auto cluster_errc = check_cluster_limits(create_count, simple_req.tp_ns);
if (cluster_errc) {
co_return cluster_errc;
}
Expand Down Expand Up @@ -274,7 +279,7 @@ partition_allocator::allocate(allocation_request request) {
}
}

auto cluster_errc = check_cluster_limits(create_count);
auto cluster_errc = check_cluster_limits(create_count, request._nt);
if (cluster_errc) {
co_return cluster_errc;
}
Expand Down
4 changes: 3 additions & 1 deletion src/v/cluster/scheduling/partition_allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "cluster/scheduling/types.h"
#include "config/property.h"
#include "features/fwd.h"
#include "model/metadata.h"

namespace cluster {

Expand Down Expand Up @@ -145,7 +146,8 @@ class partition_allocator {
// new_partitions_replicas_requested represents the total number of
// partitions requested by a request. i.e. partitions * replicas requested.
std::error_code check_cluster_limits(
const uint64_t new_partitions_replicas_requested) const;
const uint64_t new_partitions_replicas_requested,
const model::topic_namespace& topic) const;

ss::future<result<allocation_units::pointer>>
do_allocate(allocation_request);
Expand Down
22 changes: 22 additions & 0 deletions src/v/cluster/tests/partition_allocator_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,28 @@ FIXTURE_TEST(allocation_over_capacity, partition_allocator_fixture) {
allocator().allocate(make_allocation_request(int_2, 1, 1)).get());
}

FIXTURE_TEST(
allocation_over_capacity_without_shard0, partition_allocator_fixture) {
// Disable shard0 reservations
partitions_reserve_shard0.update(0);

register_node(0, 6);
register_node(1, 6);
register_node(2, 6);

saturate_all_machines();
auto gr = allocator().state().last_group_id();
BOOST_REQUIRE(
allocator().allocate(make_allocation_request(1, 1)).get().has_error());
// group id hasn't changed
BOOST_REQUIRE_EQUAL(allocator().state().last_group_id(), gr);

// Make the topic internal and retry, should work.
kafka_internal_topics.update({tn.tp()});
BOOST_REQUIRE(allocator().allocate(make_allocation_request(1, 1)).get());
BOOST_REQUIRE_GT(allocator().state().last_group_id(), gr);
}

FIXTURE_TEST(max_allocation, partition_allocator_fixture) {
register_node(0, 2);
register_node(1, 2);
Expand Down

0 comments on commit 19bc4f2

Please sign in to comment.