diff --git a/src/v/cluster/scheduling/partition_allocator.cc b/src/v/cluster/scheduling/partition_allocator.cc index f45df6f08fae0..8e501f702f16c 100644 --- a/src/v/cluster/scheduling/partition_allocator.cc +++ b/src/v/cluster/scheduling/partition_allocator.cc @@ -108,11 +108,16 @@ allocation_constraints partition_allocator::default_constraints() { * with partitions that cannot be re-accommodated on smaller peers). */ std::error_code partition_allocator::check_cluster_limits( - const uint64_t new_partitions_replicas_requested) const { + const uint64_t new_partitions_replicas_requested, + const model::topic_namespace& topic) const { if (_members.local().nodes().empty()) { // Empty members table, we're probably running in a unit test return errc::success; } + if (allocation_node::is_internal_topic(_internal_kafka_topics, topic)) { + return errc::success; + } + // Calculate how many partition-replicas already exist, so that we can // check if the new topic would take us past any limits. uint64_t existent_partitions{0}; @@ -242,7 +247,7 @@ partition_allocator::allocate(simple_allocation_request simple_req) { const uint64_t create_count = static_cast(simple_req.additional_partitions) * static_cast(simple_req.replication_factor); - auto cluster_errc = check_cluster_limits(create_count); + auto cluster_errc = check_cluster_limits(create_count, simple_req.tp_ns); if (cluster_errc) { co_return cluster_errc; } @@ -274,7 +279,7 @@ partition_allocator::allocate(allocation_request request) { } } - auto cluster_errc = check_cluster_limits(create_count); + auto cluster_errc = check_cluster_limits(create_count, request._nt); if (cluster_errc) { co_return cluster_errc; } diff --git a/src/v/cluster/scheduling/partition_allocator.h b/src/v/cluster/scheduling/partition_allocator.h index 14545c5951b4c..2574da6f30b78 100644 --- a/src/v/cluster/scheduling/partition_allocator.h +++ b/src/v/cluster/scheduling/partition_allocator.h @@ -18,6 +18,7 @@ #include "cluster/scheduling/types.h" #include "config/property.h" #include "features/fwd.h" +#include "model/metadata.h" namespace cluster { @@ -145,7 +146,8 @@ class partition_allocator { // new_partitions_replicas_requested represents the total number of // partitions requested by a request. i.e. partitions * replicas requested. std::error_code check_cluster_limits( - const uint64_t new_partitions_replicas_requested) const; + const uint64_t new_partitions_replicas_requested, + const model::topic_namespace& topic) const; ss::future> do_allocate(allocation_request); diff --git a/src/v/cluster/tests/partition_allocator_tests.cc b/src/v/cluster/tests/partition_allocator_tests.cc index 80ac265f1b95c..8ef3382213671 100644 --- a/src/v/cluster/tests/partition_allocator_tests.cc +++ b/src/v/cluster/tests/partition_allocator_tests.cc @@ -194,6 +194,28 @@ FIXTURE_TEST(allocation_over_capacity, partition_allocator_fixture) { allocator().allocate(make_allocation_request(int_2, 1, 1)).get()); } +FIXTURE_TEST( + allocation_over_capacity_without_shard0, partition_allocator_fixture) { + // Disable shard0 reservations + partitions_reserve_shard0.update(0); + + register_node(0, 6); + register_node(1, 6); + register_node(2, 6); + + saturate_all_machines(); + auto gr = allocator().state().last_group_id(); + BOOST_REQUIRE( + allocator().allocate(make_allocation_request(1, 1)).get().has_error()); + // group id hasn't changed + BOOST_REQUIRE_EQUAL(allocator().state().last_group_id(), gr); + + // Make the topic internal and retry, should work. + kafka_internal_topics.update({tn.tp()}); + BOOST_REQUIRE(allocator().allocate(make_allocation_request(1, 1)).get()); + BOOST_REQUIRE_GT(allocator().state().last_group_id(), gr); +} + FIXTURE_TEST(max_allocation, partition_allocator_fixture) { register_node(0, 2); register_node(1, 2);