diff --git a/src/v/pandaproxy/schema_registry/seq_writer.cc b/src/v/pandaproxy/schema_registry/seq_writer.cc index e5d66ab47bcd3..7fb68c39ad916 100644 --- a/src/v/pandaproxy/schema_registry/seq_writer.cc +++ b/src/v/pandaproxy/schema_registry/seq_writer.cc @@ -444,6 +444,13 @@ ss::future> seq_writer::do_delete_subject_version( batch_builder rb(write_at, sub); rb(std::move(key), std::move(value)); + { + // Clear config if this is a delete of the last version + auto vec = co_await _store.get_versions(sub, include_deleted::no); + if (vec.size() == 1 && vec.front() == version) { + rb(co_await _store.get_subject_config_written_at(sub)); + } + } if (co_await produce_and_apply(write_at, std::move(rb).build())) { co_return true; } else { diff --git a/src/v/pandaproxy/schema_registry/store.h b/src/v/pandaproxy/schema_registry/store.h index 906f1c19937ec..85203c293272b 100644 --- a/src/v/pandaproxy/schema_registry/store.h +++ b/src/v/pandaproxy/schema_registry/store.h @@ -382,7 +382,14 @@ class store { result> get_version_ids(const subject& sub, include_deleted inc_del) const { auto sub_it = BOOST_OUTCOME_TRYX(get_subject_iter(sub, inc_del)); - return sub_it->second.versions; + std::vector res; + absl::c_copy_if( + sub_it->second.versions, + std::back_inserter(res), + [inc_del](const subject_version_entry& e) { + return inc_del || !e.deleted; + }); + return {std::move(res)}; } ///\brief Return whether this subject has a version that references the @@ -391,8 +398,8 @@ class store { const subject& sub, schema_id id, include_deleted inc_del) const { auto sub_it = BOOST_OUTCOME_TRYX(get_subject_iter(sub, inc_del)); const auto& vs = sub_it->second.versions; - return std::any_of(vs.cbegin(), vs.cend(), [id](const auto& entry) { - return entry.id == id; + return absl::c_any_of(vs, [id, inc_del](const auto& entry) { + return entry.id == id && (inc_del || !entry.deleted); }); } diff --git a/tests/rptest/tests/schema_registry_test.py b/tests/rptest/tests/schema_registry_test.py index 983ddeed09800..bfe58e1b23ba1 100644 --- a/tests/rptest/tests/schema_registry_test.py +++ b/tests/rptest/tests/schema_registry_test.py @@ -1105,7 +1105,18 @@ def test_post_compatibility_subject_version(self): version=1) assert result_raw.status_code == requests.codes.ok - self.logger.debug("Posting schema 1 again, expect same version") + self.logger.debug("Posting schema 1 again, expect incompatible") + result_raw = self._post_subjects_subject_versions( + subject=f"{topic}-key", data=schema_1_data) + assert result_raw.status_code == requests.codes.conflict + + self.logger.debug("Set subject config - NONE") + result_raw = self._set_config_subject(subject=f"{topic}-key", + data=json.dumps( + {"compatibility": "NONE"})) + assert result_raw.status_code == requests.codes.ok + + self.logger.debug("Posting schema 1 again, expect same id") result_raw = self._post_subjects_subject_versions( subject=f"{topic}-key", data=schema_1_data) assert result_raw.status_code == requests.codes.ok @@ -2649,6 +2660,125 @@ def test_protobuf(self): assert result_raw.status_code == requests.codes.ok assert result_raw.json() == [2] + @cluster(num_nodes=3) + def test_delete_subject_bug(self): + topic = 'foo' + self.logger.debug(f"Register a schema against a subject") + schema_1_data = json.dumps({"schema": schema1_def}) + + self.logger.debug("Posting schema 1 as a subject key") + result_raw = self._post_subjects_subject_versions( + subject=f"{topic}-key", data=schema_1_data, auth=self.super_auth) + self.logger.debug(result_raw) + assert result_raw.status_code == requests.codes.ok + + self.logger.debug(f"Register a schema against a subject") + schema_2_data = json.dumps({"schema": schema2_def}) + + self.logger.debug("Posting schema 2 as a subject key") + result_raw = self._post_subjects_subject_versions( + subject=f"{topic}-key", data=schema_2_data, auth=self.super_auth) + self.logger.debug(result_raw) + assert result_raw.status_code == requests.codes.ok + + self.logger.debug("Soft delete subject 1 version 1") + result_raw = self._delete_subject_version(subject=f"{topic}-key", + version=1, + auth=self.super_auth) + assert result_raw.status_code == requests.codes.ok, f'Code: {result_raw.status_code}' + assert result_raw.json() == 1, f"Json: {result_raw.json()}" + + self.logger.debug("Soft delete subject 1 version 2") + result_raw = self._delete_subject_version(subject=f"{topic}-key", + version=2, + auth=self.super_auth) + assert result_raw.status_code == requests.codes.ok, f'Code: {result_raw.status_code}' + assert result_raw.json() == 2, f"Json: {result_raw.json()}" + + self.logger.debug("Posting schema 1 - again - as a subject key") + result_raw = self._post_subjects_subject_versions( + subject=f"{topic}-key", data=schema_1_data, auth=self.super_auth) + self.logger.debug(result_raw) + assert result_raw.status_code == requests.codes.ok + assert result_raw.json() == {'id': 1}, f"Json: {result_raw.json()}" + + self.logger.debug("Get subject versions") + result_raw = self._get_subjects_subject_versions( + subject=f"{topic}-key", auth=self.super_auth) + assert result_raw.status_code == requests.codes.ok, f'Code: {result_raw.status_code}' + assert result_raw.json() == [3], f"Json: {result_raw.json()}" + + self.logger.debug("Posting schema 2 - again - as a subject key") + result_raw = self._post_subjects_subject_versions( + subject=f"{topic}-key", data=schema_2_data, auth=self.super_auth) + self.logger.debug(result_raw) + assert result_raw.status_code == requests.codes.ok + assert result_raw.json() == {'id': 2}, f"Json: {result_raw.json()}" + + self.logger.debug("Get subject versions") + result_raw = self._get_subjects_subject_versions( + subject=f"{topic}-key", auth=self.super_auth) + assert result_raw.status_code == requests.codes.ok, f'Code: {result_raw.status_code}' + assert result_raw.json() == [3, 4], f"Json: {result_raw.json()}" + + @cluster(num_nodes=3) + def test_delete_subject_last_clears_config(self): + topic = 'foo' + + self.logger.debug("Set subject config - NONE") + result_raw = self._set_config_subject(subject=f"{topic}-key", + data=json.dumps( + {"compatibility": "NONE"}), + auth=self.super_auth) + assert result_raw.status_code == requests.codes.ok + + self.logger.debug(f"Register a schema against a subject") + schema_1_data = json.dumps({"schema": schema1_def}) + schema_3_data = json.dumps({"schema": schema3_def}) + + self.logger.debug("Posting schema 1 as a subject key") + result_raw = self._post_subjects_subject_versions( + subject=f"{topic}-key", data=schema_1_data, auth=self.super_auth) + self.logger.debug(result_raw) + assert result_raw.status_code == requests.codes.ok + + self.logger.debug("Get subject config - should be overriden") + result_raw = self._get_config_subject(subject=f"{topic}-key", + auth=self.super_auth) + assert result_raw.json()["compatibilityLevel"] == "NONE" + + self.logger.debug("Soft delete subject 1 version 1") + result_raw = self._delete_subject_version(subject=f"{topic}-key", + version=1, + auth=self.super_auth) + assert result_raw.status_code == requests.codes.ok, f'Code: {result_raw.status_code}' + assert result_raw.json() == 1, f"Json: {result_raw.json()}" + + self.logger.debug("Get subject config - should fail") + result_raw = self._get_config_subject(subject=f"{topic}-key", + auth=self.super_auth) + assert result_raw.status_code == requests.codes.not_found + assert result_raw.json()["error_code"] == 40408 + + self.logger.debug("Posting schema 1 as a subject key") + result_raw = self._post_subjects_subject_versions( + subject=f"{topic}-key", data=schema_1_data, auth=self.super_auth) + self.logger.debug(result_raw) + assert result_raw.status_code == requests.codes.ok + + self.logger.debug( + "Posting incompatible schema 3 as a subject key - expect conflict") + result_raw = self._post_subjects_subject_versions( + subject=f"{topic}-key", data=schema_3_data, auth=self.super_auth) + self.logger.debug(result_raw) + assert result_raw.status_code == requests.codes.conflict + + self.logger.debug("Get subject config - should fail") + result_raw = self._get_config_subject(subject=f"{topic}-key", + auth=self.super_auth) + assert result_raw.status_code == requests.codes.not_found + assert result_raw.json()["error_code"] == 40408 + class SchemaRegistryTest(SchemaRegistryTestMethods): """