Skip to content

Commit

Permalink
Merge pull request redpanda-data#18217 from ztlpn/flex-assignment-con…
Browse files Browse the repository at this point in the history
…currency

c/shard_balancer: assign shards concurrently
  • Loading branch information
ztlpn authored May 7, 2024
2 parents 11c0cb8 + d51f63c commit eb003fe
Show file tree
Hide file tree
Showing 6 changed files with 252 additions and 169 deletions.
110 changes: 65 additions & 45 deletions src/v/cluster/shard_balancer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,7 @@ shard_balancer::shard_balancer(
: _topics(topics)
, _shard_placement(spt)
, _controller_backend(cb)
, _self(*config::node().node_id())
, _work_queue([](auto ex) {
if (!ssx::is_shutdown_exception(ex)) {
vlog(clusterlog.error, "shard balancer exception: {}", ex);
}
}) {}
, _self(*config::node().node_id()) {}

ss::future<> shard_balancer::start() {
vassert(
Expand All @@ -52,19 +47,22 @@ ss::future<> shard_balancer::start() {

_topic_table_notify_handle = _topics.local().register_delta_notification(
[this](topic_table::delta_range_t deltas_range) {
fragmented_vector<topic_table::delta> deltas(
deltas_range.begin(), deltas_range.end());
// Process deltas asynchronously in the work queue to preserve the
// order in which they appeared.
_work_queue.submit([this, deltas = std::move(deltas)]() mutable {
return ss::do_with(std::move(deltas), [this](auto& deltas) {
return ss::do_for_each(
deltas, [this](const topic_table::delta& d) {
return process_delta(d);
});
});
});
for (const auto& delta : deltas_range) {
// Filter out only deltas that might change the set of partition
// replicas on this node.
switch (delta.type) {
case topic_table_delta_type::disabled_flag_updated:
case topic_table_delta_type::properties_updated:
continue;
default:
_to_assign.insert(delta.ntp);
_wakeup_event.set();
break;
}
}
});

ssx::background = assign_fiber();
}

ss::future<> shard_balancer::stop() {
Expand All @@ -74,44 +72,66 @@ ss::future<> shard_balancer::stop() {
shard_id);

_topics.local().unregister_delta_notification(_topic_table_notify_handle);
co_await _work_queue.shutdown();
_wakeup_event.set();
return _gate.close();
}

ss::future<> shard_balancer::process_delta(const topic_table::delta& delta) {
const auto& ntp = delta.ntp;

auto shard_callback = [this](const model::ntp& ntp) {
_controller_backend.local().notify_reconciliation(ntp);
};
ss::future<> shard_balancer::assign_fiber() {
if (_gate.is_closed()) {
co_return;
}
auto gate_holder = _gate.hold();

auto maybe_replicas_view = _topics.local().get_replicas_view(ntp);
if (!maybe_replicas_view) {
if (delta.type == topic_table_delta_type::removed) {
co_await _shard_placement.local().set_target(
ntp,
std::nullopt,
model::shard_revision_id{delta.revision()},
shard_callback);
while (true) {
co_await _wakeup_event.wait(1s);
if (_gate.is_closed()) {
co_return;
}
co_return;

co_await do_assign_ntps();
}
auto replicas_view = maybe_replicas_view.value();
}

// Has value if the partition is expected to exist on this node.
auto target = placement_target_on_node(replicas_view, _self);
ss::future<> shard_balancer::do_assign_ntps() {
auto to_assign = std::exchange(_to_assign, {});
co_await ss::max_concurrent_for_each(
to_assign, 128, [this](const model::ntp& ntp) {
return assign_ntp(ntp);
});
}

auto shard_rev = model::shard_revision_id{
replicas_view.last_cmd_revision()};
ss::future<> shard_balancer::assign_ntp(const model::ntp& ntp) {
auto shard_callback = [this](const model::ntp& ntp) {
_controller_backend.local().notify_reconciliation(ntp);
};

std::optional<shard_placement_target> target;
auto replicas_view = _topics.local().get_replicas_view(ntp);
if (replicas_view) {
// Has value if the partition is expected to exist on this node.
target = placement_target_on_node(replicas_view.value(), _self);
}
vlog(
clusterlog.trace,
"[{}] setting placement target on on this node: {}, shard_rev: {}",
"[{}] setting placement target on this node: {}",
ntp,
target,
shard_rev);

co_await _shard_placement.local().set_target(
ntp, target, shard_rev, shard_callback);
target);

try {
co_await _shard_placement.local().set_target(
ntp, target, shard_callback);
} catch (...) {
auto ex = std::current_exception();
if (!ssx::is_shutdown_exception(ex)) {
vlog(
clusterlog.warn,
"[{}] exception while setting target: {}",
ntp,
ex);
// Retry on the next tick.
_to_assign.insert(ntp);
}
}
}

} // namespace cluster
14 changes: 11 additions & 3 deletions src/v/cluster/shard_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@

#include "cluster/controller_backend.h"
#include "cluster/shard_placement_table.h"
#include "ssx/work_queue.h"
#include "container/chunked_hash_map.h"
#include "ssx/event.h"

namespace cluster {

Expand All @@ -38,7 +39,11 @@ class shard_balancer {
ss::future<> stop();

private:
ss::future<> process_delta(const topic_table::delta&);
void process_delta(const topic_table::delta&);

ss::future<> assign_fiber();
ss::future<> do_assign_ntps();
ss::future<> assign_ntp(const model::ntp&);

private:
ss::sharded<topic_table>& _topics;
Expand All @@ -47,7 +52,10 @@ class shard_balancer {
model::node_id _self;

cluster::notification_id_type _topic_table_notify_handle;
ssx::work_queue _work_queue;

chunked_hash_set<model::ntp> _to_assign;
ssx::event _wakeup_event{"shard_balancer"};
ss::gate _gate;
};

} // namespace cluster
140 changes: 61 additions & 79 deletions src/v/cluster/shard_placement_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
#include "cluster/topic_table.h"
#include "ssx/async_algorithm.h"

#include <seastar/util/defer.hh>

namespace cluster {

std::ostream& operator<<(
Expand Down Expand Up @@ -120,11 +122,10 @@ ss::future<> shard_placement_table::initialize(
return;
}

auto shard_rev = model::shard_revision_id{
replicas_view.last_cmd_revision()};

if (ss::this_shard_id() == assignment_shard_id) {
_ntp2target.emplace(ntp, target.value());
auto it = _ntp2entry.emplace(ntp, std::make_unique<entry_t>())
.first;
it->second->target = target.value();
}

// We add an initial hosted marker for the partition on the shard
Expand Down Expand Up @@ -152,7 +153,7 @@ ss::future<> shard_placement_table::initialize(
auto placement = placement_state();
auto assigned = shard_local_assignment{
.log_revision = target->log_revision,
.shard_revision = shard_rev};
.shard_revision = _cur_shard_revision};

if (orig_shard && target->shard != orig_shard) {
// cross-shard transfer, orig_shard gets the hosted marker
Expand All @@ -171,82 +172,69 @@ ss::future<> shard_placement_table::initialize(
}
});
}

if (!_ntp2entry.empty()) {
_cur_shard_revision += 1;
}
}

ss::future<> shard_placement_table::set_target(
const model::ntp& ntp,
std::optional<shard_placement_target> target,
model::shard_revision_id shard_rev,
shard_callback_t shard_callback) {
vassert(
ss::this_shard_id() == assignment_shard_id,
"method can only be invoked on shard {}",
assignment_shard_id);

auto units = co_await _mtx.get_units();

// 1. update node-wide map

std::optional<ss::shard_id> prev;
bool is_initial = false;
if (target) {
auto [it, inserted] = _ntp2target.try_emplace(ntp, *target);
if (inserted) {
vlog(
clusterlog.trace,
"[{}] insert target: {}, shard_rev: {}",
ntp,
target,
shard_rev);
is_initial = true;
} else {
if (it->second == *target) {
vlog(
clusterlog.trace,
"[{}] modify target no-op, cur: {}, shard_rev: {}",
ntp,
it->second,
shard_rev);
co_return;
}

vlog(
clusterlog.trace,
"[{}] modify target: {} -> {}, shard_rev: {}",
ntp,
it->second,
target,
shard_rev);
is_initial = it->second.log_revision != target->log_revision;
if (it->second.shard != target->shard) {
prev = it->second.shard;
}
it->second = *target;
}
} else {
auto prev_it = _ntp2target.find(ntp);
if (prev_it == _ntp2target.end()) {
vlog(
clusterlog.trace,
"[{}] remove target no-op, shard_rev: {}",
ntp,
shard_rev);
co_return;
}
if (!target && !_ntp2entry.contains(ntp)) {
co_return;
}

auto entry_it = _ntp2entry.try_emplace(ntp).first;
if (!entry_it->second) {
entry_it->second = std::make_unique<entry_t>();
}
entry_t& entry = *entry_it->second;

auto release_units = ss::defer(
[&, units = co_await entry.mtx.get_units()]() mutable {
bool had_waiters = entry.mtx.waiters() > 0;
units.return_all();
if (!had_waiters && !entry.target) {
_ntp2entry.erase(ntp);
}
});

const auto prev_target = entry.target;
if (prev_target == target) {
vlog(
clusterlog.trace,
"[{}] remove target: {}, shard_rev: {}",
"[{}] modify target no-op, current: {}",
ntp,
prev_it->second,
shard_rev);
prev = prev_it->second.shard;
_ntp2target.erase(prev_it);
prev_target);
co_return;
}

// 1. update node-wide map

const model::shard_revision_id shard_rev = _cur_shard_revision;
_cur_shard_revision += 1;

vlog(
clusterlog.trace,
"[{}] modify target: {} -> {}, shard_rev: {}",
ntp,
prev_target,
target,
shard_rev);
entry.target = target;

// 2. update shard-local state

if (target) {
const bool is_initial
= (!prev_target || prev_target->log_revision != target->log_revision);
co_await container().invoke_on(
target->shard,
[&ntp, target, shard_rev, is_initial, shard_callback](
Expand All @@ -260,12 +248,11 @@ ss::future<> shard_placement_table::set_target(
});
}

if (prev) {
if (prev_target && (!target || target->shard != prev_target->shard)) {
co_await container().invoke_on(
*prev,
[&ntp, shard_rev, shard_callback](shard_placement_table& other) {
return other.remove_assigned_on_this_shard(
ntp, shard_rev, shard_callback);
prev_target->shard,
[&ntp, shard_callback](shard_placement_table& other) {
return other.remove_assigned_on_this_shard(ntp, shard_callback);
});
}
}
Expand Down Expand Up @@ -300,14 +287,8 @@ ss::future<> shard_placement_table::set_assigned_on_this_shard(
}

ss::future<> shard_placement_table::remove_assigned_on_this_shard(
const model::ntp& ntp,
model::shard_revision_id shard_rev,
shard_callback_t shard_callback) {
vlog(
clusterlog.trace,
"[{}] removing assigned on this shard, shard_revision: {}",
ntp,
shard_rev);
const model::ntp& ntp, shard_callback_t shard_callback) {
vlog(clusterlog.trace, "[{}] removing assigned on this shard", ntp);

auto it = _states.find(ntp);
if (it == _states.end()) {
Expand Down Expand Up @@ -415,14 +396,15 @@ ss::future<result<ss::shard_id>> shard_placement_table::prepare_transfer(
auto maybe_dest = co_await container().invoke_on(
assignment_shard_id,
[&ntp, expected_log_rev](shard_placement_table& spt) {
auto it = spt._ntp2target.find(ntp);
if (it == spt._ntp2target.end()) {
auto it = spt._ntp2entry.find(ntp);
if (it == spt._ntp2entry.end()) {
return std::optional<ss::shard_id>{};
}
if (it->second.log_revision != expected_log_rev) {
return std::optional<ss::shard_id>{};
const auto& target = it->second->target;
if (target && target->log_revision == expected_log_rev) {
return std::optional{target->shard};
}
return std::optional{it->second.shard};
return std::optional<ss::shard_id>{};
});
if (!maybe_dest || maybe_dest == ss::this_shard_id()) {
// Inconsistent state, likely because we are in the middle of
Expand Down
Loading

0 comments on commit eb003fe

Please sign in to comment.