Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v23.2.x] schema_registry: Support GET /schemas/ids/{id}/subjects #13084

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions src/v/pandaproxy/api/api-doc/schema_registry.json
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,55 @@
}
}
},
"/schemas/ids/{id}/subjects": {
"get": {
"summary": "Retrieve a list of subjects associated with some schema ID.",
"operationId": "get_schemas_ids_id_subjects",
"consumes": [
"application/vnd.schemaregistry.v1+json",
"application/vnd.schemaregistry+json",
"application/json"
],
"parameters": [
{
"name": "id",
"in": "path",
"required": true,
"type": "integer"
},
{
"name": "deleted",
"in": "query",
"required": false,
"type": "boolean"
}
],
"produces": ["application/vnd.schemaregistry.v1+json"],
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "array",
"items": {
"type": "string"
}
}
},
"404": {
"description": "Schema not found",
"schema": {
"$ref": "#/definitions/error_body"
}
},
"500": {
"description": "Internal Server error",
"schema": {
"$ref": "#/definitions/error_body"
}
}
}
}
},
"/subjects": {
"get": {
"summary": "Retrieve a list of subjects.",
Expand Down
2 changes: 1 addition & 1 deletion src/v/pandaproxy/api/api-doc/schema_registry_header.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"swagger": "2.0",
"info": {
"title": "Pandaproxy Schema Registry",
"version": "1.0.2"
"version": "1.0.3"
},
"host": "{{Host}}",
"basePath": "/",
Expand Down
22 changes: 22 additions & 0 deletions src/v/pandaproxy/schema_registry/handlers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,28 @@ get_schemas_ids_id_versions(server::request_t rq, server::reply_t rp) {
co_return rp;
}

ss::future<ctx_server<service>::reply_t> get_schemas_ids_id_subjects(
ctx_server<service>::request_t rq, ctx_server<service>::reply_t rp) {
parse_accept_header(rq, rp);
auto id = parse::request_param<schema_id>(*rq.req, "id");
auto incl_del{
parse::query_param<std::optional<include_deleted>>(*rq.req, "deleted")
.value_or(include_deleted::no)};
rq.req.reset();

// List-type request: must ensure we see latest writes
co_await rq.service().writer().read_sync();

// Force early 40403 if the schema id isn't found
co_await rq.service().schema_store().get_schema_definition(id);

auto subjects = co_await rq.service().schema_store().get_schema_subjects(
id, incl_del);
auto json_rslt{json::rjson_serialize(subjects)};
rp.rep->write_body("json", json_rslt);
co_return rp;
}

ss::future<server::reply_t>
get_subjects(server::request_t rq, server::reply_t rp) {
parse_accept_header(rq, rp);
Expand Down
3 changes: 3 additions & 0 deletions src/v/pandaproxy/schema_registry/handlers.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ ss::future<ctx_server<service>::reply_t> get_schemas_ids_id(
ss::future<ctx_server<service>::reply_t> get_schemas_ids_id_versions(
ctx_server<service>::request_t rq, ctx_server<service>::reply_t rp);

ss::future<ctx_server<service>::reply_t> get_schemas_ids_id_subjects(
ctx_server<service>::request_t rq, ctx_server<service>::reply_t rp);

ss::future<ctx_server<service>::reply_t> get_subjects(
ctx_server<service>::request_t rq, ctx_server<service>::reply_t rp);

Expand Down
4 changes: 4 additions & 0 deletions src/v/pandaproxy/schema_registry/service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ server::routes_t get_schema_registry_routes(ss::gate& gate, one_shot& es) {
ss::httpd::schema_registry_json::get_schemas_ids_id_versions,
wrap(gate, es, get_schemas_ids_id_versions)});

routes.routes.emplace_back(server::route_t{
ss::httpd::schema_registry_json::get_schemas_ids_id_subjects,
wrap(gate, es, get_schemas_ids_id_subjects)});

routes.routes.emplace_back(server::route_t{
ss::httpd::schema_registry_json::get_subjects,
wrap(gate, es, get_subjects)});
Expand Down
19 changes: 19 additions & 0 deletions src/v/pandaproxy/schema_registry/sharded_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include <fmt/core.h>

#include <functional>
#include <iterator>

namespace pandaproxy::schema_registry {

Expand Down Expand Up @@ -306,6 +307,24 @@ sharded_store::get_schema_subject_versions(schema_id id) {
map, std::vector<subject_version>{}, reduce);
}

ss::future<std::vector<subject>>
sharded_store::get_schema_subjects(schema_id id, include_deleted inc_del) {
auto map = [id, inc_del](store& s) {
return s.get_schema_subjects(id, inc_del);
};
auto reduce = [](std::vector<subject> acc, std::vector<subject> subs) {
acc.insert(
acc.end(),
std::make_move_iterator(subs.begin()),
std::make_move_iterator(subs.end()));
return acc;
};
auto subs = co_await _store.map_reduce0(
map, std::vector<subject>{}, reduce);
absl::c_sort(subs);
co_return subs;
}

ss::future<subject_schema> sharded_store::get_subject_schema(
subject sub, std::optional<schema_version> version, include_deleted inc_del) {
auto sub_shard{shard_for(sub)};
Expand Down
4 changes: 4 additions & 0 deletions src/v/pandaproxy/schema_registry/sharded_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ class sharded_store {
ss::future<std::vector<subject_version>>
get_schema_subject_versions(schema_id id);

///\brief Return a list of subjects for the schema id.
ss::future<std::vector<subject>>
get_schema_subjects(schema_id id, include_deleted inc_del);

///\brief Return a schema by subject and version (or latest).
ss::future<subject_schema> get_subject_schema(
subject sub,
Expand Down
15 changes: 15 additions & 0 deletions src/v/pandaproxy/schema_registry/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,21 @@ class store {
return svs;
}

///\brief Return a list of subjects for the schema id.
std::vector<subject>
get_schema_subjects(schema_id id, include_deleted inc_del) {
std::vector<subject> subs;
for (const auto& s : _subjects) {
if (absl::c_any_of(
s.second.versions, [id, inc_del](const auto& vs) {
return vs.id == id && (inc_del || !vs.deleted);
})) {
subs.emplace_back(s.first);
}
}
return subs;
}

///\brief Return subject_version_id for a subject and version
result<subject_version_entry> get_subject_version_id(
const subject& sub,
Expand Down
69 changes: 69 additions & 0 deletions src/v/pandaproxy/schema_registry/test/store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const pps::canonical_schema_definition int_def0{
pps::schema_type::avro};
const pps::subject subject0{"subject0"};
const pps::subject subject1{"subject1"};
const pps::subject subject2{"subject2"};

BOOST_AUTO_TEST_CASE(test_store_insert) {
pps::store s;
Expand Down Expand Up @@ -247,6 +248,74 @@ BOOST_AUTO_TEST_CASE(test_store_get_schema_subject_versions) {
BOOST_REQUIRE_EQUAL(versions[0].version, pps::schema_version{2});
}

BOOST_AUTO_TEST_CASE(test_store_get_schema_subjects) {
auto is_equal = [](auto lhs) {
return [lhs](auto rhs) { return lhs == rhs; };
};

pps::store s;

pps::seq_marker dummy_marker;

// First insert, expect id{1}
auto ins_res = s.insert(
{subject0, pps::canonical_schema_definition(schema1)});
BOOST_REQUIRE(ins_res.inserted);
BOOST_REQUIRE_EQUAL(ins_res.id, pps::schema_id{1});
BOOST_REQUIRE_EQUAL(ins_res.version, pps::schema_version{1});

auto subjects = s.get_schema_subjects(
pps::schema_id{1}, pps::include_deleted::no);
BOOST_REQUIRE_EQUAL(subjects.size(), 1);
BOOST_REQUIRE_EQUAL(absl::c_count_if(subjects, is_equal(subject0)), 1);

// Second insert, same schema, expect id{1}
ins_res = s.insert({subject1, pps::canonical_schema_definition(schema1)});
BOOST_REQUIRE(ins_res.inserted);
BOOST_REQUIRE_EQUAL(ins_res.id, pps::schema_id{1});
BOOST_REQUIRE_EQUAL(ins_res.version, pps::schema_version{1});

// Insert yet another schema associated with a different subject
ins_res = s.insert({subject2, pps::canonical_schema_definition(schema2)});
BOOST_REQUIRE(ins_res.inserted);
BOOST_REQUIRE_EQUAL(ins_res.id, pps::schema_id{2});
BOOST_REQUIRE_EQUAL(ins_res.version, pps::schema_version{1});

subjects = s.get_schema_subjects(
pps::schema_id{1}, pps::include_deleted::no);
BOOST_REQUIRE_EQUAL(subjects.size(), 2);
BOOST_REQUIRE_EQUAL(absl::c_count_if(subjects, is_equal(subject0)), 1);
BOOST_REQUIRE_EQUAL(absl::c_count_if(subjects, is_equal(subject1)), 1);
BOOST_REQUIRE_EQUAL(absl::c_count_if(subjects, is_equal(subject2)), 0);

subjects = s.get_schema_subjects(
pps::schema_id{2}, pps::include_deleted::no);
BOOST_REQUIRE_EQUAL(subjects.size(), 1);
BOOST_REQUIRE_EQUAL(absl::c_count_if(subjects, is_equal(subject0)), 0);
BOOST_REQUIRE_EQUAL(absl::c_count_if(subjects, is_equal(subject1)), 0);
BOOST_REQUIRE_EQUAL(absl::c_count_if(subjects, is_equal(subject2)), 1);

// Test deletion

s.upsert_subject(
dummy_marker,
subject0,
pps::schema_version{1},
pps::schema_id{1},
pps::is_deleted::yes);

subjects = s.get_schema_subjects(
pps::schema_id{1}, pps::include_deleted::no);
BOOST_REQUIRE_EQUAL(subjects.size(), 1);
BOOST_REQUIRE_EQUAL(absl::c_count_if(subjects, is_equal(subject1)), 1);

subjects = s.get_schema_subjects(
pps::schema_id{1}, pps::include_deleted::yes);
BOOST_REQUIRE_EQUAL(subjects.size(), 2);
BOOST_REQUIRE_EQUAL(absl::c_count_if(subjects, is_equal(subject0)), 1);
BOOST_REQUIRE_EQUAL(absl::c_count_if(subjects, is_equal(subject1)), 1);
}

BOOST_AUTO_TEST_CASE(test_store_get_subject_schema) {
pps::store s;

Expand Down
86 changes: 86 additions & 0 deletions tests/rptest/tests/schema_registry_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,17 @@ def _get_schemas_ids_id_versions(self,
headers=headers,
**kwargs)

def _get_schemas_ids_id_subjects(self,
id,
deleted=False,
headers=HTTP_GET_HEADERS,
**kwargs):
return self._request(
"GET",
f"schemas/ids/{id}/subjects{'?deleted=true' if deleted else ''}",
headers=headers,
**kwargs)

def _get_subjects(self, deleted=False, headers=HTTP_GET_HEADERS, **kwargs):
return self._request("GET",
f"subjects{'?deleted=true' if deleted else ''}",
Expand Down Expand Up @@ -527,6 +538,81 @@ def test_get_schema_id_versions(self):
assert result_raw.status_code == requests.codes.ok
assert result_raw.json() == []

@cluster(num_nodes=3)
def test_get_schema_id_subjects(self):
"""
Verify schema subjects
"""

# Given an ID and a list of subjects, check the association
# Also checks that schema registry returns a sorted list of subjects
def check_schema_subjects(id: int, subjects: list[str], deleted=False):
result_raw = self._get_schemas_ids_id_subjects(id=id,
deleted=deleted)
if result_raw.status_code != requests.codes.ok:
return False
res_subjects = result_raw.json()
if type(res_subjects) != type([]):
return False
subjects.sort()
return (res_subjects == subjects
and res_subjects == sorted(res_subjects))

self.logger.debug("Checking schema 1 subjects - expect 40403")
result_raw = self._get_schemas_ids_id_subjects(id=1)
assert result_raw.status_code == requests.codes.not_found
assert result_raw.json()["error_code"] == 40403

topics = create_topic_names(2)
topic_0 = topics[0]
topic_1 = topics[1]
subject_0 = f"{topic_0}-value"
subject_1 = f"{topic_1}-value"

schema_1_data = json.dumps({"schema": schema1_def})

self.logger.debug("Posting schema 1 as a subject value")
result_raw = self._post_subjects_subject_versions(subject=subject_0,
data=schema_1_data)

self.logger.debug(result_raw)
assert result_raw.status_code == requests.codes.ok
assert result_raw.json()["id"] == 1

self.logger.debug("Checking schema 1 subjects - expect subject_0")
assert check_schema_subjects(id=1, subjects=list([subject_0]))

self.logger.debug("Posting schema 1 as a subject value (subject_1)")
result_raw = self._post_subjects_subject_versions(subject=subject_1,
data=schema_1_data)

self.logger.debug(result_raw)
assert result_raw.status_code == requests.codes.ok
assert result_raw.json()["id"] == 1

self.logger.debug("Checking schema 1 subjects - expect subject_{0,1}")
assert check_schema_subjects(id=1,
subjects=list([subject_0, subject_1]))

self.logger.debug("Soft delete subject_0")
result_raw = self._delete_subject(subject=subject_0)
assert result_raw.status_code == requests.codes.ok

self.logger.debug("Check again, not including deleted")
assert check_schema_subjects(id=1, subjects=[subject_1])

self.logger.debug("Check including deleted")
assert check_schema_subjects(id=1,
subjects=[subject_0, subject_1],
deleted=True)

self.logger.debug("Hard delete subject_0")
result_raw = self._delete_subject(subject=subject_0, permanent=True)
assert result_raw.status_code == requests.codes.ok

self.logger.debug("Check including deleted - subject_0 should be gone")
assert check_schema_subjects(id=1, subjects=[subject_1], deleted=True)

@cluster(num_nodes=3)
def test_post_subjects_subject_versions(self):
"""
Expand Down