From 709d5e4ebb1f03cceca07ed52df0394cd3699307 Mon Sep 17 00:00:00 2001 From: Oren Leiman Date: Thu, 24 Aug 2023 14:29:43 -0700 Subject: [PATCH 1/5] schema_registry/store: Introduce get_schema_subjects (cherry picked from commit d7007f5ab787b417d21cb1aad3937e891e63899f) --- .../schema_registry/sharded_store.cc | 19 +++++ .../schema_registry/sharded_store.h | 4 ++ src/v/pandaproxy/schema_registry/store.h | 15 ++++ .../pandaproxy/schema_registry/test/store.cc | 69 +++++++++++++++++++ 4 files changed, 107 insertions(+) diff --git a/src/v/pandaproxy/schema_registry/sharded_store.cc b/src/v/pandaproxy/schema_registry/sharded_store.cc index d4bf958b670e4..e1becfae68d97 100644 --- a/src/v/pandaproxy/schema_registry/sharded_store.cc +++ b/src/v/pandaproxy/schema_registry/sharded_store.cc @@ -33,6 +33,7 @@ #include #include +#include namespace pandaproxy::schema_registry { @@ -306,6 +307,24 @@ sharded_store::get_schema_subject_versions(schema_id id) { map, std::vector{}, reduce); } +ss::future> +sharded_store::get_schema_subjects(schema_id id, include_deleted inc_del) { + auto map = [id, inc_del](store& s) { + return s.get_schema_subjects(id, inc_del); + }; + auto reduce = [](std::vector acc, std::vector subs) { + acc.insert( + acc.end(), + std::make_move_iterator(subs.begin()), + std::make_move_iterator(subs.end())); + return acc; + }; + auto subs = co_await _store.map_reduce0( + map, std::vector{}, reduce); + absl::c_sort(subs); + co_return subs; +} + ss::future sharded_store::get_subject_schema( subject sub, std::optional version, include_deleted inc_del) { auto sub_shard{shard_for(sub)}; diff --git a/src/v/pandaproxy/schema_registry/sharded_store.h b/src/v/pandaproxy/schema_registry/sharded_store.h index 99f8cb0099a41..f9b317154123d 100644 --- a/src/v/pandaproxy/schema_registry/sharded_store.h +++ b/src/v/pandaproxy/schema_registry/sharded_store.h @@ -67,6 +67,10 @@ class sharded_store { ss::future> get_schema_subject_versions(schema_id id); + ///\brief Return a list of subjects for the schema id. + ss::future> + get_schema_subjects(schema_id id, include_deleted inc_del); + ///\brief Return a schema by subject and version (or latest). ss::future get_subject_schema( subject sub, diff --git a/src/v/pandaproxy/schema_registry/store.h b/src/v/pandaproxy/schema_registry/store.h index 56fbadec43b63..ee5859f4879c0 100644 --- a/src/v/pandaproxy/schema_registry/store.h +++ b/src/v/pandaproxy/schema_registry/store.h @@ -113,6 +113,21 @@ class store { return svs; } + ///\brief Return a list of subjects for the schema id. + std::vector + get_schema_subjects(schema_id id, include_deleted inc_del) { + std::vector subs; + for (const auto& s : _subjects) { + if (absl::c_any_of( + s.second.versions, [id, inc_del](const auto& vs) { + return vs.id == id && (inc_del || !vs.deleted); + })) { + subs.emplace_back(s.first); + } + } + return subs; + } + ///\brief Return subject_version_id for a subject and version result get_subject_version_id( const subject& sub, diff --git a/src/v/pandaproxy/schema_registry/test/store.cc b/src/v/pandaproxy/schema_registry/test/store.cc index a1002bed3673d..37dedae537455 100644 --- a/src/v/pandaproxy/schema_registry/test/store.cc +++ b/src/v/pandaproxy/schema_registry/test/store.cc @@ -32,6 +32,7 @@ const pps::canonical_schema_definition int_def0{ pps::schema_type::avro}; const pps::subject subject0{"subject0"}; const pps::subject subject1{"subject1"}; +const pps::subject subject2{"subject2"}; BOOST_AUTO_TEST_CASE(test_store_insert) { pps::store s; @@ -247,6 +248,74 @@ BOOST_AUTO_TEST_CASE(test_store_get_schema_subject_versions) { BOOST_REQUIRE_EQUAL(versions[0].version, pps::schema_version{2}); } +BOOST_AUTO_TEST_CASE(test_store_get_schema_subjects) { + auto is_equal = [](auto lhs) { + return [lhs](auto rhs) { return lhs == rhs; }; + }; + + pps::store s; + + pps::seq_marker dummy_marker; + + // First insert, expect id{1} + auto ins_res = s.insert( + {subject0, pps::canonical_schema_definition(schema1)}); + BOOST_REQUIRE(ins_res.inserted); + BOOST_REQUIRE_EQUAL(ins_res.id, pps::schema_id{1}); + BOOST_REQUIRE_EQUAL(ins_res.version, pps::schema_version{1}); + + auto subjects = s.get_schema_subjects( + pps::schema_id{1}, pps::include_deleted::no); + BOOST_REQUIRE_EQUAL(subjects.size(), 1); + BOOST_REQUIRE_EQUAL(absl::c_count_if(subjects, is_equal(subject0)), 1); + + // Second insert, same schema, expect id{1} + ins_res = s.insert({subject1, pps::canonical_schema_definition(schema1)}); + BOOST_REQUIRE(ins_res.inserted); + BOOST_REQUIRE_EQUAL(ins_res.id, pps::schema_id{1}); + BOOST_REQUIRE_EQUAL(ins_res.version, pps::schema_version{1}); + + // Insert yet another schema associated with a different subject + ins_res = s.insert({subject2, pps::canonical_schema_definition(schema2)}); + BOOST_REQUIRE(ins_res.inserted); + BOOST_REQUIRE_EQUAL(ins_res.id, pps::schema_id{2}); + BOOST_REQUIRE_EQUAL(ins_res.version, pps::schema_version{1}); + + subjects = s.get_schema_subjects( + pps::schema_id{1}, pps::include_deleted::no); + BOOST_REQUIRE_EQUAL(subjects.size(), 2); + BOOST_REQUIRE_EQUAL(absl::c_count_if(subjects, is_equal(subject0)), 1); + BOOST_REQUIRE_EQUAL(absl::c_count_if(subjects, is_equal(subject1)), 1); + BOOST_REQUIRE_EQUAL(absl::c_count_if(subjects, is_equal(subject2)), 0); + + subjects = s.get_schema_subjects( + pps::schema_id{2}, pps::include_deleted::no); + BOOST_REQUIRE_EQUAL(subjects.size(), 1); + BOOST_REQUIRE_EQUAL(absl::c_count_if(subjects, is_equal(subject0)), 0); + BOOST_REQUIRE_EQUAL(absl::c_count_if(subjects, is_equal(subject1)), 0); + BOOST_REQUIRE_EQUAL(absl::c_count_if(subjects, is_equal(subject2)), 1); + + // Test deletion + + s.upsert_subject( + dummy_marker, + subject0, + pps::schema_version{1}, + pps::schema_id{1}, + pps::is_deleted::yes); + + subjects = s.get_schema_subjects( + pps::schema_id{1}, pps::include_deleted::no); + BOOST_REQUIRE_EQUAL(subjects.size(), 1); + BOOST_REQUIRE_EQUAL(absl::c_count_if(subjects, is_equal(subject1)), 1); + + subjects = s.get_schema_subjects( + pps::schema_id{1}, pps::include_deleted::yes); + BOOST_REQUIRE_EQUAL(subjects.size(), 2); + BOOST_REQUIRE_EQUAL(absl::c_count_if(subjects, is_equal(subject0)), 1); + BOOST_REQUIRE_EQUAL(absl::c_count_if(subjects, is_equal(subject1)), 1); +} + BOOST_AUTO_TEST_CASE(test_store_get_subject_schema) { pps::store s; From cb344d860fc3a7c444d47dc0f723cd2ae1bf7f51 Mon Sep 17 00:00:00 2001 From: Oren Leiman Date: Thu, 24 Aug 2023 11:11:42 -0700 Subject: [PATCH 2/5] schema_registry/handlers: Support get_schema_subjects (cherry picked from commit 12502e1afddcdc2ce22e85a01970c7b1d6f55029) --- src/v/pandaproxy/schema_registry/handlers.cc | 22 ++++++++++++++++++++ src/v/pandaproxy/schema_registry/handlers.h | 3 +++ 2 files changed, 25 insertions(+) diff --git a/src/v/pandaproxy/schema_registry/handlers.cc b/src/v/pandaproxy/schema_registry/handlers.cc index f04268a476fa0..10c811bb81a6b 100644 --- a/src/v/pandaproxy/schema_registry/handlers.cc +++ b/src/v/pandaproxy/schema_registry/handlers.cc @@ -234,6 +234,28 @@ get_schemas_ids_id_versions(server::request_t rq, server::reply_t rp) { co_return rp; } +ss::future::reply_t> get_schemas_ids_id_subjects( + ctx_server::request_t rq, ctx_server::reply_t rp) { + parse_accept_header(rq, rp); + auto id = parse::request_param(*rq.req, "id"); + auto incl_del{ + parse::query_param>(*rq.req, "deleted") + .value_or(include_deleted::no)}; + rq.req.reset(); + + // List-type request: must ensure we see latest writes + co_await rq.service().writer().read_sync(); + + // Force early 40403 if the schema id isn't found + co_await rq.service().schema_store().get_schema_definition(id); + + auto subjects = co_await rq.service().schema_store().get_schema_subjects( + id, incl_del); + auto json_rslt{json::rjson_serialize(subjects)}; + rp.rep->write_body("json", json_rslt); + co_return rp; +} + ss::future get_subjects(server::request_t rq, server::reply_t rp) { parse_accept_header(rq, rp); diff --git a/src/v/pandaproxy/schema_registry/handlers.h b/src/v/pandaproxy/schema_registry/handlers.h index 20fec78eba633..5b88ce43f1a90 100644 --- a/src/v/pandaproxy/schema_registry/handlers.h +++ b/src/v/pandaproxy/schema_registry/handlers.h @@ -43,6 +43,9 @@ ss::future::reply_t> get_schemas_ids_id( ss::future::reply_t> get_schemas_ids_id_versions( ctx_server::request_t rq, ctx_server::reply_t rp); +ss::future::reply_t> get_schemas_ids_id_subjects( + ctx_server::request_t rq, ctx_server::reply_t rp); + ss::future::reply_t> get_subjects( ctx_server::request_t rq, ctx_server::reply_t rp); From 3b0b9de0a5afa135eb567d5db0be387134435c20 Mon Sep 17 00:00:00 2001 From: Oren Leiman Date: Thu, 24 Aug 2023 10:28:17 -0700 Subject: [PATCH 3/5] schema_registry: Swagger for subjects (cherry picked from commit 6f3233a051a20ccaee3d984a7179c582b71fc268) --- .../api/api-doc/schema_registry.json | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/src/v/pandaproxy/api/api-doc/schema_registry.json b/src/v/pandaproxy/api/api-doc/schema_registry.json index 9ec8a45f6e8d3..30a8e683af357 100644 --- a/src/v/pandaproxy/api/api-doc/schema_registry.json +++ b/src/v/pandaproxy/api/api-doc/schema_registry.json @@ -322,6 +322,55 @@ } } }, + "/schemas/ids/{id}/subjects": { + "get": { + "summary": "Retrieve a list of subjects associated with some schema ID.", + "operationId": "get_schemas_ids_id_subjects", + "consumes": [ + "application/vnd.schemaregistry.v1+json", + "application/vnd.schemaregistry+json", + "application/json" + ], + "parameters": [ + { + "name": "id", + "in": "path", + "required": true, + "type": "integer" + }, + { + "name": "deleted", + "in": "query", + "required": false, + "type": "boolean" + } + ], + "produces": ["application/vnd.schemaregistry.v1+json"], + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "array", + "items": { + "type": "string" + } + } + }, + "404": { + "description": "Schema not found", + "schema": { + "$ref": "#/definitions/error_body" + } + }, + "500": { + "description": "Internal Server error", + "schema": { + "$ref": "#/definitions/error_body" + } + } + } + } + }, "/subjects": { "get": { "summary": "Retrieve a list of subjects.", From 4dc0a5b58560cadc68198768f6ec8424ef87cabd Mon Sep 17 00:00:00 2001 From: Oren Leiman Date: Thu, 24 Aug 2023 15:53:26 -0700 Subject: [PATCH 4/5] schema_registry: Wire-up get_schema_subjects (cherry picked from commit 93a49f3c6a81e053e33ed941f3c88b4b2e1f1aa0) --- src/v/pandaproxy/schema_registry/service.cc | 4 + tests/rptest/tests/schema_registry_test.py | 86 +++++++++++++++++++++ 2 files changed, 90 insertions(+) diff --git a/src/v/pandaproxy/schema_registry/service.cc b/src/v/pandaproxy/schema_registry/service.cc index cf8509e57da26..63f943617004d 100644 --- a/src/v/pandaproxy/schema_registry/service.cc +++ b/src/v/pandaproxy/schema_registry/service.cc @@ -122,6 +122,10 @@ server::routes_t get_schema_registry_routes(ss::gate& gate, one_shot& es) { ss::httpd::schema_registry_json::get_schemas_ids_id_versions, wrap(gate, es, get_schemas_ids_id_versions)}); + routes.routes.emplace_back(server::route_t{ + ss::httpd::schema_registry_json::get_schemas_ids_id_subjects, + wrap(gate, es, get_schemas_ids_id_subjects)}); + routes.routes.emplace_back(server::route_t{ ss::httpd::schema_registry_json::get_subjects, wrap(gate, es, get_subjects)}); diff --git a/tests/rptest/tests/schema_registry_test.py b/tests/rptest/tests/schema_registry_test.py index c403a5e35e309..8ed649d33d6ae 100644 --- a/tests/rptest/tests/schema_registry_test.py +++ b/tests/rptest/tests/schema_registry_test.py @@ -332,6 +332,17 @@ def _get_schemas_ids_id_versions(self, headers=headers, **kwargs) + def _get_schemas_ids_id_subjects(self, + id, + deleted=False, + headers=HTTP_GET_HEADERS, + **kwargs): + return self._request( + "GET", + f"schemas/ids/{id}/subjects{'?deleted=true' if deleted else ''}", + headers=headers, + **kwargs) + def _get_subjects(self, deleted=False, headers=HTTP_GET_HEADERS, **kwargs): return self._request("GET", f"subjects{'?deleted=true' if deleted else ''}", @@ -527,6 +538,81 @@ def test_get_schema_id_versions(self): assert result_raw.status_code == requests.codes.ok assert result_raw.json() == [] + @cluster(num_nodes=3) + def test_get_schema_id_subjects(self): + """ + Verify schema subjects + """ + + # Given an ID and a list of subjects, check the association + # Also checks that schema registry returns a sorted list of subjects + def check_schema_subjects(id: int, subjects: list[str], deleted=False): + result_raw = self._get_schemas_ids_id_subjects(id=id, + deleted=deleted) + if result_raw.status_code != requests.codes.ok: + return False + res_subjects = result_raw.json() + if type(res_subjects) != type([]): + return False + subjects.sort() + return (res_subjects == subjects + and res_subjects == sorted(res_subjects)) + + self.logger.debug("Checking schema 1 subjects - expect 40403") + result_raw = self._get_schemas_ids_id_subjects(id=1) + assert result_raw.status_code == requests.codes.not_found + assert result_raw.json()["error_code"] == 40403 + + topics = create_topic_names(2) + topic_0 = topics[0] + topic_1 = topics[1] + subject_0 = f"{topic_0}-value" + subject_1 = f"{topic_1}-value" + + schema_1_data = json.dumps({"schema": schema1_def}) + + self.logger.debug("Posting schema 1 as a subject value") + result_raw = self._post_subjects_subject_versions(subject=subject_0, + data=schema_1_data) + + self.logger.debug(result_raw) + assert result_raw.status_code == requests.codes.ok + assert result_raw.json()["id"] == 1 + + self.logger.debug("Checking schema 1 subjects - expect subject_0") + assert check_schema_subjects(id=1, subjects=list([subject_0])) + + self.logger.debug("Posting schema 1 as a subject value (subject_1)") + result_raw = self._post_subjects_subject_versions(subject=subject_1, + data=schema_1_data) + + self.logger.debug(result_raw) + assert result_raw.status_code == requests.codes.ok + assert result_raw.json()["id"] == 1 + + self.logger.debug("Checking schema 1 subjects - expect subject_{0,1}") + assert check_schema_subjects(id=1, + subjects=list([subject_0, subject_1])) + + self.logger.debug("Soft delete subject_0") + result_raw = self._delete_subject(subject=subject_0) + assert result_raw.status_code == requests.codes.ok + + self.logger.debug("Check again, not including deleted") + assert check_schema_subjects(id=1, subjects=[subject_1]) + + self.logger.debug("Check including deleted") + assert check_schema_subjects(id=1, + subjects=[subject_0, subject_1], + deleted=True) + + self.logger.debug("Hard delete subject_0") + result_raw = self._delete_subject(subject=subject_0, permanent=True) + assert result_raw.status_code == requests.codes.ok + + self.logger.debug("Check including deleted - subject_0 should be gone") + assert check_schema_subjects(id=1, subjects=[subject_1], deleted=True) + @cluster(num_nodes=3) def test_post_subjects_subject_versions(self): """ From 789bb4e9d0839669695ec653e37e6206cefa91e5 Mon Sep 17 00:00:00 2001 From: Oren Leiman Date: Thu, 24 Aug 2023 17:13:54 -0700 Subject: [PATCH 5/5] schema_registry: Bump swagger version to 1.0.3 (cherry picked from commit 84bf726f260ff67fc7070acbe60a7a080054edbd) --- src/v/pandaproxy/api/api-doc/schema_registry_header.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/v/pandaproxy/api/api-doc/schema_registry_header.json b/src/v/pandaproxy/api/api-doc/schema_registry_header.json index f77e5710aa40f..6f807a01b4164 100644 --- a/src/v/pandaproxy/api/api-doc/schema_registry_header.json +++ b/src/v/pandaproxy/api/api-doc/schema_registry_header.json @@ -2,7 +2,7 @@ "swagger": "2.0", "info": { "title": "Pandaproxy Schema Registry", - "version": "1.0.2" + "version": "1.0.3" }, "host": "{{Host}}", "basePath": "/",