Skip to content

Commit

Permalink
schema_registry: Wire-up mode_mutability
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Pope <ben@redpanda.com>
  • Loading branch information
BenPope committed May 13, 2024
1 parent ba05b97 commit 390fc71
Show file tree
Hide file tree
Showing 9 changed files with 35 additions and 11 deletions.
2 changes: 1 addition & 1 deletion src/v/pandaproxy/schema_registry/api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ api::~api() noexcept = default;

ss::future<> api::start() {
_store = std::make_unique<sharded_store>();
co_await _store->start(_sg);
co_await _store->start(is_mutable(_cfg.mode_mutability), _sg);
co_await _schema_id_validation_probe.start();
co_await _schema_id_validation_probe.invoke_on_all(
&schema_id_validation_probe::setup_metrics);
Expand Down
8 changes: 6 additions & 2 deletions src/v/pandaproxy/schema_registry/sharded_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ constexpr auto set_accumulator =

} // namespace

ss::future<> sharded_store::start(ss::smp_service_group sg) {
ss::future<> sharded_store::start(is_mutable mut, ss::smp_service_group sg) {
_smp_opts = ss::smp_submit_to_options{sg};
return _store.start();
return _store.start(mut);
}

ss::future<> sharded_store::stop() { return _store.stop(); }
Expand Down Expand Up @@ -678,6 +678,10 @@ ss::future<bool> sharded_store::is_compatible(
co_return is_compat;
}

void sharded_store::check_mode_mutability(force f) const {
_store.local().check_mode_mutability(f).value();
}

ss::future<bool> sharded_store::has_version(
const subject& sub, schema_id id, include_deleted i) {
auto sub_shard{shard_for(sub)};
Expand Down
5 changes: 4 additions & 1 deletion src/v/pandaproxy/schema_registry/sharded_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class store;
/// subject or schema_id
class sharded_store {
public:
ss::future<> start(ss::smp_service_group sg);
ss::future<> start(is_mutable mut, ss::smp_service_group sg);
ss::future<> stop();

///\brief Make the canonical form of the schema
Expand Down Expand Up @@ -150,6 +150,9 @@ class sharded_store {

ss::future<bool> has_version(const subject&, schema_id, include_deleted);

//// \brief Throw if the store is not mutable
void check_mode_mutability(force f) const;

private:
ss::future<bool>
upsert_schema(schema_id id, canonical_schema_definition def);
Expand Down
16 changes: 16 additions & 0 deletions src/v/pandaproxy/schema_registry/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ class store {
public:
using schema_id_set = absl::btree_set<schema_id>;

explicit store() = default;

explicit store(is_mutable mut)
: _mutable(mut) {}

struct insert_result {
schema_version version;
schema_id id;
Expand Down Expand Up @@ -598,6 +603,16 @@ class store {
return !found;
}

//// \brief Return error if the store is not mutable
result<void> check_mode_mutability(force f) const {
if (!_mutable && !f) {
return error_info{
error_code::subject_version_operaton_not_permitted,
"Mode changes are not allowed"};
}
return outcome::success();
}

private:
struct schema_entry {
explicit schema_entry(canonical_schema_definition definition)
Expand Down Expand Up @@ -672,6 +687,7 @@ class store {
schema_map _schemas;
subject_map _subjects;
compatibility_level _compatibility{compatibility_level::backward};
is_mutable _mutable{is_mutable::no};
};

} // namespace pandaproxy::schema_registry
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ constexpr std::string_view del_sub_value_0{

SEASTAR_THREAD_TEST_CASE(test_consume_to_store_3rdparty) {
pps::sharded_store s;
s.start(ss::default_smp_service_group()).get();
s.start(pps::is_mutable::yes, ss::default_smp_service_group()).get();
auto stop_store = ss::defer([&s]() { s.stop().get(); });

// This kafka client will not be used by the sequencer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ namespace pps = pp::schema_registry;

struct simple_sharded_store {
simple_sharded_store() {
store.start(ss::default_smp_service_group()).get();
store.start(pps::is_mutable::yes, ss::default_smp_service_group())
.get();
}
~simple_sharded_store() { store.stop().get(); }
simple_sharded_store(const simple_sharded_store&) = delete;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ SEASTAR_THREAD_TEST_CASE(test_avro_basic_backwards_store_compat) {
// used to read the data written in the previous schema.

pps::sharded_store s;
s.start(ss::default_smp_service_group()).get();
s.start(pps::is_mutable::yes, ss::default_smp_service_group()).get();
auto stop_store = ss::defer([&s]() { s.stop().get(); });

pps::seq_marker dummy_marker;
Expand Down
4 changes: 2 additions & 2 deletions src/v/pandaproxy/schema_registry/test/consume_to_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ inline model::record_batch make_delete_subject_permanently_batch(

SEASTAR_THREAD_TEST_CASE(test_consume_to_store) {
pps::sharded_store s;
s.start(ss::default_smp_service_group()).get();
s.start(pps::is_mutable::yes, ss::default_smp_service_group()).get();
auto stop_store = ss::defer([&s]() { s.stop().get(); });

// This kafka client will not be used by the sequencer
Expand Down Expand Up @@ -202,7 +202,7 @@ model::record_batch as_record_batch(Key key) {

SEASTAR_THREAD_TEST_CASE(test_consume_to_store_after_compaction) {
pps::sharded_store s;
s.start(ss::default_smp_service_group()).get();
s.start(pps::is_mutable::no, ss::default_smp_service_group()).get();
auto stop_store = ss::defer([&s]() { s.stop().get(); });

// This kafka client will not be used by the sequencer
Expand Down
4 changes: 2 additions & 2 deletions src/v/pandaproxy/schema_registry/test/sharded_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace pps = pp::schema_registry;

SEASTAR_THREAD_TEST_CASE(test_sharded_store_referenced_by) {
pps::sharded_store store;
store.start(ss::default_smp_service_group()).get();
store.start(pps::is_mutable::yes, ss::default_smp_service_group()).get();
auto stop_store = ss::defer([&store]() { store.stop().get(); });

const pps::schema_version ver1{1};
Expand Down Expand Up @@ -97,7 +97,7 @@ SEASTAR_THREAD_TEST_CASE(test_sharded_store_referenced_by) {

SEASTAR_THREAD_TEST_CASE(test_sharded_store_find_unordered) {
pps::sharded_store store;
store.start(ss::default_smp_service_group()).get();
store.start(pps::is_mutable::no, ss::default_smp_service_group()).get();
auto stop_store = ss::defer([&store]() { store.stop().get(); });

pps::unparsed_schema array_unsanitized{
Expand Down

0 comments on commit 390fc71

Please sign in to comment.