diff --git a/src/v/pandaproxy/api/api-doc/schema_registry.json b/src/v/pandaproxy/api/api-doc/schema_registry.json index 987fe42febd1c..e999015929b7d 100644 --- a/src/v/pandaproxy/api/api-doc/schema_registry.json +++ b/src/v/pandaproxy/api/api-doc/schema_registry.json @@ -574,6 +574,12 @@ "in": "query", "required": false, "type": "string" + }, + { + "name": "subjectPrefix", + "in": "query", + "required": false, + "type": "string" } ], "produces": [ diff --git a/src/v/pandaproxy/schema_registry/handlers.cc b/src/v/pandaproxy/schema_registry/handlers.cc index 38451ca0414d3..8cd8e19d896f3 100644 --- a/src/v/pandaproxy/schema_registry/handlers.cc +++ b/src/v/pandaproxy/schema_registry/handlers.cc @@ -422,12 +422,15 @@ get_subjects(server::request_t rq, server::reply_t rp) { auto inc_del{ parse::query_param>(*rq.req, "deleted") .value_or(include_deleted::no)}; + auto subject_prefix{ + parse::query_param>(*rq.req, "subjectPrefix")}; rq.req.reset(); // List-type request: must ensure we see latest writes co_await rq.service().writer().read_sync(); - auto subjects = co_await rq.service().schema_store().get_subjects(inc_del); + auto subjects = co_await rq.service().schema_store().get_subjects( + inc_del, subject_prefix); rp.rep->write_body( "json", ss::json::stream_range_as_array( diff --git a/src/v/pandaproxy/schema_registry/sharded_store.cc b/src/v/pandaproxy/schema_registry/sharded_store.cc index fbb6b972b2e13..820c2904163d6 100644 --- a/src/v/pandaproxy/schema_registry/sharded_store.cc +++ b/src/v/pandaproxy/schema_registry/sharded_store.cc @@ -354,10 +354,12 @@ ss::future sharded_store::get_subject_schema( .deleted = v_id.deleted}; } -ss::future> -sharded_store::get_subjects(include_deleted inc_del) { +ss::future> sharded_store::get_subjects( + include_deleted inc_del, std::optional subject_prefix) { using subjects = chunked_vector; - auto map = [inc_del](store& s) { return s.get_subjects(inc_del); }; + auto map = [inc_del, &subject_prefix](store& s) { + return s.get_subjects(inc_del, subject_prefix); + }; auto reduce = [](subjects acc, subjects subs) { acc.reserve(acc.size() + subs.size()); std::move(subs.begin(), subs.end(), std::back_inserter(acc)); diff --git a/src/v/pandaproxy/schema_registry/sharded_store.h b/src/v/pandaproxy/schema_registry/sharded_store.h index ed4b7b57d4d71..0e374b43b5b5c 100644 --- a/src/v/pandaproxy/schema_registry/sharded_store.h +++ b/src/v/pandaproxy/schema_registry/sharded_store.h @@ -85,7 +85,9 @@ class sharded_store { include_deleted inc_dec); ///\brief Return a list of subjects. - ss::future> get_subjects(include_deleted inc_del); + ss::future> get_subjects( + include_deleted inc_del, + std::optional subject_prefix = std::nullopt); ///\brief Return whether there are any subjects. ss::future has_subjects(include_deleted inc_del); diff --git a/src/v/pandaproxy/schema_registry/store.h b/src/v/pandaproxy/schema_registry/store.h index 85203c293272b..28e27941d870a 100644 --- a/src/v/pandaproxy/schema_registry/store.h +++ b/src/v/pandaproxy/schema_registry/store.h @@ -174,7 +174,9 @@ class store { } ///\brief Return a list of subjects. - chunked_vector get_subjects(include_deleted inc_del) const { + chunked_vector get_subjects( + include_deleted inc_del, + const std::optional& subject_prefix = std::nullopt) const { chunked_vector res; res.reserve(_subjects.size()); for (const auto& sub : _subjects) { @@ -182,7 +184,9 @@ class store { auto has_version = absl::c_any_of( sub.second.versions, [inc_del](auto const& v) { return inc_del || !v.deleted; }); - if (has_version) { + if ( + has_version + && sub.first().starts_with(subject_prefix.value_or(""))) { res.push_back(sub.first); } } diff --git a/tests/rptest/tests/schema_registry_test.py b/tests/rptest/tests/schema_registry_test.py index 78ef7de890b33..ee425af5ac03b 100644 --- a/tests/rptest/tests/schema_registry_test.py +++ b/tests/rptest/tests/schema_registry_test.py @@ -394,9 +394,19 @@ def _get_schemas_ids_id_subjects(self, headers=headers, **kwargs) - def _get_subjects(self, deleted=False, headers=HTTP_GET_HEADERS, **kwargs): + def _get_subjects(self, + deleted=False, + subject_prefix=None, + headers=HTTP_GET_HEADERS, + **kwargs): + params = {} + if deleted: + params['deleted'] = 'true' + if subject_prefix: + params['subjectPrefix'] = subject_prefix return self._request("GET", - f"subjects{'?deleted=true' if deleted else ''}", + "subjects", + params=params, headers=headers, **kwargs) @@ -2248,6 +2258,45 @@ def test_get_schema_id_versions(self): assert result_raw.status_code == requests.codes.ok assert result_raw.json() == [{"subject": subject, "version": 1}] + @cluster(num_nodes=3) + def test_get_subjects(self): + """ + Verify getting subjects + """ + self._init_users() + + topics = ['a', 'aa', 'b', 'ab', 'bb'] + + schema_1_data = json.dumps({"schema": schema1_def}) + + self.logger.debug("Posting schemas 1 as subject keys") + + def post(topic): + result_raw = self._post_subjects_subject_versions( + subject=f"{topic}-key", + data=schema_1_data, + auth=self.super_auth) + self.logger.debug(result_raw) + assert result_raw.status_code == requests.codes.ok + + for t in topics: + post(t) + + def get_subjects(prefix: Optional[str]): + result_raw = self._get_subjects(subject_prefix=prefix, + auth=self.super_auth) + assert result_raw.status_code == requests.codes.ok + + return result_raw.json() + + assert len(get_subjects(prefix=None)) == 5 + assert len(get_subjects(prefix="")) == 5 + assert len(get_subjects(prefix="a")) == 3 + assert len(get_subjects(prefix="aa")) == 1 + assert len(get_subjects(prefix="aaa")) == 0 + assert len(get_subjects(prefix="b")) == 2 + assert len(get_subjects(prefix="bb")) == 1 + @cluster(num_nodes=3) def test_post_subjects_subject_versions(self): """