diff --git a/src/v/pandaproxy/schema_registry/errors.h b/src/v/pandaproxy/schema_registry/errors.h index 72054ca01ad0..05589b32021c 100644 --- a/src/v/pandaproxy/schema_registry/errors.h +++ b/src/v/pandaproxy/schema_registry/errors.h @@ -161,4 +161,17 @@ inline error_info compatibility_not_found(const subject& sub) { sub())}; } +inline error_info mode_not_found(const subject& sub) { + return error_info{ + error_code::mode_not_found, + fmt::format( + "Subject '{}' does not have subject-level mode configured", sub())}; +} + +inline error_info mode_not_readwrite(const subject& sub) { + return error_info{ + error_code::subject_version_operaton_not_permitted, + fmt::format("Subject {} is not in read-write mode", sub())}; +} + } // namespace pandaproxy::schema_registry diff --git a/src/v/pandaproxy/schema_registry/sharded_store.cc b/src/v/pandaproxy/schema_registry/sharded_store.cc index 9681fe5e15ef..b7a7b72ed36a 100644 --- a/src/v/pandaproxy/schema_registry/sharded_store.cc +++ b/src/v/pandaproxy/schema_registry/sharded_store.cc @@ -497,6 +497,43 @@ ss::future sharded_store::delete_subject_version( }); } +ss::future sharded_store::get_mode() { + co_return _store.local().get_mode().value(); +} + +ss::future +sharded_store::get_mode(subject sub, default_to_global fallback) { + auto sub_shard{shard_for(sub)}; + co_return co_await _store.invoke_on( + sub_shard, [sub{std::move(sub)}, fallback](store& s) { + return s.get_mode(sub, fallback).value(); + }); +} + +ss::future sharded_store::set_mode(mode m, force f) { + auto map = [m, f](store& s) { return s.set_mode(m, f).value(); }; + auto reduce = std::logical_and<>{}; + co_return co_await _store.map_reduce0(map, true, reduce); +} + +ss::future +sharded_store::set_mode(seq_marker marker, subject sub, mode m, force f) { + auto sub_shard{shard_for(sub)}; + co_return co_await _store.invoke_on( + sub_shard, _smp_opts, [marker, sub{std::move(sub)}, m, f](store& s) { + return s.set_mode(marker, sub, m, f).value(); + }); +} + +ss::future +sharded_store::clear_mode(seq_marker marker, subject sub, force f) { + auto sub_shard{shard_for(sub)}; + co_return co_await _store.invoke_on( + sub_shard, _smp_opts, [marker, sub{std::move(sub)}, f](store& s) { + return s.clear_mode(marker, sub, f).value(); + }); +} + ss::future sharded_store::get_compatibility() { co_return _store.local().get_compatibility().value(); } diff --git a/src/v/pandaproxy/schema_registry/sharded_store.h b/src/v/pandaproxy/schema_registry/sharded_store.h index ff84629d96c9..e2c7d302401e 100644 --- a/src/v/pandaproxy/schema_registry/sharded_store.h +++ b/src/v/pandaproxy/schema_registry/sharded_store.h @@ -126,6 +126,24 @@ class sharded_store { ss::future delete_subject_version( subject sub, schema_version version, force f = force::no); + ///\brief Get the global mode. + ss::future get_mode(); + + ///\brief Get the mode for a subject, or fallback to global. + ss::future get_mode(subject sub, default_to_global fallback); + + ///\brief Set the global mode. + /// \param force Override checks, always apply action + ss::future set_mode(mode m, force f); + + ///\brief Set the mode for a subject. + /// \param force Override checks, always apply action + ss::future set_mode(seq_marker marker, subject sub, mode m, force f); + + ///\brief Clear the mode for a subject. + /// \param force Override checks, always apply action + ss::future clear_mode(seq_marker marker, subject sub, force f); + ///\brief Get the global compatibility level. ss::future get_compatibility(); diff --git a/src/v/pandaproxy/schema_registry/store.h b/src/v/pandaproxy/schema_registry/store.h index ebaaf687878e..9599df127bce 100644 --- a/src/v/pandaproxy/schema_registry/store.h +++ b/src/v/pandaproxy/schema_registry/store.h @@ -477,6 +477,46 @@ class store { return true; } + ///\brief Get the global mode. + result get_mode() const { return _mode; } + + ///\brief Get the mode for a subject, or fallback to global. + result + get_mode(const subject& sub, default_to_global fallback) const { + auto sub_it = get_subject_iter(sub, include_deleted::yes); + if (sub_it && (sub_it.assume_value())->second.mode.has_value()) { + return (sub_it.assume_value())->second.mode.value(); + } else if (fallback) { + return _mode; + } + return mode_not_found(sub); + } + + ///\brief Set the global mode. + result set_mode(mode m, force f) { + BOOST_OUTCOME_TRYX(check_mode_mutability(f)); + return std::exchange(_mode, m) != m; + } + + ///\brief Set the mode for a subject. + result + set_mode(seq_marker marker, const subject& sub, mode m, force f) { + BOOST_OUTCOME_TRYX(check_mode_mutability(f)); + auto& sub_entry = _subjects[sub]; + sub_entry.written_at.push_back(marker); + return std::exchange(sub_entry.mode, m) != m; + } + + ///\brief Clear the mode for a subject. + result + clear_mode(const seq_marker& marker, const subject& sub, force f) { + BOOST_OUTCOME_TRYX(check_mode_mutability(f)); + auto sub_it = BOOST_OUTCOME_TRYX( + get_subject_iter(sub, include_deleted::yes)); + std::erase(sub_it->second.written_at, marker); + return std::exchange(sub_it->second.mode, std::nullopt) != std::nullopt; + } + ///\brief Get the global compatibility level. result get_compatibility() const { return _compatibility; @@ -632,6 +672,7 @@ class store { struct subject_entry { std::optional compatibility; + std::optional mode; std::vector versions; is_deleted deleted{false}; @@ -696,6 +737,7 @@ class store { schema_map _schemas; subject_map _subjects; compatibility_level _compatibility{compatibility_level::backward}; + mode _mode{mode::read_write}; is_mutable _mutable{is_mutable::no}; };