Skip to content

Commit

Permalink
Merge pull request #19864 from ztlpn/flex-assignment-balancing
Browse files Browse the repository at this point in the history
Node-local core assignment: rebalancing
  • Loading branch information
ztlpn authored Jun 26, 2024
2 parents 6ccd89f + 928723d commit 1c40c32
Show file tree
Hide file tree
Showing 17 changed files with 588 additions and 38 deletions.
7 changes: 6 additions & 1 deletion src/v/cluster/controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -465,8 +465,13 @@ ss::future<> controller::start(
return _shard_balancer.start_single(
std::ref(_shard_placement),
std::ref(_feature_table),
std::ref(_storage),
std::ref(_tp_state),
std::ref(_backend));
std::ref(_backend),
config::shard_local_cfg()
.core_balancing_on_core_count_change.bind(),
config::shard_local_cfg().core_balancing_continuous.bind(),
config::shard_local_cfg().core_balancing_debounce_timeout.bind());
})
.then(
[this] { return _drain_manager.invoke_on_all(&drain_manager::start); })
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/feature_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,8 @@ ss::future<> feature_manager::maybe_log_license_check_info() {
cfg.audit_enabled || cfg.cloud_storage_enabled
|| cfg.partition_autobalancing_mode
== model::partition_autobalancing_mode::continuous
|| has_gssapi() || has_oidc() || has_schma_id_validation()
|| has_non_default_roles) {
|| cfg.core_balancing_continuous() || has_gssapi() || has_oidc()
|| has_schma_id_validation() || has_non_default_roles) {
const auto& license = _feature_table.local().get_license();
if (!license || license->is_expired()) {
vlog(
Expand Down
181 changes: 175 additions & 6 deletions src/v/cluster/shard_balancer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,55 @@

namespace cluster {

namespace {

const bytes& state_kvstore_key() {
static thread_local bytes key = []() {
iobuf buf;
serde::write(buf, shard_placement_kvstore_key_type::balancer_state);
return iobuf_to_bytes(buf);
}();
return key;
}

struct persisted_state
: serde::
envelope<persisted_state, serde::version<0>, serde::compat_version<0>> {
uint32_t last_rebalance_core_count = 0;

bool operator==(const persisted_state&) const = default;
auto serde_fields() { return std::tie(last_rebalance_core_count); }
};

} // namespace

shard_balancer::shard_balancer(
ss::sharded<shard_placement_table>& spt,
ss::sharded<features::feature_table>& features,
ss::sharded<storage::api>& storage,
ss::sharded<topic_table>& topics,
ss::sharded<controller_backend>& cb)
ss::sharded<controller_backend>& cb,
config::binding<bool> balancing_on_core_count_change,
config::binding<bool> balancing_continuous,
config::binding<std::chrono::milliseconds> debounce_timeout)
: _shard_placement(spt.local())
, _features(features.local())
, _kvstore(storage.local().kvs())
, _topics(topics)
, _controller_backend(cb)
, _self(*config::node().node_id())
, _balancing_on_core_count_change(std::move(balancing_on_core_count_change))
, _balancing_continuous(std::move(balancing_continuous))
, _debounce_timeout(std::move(debounce_timeout))
, _debounce_jitter(_debounce_timeout())
, _balance_timer([this] { balance_timer_callback(); })
, _total_counts(ss::smp::count, 0) {
_total_counts.at(0) += 1; // controller partition

_debounce_timeout.watch([this] {
_debounce_jitter = simple_time_jitter<ss::lowres_clock>(
_debounce_timeout());
});
}

ss::future<> shard_balancer::start() {
Expand Down Expand Up @@ -125,8 +162,15 @@ ss::future<> shard_balancer::start() {
_to_assign.insert(ntp);
}
});

co_await do_assign_ntps(lock);

if (
_balancing_on_core_count_change()
&& _features.is_active(features::feature::node_local_core_assignment)) {
co_await balance_on_core_count_change(lock);
}

vassert(
tt_version == _topics.local().topics_map_revision(),
"topic_table unexpectedly changed");
Expand Down Expand Up @@ -161,6 +205,7 @@ ss::future<> shard_balancer::stop() {
shard_id);

_topics.local().unregister_delta_notification(_topic_table_notify_handle);
_balance_timer.cancel();
_wakeup_event.set();
return _gate.close();
}
Expand Down Expand Up @@ -218,6 +263,20 @@ shard_balancer::reassign_shard(model::ntp ntp, ss::shard_id shard) {
co_return errc::success;
}

errc shard_balancer::trigger_rebalance() {
if (_gate.is_closed()) {
return errc::shutting_down;
}

if (!_features.is_active(features::feature::node_local_core_assignment)) {
return errc::feature_disabled;
}

vlog(clusterlog.info, "triggering manual rebalancing");
_balance_timer.rearm(ss::lowres_clock::now());
return errc::success;
}

ss::future<> shard_balancer::assign_fiber() {
if (_gate.is_closed()) {
co_return;
Expand All @@ -243,7 +302,7 @@ ss::future<> shard_balancer::do_assign_ntps(mutex::units& lock) {
auto to_assign = std::exchange(_to_assign, {});
co_await ssx::async_for_each(
to_assign.begin(), to_assign.end(), [&](const model::ntp& ntp) {
maybe_assign(ntp, new_targets);
maybe_assign(ntp, /*can_reassign=*/false, new_targets);
});

co_await ss::max_concurrent_for_each(
Expand All @@ -260,7 +319,7 @@ ss::future<> shard_balancer::do_assign_ntps(mutex::units& lock) {
}

void shard_balancer::maybe_assign(
const model::ntp& ntp, ntp2target_t& new_targets) {
const model::ntp& ntp, bool can_reassign, ntp2target_t& new_targets) {
std::optional<shard_placement_target> prev_target
= _shard_placement.get_target(ntp);

Expand All @@ -286,20 +345,44 @@ void shard_balancer::maybe_assign(
_shard_placement.is_persistence_enabled(),
"expected persistence to be enabled");

std::optional<ss::shard_id> prev_shard;
if (prev_target && prev_target->log_revision == log_revision) {
prev_shard = prev_target->shard;
}

if (prev_shard && !can_reassign) {
// partition already assigned, keep current shard.
return;
}

auto new_shard = choose_shard(ntp, topic_data, prev_shard);
if (new_shard == prev_shard) {
return;
}

target.emplace(
replicas_view->assignment.group,
log_revision.value(),
choose_shard(ntp, topic_data, std::nullopt));
replicas_view->assignment.group, log_revision.value(), new_shard);
} else {
// node-local shard placement not enabled yet, get target from
// topic_table.
target = placement_target_on_node(replicas_view.value(), _self);
}
} else {
// partition is removed from this node, this will likely disrupt the
// counts balance, so we set up the balancing timer.

if (
_features.is_active(features::feature::node_local_core_assignment)
&& _balancing_continuous() && !_balance_timer.armed()) {
// Add jitter so that different nodes don't move replicas of the
// same partition in unison.
auto debounce_interval = _debounce_jitter.next_duration();
vlog(
clusterlog.info,
"scheduling balancing in {}s.",
debounce_interval / 1s);
_balance_timer.arm(debounce_interval);
}
}

vlog(
Expand All @@ -315,6 +398,92 @@ void shard_balancer::maybe_assign(
new_targets.emplace(ntp, target);
}

ss::future<> shard_balancer::balance_on_core_count_change(mutex::units& lock) {
uint32_t last_rebalance_core_count = 0;
auto state_buf = _kvstore.get(
storage::kvstore::key_space::shard_placement, state_kvstore_key());
if (state_buf) {
last_rebalance_core_count = serde::from_iobuf<persisted_state>(
std::move(*state_buf))
.last_rebalance_core_count;
}

// If there is no state in kvstore, this means that we are restarting with
// shard balancing enabled for the first time, and this is a good time to
// rebalance as well.

if (last_rebalance_core_count == ss::smp::count) {
co_return;
}

vlog(
clusterlog.info, "detected core count change, triggering rebalance...");
co_await do_balance(lock);
}

void shard_balancer::balance_timer_callback() {
ssx::spawn_with_gate(_gate, [this] {
return _mtx.get_units()
.then([this](mutex::units lock) {
return ss::do_with(std::move(lock), [this](mutex::units& lock) {
return do_balance(lock);
});
})
.handle_exception([this](const std::exception_ptr& e) {
if (ssx::is_shutdown_exception(e)) {
return;
}

// Retry balancing after some time.
if (!_balance_timer.armed()) {
_balance_timer.arm(_debounce_jitter.next_duration());
}
vlog(
clusterlog.warn,
"failed to balance: {}, retrying after {}s.",
e,
(_balance_timer.get_timeout() - ss::lowres_clock::now()) / 1s);
});
});
}

ss::future<> shard_balancer::do_balance(mutex::units& lock) {
// Go over all node-local ntps in random order and try to find a more
// optimal core for them.
chunked_vector<model::ntp> ntps;
co_await _shard_placement.for_each_ntp(
[&](const model::ntp& ntp, const shard_placement_target&) {
ntps.push_back(ntp);
});
std::shuffle(ntps.begin(), ntps.end(), random_generators::internal::gen);

ntp2target_t new_targets;
co_await ssx::async_for_each(
ntps.begin(), ntps.end(), [&](const model::ntp& ntp) {
maybe_assign(ntp, /*can_reassign=*/true, new_targets);
});

vlog(
clusterlog.info,
"after balancing {} ntps were reassigned",
new_targets.size());

co_await ss::max_concurrent_for_each(
new_targets,
128,
[this, &lock](const decltype(new_targets)::value_type& kv) {
const auto& [ntp, target] = kv;
return set_target(ntp, target, lock);
});

co_await _kvstore.put(
storage::kvstore::key_space::shard_placement,
state_kvstore_key(),
serde::to_iobuf(persisted_state{
.last_rebalance_core_count = ss::smp::count,
}));
}

ss::future<> shard_balancer::set_target(
const model::ntp& ntp,
const std::optional<shard_placement_target>& target,
Expand Down
23 changes: 22 additions & 1 deletion src/v/cluster/shard_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "cluster/controller_backend.h"
#include "cluster/shard_placement_table.h"
#include "container/chunked_hash_map.h"
#include "random/simple_time_jitter.h"
#include "ssx/event.h"
#include "utils/mutex.h"

Expand All @@ -36,8 +37,12 @@ class shard_balancer {
shard_balancer(
ss::sharded<shard_placement_table>&,
ss::sharded<features::feature_table>&,
ss::sharded<storage::api>&,
ss::sharded<topic_table>&,
ss::sharded<controller_backend>&);
ss::sharded<controller_backend>&,
config::binding<bool> balancing_on_core_count_change,
config::binding<bool> balancing_continuous,
config::binding<std::chrono::milliseconds> debounce_timeout);

ss::future<> start();
ss::future<> stop();
Expand All @@ -50,14 +55,23 @@ class shard_balancer {
/// Manually set shard placement for an ntp that has a replica on this node.
ss::future<errc> reassign_shard(model::ntp, ss::shard_id);

/// Manually trigger shard placement rebalancing for partitions in this
/// node.
errc trigger_rebalance();

private:
void process_delta(const topic_table::delta&);

ss::future<> assign_fiber();
ss::future<> do_assign_ntps(mutex::units& lock);

ss::future<> balance_on_core_count_change(mutex::units& lock);
void balance_timer_callback();
ss::future<> do_balance(mutex::units& lock);

void maybe_assign(
const model::ntp&,
bool can_reassign,
chunked_hash_map<model::ntp, std::optional<shard_placement_target>>&);

ss::future<> set_target(
Expand Down Expand Up @@ -88,11 +102,18 @@ class shard_balancer {
private:
shard_placement_table& _shard_placement;
features::feature_table& _features;
storage::kvstore& _kvstore;
ss::sharded<topic_table>& _topics;
ss::sharded<controller_backend>& _controller_backend;
model::node_id _self;

config::binding<bool> _balancing_on_core_count_change;
config::binding<bool> _balancing_continuous;
config::binding<std::chrono::milliseconds> _debounce_timeout;
simple_time_jitter<ss::lowres_clock> _debounce_jitter;

cluster::notification_id_type _topic_table_notify_handle;
ss::timer<ss::lowres_clock> _balance_timer;
ssx::event _wakeup_event{"shard_balancer"};
mutex _mtx{"shard_balancer"};
ss::gate _gate;
Expand Down
27 changes: 21 additions & 6 deletions src/v/cluster/shard_placement_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,7 @@ namespace {
static constexpr auto kvstore_key_space
= storage::kvstore::key_space::shard_placement;

// enum type is irrelevant, serde will serialize to 32 bit anyway
enum class kvstore_key_type {
persistence_enabled = 0,
assignment = 1,
current_state = 2,
};
using kvstore_key_type = shard_placement_kvstore_key_type;

bytes persistence_enabled_kvstore_key() {
iobuf buf;
Expand Down Expand Up @@ -831,6 +826,26 @@ shard_placement_table::get_target(const model::ntp& ntp) const {
return std::nullopt;
}

ss::future<> shard_placement_table::for_each_ntp(
ss::noncopyable_function<
void(const model::ntp&, const shard_placement_target&)> func) const {
vassert(
ss::this_shard_id() == assignment_shard_id,
"method can only be invoked on shard {}",
assignment_shard_id);
return ssx::async_for_each(
_ntp2entry.begin(),
_ntp2entry.end(),
[&func](const decltype(_ntp2entry)::value_type& kv) {
const auto& [ntp, entry] = kv;
vassert(
entry && entry->target && entry->mtx.ready(),
"[{}]: unexpected concurrent set_target()",
ntp);
func(ntp, *entry->target);
});
}

ss::future<std::error_code> shard_placement_table::prepare_create(
const model::ntp& ntp, model::revision_id expected_log_rev) {
// ensure that there is no concurrent enable_persistence() call
Expand Down
Loading

0 comments on commit 1c40c32

Please sign in to comment.