Skip to content

Commit

Permalink
Merge pull request #18283 from ztlpn/flex-assignment-persistence
Browse files Browse the repository at this point in the history
Shard placement table persistence
  • Loading branch information
ztlpn authored May 20, 2024
2 parents a7cd2b0 + faa76fd commit 9005b9d
Show file tree
Hide file tree
Showing 17 changed files with 1,257 additions and 222 deletions.
7 changes: 7 additions & 0 deletions src/v/bytes/include/bytes/bytes.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,13 @@ inline iobuf bytes_to_iobuf(const bytes& in) {
return out;
}

inline iobuf bytes_to_iobuf(bytes_view in) {
iobuf out;
// NOLINTNEXTLINE
out.append(reinterpret_cast<const char*>(in.data()), in.size());
return out;
}

// NOLINTNEXTLINE(cert-dcl58-cpp): hash<> specialization
namespace std {
template<>
Expand Down
9 changes: 7 additions & 2 deletions src/v/cluster/cluster_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -237,12 +237,16 @@ std::optional<shard_placement_target> placement_target_on_node(
// expected shard is determined by the resulting assignment
// (including cancellation effects).
return shard_placement_target{
log_revision, resulting_shard_on_node.value()};
replicas_view.assignment.group,
log_revision,
resulting_shard_on_node.value()};
} else {
// partition is moved away from this node, but we keep the original
// replica until update is finished.
return shard_placement_target{
log_revision, orig_shard_on_node.value()};
replicas_view.assignment.group,
log_revision,
orig_shard_on_node.value()};
}
} else if (replicas_view.update) {
// if partition appears on the node as a result of the update, create
Expand All @@ -252,6 +256,7 @@ std::optional<shard_placement_target> placement_target_on_node(
replicas_view.update->get_target_replicas(), node);
if (updated_shard_on_node) {
return shard_placement_target{
replicas_view.assignment.group,
replicas_view.update->get_update_revision(),
updated_shard_on_node.value()};
}
Expand Down
8 changes: 6 additions & 2 deletions src/v/cluster/controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,10 @@ ss::future<> controller::wire_up() {
std::ref(_partition_allocator),
std::ref(_node_status_table));
})
.then([this] { return _shard_placement.start(); })
.then([this] {
return _shard_placement.start(ss::sharded_parameter(
[this] { return std::ref(_storage.local().kvs()); }));
})
.then([this] { _probe.start(); });
}

Expand Down Expand Up @@ -438,8 +441,9 @@ ss::future<> controller::start(
})
.then([this] {
return _shard_balancer.start_single(
std::ref(_tp_state),
std::ref(_shard_placement),
std::ref(_feature_table),
std::ref(_tp_state),
std::ref(_backend));
})
.then(
Expand Down
92 changes: 81 additions & 11 deletions src/v/cluster/shard_balancer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,18 @@
#include "cluster/cluster_utils.h"
#include "cluster/logger.h"
#include "config/node_config.h"
#include "ssx/async_algorithm.h"

namespace cluster {

shard_balancer::shard_balancer(
ss::sharded<topic_table>& topics,
ss::sharded<shard_placement_table>& spt,
ss::sharded<features::feature_table>& features,
ss::sharded<topic_table>& topics,
ss::sharded<controller_backend>& cb)
: _topics(topics)
, _shard_placement(spt)
: _shard_placement(spt.local())
, _features(features.local())
, _topics(topics)
, _controller_backend(cb)
, _self(*config::node().node_id()) {}

Expand All @@ -32,19 +35,81 @@ ss::future<> shard_balancer::start() {
"method can only be invoked on shard {}",
shard_id);

// We expect topic_table to remain unchanged throughout the method
// invocation because it is supposed to be called after local controller
// replay is finished but before we start getting new controller updates
// from the leader.
auto tt_version = _topics.local().topics_map_revision();

co_await _shard_placement.invoke_on_all([this](shard_placement_table& spt) {
return spt.initialize(_topics.local(), _self);
});
if (_shard_placement.is_persistence_enabled()) {
// 1. collect the set of node-local ntps from topic_table

chunked_hash_map<raft::group_id, model::ntp> local_group2ntp;
chunked_hash_map<model::ntp, model::revision_id> local_ntp2log_revision;
const auto& topics = _topics.local();
ssx::async_counter counter;
for (const auto& [ns_tp, md_item] : topics.all_topics_metadata()) {
vassert(
tt_version == topics.topics_map_revision(),
"topic_table unexpectedly changed");

co_await ssx::async_for_each_counter(
counter,
md_item.get_assignments().begin(),
md_item.get_assignments().end(),
[&](const partition_assignment& p_as) {
vassert(
tt_version == topics.topics_map_revision(),
"topic_table unexpectedly changed");

model::ntp ntp{ns_tp.ns, ns_tp.tp, p_as.id};
auto replicas_view = topics.get_replicas_view(
ntp, md_item, p_as);
auto log_rev = log_revision_on_node(replicas_view, _self);
if (log_rev) {
local_group2ntp.emplace(
replicas_view.assignment.group, ntp);
local_ntp2log_revision.emplace(ntp, *log_rev);
}
});
}

// 2. restore shard_placement_table from the kvstore

co_await _shard_placement.initialize_from_kvstore(local_group2ntp);

// 3. assign non-assigned ntps that have to be assigned

co_await ssx::async_for_each_counter(
counter,
local_ntp2log_revision.begin(),
local_ntp2log_revision.end(),
[&](const std::pair<const model::ntp&, model::revision_id> kv) {
const auto& [ntp, log_revision] = kv;
auto existing_target = _shard_placement.get_target(ntp);
if (
!existing_target
|| existing_target->log_revision != log_revision) {
_to_assign.insert(ntp);
}
});
co_await do_assign_ntps();
} else {
co_await _shard_placement.initialize_from_topic_table(_topics, _self);

if (_features.is_active(
features::feature::shard_placement_persistence)) {
co_await _shard_placement.enable_persistence();
}
}

// we shouldn't be receiving any controller updates at this point, so no
// risk of missing a notification between initializing shard_placement_table
// and subscribing.
vassert(
tt_version == _topics.local().topics_map_revision(),
"topic_table unexpectedly changed");

// we shouldn't be receiving any controller updates at this point, so no
// risk of missing a notification between initializing shard_placement_table
// and subscribing.
_topic_table_notify_handle = _topics.local().register_delta_notification(
[this](topic_table::delta_range_t deltas_range) {
for (const auto& delta : deltas_range) {
Expand Down Expand Up @@ -88,6 +153,12 @@ ss::future<> shard_balancer::assign_fiber() {
co_return;
}

if (
_features.is_active(features::feature::shard_placement_persistence)
&& !_shard_placement.is_persistence_enabled()) {
co_await _shard_placement.enable_persistence();
}

co_await do_assign_ntps();
}
}
Expand Down Expand Up @@ -118,8 +189,7 @@ ss::future<> shard_balancer::assign_ntp(const model::ntp& ntp) {
target);

try {
co_await _shard_placement.local().set_target(
ntp, target, shard_callback);
co_await _shard_placement.set_target(ntp, target, shard_callback);
} catch (...) {
auto ex = std::current_exception();
if (!ssx::is_shutdown_exception(ex)) {
Expand Down
6 changes: 4 additions & 2 deletions src/v/cluster/shard_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ class shard_balancer {
static constexpr ss::shard_id shard_id = 0;

shard_balancer(
ss::sharded<topic_table>&,
ss::sharded<shard_placement_table>&,
ss::sharded<features::feature_table>&,
ss::sharded<topic_table>&,
ss::sharded<controller_backend>&);

ss::future<> start();
Expand All @@ -46,8 +47,9 @@ class shard_balancer {
ss::future<> assign_ntp(const model::ntp&);

private:
shard_placement_table& _shard_placement;
features::feature_table& _features;
ss::sharded<topic_table>& _topics;
ss::sharded<shard_placement_table>& _shard_placement;
ss::sharded<controller_backend>& _controller_backend;
model::node_id _self;

Expand Down
Loading

0 comments on commit 9005b9d

Please sign in to comment.