Skip to content

Commit

Permalink
schema_registry/store: Support mode
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Pope <ben@redpanda.com>
  • Loading branch information
BenPope committed May 13, 2024
1 parent 71c5fef commit 6ce966f
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 0 deletions.
13 changes: 13 additions & 0 deletions src/v/pandaproxy/schema_registry/errors.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,4 +161,17 @@ inline error_info compatibility_not_found(const subject& sub) {
sub())};
}

inline error_info mode_not_found(const subject& sub) {
return error_info{
error_code::mode_not_found,
fmt::format(
"Subject '{}' does not have subject-level mode configured", sub())};
}

inline error_info mode_not_readwrite(const subject& sub) {
return error_info{
error_code::subject_version_operaton_not_permitted,
fmt::format("Subject {} is not in read-write mode", sub())};
}

} // namespace pandaproxy::schema_registry
37 changes: 37 additions & 0 deletions src/v/pandaproxy/schema_registry/sharded_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,43 @@ ss::future<bool> sharded_store::delete_subject_version(
});
}

ss::future<mode> sharded_store::get_mode() {
co_return _store.local().get_mode().value();
}

ss::future<mode>
sharded_store::get_mode(subject sub, default_to_global fallback) {
auto sub_shard{shard_for(sub)};
co_return co_await _store.invoke_on(
sub_shard, [sub{std::move(sub)}, fallback](store& s) {
return s.get_mode(sub, fallback).value();
});
}

ss::future<bool> sharded_store::set_mode(mode m, force f) {
auto map = [m, f](store& s) { return s.set_mode(m, f).value(); };
auto reduce = std::logical_and<>{};
co_return co_await _store.map_reduce0(map, true, reduce);
}

ss::future<bool>
sharded_store::set_mode(seq_marker marker, subject sub, mode m, force f) {
auto sub_shard{shard_for(sub)};
co_return co_await _store.invoke_on(
sub_shard, _smp_opts, [marker, sub{std::move(sub)}, m, f](store& s) {
return s.set_mode(marker, sub, m, f).value();
});
}

ss::future<bool>
sharded_store::clear_mode(seq_marker marker, subject sub, force f) {
auto sub_shard{shard_for(sub)};
co_return co_await _store.invoke_on(
sub_shard, _smp_opts, [marker, sub{std::move(sub)}, f](store& s) {
return s.clear_mode(marker, sub, f).value();
});
}

ss::future<compatibility_level> sharded_store::get_compatibility() {
co_return _store.local().get_compatibility().value();
}
Expand Down
18 changes: 18 additions & 0 deletions src/v/pandaproxy/schema_registry/sharded_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,24 @@ class sharded_store {
ss::future<bool> delete_subject_version(
subject sub, schema_version version, force f = force::no);

///\brief Get the global mode.
ss::future<mode> get_mode();

///\brief Get the mode for a subject, or fallback to global.
ss::future<mode> get_mode(subject sub, default_to_global fallback);

///\brief Set the global mode.
/// \param force Override checks, always apply action
ss::future<bool> set_mode(mode m, force f);

///\brief Set the mode for a subject.
/// \param force Override checks, always apply action
ss::future<bool> set_mode(seq_marker marker, subject sub, mode m, force f);

///\brief Clear the mode for a subject.
/// \param force Override checks, always apply action
ss::future<bool> clear_mode(seq_marker marker, subject sub, force f);

///\brief Get the global compatibility level.
ss::future<compatibility_level> get_compatibility();

Expand Down
42 changes: 42 additions & 0 deletions src/v/pandaproxy/schema_registry/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,46 @@ class store {
return true;
}

///\brief Get the global mode.
result<mode> get_mode() const { return _mode; }

///\brief Get the mode for a subject, or fallback to global.
result<mode>
get_mode(const subject& sub, default_to_global fallback) const {
auto sub_it = get_subject_iter(sub, include_deleted::yes);
if (sub_it && (sub_it.assume_value())->second.mode.has_value()) {
return (sub_it.assume_value())->second.mode.value();
} else if (fallback) {
return _mode;
}
return mode_not_found(sub);
}

///\brief Set the global mode.
result<bool> set_mode(mode m, force f) {
BOOST_OUTCOME_TRYX(check_mode_mutability(f));
return std::exchange(_mode, m) != m;
}

///\brief Set the mode for a subject.
result<bool>
set_mode(seq_marker marker, const subject& sub, mode m, force f) {
BOOST_OUTCOME_TRYX(check_mode_mutability(f));
auto& sub_entry = _subjects[sub];
sub_entry.written_at.push_back(marker);
return std::exchange(sub_entry.mode, m) != m;
}

///\brief Clear the mode for a subject.
result<bool>
clear_mode(const seq_marker& marker, const subject& sub, force f) {
BOOST_OUTCOME_TRYX(check_mode_mutability(f));
auto sub_it = BOOST_OUTCOME_TRYX(
get_subject_iter(sub, include_deleted::yes));
std::erase(sub_it->second.written_at, marker);
return std::exchange(sub_it->second.mode, std::nullopt) != std::nullopt;
}

///\brief Get the global compatibility level.
result<compatibility_level> get_compatibility() const {
return _compatibility;
Expand Down Expand Up @@ -632,6 +672,7 @@ class store {

struct subject_entry {
std::optional<compatibility_level> compatibility;
std::optional<mode> mode;
std::vector<subject_version_entry> versions;
is_deleted deleted{false};

Expand Down Expand Up @@ -696,6 +737,7 @@ class store {
schema_map _schemas;
subject_map _subjects;
compatibility_level _compatibility{compatibility_level::backward};
mode _mode{mode::read_write};
is_mutable _mutable{is_mutable::no};
};

Expand Down

0 comments on commit 6ce966f

Please sign in to comment.