Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node-local core assignment: core count decrease #20312

Merged
merged 16 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/v/cluster/controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,10 @@ ss::future<> controller::wire_up() {
std::ref(_node_status_table));
})
.then([this] {
return _shard_placement.start(ss::sharded_parameter(
[this] { return std::ref(_storage.local().kvs()); }));
return _shard_placement.start(
ss::sharded_parameter([] { return ss::this_shard_id(); }),
ss::sharded_parameter(
[this] { return std::ref(_storage.local().kvs()); }));
})
.then([this] { _probe.start(); });
}
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/controller_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1725,7 +1725,7 @@ ss::future<std::error_code> controller_backend::transfer_partition(
log_revision);

auto maybe_dest = co_await _shard_placement.prepare_transfer(
ntp, log_revision);
ntp, log_revision, _shard_placement.container());
if (maybe_dest.has_error()) {
co_return maybe_dest.error();
}
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/shard_balancer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ ss::future<> shard_balancer::start() {
// 2. restore shard_placement_table from the kvstore or from topic_table.

if (_shard_placement.is_persistence_enabled()) {
co_await _shard_placement.initialize_from_kvstore(local_group2ntp);
co_await _shard_placement.initialize_from_kvstore(local_group2ntp, {});
} else if (_features.is_active(
features::feature::node_local_core_assignment)) {
// joiner node? enable persistence without initializing
Expand Down
132 changes: 84 additions & 48 deletions src/v/cluster/shard_placement_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -153,14 +153,13 @@ bytes current_state_kvstore_key(const raft::group_id group) {

} // namespace

shard_placement_table::shard_placement_table(storage::kvstore& kvstore)
: _kvstore(kvstore) {}
shard_placement_table::shard_placement_table(
ss::shard_id shard, storage::kvstore& kvstore)
: _shard(shard)
, _kvstore(kvstore) {}

bool shard_placement_table::is_persistence_enabled() const {
vassert(
ss::this_shard_id() == assignment_shard_id,
"method can only be invoked on shard {}",
assignment_shard_id);
assert_is_assignment_shard();

if (_persistence_enabled) {
return true;
Expand All @@ -171,10 +170,7 @@ bool shard_placement_table::is_persistence_enabled() const {
}

ss::future<> shard_placement_table::enable_persistence() {
vassert(
ss::this_shard_id() == assignment_shard_id,
"method can only be invoked on shard {}",
assignment_shard_id);
assert_is_assignment_shard();

if (is_persistence_enabled()) {
co_return;
Expand Down Expand Up @@ -307,25 +303,34 @@ struct shard_placement_table::ntp_init_data {
}
};

ss::future<> shard_placement_table::initialize_from_kvstore(
const chunked_hash_map<raft::group_id, model::ntp>& local_group2ntp) {
vassert(
ss::this_shard_id() == assignment_shard_id,
"method can only be invoked on shard {}",
assignment_shard_id);
ss::future<std::vector<std::unique_ptr<shard_placement_table>>>
shard_placement_table::initialize_from_kvstore(
const chunked_hash_map<raft::group_id, model::ntp>& local_group2ntp,
const std::vector<std::unique_ptr<storage::kvstore>>& extra_kvstores) {
assert_is_assignment_shard();

vassert(
is_persistence_enabled(),
"can't initialize from kvstore, persistence hasn't been enabled yet");
co_await container().invoke_on_all(
[](shard_placement_table& spt) { spt._persistence_enabled = true; });

std::vector<std::unique_ptr<shard_placement_table>> extra_spts;
for (size_t i = 0; i < extra_kvstores.size(); ++i) {
extra_spts.push_back(std::make_unique<shard_placement_table>(
ss::smp::count + i, *extra_kvstores[i]));
}

// 1. gather kvstore markers from all shards

auto shard2init_states = co_await container().map(
[&local_group2ntp](shard_placement_table& spt) {
return spt.gather_init_states(local_group2ntp);
});
for (const auto& spt : extra_spts) {
shard2init_states.push_back(
co_await spt->gather_init_states(local_group2ntp));
}

// 2. merge into up-to-date shard_placement_table state

Expand Down Expand Up @@ -365,6 +370,9 @@ ss::future<> shard_placement_table::initialize_from_kvstore(
[&ntp2init_data](shard_placement_table& spt) {
return spt.scatter_init_data(ntp2init_data);
});
for (auto& spt : extra_spts) {
co_await spt->scatter_init_data(ntp2init_data);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason why we process existing shard data concurrently, but extra shards one by one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No particular reason, but scatter_init_data is CPU-bound, so no benefit in doing it concurrently either.


co_await ssx::async_for_each(
ntp2init_data.begin(),
Expand All @@ -391,6 +399,8 @@ ss::future<> shard_placement_table::initialize_from_kvstore(
_ntp2entry.emplace(ntp, std::move(entry));
}
});

co_return extra_spts;
}

ss::future<ss::foreign_ptr<std::unique_ptr<shard_placement_table::ntp2state_t>>>
Expand All @@ -412,16 +422,18 @@ shard_placement_table::gather_init_states(
if (ntp_it == partitions.end()) {
vlog(
clusterlog.trace,
"recovered orphan assigned marker, group: {}",
"shard {}: recovered orphan assigned marker, group: {}",
_shard,
group);
orphan_assignments.push_back(group);
} else {
auto marker = serde::from_iobuf<assignment_marker>(
val.copy());
vlog(
clusterlog.trace,
"[{}] recovered assigned marker, lr: {} sr: {}",
"[{}] shard {}: recovered assigned marker, lr: {} sr: {}",
ntp_it->second,
_shard,
marker.log_revision,
marker.shard_revision);

Expand All @@ -437,8 +449,10 @@ shard_placement_table::gather_init_states(
auto marker = serde::from_iobuf<current_state_marker>(val.copy());
vlog(
clusterlog.trace,
"[{}] recovered cur state marker, lr: {} sr: {} complete: {}",
"[{}] shard {}: recovered cur state marker, lr: {} sr: {} "
"complete: {}",
marker.ntp,
_shard,
marker.log_revision,
marker.shard_revision,
marker.is_complete);
Expand All @@ -449,7 +463,7 @@ shard_placement_table::gather_init_states(
ssx::sformat,
"duplicate ntp {} in kvstore map on shard {}",
marker.ntp,
ss::this_shard_id()));
_shard));
}
state.current = shard_local_state(
group,
Expand Down Expand Up @@ -490,34 +504,44 @@ ss::future<> shard_placement_table::scatter_init_data(
auto& state = it->second;

if (state.current) {
if (ss::this_shard_id() == init_data.hosted.shard) {
if (init_data.receiving.shard) {
if (_shard == init_data.hosted.shard) {
if (
init_data.receiving.shard
&& init_data.receiving.shard < ss::smp::count) {
state._next = init_data.receiving.shard;
}
} else if (ss::this_shard_id() != init_data.receiving.shard) {
} else if (
_shard != init_data.receiving.shard
|| _shard >= ss::smp::count) {
state.current->status = hosted_status::obsolete;
}
}

ss::future<> fut = ss::now();
if (state.assigned) {
if (ss::this_shard_id() != init_data.assigned.shard) {
if (_shard != init_data.assigned.shard) {
fut = _kvstore.remove(
kvstore_key_space,
assignment_kvstore_key(state.assigned->group));
state.assigned = std::nullopt;
} else if (!init_data.hosted.shard) {
state._is_initial_for = init_data.log_revision;
}

if (_shard >= ss::smp::count) {
// mark states on extra shards as ready to transfer
state.assigned = std::nullopt;
}
}

if (state.is_empty()) {
_states.erase(it);
} else {
vlog(
clusterlog.info,
"[{}] recovered placement state: {}",
"[{}] shard {}: recovered placement state: {}",
ntp,
_shard,
state);
}

Expand All @@ -527,10 +551,8 @@ ss::future<> shard_placement_table::scatter_init_data(

ss::future<> shard_placement_table::initialize_from_topic_table(
ss::sharded<topic_table>& topics, model::node_id self) {
vassert(
ss::this_shard_id() == assignment_shard_id,
"method can only be invoked on shard {}",
assignment_shard_id);
assert_is_assignment_shard();

vassert(
!is_persistence_enabled(),
"can't initialize from topic_table, persistence has already been "
Expand All @@ -548,6 +570,11 @@ ss::future<> shard_placement_table::initialize_from_topic_table(

ss::future<> shard_placement_table::do_initialize_from_topic_table(
const topic_table& topics, model::node_id self) {
vassert(
ss::this_shard_id() == _shard,
"trying to init table for extra shard {} from topic_table",
_shard);

// We expect topic_table to remain unchanged throughout the loop because the
// method is supposed to be called after local controller replay is finished
// but before we start getting new controller updates from the leader.
Expand Down Expand Up @@ -633,10 +660,12 @@ ss::future<> shard_placement_table::set_target(
const model::ntp& ntp,
std::optional<shard_placement_target> target,
shard_callback_t shard_callback) {
vassert(
ss::this_shard_id() == assignment_shard_id,
"method can only be invoked on shard {}",
assignment_shard_id);
assert_is_assignment_shard();

if (target) {
vassert(
target->shard < ss::smp::count, "[{}] bad target: {}", ntp, target);
}

// ensure that there is no concurrent enable_persistence() call
auto persistence_lock_holder = co_await _persistence_lock.hold_read_lock();
Expand Down Expand Up @@ -703,7 +732,7 @@ ss::future<> shard_placement_table::set_target(
assignment_kvstore_key(target->group),
std::move(marker_buf));
});
} else {
} else if (prev_target.value().shard < ss::smp::count) {
co_await container().invoke_on(
prev_target.value().shard,
[group = prev_target->group, &ntp](shard_placement_table& other) {
Expand All @@ -719,7 +748,9 @@ ss::future<> shard_placement_table::set_target(

entry.target = target;

if (prev_target && (!target || target->shard != prev_target->shard)) {
if (
prev_target && prev_target->shard < ss::smp::count
&& (!target || target->shard != prev_target->shard)) {
co_await container().invoke_on(
prev_target->shard,
[&ntp, shard_callback](shard_placement_table& other) {
Expand Down Expand Up @@ -781,7 +812,7 @@ ss::future<> shard_placement_table::set_target(
// 3. Lastly, remove obsolete kvstore marker

if (
_persistence_enabled && prev_target
_persistence_enabled && prev_target && prev_target->shard < ss::smp::count
&& (!target || target->shard != prev_target->shard)) {
co_await container().invoke_on(
prev_target->shard,
Expand Down Expand Up @@ -815,10 +846,7 @@ shard_placement_table::state_on_this_shard(const model::ntp& ntp) const {

std::optional<shard_placement_target>
shard_placement_table::get_target(const model::ntp& ntp) const {
vassert(
ss::this_shard_id() == assignment_shard_id,
"method can only be invoked on shard {}",
assignment_shard_id);
assert_is_assignment_shard();
auto it = _ntp2entry.find(ntp);
if (it != _ntp2entry.end()) {
return it->second->target;
Expand All @@ -829,10 +857,7 @@ shard_placement_table::get_target(const model::ntp& ntp) const {
ss::future<> shard_placement_table::for_each_ntp(
ss::noncopyable_function<
void(const model::ntp&, const shard_placement_target&)> func) const {
vassert(
ss::this_shard_id() == assignment_shard_id,
"method can only be invoked on shard {}",
assignment_shard_id);
assert_is_assignment_shard();
return ssx::async_for_each(
_ntp2entry.begin(),
_ntp2entry.end(),
Expand Down Expand Up @@ -913,7 +938,9 @@ ss::future<std::error_code> shard_placement_table::prepare_create(
}

ss::future<result<ss::shard_id>> shard_placement_table::prepare_transfer(
const model::ntp& ntp, model::revision_id expected_log_rev) {
const model::ntp& ntp,
model::revision_id expected_log_rev,
ss::sharded<shard_placement_table>& sharded_spt) {
// ensure that there is no concurrent enable_persistence() call
auto persistence_lock_holder = co_await _persistence_lock.hold_read_lock();

Expand Down Expand Up @@ -964,7 +991,7 @@ ss::future<result<ss::shard_id>> shard_placement_table::prepare_transfer(
co_return errc::waiting_for_shard_placement_update;
}

auto maybe_dest = co_await container().invoke_on(
auto maybe_dest = co_await sharded_spt.invoke_on(
assignment_shard_id,
[&ntp, expected_log_rev](shard_placement_table& spt) {
auto it = spt._ntp2entry.find(ntp);
Expand All @@ -977,15 +1004,15 @@ ss::future<result<ss::shard_id>> shard_placement_table::prepare_transfer(
}
return std::optional<ss::shard_id>{};
});
if (!maybe_dest || maybe_dest == ss::this_shard_id()) {
if (!maybe_dest || maybe_dest == _shard) {
// Inconsistent state, likely because we are in the middle of
// shard_placement_table update, wait for it to finish.
co_return errc::waiting_for_shard_placement_update;
}
ss::shard_id destination = maybe_dest.value();

// check if destination is ready
auto ec = co_await container().invoke_on(
auto ec = co_await sharded_spt.invoke_on(
destination, [&ntp, expected_log_rev](shard_placement_table& dest) {
auto dest_it = dest._states.find(ntp);
if (
Expand Down Expand Up @@ -1223,4 +1250,13 @@ ss::future<> shard_placement_table::do_delete(
co_return;
}

void shard_placement_table::assert_is_assignment_shard() const {
vassert(
ss::this_shard_id() == _shard
&& ss::this_shard_id() == assignment_shard_id,
"method can only be invoked on shard {} (table for shard: {})",
assignment_shard_id,
_shard);
}

} // namespace cluster
Loading