Skip to content

Commit

Permalink
schema_registry: Wire-up get_schema_subjects
Browse files Browse the repository at this point in the history
  • Loading branch information
oleiman committed Aug 28, 2023
1 parent 6f3233a commit 93a49f3
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 0 deletions.
4 changes: 4 additions & 0 deletions src/v/pandaproxy/schema_registry/service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ server::routes_t get_schema_registry_routes(ss::gate& gate, one_shot& es) {
ss::httpd::schema_registry_json::get_schemas_ids_id_versions,
wrap(gate, es, get_schemas_ids_id_versions)});

routes.routes.emplace_back(server::route_t{
ss::httpd::schema_registry_json::get_schemas_ids_id_subjects,
wrap(gate, es, get_schemas_ids_id_subjects)});

routes.routes.emplace_back(server::route_t{
ss::httpd::schema_registry_json::get_subjects,
wrap(gate, es, get_subjects)});
Expand Down
86 changes: 86 additions & 0 deletions tests/rptest/tests/schema_registry_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,17 @@ def _get_schemas_ids_id_versions(self,
headers=headers,
**kwargs)

def _get_schemas_ids_id_subjects(self,
id,
deleted=False,
headers=HTTP_GET_HEADERS,
**kwargs):
return self._request(
"GET",
f"schemas/ids/{id}/subjects{'?deleted=true' if deleted else ''}",
headers=headers,
**kwargs)

def _get_subjects(self, deleted=False, headers=HTTP_GET_HEADERS, **kwargs):
return self._request("GET",
f"subjects{'?deleted=true' if deleted else ''}",
Expand Down Expand Up @@ -527,6 +538,81 @@ def test_get_schema_id_versions(self):
assert result_raw.status_code == requests.codes.ok
assert result_raw.json() == []

@cluster(num_nodes=3)
def test_get_schema_id_subjects(self):
"""
Verify schema subjects
"""

# Given an ID and a list of subjects, check the association
# Also checks that schema registry returns a sorted list of subjects
def check_schema_subjects(id: int, subjects: list[str], deleted=False):
result_raw = self._get_schemas_ids_id_subjects(id=id,
deleted=deleted)
if result_raw.status_code != requests.codes.ok:
return False
res_subjects = result_raw.json()
if type(res_subjects) != type([]):
return False
subjects.sort()
return (res_subjects == subjects
and res_subjects == sorted(res_subjects))

self.logger.debug("Checking schema 1 subjects - expect 40403")
result_raw = self._get_schemas_ids_id_subjects(id=1)
assert result_raw.status_code == requests.codes.not_found
assert result_raw.json()["error_code"] == 40403

topics = create_topic_names(2)
topic_0 = topics[0]
topic_1 = topics[1]
subject_0 = f"{topic_0}-value"
subject_1 = f"{topic_1}-value"

schema_1_data = json.dumps({"schema": schema1_def})

self.logger.debug("Posting schema 1 as a subject value")
result_raw = self._post_subjects_subject_versions(subject=subject_0,
data=schema_1_data)

self.logger.debug(result_raw)
assert result_raw.status_code == requests.codes.ok
assert result_raw.json()["id"] == 1

self.logger.debug("Checking schema 1 subjects - expect subject_0")
assert check_schema_subjects(id=1, subjects=list([subject_0]))

self.logger.debug("Posting schema 1 as a subject value (subject_1)")
result_raw = self._post_subjects_subject_versions(subject=subject_1,
data=schema_1_data)

self.logger.debug(result_raw)
assert result_raw.status_code == requests.codes.ok
assert result_raw.json()["id"] == 1

self.logger.debug("Checking schema 1 subjects - expect subject_{0,1}")
assert check_schema_subjects(id=1,
subjects=list([subject_0, subject_1]))

self.logger.debug("Soft delete subject_0")
result_raw = self._delete_subject(subject=subject_0)
assert result_raw.status_code == requests.codes.ok

self.logger.debug("Check again, not including deleted")
assert check_schema_subjects(id=1, subjects=[subject_1])

self.logger.debug("Check including deleted")
assert check_schema_subjects(id=1,
subjects=[subject_0, subject_1],
deleted=True)

self.logger.debug("Hard delete subject_0")
result_raw = self._delete_subject(subject=subject_0, permanent=True)
assert result_raw.status_code == requests.codes.ok

self.logger.debug("Check including deleted - subject_0 should be gone")
assert check_schema_subjects(id=1, subjects=[subject_1], deleted=True)

@cluster(num_nodes=3)
def test_post_subjects_subject_versions(self):
"""
Expand Down

0 comments on commit 93a49f3

Please sign in to comment.