Skip to content

Commit

Permalink
Merge pull request #19973 from vbotbuildovich/backport-pr-19944-v24.1…
Browse files Browse the repository at this point in the history
….x-85

[v24.1.x] [CORE-3008] schema_registry: Improve handling of deleted schema
  • Loading branch information
BenPope authored Jun 25, 2024
2 parents cc72235 + 1d07730 commit 4dde709
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 4 deletions.
7 changes: 7 additions & 0 deletions src/v/pandaproxy/schema_registry/seq_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,13 @@ ss::future<std::optional<bool>> seq_writer::do_delete_subject_version(
batch_builder rb(write_at, sub);
rb(std::move(key), std::move(value));

{
// Clear config if this is a delete of the last version
auto vec = co_await _store.get_versions(sub, include_deleted::no);
if (vec.size() == 1 && vec.front() == version) {
rb(co_await _store.get_subject_config_written_at(sub));
}
}
if (co_await produce_and_apply(write_at, std::move(rb).build())) {
co_return true;
} else {
Expand Down
13 changes: 10 additions & 3 deletions src/v/pandaproxy/schema_registry/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,14 @@ class store {
result<std::vector<subject_version_entry>>
get_version_ids(const subject& sub, include_deleted inc_del) const {
auto sub_it = BOOST_OUTCOME_TRYX(get_subject_iter(sub, inc_del));
return sub_it->second.versions;
std::vector<subject_version_entry> res;
absl::c_copy_if(
sub_it->second.versions,
std::back_inserter(res),
[inc_del](const subject_version_entry& e) {
return inc_del || !e.deleted;
});
return {std::move(res)};
}

///\brief Return whether this subject has a version that references the
Expand All @@ -391,8 +398,8 @@ class store {
const subject& sub, schema_id id, include_deleted inc_del) const {
auto sub_it = BOOST_OUTCOME_TRYX(get_subject_iter(sub, inc_del));
const auto& vs = sub_it->second.versions;
return std::any_of(vs.cbegin(), vs.cend(), [id](const auto& entry) {
return entry.id == id;
return absl::c_any_of(vs, [id, inc_del](const auto& entry) {
return entry.id == id && (inc_del || !entry.deleted);
});
}

Expand Down
132 changes: 131 additions & 1 deletion tests/rptest/tests/schema_registry_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1105,7 +1105,18 @@ def test_post_compatibility_subject_version(self):
version=1)
assert result_raw.status_code == requests.codes.ok

self.logger.debug("Posting schema 1 again, expect same version")
self.logger.debug("Posting schema 1 again, expect incompatible")
result_raw = self._post_subjects_subject_versions(
subject=f"{topic}-key", data=schema_1_data)
assert result_raw.status_code == requests.codes.conflict

self.logger.debug("Set subject config - NONE")
result_raw = self._set_config_subject(subject=f"{topic}-key",
data=json.dumps(
{"compatibility": "NONE"}))
assert result_raw.status_code == requests.codes.ok

self.logger.debug("Posting schema 1 again, expect same id")
result_raw = self._post_subjects_subject_versions(
subject=f"{topic}-key", data=schema_1_data)
assert result_raw.status_code == requests.codes.ok
Expand Down Expand Up @@ -2649,6 +2660,125 @@ def test_protobuf(self):
assert result_raw.status_code == requests.codes.ok
assert result_raw.json() == [2]

@cluster(num_nodes=3)
def test_delete_subject_bug(self):
topic = 'foo'
self.logger.debug(f"Register a schema against a subject")
schema_1_data = json.dumps({"schema": schema1_def})

self.logger.debug("Posting schema 1 as a subject key")
result_raw = self._post_subjects_subject_versions(
subject=f"{topic}-key", data=schema_1_data, auth=self.super_auth)
self.logger.debug(result_raw)
assert result_raw.status_code == requests.codes.ok

self.logger.debug(f"Register a schema against a subject")
schema_2_data = json.dumps({"schema": schema2_def})

self.logger.debug("Posting schema 2 as a subject key")
result_raw = self._post_subjects_subject_versions(
subject=f"{topic}-key", data=schema_2_data, auth=self.super_auth)
self.logger.debug(result_raw)
assert result_raw.status_code == requests.codes.ok

self.logger.debug("Soft delete subject 1 version 1")
result_raw = self._delete_subject_version(subject=f"{topic}-key",
version=1,
auth=self.super_auth)
assert result_raw.status_code == requests.codes.ok, f'Code: {result_raw.status_code}'
assert result_raw.json() == 1, f"Json: {result_raw.json()}"

self.logger.debug("Soft delete subject 1 version 2")
result_raw = self._delete_subject_version(subject=f"{topic}-key",
version=2,
auth=self.super_auth)
assert result_raw.status_code == requests.codes.ok, f'Code: {result_raw.status_code}'
assert result_raw.json() == 2, f"Json: {result_raw.json()}"

self.logger.debug("Posting schema 1 - again - as a subject key")
result_raw = self._post_subjects_subject_versions(
subject=f"{topic}-key", data=schema_1_data, auth=self.super_auth)
self.logger.debug(result_raw)
assert result_raw.status_code == requests.codes.ok
assert result_raw.json() == {'id': 1}, f"Json: {result_raw.json()}"

self.logger.debug("Get subject versions")
result_raw = self._get_subjects_subject_versions(
subject=f"{topic}-key", auth=self.super_auth)
assert result_raw.status_code == requests.codes.ok, f'Code: {result_raw.status_code}'
assert result_raw.json() == [3], f"Json: {result_raw.json()}"

self.logger.debug("Posting schema 2 - again - as a subject key")
result_raw = self._post_subjects_subject_versions(
subject=f"{topic}-key", data=schema_2_data, auth=self.super_auth)
self.logger.debug(result_raw)
assert result_raw.status_code == requests.codes.ok
assert result_raw.json() == {'id': 2}, f"Json: {result_raw.json()}"

self.logger.debug("Get subject versions")
result_raw = self._get_subjects_subject_versions(
subject=f"{topic}-key", auth=self.super_auth)
assert result_raw.status_code == requests.codes.ok, f'Code: {result_raw.status_code}'
assert result_raw.json() == [3, 4], f"Json: {result_raw.json()}"

@cluster(num_nodes=3)
def test_delete_subject_last_clears_config(self):
topic = 'foo'

self.logger.debug("Set subject config - NONE")
result_raw = self._set_config_subject(subject=f"{topic}-key",
data=json.dumps(
{"compatibility": "NONE"}),
auth=self.super_auth)
assert result_raw.status_code == requests.codes.ok

self.logger.debug(f"Register a schema against a subject")
schema_1_data = json.dumps({"schema": schema1_def})
schema_3_data = json.dumps({"schema": schema3_def})

self.logger.debug("Posting schema 1 as a subject key")
result_raw = self._post_subjects_subject_versions(
subject=f"{topic}-key", data=schema_1_data, auth=self.super_auth)
self.logger.debug(result_raw)
assert result_raw.status_code == requests.codes.ok

self.logger.debug("Get subject config - should be overriden")
result_raw = self._get_config_subject(subject=f"{topic}-key",
auth=self.super_auth)
assert result_raw.json()["compatibilityLevel"] == "NONE"

self.logger.debug("Soft delete subject 1 version 1")
result_raw = self._delete_subject_version(subject=f"{topic}-key",
version=1,
auth=self.super_auth)
assert result_raw.status_code == requests.codes.ok, f'Code: {result_raw.status_code}'
assert result_raw.json() == 1, f"Json: {result_raw.json()}"

self.logger.debug("Get subject config - should fail")
result_raw = self._get_config_subject(subject=f"{topic}-key",
auth=self.super_auth)
assert result_raw.status_code == requests.codes.not_found
assert result_raw.json()["error_code"] == 40408

self.logger.debug("Posting schema 1 as a subject key")
result_raw = self._post_subjects_subject_versions(
subject=f"{topic}-key", data=schema_1_data, auth=self.super_auth)
self.logger.debug(result_raw)
assert result_raw.status_code == requests.codes.ok

self.logger.debug(
"Posting incompatible schema 3 as a subject key - expect conflict")
result_raw = self._post_subjects_subject_versions(
subject=f"{topic}-key", data=schema_3_data, auth=self.super_auth)
self.logger.debug(result_raw)
assert result_raw.status_code == requests.codes.conflict

self.logger.debug("Get subject config - should fail")
result_raw = self._get_config_subject(subject=f"{topic}-key",
auth=self.super_auth)
assert result_raw.status_code == requests.codes.not_found
assert result_raw.json()["error_code"] == 40408


class SchemaRegistryTest(SchemaRegistryTestMethods):
"""
Expand Down

0 comments on commit 4dde709

Please sign in to comment.