Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v24.1.x] [CORE-3008] schema_registry: Improve handling of deleted schema #19973

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/v/pandaproxy/schema_registry/seq_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,13 @@ ss::future<std::optional<bool>> seq_writer::do_delete_subject_version(
batch_builder rb(write_at, sub);
rb(std::move(key), std::move(value));

{
// Clear config if this is a delete of the last version
auto vec = co_await _store.get_versions(sub, include_deleted::no);
if (vec.size() == 1 && vec.front() == version) {
rb(co_await _store.get_subject_config_written_at(sub));
}
}
if (co_await produce_and_apply(write_at, std::move(rb).build())) {
co_return true;
} else {
Expand Down
13 changes: 10 additions & 3 deletions src/v/pandaproxy/schema_registry/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,14 @@ class store {
result<std::vector<subject_version_entry>>
get_version_ids(const subject& sub, include_deleted inc_del) const {
auto sub_it = BOOST_OUTCOME_TRYX(get_subject_iter(sub, inc_del));
return sub_it->second.versions;
std::vector<subject_version_entry> res;
absl::c_copy_if(
sub_it->second.versions,
std::back_inserter(res),
[inc_del](const subject_version_entry& e) {
return inc_del || !e.deleted;
});
return {std::move(res)};
}

///\brief Return whether this subject has a version that references the
Expand All @@ -391,8 +398,8 @@ class store {
const subject& sub, schema_id id, include_deleted inc_del) const {
auto sub_it = BOOST_OUTCOME_TRYX(get_subject_iter(sub, inc_del));
const auto& vs = sub_it->second.versions;
return std::any_of(vs.cbegin(), vs.cend(), [id](const auto& entry) {
return entry.id == id;
return absl::c_any_of(vs, [id, inc_del](const auto& entry) {
return entry.id == id && (inc_del || !entry.deleted);
});
}

Expand Down
132 changes: 131 additions & 1 deletion tests/rptest/tests/schema_registry_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1105,7 +1105,18 @@ def test_post_compatibility_subject_version(self):
version=1)
assert result_raw.status_code == requests.codes.ok

self.logger.debug("Posting schema 1 again, expect same version")
self.logger.debug("Posting schema 1 again, expect incompatible")
result_raw = self._post_subjects_subject_versions(
subject=f"{topic}-key", data=schema_1_data)
assert result_raw.status_code == requests.codes.conflict

self.logger.debug("Set subject config - NONE")
result_raw = self._set_config_subject(subject=f"{topic}-key",
data=json.dumps(
{"compatibility": "NONE"}))
assert result_raw.status_code == requests.codes.ok

self.logger.debug("Posting schema 1 again, expect same id")
result_raw = self._post_subjects_subject_versions(
subject=f"{topic}-key", data=schema_1_data)
assert result_raw.status_code == requests.codes.ok
Expand Down Expand Up @@ -2649,6 +2660,125 @@ def test_protobuf(self):
assert result_raw.status_code == requests.codes.ok
assert result_raw.json() == [2]

@cluster(num_nodes=3)
def test_delete_subject_bug(self):
topic = 'foo'
self.logger.debug(f"Register a schema against a subject")
schema_1_data = json.dumps({"schema": schema1_def})

self.logger.debug("Posting schema 1 as a subject key")
result_raw = self._post_subjects_subject_versions(
subject=f"{topic}-key", data=schema_1_data, auth=self.super_auth)
self.logger.debug(result_raw)
assert result_raw.status_code == requests.codes.ok

self.logger.debug(f"Register a schema against a subject")
schema_2_data = json.dumps({"schema": schema2_def})

self.logger.debug("Posting schema 2 as a subject key")
result_raw = self._post_subjects_subject_versions(
subject=f"{topic}-key", data=schema_2_data, auth=self.super_auth)
self.logger.debug(result_raw)
assert result_raw.status_code == requests.codes.ok

self.logger.debug("Soft delete subject 1 version 1")
result_raw = self._delete_subject_version(subject=f"{topic}-key",
version=1,
auth=self.super_auth)
assert result_raw.status_code == requests.codes.ok, f'Code: {result_raw.status_code}'
assert result_raw.json() == 1, f"Json: {result_raw.json()}"

self.logger.debug("Soft delete subject 1 version 2")
result_raw = self._delete_subject_version(subject=f"{topic}-key",
version=2,
auth=self.super_auth)
assert result_raw.status_code == requests.codes.ok, f'Code: {result_raw.status_code}'
assert result_raw.json() == 2, f"Json: {result_raw.json()}"

self.logger.debug("Posting schema 1 - again - as a subject key")
result_raw = self._post_subjects_subject_versions(
subject=f"{topic}-key", data=schema_1_data, auth=self.super_auth)
self.logger.debug(result_raw)
assert result_raw.status_code == requests.codes.ok
assert result_raw.json() == {'id': 1}, f"Json: {result_raw.json()}"

self.logger.debug("Get subject versions")
result_raw = self._get_subjects_subject_versions(
subject=f"{topic}-key", auth=self.super_auth)
assert result_raw.status_code == requests.codes.ok, f'Code: {result_raw.status_code}'
assert result_raw.json() == [3], f"Json: {result_raw.json()}"

self.logger.debug("Posting schema 2 - again - as a subject key")
result_raw = self._post_subjects_subject_versions(
subject=f"{topic}-key", data=schema_2_data, auth=self.super_auth)
self.logger.debug(result_raw)
assert result_raw.status_code == requests.codes.ok
assert result_raw.json() == {'id': 2}, f"Json: {result_raw.json()}"

self.logger.debug("Get subject versions")
result_raw = self._get_subjects_subject_versions(
subject=f"{topic}-key", auth=self.super_auth)
assert result_raw.status_code == requests.codes.ok, f'Code: {result_raw.status_code}'
assert result_raw.json() == [3, 4], f"Json: {result_raw.json()}"

@cluster(num_nodes=3)
def test_delete_subject_last_clears_config(self):
topic = 'foo'

self.logger.debug("Set subject config - NONE")
result_raw = self._set_config_subject(subject=f"{topic}-key",
data=json.dumps(
{"compatibility": "NONE"}),
auth=self.super_auth)
assert result_raw.status_code == requests.codes.ok

self.logger.debug(f"Register a schema against a subject")
schema_1_data = json.dumps({"schema": schema1_def})
schema_3_data = json.dumps({"schema": schema3_def})

self.logger.debug("Posting schema 1 as a subject key")
result_raw = self._post_subjects_subject_versions(
subject=f"{topic}-key", data=schema_1_data, auth=self.super_auth)
self.logger.debug(result_raw)
assert result_raw.status_code == requests.codes.ok

self.logger.debug("Get subject config - should be overriden")
result_raw = self._get_config_subject(subject=f"{topic}-key",
auth=self.super_auth)
assert result_raw.json()["compatibilityLevel"] == "NONE"

self.logger.debug("Soft delete subject 1 version 1")
result_raw = self._delete_subject_version(subject=f"{topic}-key",
version=1,
auth=self.super_auth)
assert result_raw.status_code == requests.codes.ok, f'Code: {result_raw.status_code}'
assert result_raw.json() == 1, f"Json: {result_raw.json()}"

self.logger.debug("Get subject config - should fail")
result_raw = self._get_config_subject(subject=f"{topic}-key",
auth=self.super_auth)
assert result_raw.status_code == requests.codes.not_found
assert result_raw.json()["error_code"] == 40408

self.logger.debug("Posting schema 1 as a subject key")
result_raw = self._post_subjects_subject_versions(
subject=f"{topic}-key", data=schema_1_data, auth=self.super_auth)
self.logger.debug(result_raw)
assert result_raw.status_code == requests.codes.ok

self.logger.debug(
"Posting incompatible schema 3 as a subject key - expect conflict")
result_raw = self._post_subjects_subject_versions(
subject=f"{topic}-key", data=schema_3_data, auth=self.super_auth)
self.logger.debug(result_raw)
assert result_raw.status_code == requests.codes.conflict

self.logger.debug("Get subject config - should fail")
result_raw = self._get_config_subject(subject=f"{topic}-key",
auth=self.super_auth)
assert result_raw.status_code == requests.codes.not_found
assert result_raw.json()["error_code"] == 40408


class SchemaRegistryTest(SchemaRegistryTestMethods):
"""
Expand Down
Loading