Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

APIs for disabling topics/partitions and listing them #14461

Merged
merged 10 commits into from
Nov 6, 2023
8 changes: 8 additions & 0 deletions src/v/cluster/commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ static constexpr int8_t revert_cancel_partition_move_cmd_type = 9;
static constexpr int8_t topic_lifecycle_transition_cmd_type = 10;
static constexpr int8_t force_partition_reconfiguration_type = 11;
static constexpr int8_t update_partition_replicas_cmd_type = 12;
static constexpr int8_t set_topic_partitions_disabled_cmd_type = 13;

static constexpr int8_t create_user_cmd_type = 5;
static constexpr int8_t delete_user_cmd_type = 6;
Expand Down Expand Up @@ -214,6 +215,13 @@ using force_partition_reconfiguration_cmd = controller_command<
model::record_batch_type::topic_management_cmd,
serde_opts::serde_only>;

using set_topic_partitions_disabled_cmd = controller_command<
int8_t, // unused
set_topic_partitions_disabled_cmd_data,
set_topic_partitions_disabled_cmd_type,
model::record_batch_type::topic_management_cmd,
serde_opts::serde_only>;

/**
* new extendible version of move_partition_replicas command
*/
Expand Down
3 changes: 2 additions & 1 deletion src/v/cluster/controller_log_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ class controller_log_limiter {
std::is_same_v<Cmd, create_topic_cmd> || //
std::is_same_v<Cmd, delete_topic_cmd> || //
std::is_same_v<Cmd, update_topic_properties_cmd> || //
std::is_same_v<Cmd, create_partition_cmd>) {
std::is_same_v<Cmd, create_partition_cmd> || //
std::is_same_v<Cmd, set_topic_partitions_disabled_cmd>) {
return _topic_operations_limiter.try_throttle();
} else if constexpr (
std::is_same_v<Cmd, move_partition_replicas_cmd> || //
Expand Down
6 changes: 6 additions & 0 deletions src/v/cluster/controller_snapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ ss::future<> topics_t::serde_async_write(iobuf& out) {
co_await write_map_async(out, std::move(topics));
serde::write(out, highest_group_id);
co_await write_map_async(out, std::move(lifecycle_markers));
co_await write_map_async(out, std::move(disabled_partitions));
}

ss::future<>
Expand All @@ -136,6 +137,11 @@ topics_t::serde_async_read(iobuf_parser& in, serde::header const h) {
lifecycle_markers
= co_await read_map_async_nested<decltype(lifecycle_markers)>(
in, h._bytes_left_limit);
if (h._version >= 1) {
disabled_partitions
= co_await read_map_async_nested<decltype(disabled_partitions)>(
in, h._bytes_left_limit);
}

if (in.bytes_left() > h._bytes_left_limit) {
in.skip(in.bytes_left() - h._bytes_left_limit);
Expand Down
9 changes: 8 additions & 1 deletion src/v/cluster/controller_snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ struct config_t

struct topics_t
: public serde::
envelope<topics_t, serde::version<0>, serde::compat_version<0>> {
envelope<topics_t, serde::version<1>, serde::compat_version<0>> {
// NOTE: layout here is a bit different than in the topic table because it
// allows more compact storage and more convenient generation of controller
// backend deltas when applying the snapshot.
Expand Down Expand Up @@ -184,6 +184,13 @@ struct topics_t
nt_revision_eq>
lifecycle_markers;

absl::node_hash_map<
model::topic_namespace,
topic_disabled_partitions_set,
model::topic_namespace_hash,
model::topic_namespace_eq>
disabled_partitions;

friend bool operator==(const topics_t&, const topics_t&) = default;

ss::future<> serde_async_write(iobuf&);
Expand Down
56 changes: 52 additions & 4 deletions src/v/cluster/topic_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ topic_table::do_local_delete(model::topic_namespace nt, model::offset offset) {
}

_topics.erase(tp);
_disabled_partitions.erase(nt);
_topics_map_revision++;
notify_waiters();
_probe.handle_topic_deletion(nt);
Expand Down Expand Up @@ -620,6 +621,45 @@ topic_table::apply(force_partition_reconfiguration_cmd cmd, model::offset o) {
return ss::make_ready_future<std::error_code>(errc::success);
}

ss::future<std::error_code>
topic_table::apply(set_topic_partitions_disabled_cmd cmd, model::offset o) {
_last_applied_revision_id = model::revision_id(o);

auto topic_it = _topics.find(cmd.value.ns_tp);
if (topic_it == _topics.end()) {
co_return errc::topic_not_exists;
}

if (cmd.value.partition_id) {
const auto& assignments = topic_it->second.get_assignments();
if (!assignments.contains(*cmd.value.partition_id)) {
co_return errc::partition_not_exists;
}

auto [disabled_it, inserted] = _disabled_partitions.try_emplace(
cmd.value.ns_tp);
auto& disabled_set = disabled_it->second;

if (cmd.value.disabled) {
disabled_set.add(*cmd.value.partition_id);
} else {
disabled_set.remove(*cmd.value.partition_id, assignments);
}

if (disabled_set.is_empty()) {
_disabled_partitions.erase(disabled_it);
}
} else {
if (cmd.value.disabled) {
_disabled_partitions[cmd.value.ns_tp].set_topic_disabled();
} else {
_disabled_partitions.erase(cmd.value.ns_tp);
}
}

co_return errc::success;
}

template<typename T>
void incremental_update(
std::optional<T>& property, property_update<std::optional<T>> override) {
Expand Down Expand Up @@ -858,6 +898,11 @@ topic_table::fill_snapshot(controller_snapshot& controller_snap) const {
for (const auto& [ntr, lm] : _lifecycle_markers) {
snap.lifecycle_markers.emplace(ntr, lm);
}

for (const auto& [ns_tp, dps] : _disabled_partitions) {
snap.disabled_partitions.emplace(ns_tp, dps);
co_await ss::coroutine::maybe_yield();
}
}

// helper class to hold context needed for adding/deleting ntps when applying a
Expand Down Expand Up @@ -1212,6 +1257,13 @@ ss::future<> topic_table::apply_snapshot(
}
}

// Lifecycle markers is a simple static collection without notifications
// etc, so we can just copy directly into place.
_lifecycle_markers = controller_snap.topics.lifecycle_markers;

// Same for disabled partitions.
_disabled_partitions = controller_snap.topics.disabled_partitions;

// 2. re-calculate derived state

_partition_count = 0;
Expand All @@ -1223,10 +1275,6 @@ ss::future<> topic_table::apply_snapshot(
// 3. notify delta waiters
notify_waiters();

// Lifecycle markers is a simple static collection without notifications
// etc, so we can just copy directly into place.
_lifecycle_markers = controller_snap.topics.lifecycle_markers;

_last_applied_revision_id = snap_revision;
}

Expand Down
43 changes: 43 additions & 0 deletions src/v/cluster/topic_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,12 @@ class topic_table {
nt_revision_hash,
nt_revision_eq>;

using disabled_partitions_t = absl::node_hash_map<
model::topic_namespace,
topic_disabled_partitions_set,
model::topic_namespace_hash,
model::topic_namespace_eq>;

using delta_range_t
= boost::iterator_range<fragmented_vector<delta>::const_iterator>;
using delta_cb_t = ss::noncopyable_function<void(delta_range_t)>;
Expand Down Expand Up @@ -320,6 +326,8 @@ class topic_table {
apply(force_partition_reconfiguration_cmd, model::offset);
ss::future<std::error_code>
apply(update_partition_replicas_cmd, model::offset);
ss::future<std::error_code>
apply(set_topic_partitions_disabled_cmd, model::offset);

ss::future<> fill_snapshot(controller_snapshot&) const;
ss::future<>
Expand Down Expand Up @@ -493,6 +501,40 @@ class topic_table {
return _lifecycle_markers;
}

const disabled_partitions_t& get_disabled_partitions() const {
return _disabled_partitions;
}

const topic_disabled_partitions_set*
get_topic_disabled_set(model::topic_namespace_view ns_tp) const {
auto it = _disabled_partitions.find(ns_tp);
if (it == _disabled_partitions.end()) {
return nullptr;
}
return &it->second;
}

bool is_disabled(model::topic_namespace_view ns_tp) const {
auto it = _disabled_partitions.find(ns_tp);
if (it == _disabled_partitions.end()) {
return false;
}
return it->second.is_topic_disabled();
}

bool is_disabled(
model::topic_namespace_view ns_tp, model::partition_id p_id) const {
auto it = _disabled_partitions.find(ns_tp);
if (it == _disabled_partitions.end()) {
return false;
}
return it->second.is_disabled(p_id);
}

bool is_disabled(const model::ntp& ntp) const {
return is_disabled(model::topic_namespace_view{ntp}, ntp.tp.partition);
}

auto topics_iterator_begin() const {
return stable_iterator<
underlying_t::const_iterator,
Expand Down Expand Up @@ -542,6 +584,7 @@ class topic_table {

underlying_t _topics;
lifecycle_markers_t _lifecycle_markers;
disabled_partitions_t _disabled_partitions;
size_t _partition_count{0};

updates_t _updates_in_progress;
Expand Down
5 changes: 5 additions & 0 deletions src/v/cluster/topic_updates_dispatcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,11 @@ ss::future<std::error_code> topic_updates_dispatcher::apply(
co_return ec;
}

ss::future<std::error_code> topic_updates_dispatcher::apply(
set_topic_partitions_disabled_cmd cmd, model::offset base_offset) {
co_return co_await dispatch_updates_to_cores(cmd, base_offset);
}

topic_updates_dispatcher::in_progress_map
topic_updates_dispatcher::collect_in_progress(
const model::topic_namespace& tp_ns,
Expand Down
5 changes: 4 additions & 1 deletion src/v/cluster/topic_updates_dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ class topic_updates_dispatcher {
move_topic_replicas_cmd,
revert_cancel_partition_move_cmd,
force_partition_reconfiguration_cmd,
update_partition_replicas_cmd>();
update_partition_replicas_cmd,
set_topic_partitions_disabled_cmd>();

bool is_batch_applicable(const model::record_batch& batch) const {
return batch.header().type
Expand Down Expand Up @@ -106,6 +107,8 @@ class topic_updates_dispatcher {
apply(force_partition_reconfiguration_cmd, model::offset);
ss::future<std::error_code>
apply(update_partition_replicas_cmd, model::offset);
ss::future<std::error_code>
apply(set_topic_partitions_disabled_cmd, model::offset);

using ntp_leader = std::pair<model::ntp, model::node_id>;

Expand Down
48 changes: 48 additions & 0 deletions src/v/cluster/topics_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1011,6 +1011,54 @@ ss::future<std::vector<topic_result>> topics_frontend::create_partitions(
co_return result;
}

ss::future<std::error_code> topics_frontend::set_topic_partitions_disabled(
model::topic_namespace_view ns_tp,
std::optional<model::partition_id> p_id,
bool disabled,
model::timeout_clock::time_point timeout) {
if (!_features.local().is_active(features::feature::disabling_partitions)) {
co_return errc::feature_disabled;
}

auto r = co_await stm_linearizable_barrier(timeout);
if (!r) {
co_return r.error();
}

// pre-replicate checks

if (p_id) {
if (!_topics.local().contains(ns_tp, *p_id)) {
co_return errc::partition_not_exists;
}
if (_topics.local().is_disabled(ns_tp, *p_id) == disabled) {
// no-op
co_return errc::success;
}
} else {
if (!_topics.local().contains(ns_tp)) {
co_return errc::topic_not_exists;
}
if (_topics.local().is_disabled(ns_tp) == disabled) {
// no-op
co_return errc::success;
}
}

// replicate the command

set_topic_partitions_disabled_cmd cmd(
0, // unused
set_topic_partitions_disabled_cmd_data{
.ns_tp = model::topic_namespace{ns_tp},
.partition_id = p_id,
.disabled = disabled,
});

co_return co_await replicate_and_wait(
_stm, _features, _as, std::move(cmd), timeout);
}

ss::future<bool>
topics_frontend::validate_shard(model::node_id node, uint32_t shard) const {
return _allocator.invoke_on(
Expand Down
11 changes: 11 additions & 0 deletions src/v/cluster/topics_frontend.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,17 @@ class topics_frontend {
std::vector<create_partitions_configuration>,
model::timeout_clock::time_point);

/// if partition_id is nullopt, the disabled flag is applied to all
/// partitions of the topic.
///
/// The method is idempotent, i.e. if the disabled flag already has the
/// desired value, the call is ignored.
ss::future<std::error_code> set_topic_partitions_disabled(
model::topic_namespace_view,
std::optional<model::partition_id>,
bool disabled,
model::timeout_clock::time_point);

ss::future<bool> validate_shard(model::node_id node, uint32_t shard) const;

ss::future<result<std::vector<move_cancellation_result>>>
Expand Down
Loading
Loading