Skip to content

Commit

Permalink
Merge pull request redpanda-data#13084 from vbotbuildovich/backport-p…
Browse files Browse the repository at this point in the history
…r-13020-v23.2.x-968

[v23.2.x] schema_registry: Support GET /schemas/ids/{id}/subjects
  • Loading branch information
piyushredpanda authored Aug 30, 2023
2 parents 7422f86 + 789bb4e commit 8047a99
Show file tree
Hide file tree
Showing 10 changed files with 272 additions and 1 deletion.
49 changes: 49 additions & 0 deletions src/v/pandaproxy/api/api-doc/schema_registry.json
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,55 @@
}
}
},
"/schemas/ids/{id}/subjects": {
"get": {
"summary": "Retrieve a list of subjects associated with some schema ID.",
"operationId": "get_schemas_ids_id_subjects",
"consumes": [
"application/vnd.schemaregistry.v1+json",
"application/vnd.schemaregistry+json",
"application/json"
],
"parameters": [
{
"name": "id",
"in": "path",
"required": true,
"type": "integer"
},
{
"name": "deleted",
"in": "query",
"required": false,
"type": "boolean"
}
],
"produces": ["application/vnd.schemaregistry.v1+json"],
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "array",
"items": {
"type": "string"
}
}
},
"404": {
"description": "Schema not found",
"schema": {
"$ref": "#/definitions/error_body"
}
},
"500": {
"description": "Internal Server error",
"schema": {
"$ref": "#/definitions/error_body"
}
}
}
}
},
"/subjects": {
"get": {
"summary": "Retrieve a list of subjects.",
Expand Down
2 changes: 1 addition & 1 deletion src/v/pandaproxy/api/api-doc/schema_registry_header.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"swagger": "2.0",
"info": {
"title": "Pandaproxy Schema Registry",
"version": "1.0.2"
"version": "1.0.3"
},
"host": "{{Host}}",
"basePath": "/",
Expand Down
22 changes: 22 additions & 0 deletions src/v/pandaproxy/schema_registry/handlers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,28 @@ get_schemas_ids_id_versions(server::request_t rq, server::reply_t rp) {
co_return rp;
}

ss::future<ctx_server<service>::reply_t> get_schemas_ids_id_subjects(
ctx_server<service>::request_t rq, ctx_server<service>::reply_t rp) {
parse_accept_header(rq, rp);
auto id = parse::request_param<schema_id>(*rq.req, "id");
auto incl_del{
parse::query_param<std::optional<include_deleted>>(*rq.req, "deleted")
.value_or(include_deleted::no)};
rq.req.reset();

// List-type request: must ensure we see latest writes
co_await rq.service().writer().read_sync();

// Force early 40403 if the schema id isn't found
co_await rq.service().schema_store().get_schema_definition(id);

auto subjects = co_await rq.service().schema_store().get_schema_subjects(
id, incl_del);
auto json_rslt{json::rjson_serialize(subjects)};
rp.rep->write_body("json", json_rslt);
co_return rp;
}

ss::future<server::reply_t>
get_subjects(server::request_t rq, server::reply_t rp) {
parse_accept_header(rq, rp);
Expand Down
3 changes: 3 additions & 0 deletions src/v/pandaproxy/schema_registry/handlers.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ ss::future<ctx_server<service>::reply_t> get_schemas_ids_id(
ss::future<ctx_server<service>::reply_t> get_schemas_ids_id_versions(
ctx_server<service>::request_t rq, ctx_server<service>::reply_t rp);

ss::future<ctx_server<service>::reply_t> get_schemas_ids_id_subjects(
ctx_server<service>::request_t rq, ctx_server<service>::reply_t rp);

ss::future<ctx_server<service>::reply_t> get_subjects(
ctx_server<service>::request_t rq, ctx_server<service>::reply_t rp);

Expand Down
4 changes: 4 additions & 0 deletions src/v/pandaproxy/schema_registry/service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ server::routes_t get_schema_registry_routes(ss::gate& gate, one_shot& es) {
ss::httpd::schema_registry_json::get_schemas_ids_id_versions,
wrap(gate, es, get_schemas_ids_id_versions)});

routes.routes.emplace_back(server::route_t{
ss::httpd::schema_registry_json::get_schemas_ids_id_subjects,
wrap(gate, es, get_schemas_ids_id_subjects)});

routes.routes.emplace_back(server::route_t{
ss::httpd::schema_registry_json::get_subjects,
wrap(gate, es, get_subjects)});
Expand Down
19 changes: 19 additions & 0 deletions src/v/pandaproxy/schema_registry/sharded_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include <fmt/core.h>

#include <functional>
#include <iterator>

namespace pandaproxy::schema_registry {

Expand Down Expand Up @@ -306,6 +307,24 @@ sharded_store::get_schema_subject_versions(schema_id id) {
map, std::vector<subject_version>{}, reduce);
}

ss::future<std::vector<subject>>
sharded_store::get_schema_subjects(schema_id id, include_deleted inc_del) {
auto map = [id, inc_del](store& s) {
return s.get_schema_subjects(id, inc_del);
};
auto reduce = [](std::vector<subject> acc, std::vector<subject> subs) {
acc.insert(
acc.end(),
std::make_move_iterator(subs.begin()),
std::make_move_iterator(subs.end()));
return acc;
};
auto subs = co_await _store.map_reduce0(
map, std::vector<subject>{}, reduce);
absl::c_sort(subs);
co_return subs;
}

ss::future<subject_schema> sharded_store::get_subject_schema(
subject sub, std::optional<schema_version> version, include_deleted inc_del) {
auto sub_shard{shard_for(sub)};
Expand Down
4 changes: 4 additions & 0 deletions src/v/pandaproxy/schema_registry/sharded_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ class sharded_store {
ss::future<std::vector<subject_version>>
get_schema_subject_versions(schema_id id);

///\brief Return a list of subjects for the schema id.
ss::future<std::vector<subject>>
get_schema_subjects(schema_id id, include_deleted inc_del);

///\brief Return a schema by subject and version (or latest).
ss::future<subject_schema> get_subject_schema(
subject sub,
Expand Down
15 changes: 15 additions & 0 deletions src/v/pandaproxy/schema_registry/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,21 @@ class store {
return svs;
}

///\brief Return a list of subjects for the schema id.
std::vector<subject>
get_schema_subjects(schema_id id, include_deleted inc_del) {
std::vector<subject> subs;
for (const auto& s : _subjects) {
if (absl::c_any_of(
s.second.versions, [id, inc_del](const auto& vs) {
return vs.id == id && (inc_del || !vs.deleted);
})) {
subs.emplace_back(s.first);
}
}
return subs;
}

///\brief Return subject_version_id for a subject and version
result<subject_version_entry> get_subject_version_id(
const subject& sub,
Expand Down
69 changes: 69 additions & 0 deletions src/v/pandaproxy/schema_registry/test/store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const pps::canonical_schema_definition int_def0{
pps::schema_type::avro};
const pps::subject subject0{"subject0"};
const pps::subject subject1{"subject1"};
const pps::subject subject2{"subject2"};

BOOST_AUTO_TEST_CASE(test_store_insert) {
pps::store s;
Expand Down Expand Up @@ -247,6 +248,74 @@ BOOST_AUTO_TEST_CASE(test_store_get_schema_subject_versions) {
BOOST_REQUIRE_EQUAL(versions[0].version, pps::schema_version{2});
}

BOOST_AUTO_TEST_CASE(test_store_get_schema_subjects) {
auto is_equal = [](auto lhs) {
return [lhs](auto rhs) { return lhs == rhs; };
};

pps::store s;

pps::seq_marker dummy_marker;

// First insert, expect id{1}
auto ins_res = s.insert(
{subject0, pps::canonical_schema_definition(schema1)});
BOOST_REQUIRE(ins_res.inserted);
BOOST_REQUIRE_EQUAL(ins_res.id, pps::schema_id{1});
BOOST_REQUIRE_EQUAL(ins_res.version, pps::schema_version{1});

auto subjects = s.get_schema_subjects(
pps::schema_id{1}, pps::include_deleted::no);
BOOST_REQUIRE_EQUAL(subjects.size(), 1);
BOOST_REQUIRE_EQUAL(absl::c_count_if(subjects, is_equal(subject0)), 1);

// Second insert, same schema, expect id{1}
ins_res = s.insert({subject1, pps::canonical_schema_definition(schema1)});
BOOST_REQUIRE(ins_res.inserted);
BOOST_REQUIRE_EQUAL(ins_res.id, pps::schema_id{1});
BOOST_REQUIRE_EQUAL(ins_res.version, pps::schema_version{1});

// Insert yet another schema associated with a different subject
ins_res = s.insert({subject2, pps::canonical_schema_definition(schema2)});
BOOST_REQUIRE(ins_res.inserted);
BOOST_REQUIRE_EQUAL(ins_res.id, pps::schema_id{2});
BOOST_REQUIRE_EQUAL(ins_res.version, pps::schema_version{1});

subjects = s.get_schema_subjects(
pps::schema_id{1}, pps::include_deleted::no);
BOOST_REQUIRE_EQUAL(subjects.size(), 2);
BOOST_REQUIRE_EQUAL(absl::c_count_if(subjects, is_equal(subject0)), 1);
BOOST_REQUIRE_EQUAL(absl::c_count_if(subjects, is_equal(subject1)), 1);
BOOST_REQUIRE_EQUAL(absl::c_count_if(subjects, is_equal(subject2)), 0);

subjects = s.get_schema_subjects(
pps::schema_id{2}, pps::include_deleted::no);
BOOST_REQUIRE_EQUAL(subjects.size(), 1);
BOOST_REQUIRE_EQUAL(absl::c_count_if(subjects, is_equal(subject0)), 0);
BOOST_REQUIRE_EQUAL(absl::c_count_if(subjects, is_equal(subject1)), 0);
BOOST_REQUIRE_EQUAL(absl::c_count_if(subjects, is_equal(subject2)), 1);

// Test deletion

s.upsert_subject(
dummy_marker,
subject0,
pps::schema_version{1},
pps::schema_id{1},
pps::is_deleted::yes);

subjects = s.get_schema_subjects(
pps::schema_id{1}, pps::include_deleted::no);
BOOST_REQUIRE_EQUAL(subjects.size(), 1);
BOOST_REQUIRE_EQUAL(absl::c_count_if(subjects, is_equal(subject1)), 1);

subjects = s.get_schema_subjects(
pps::schema_id{1}, pps::include_deleted::yes);
BOOST_REQUIRE_EQUAL(subjects.size(), 2);
BOOST_REQUIRE_EQUAL(absl::c_count_if(subjects, is_equal(subject0)), 1);
BOOST_REQUIRE_EQUAL(absl::c_count_if(subjects, is_equal(subject1)), 1);
}

BOOST_AUTO_TEST_CASE(test_store_get_subject_schema) {
pps::store s;

Expand Down
86 changes: 86 additions & 0 deletions tests/rptest/tests/schema_registry_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,17 @@ def _get_schemas_ids_id_versions(self,
headers=headers,
**kwargs)

def _get_schemas_ids_id_subjects(self,
id,
deleted=False,
headers=HTTP_GET_HEADERS,
**kwargs):
return self._request(
"GET",
f"schemas/ids/{id}/subjects{'?deleted=true' if deleted else ''}",
headers=headers,
**kwargs)

def _get_subjects(self, deleted=False, headers=HTTP_GET_HEADERS, **kwargs):
return self._request("GET",
f"subjects{'?deleted=true' if deleted else ''}",
Expand Down Expand Up @@ -527,6 +538,81 @@ def test_get_schema_id_versions(self):
assert result_raw.status_code == requests.codes.ok
assert result_raw.json() == []

@cluster(num_nodes=3)
def test_get_schema_id_subjects(self):
"""
Verify schema subjects
"""

# Given an ID and a list of subjects, check the association
# Also checks that schema registry returns a sorted list of subjects
def check_schema_subjects(id: int, subjects: list[str], deleted=False):
result_raw = self._get_schemas_ids_id_subjects(id=id,
deleted=deleted)
if result_raw.status_code != requests.codes.ok:
return False
res_subjects = result_raw.json()
if type(res_subjects) != type([]):
return False
subjects.sort()
return (res_subjects == subjects
and res_subjects == sorted(res_subjects))

self.logger.debug("Checking schema 1 subjects - expect 40403")
result_raw = self._get_schemas_ids_id_subjects(id=1)
assert result_raw.status_code == requests.codes.not_found
assert result_raw.json()["error_code"] == 40403

topics = create_topic_names(2)
topic_0 = topics[0]
topic_1 = topics[1]
subject_0 = f"{topic_0}-value"
subject_1 = f"{topic_1}-value"

schema_1_data = json.dumps({"schema": schema1_def})

self.logger.debug("Posting schema 1 as a subject value")
result_raw = self._post_subjects_subject_versions(subject=subject_0,
data=schema_1_data)

self.logger.debug(result_raw)
assert result_raw.status_code == requests.codes.ok
assert result_raw.json()["id"] == 1

self.logger.debug("Checking schema 1 subjects - expect subject_0")
assert check_schema_subjects(id=1, subjects=list([subject_0]))

self.logger.debug("Posting schema 1 as a subject value (subject_1)")
result_raw = self._post_subjects_subject_versions(subject=subject_1,
data=schema_1_data)

self.logger.debug(result_raw)
assert result_raw.status_code == requests.codes.ok
assert result_raw.json()["id"] == 1

self.logger.debug("Checking schema 1 subjects - expect subject_{0,1}")
assert check_schema_subjects(id=1,
subjects=list([subject_0, subject_1]))

self.logger.debug("Soft delete subject_0")
result_raw = self._delete_subject(subject=subject_0)
assert result_raw.status_code == requests.codes.ok

self.logger.debug("Check again, not including deleted")
assert check_schema_subjects(id=1, subjects=[subject_1])

self.logger.debug("Check including deleted")
assert check_schema_subjects(id=1,
subjects=[subject_0, subject_1],
deleted=True)

self.logger.debug("Hard delete subject_0")
result_raw = self._delete_subject(subject=subject_0, permanent=True)
assert result_raw.status_code == requests.codes.ok

self.logger.debug("Check including deleted - subject_0 should be gone")
assert check_schema_subjects(id=1, subjects=[subject_1], deleted=True)

@cluster(num_nodes=3)
def test_post_subjects_subject_versions(self):
"""
Expand Down

0 comments on commit 8047a99

Please sign in to comment.