Skip to content

Commit

Permalink
schema_registry/store: Introduce get_schema_subjects
Browse files Browse the repository at this point in the history
  • Loading branch information
oleiman committed Aug 28, 2023
1 parent a21e8e7 commit d7007f5
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 0 deletions.
19 changes: 19 additions & 0 deletions src/v/pandaproxy/schema_registry/sharded_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include <fmt/core.h>

#include <functional>
#include <iterator>

namespace pandaproxy::schema_registry {

Expand Down Expand Up @@ -306,6 +307,24 @@ sharded_store::get_schema_subject_versions(schema_id id) {
map, std::vector<subject_version>{}, reduce);
}

ss::future<std::vector<subject>>
sharded_store::get_schema_subjects(schema_id id, include_deleted inc_del) {
auto map = [id, inc_del](store& s) {
return s.get_schema_subjects(id, inc_del);
};
auto reduce = [](std::vector<subject> acc, std::vector<subject> subs) {
acc.insert(
acc.end(),
std::make_move_iterator(subs.begin()),
std::make_move_iterator(subs.end()));
return acc;
};
auto subs = co_await _store.map_reduce0(
map, std::vector<subject>{}, reduce);
absl::c_sort(subs);
co_return subs;
}

ss::future<subject_schema> sharded_store::get_subject_schema(
subject sub, std::optional<schema_version> version, include_deleted inc_del) {
auto sub_shard{shard_for(sub)};
Expand Down
4 changes: 4 additions & 0 deletions src/v/pandaproxy/schema_registry/sharded_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ class sharded_store {
ss::future<std::vector<subject_version>>
get_schema_subject_versions(schema_id id);

///\brief Return a list of subjects for the schema id.
ss::future<std::vector<subject>>
get_schema_subjects(schema_id id, include_deleted inc_del);

///\brief Return a schema by subject and version (or latest).
ss::future<subject_schema> get_subject_schema(
subject sub,
Expand Down
15 changes: 15 additions & 0 deletions src/v/pandaproxy/schema_registry/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,21 @@ class store {
return svs;
}

///\brief Return a list of subjects for the schema id.
std::vector<subject>
get_schema_subjects(schema_id id, include_deleted inc_del) {
std::vector<subject> subs;
for (const auto& s : _subjects) {
if (absl::c_any_of(
s.second.versions, [id, inc_del](const auto& vs) {
return vs.id == id && (inc_del || !vs.deleted);
})) {
subs.emplace_back(s.first);
}
}
return subs;
}

///\brief Return subject_version_id for a subject and version
result<subject_version_entry> get_subject_version_id(
const subject& sub,
Expand Down
69 changes: 69 additions & 0 deletions src/v/pandaproxy/schema_registry/test/store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const pps::canonical_schema_definition int_def0{
pps::schema_type::avro};
const pps::subject subject0{"subject0"};
const pps::subject subject1{"subject1"};
const pps::subject subject2{"subject2"};

BOOST_AUTO_TEST_CASE(test_store_insert) {
pps::store s;
Expand Down Expand Up @@ -247,6 +248,74 @@ BOOST_AUTO_TEST_CASE(test_store_get_schema_subject_versions) {
BOOST_REQUIRE_EQUAL(versions[0].version, pps::schema_version{2});
}

BOOST_AUTO_TEST_CASE(test_store_get_schema_subjects) {
auto is_equal = [](auto lhs) {
return [lhs](auto rhs) { return lhs == rhs; };
};

pps::store s;

pps::seq_marker dummy_marker;

// First insert, expect id{1}
auto ins_res = s.insert(
{subject0, pps::canonical_schema_definition(schema1)});
BOOST_REQUIRE(ins_res.inserted);
BOOST_REQUIRE_EQUAL(ins_res.id, pps::schema_id{1});
BOOST_REQUIRE_EQUAL(ins_res.version, pps::schema_version{1});

auto subjects = s.get_schema_subjects(
pps::schema_id{1}, pps::include_deleted::no);
BOOST_REQUIRE_EQUAL(subjects.size(), 1);
BOOST_REQUIRE_EQUAL(absl::c_count_if(subjects, is_equal(subject0)), 1);

// Second insert, same schema, expect id{1}
ins_res = s.insert({subject1, pps::canonical_schema_definition(schema1)});
BOOST_REQUIRE(ins_res.inserted);
BOOST_REQUIRE_EQUAL(ins_res.id, pps::schema_id{1});
BOOST_REQUIRE_EQUAL(ins_res.version, pps::schema_version{1});

// Insert yet another schema associated with a different subject
ins_res = s.insert({subject2, pps::canonical_schema_definition(schema2)});
BOOST_REQUIRE(ins_res.inserted);
BOOST_REQUIRE_EQUAL(ins_res.id, pps::schema_id{2});
BOOST_REQUIRE_EQUAL(ins_res.version, pps::schema_version{1});

subjects = s.get_schema_subjects(
pps::schema_id{1}, pps::include_deleted::no);
BOOST_REQUIRE_EQUAL(subjects.size(), 2);
BOOST_REQUIRE_EQUAL(absl::c_count_if(subjects, is_equal(subject0)), 1);
BOOST_REQUIRE_EQUAL(absl::c_count_if(subjects, is_equal(subject1)), 1);
BOOST_REQUIRE_EQUAL(absl::c_count_if(subjects, is_equal(subject2)), 0);

subjects = s.get_schema_subjects(
pps::schema_id{2}, pps::include_deleted::no);
BOOST_REQUIRE_EQUAL(subjects.size(), 1);
BOOST_REQUIRE_EQUAL(absl::c_count_if(subjects, is_equal(subject0)), 0);
BOOST_REQUIRE_EQUAL(absl::c_count_if(subjects, is_equal(subject1)), 0);
BOOST_REQUIRE_EQUAL(absl::c_count_if(subjects, is_equal(subject2)), 1);

// Test deletion

s.upsert_subject(
dummy_marker,
subject0,
pps::schema_version{1},
pps::schema_id{1},
pps::is_deleted::yes);

subjects = s.get_schema_subjects(
pps::schema_id{1}, pps::include_deleted::no);
BOOST_REQUIRE_EQUAL(subjects.size(), 1);
BOOST_REQUIRE_EQUAL(absl::c_count_if(subjects, is_equal(subject1)), 1);

subjects = s.get_schema_subjects(
pps::schema_id{1}, pps::include_deleted::yes);
BOOST_REQUIRE_EQUAL(subjects.size(), 2);
BOOST_REQUIRE_EQUAL(absl::c_count_if(subjects, is_equal(subject0)), 1);
BOOST_REQUIRE_EQUAL(absl::c_count_if(subjects, is_equal(subject1)), 1);
}

BOOST_AUTO_TEST_CASE(test_store_get_subject_schema) {
pps::store s;

Expand Down

0 comments on commit d7007f5

Please sign in to comment.