diff --git a/src/v/pandaproxy/api/api-doc/schema_registry.json b/src/v/pandaproxy/api/api-doc/schema_registry.json index 30a8e683af35..8e0f788ac6eb 100644 --- a/src/v/pandaproxy/api/api-doc/schema_registry.json +++ b/src/v/pandaproxy/api/api-doc/schema_registry.json @@ -84,6 +84,49 @@ } } } + }, + "delete": { + "summary": "Delete the compatibility level for a subject.", + "operationId": "delete_config_subject", + "consumes": [ + "application/vnd.schemaregistry.v1+json", + "application/vnd.schemaregistry+json", + "application/json" + ], + "parameters": [ + { + "name": "subject", + "in": "path", + "required": true, + "type": "string" + } + ], + "produces": ["application/vnd.schemaregistry.v1+json"], + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "object", + "properties": { + "compatibility": { + "type": "string" + } + } + } + }, + "404": { + "description": "Subject not found", + "schema": { + "$ref": "#/definitions/error_body" + } + }, + "500": { + "description": "Internal Server error", + "schema": { + "$ref": "#/definitions/error_body" + } + } + } } }, "/config": { diff --git a/src/v/pandaproxy/schema_registry/handlers.cc b/src/v/pandaproxy/schema_registry/handlers.cc index 10c811bb81a6..335980b1d8d2 100644 --- a/src/v/pandaproxy/schema_registry/handlers.cc +++ b/src/v/pandaproxy/schema_registry/handlers.cc @@ -17,6 +17,7 @@ #include "pandaproxy/parsing/httpd.h" #include "pandaproxy/reply.h" #include "pandaproxy/schema_registry/error.h" +#include "pandaproxy/schema_registry/errors.h" #include "pandaproxy/schema_registry/requests/compatibility.h" #include "pandaproxy/schema_registry/requests/config.h" #include "pandaproxy/schema_registry/requests/get_schemas_ids_id.h" @@ -175,6 +176,36 @@ put_config_subject(server::request_t rq, server::reply_t rp) { co_return rp; } +ss::future +delete_config_subject(server::request_t rq, server::reply_t rp) { + parse_content_type_header(rq); + parse_accept_header(rq, rp); + auto sub = parse::request_param(*rq.req, "subject"); + + rq.req.reset(); + + // ensure we see latest writes + co_await rq.service().writer().read_sync(); + + compatibility_level lvl{}; + try { + lvl = co_await rq.service().schema_store().get_compatibility( + sub, default_to_global::no); + } catch (const exception& e) { + if (e.code() == error_code::compatibility_not_found) { + throw as_exception(not_found(sub)); + } else { + throw; + } + } + + co_await rq.service().writer().delete_config(sub); + + auto json_rslt = ppj::rjson_serialize(get_config_req_rep{.compat = lvl}); + rp.rep->write_body("json", json_rslt); + co_return rp; +} + ss::future get_mode(server::request_t rq, server::reply_t rp) { parse_accept_header(rq, rp); rq.req.reset(); diff --git a/src/v/pandaproxy/schema_registry/handlers.h b/src/v/pandaproxy/schema_registry/handlers.h index 5b88ce43f1a9..85b323d6cd78 100644 --- a/src/v/pandaproxy/schema_registry/handlers.h +++ b/src/v/pandaproxy/schema_registry/handlers.h @@ -31,6 +31,9 @@ ss::future::reply_t> get_config_subject( ss::future::reply_t> put_config_subject( ctx_server::request_t rq, ctx_server::reply_t rp); +ss::future::reply_t> delete_config_subject( + ctx_server::request_t rq, ctx_server::reply_t rp); + ss::future::reply_t> get_mode(ctx_server::request_t rq, ctx_server::reply_t rp); diff --git a/src/v/pandaproxy/schema_registry/seq_writer.cc b/src/v/pandaproxy/schema_registry/seq_writer.cc index 9679d2cdd98e..81af844c527d 100644 --- a/src/v/pandaproxy/schema_registry/seq_writer.cc +++ b/src/v/pandaproxy/schema_registry/seq_writer.cc @@ -239,6 +239,73 @@ ss::future seq_writer::write_config( }); } +ss::future> seq_writer::do_delete_config( + subject sub, model::offset write_at, seq_writer& seq) { + vlog(plog.debug, "delete config sub={} offset={}", sub, write_at); + + try { + co_await seq._store.get_compatibility(sub, default_to_global::no); + } catch (const exception&) { + // subject config already blank + co_return false; + } + + std::vector sequences{ + co_await _store.get_subject_config_written_at(sub)}; + + storage::record_batch_builder rb{ + model::record_batch_type::raft_data, model::offset{0}}; + + std::vector keys; + for (const auto& s : sequences) { + vlog( + plog.debug, + "Deleting config: tombstoning config_key for sub={} at {}", + sub, + s); + + vassert( + s.key_type == seq_marker_key_type::config, + "Unexpected key type: {}", + s.key_type); + + auto key = config_key{.seq{s.seq}, .node{s.node}, .sub{sub}}; + keys.push_back(key); + rb.add_raw_kv(to_json_iobuf(std::move(key)), std::nullopt); + } + + auto ts_batch = std::move(rb).build(); + kafka::partition_produce_response res + = co_await _client.local().produce_record_batch( + model::schema_registry_internal_tp, std::move(ts_batch)); + + if (res.error_code != kafka::error_code::none) { + vlog( + plog.error, + "Error writing to subject topic: {} {}", + res.error_code, + res.error_message); + throw kafka::exception(res.error_code, *res.error_message); + } + + auto applier = consume_to_store(seq._store, seq); + auto offset = res.base_offset; + for (const auto& k : keys) { + co_await applier.apply(offset, k, std::nullopt); + seq.advance_offset_inner(offset); + ++offset; + } + + co_return true; +} + +ss::future seq_writer::delete_config(subject sub) { + return sequenced_write( + [this, sub{std::move(sub)}](model::offset write_at, seq_writer& seq) { + return do_delete_config(sub, write_at, seq); + }); +} + /// Impermanent delete: update a version with is_deleted=true ss::future> seq_writer::do_delete_subject_version( subject sub, diff --git a/src/v/pandaproxy/schema_registry/seq_writer.h b/src/v/pandaproxy/schema_registry/seq_writer.h index e776980f11f3..fd538b800644 100644 --- a/src/v/pandaproxy/schema_registry/seq_writer.h +++ b/src/v/pandaproxy/schema_registry/seq_writer.h @@ -48,6 +48,8 @@ class seq_writer final : public ss::peering_sharded_service { ss::future write_config(std::optional sub, compatibility_level compat); + ss::future delete_config(subject sub); + ss::future delete_subject_version(subject sub, schema_version version); @@ -76,6 +78,9 @@ class seq_writer final : public ss::peering_sharded_service { model::offset write_at, seq_writer& seq); + ss::future> + do_delete_config(subject sub, model::offset write_at, seq_writer& seq); + ss::future> do_delete_subject_version( subject sub, schema_version version, diff --git a/src/v/pandaproxy/schema_registry/service.cc b/src/v/pandaproxy/schema_registry/service.cc index 81ae390db313..6d26486dab68 100644 --- a/src/v/pandaproxy/schema_registry/service.cc +++ b/src/v/pandaproxy/schema_registry/service.cc @@ -103,6 +103,10 @@ server::routes_t get_schema_registry_routes(ss::gate& gate, one_shot& es) { ss::httpd::schema_registry_json::put_config_subject, wrap(gate, es, put_config_subject)}); + routes.routes.emplace_back(server::route_t{ + ss::httpd::schema_registry_json::delete_config_subject, + wrap(gate, es, delete_config_subject)}); + routes.routes.emplace_back(server::route_t{ ss::httpd::schema_registry_json::get_mode, wrap(gate, es, get_mode)}); diff --git a/src/v/pandaproxy/schema_registry/sharded_store.cc b/src/v/pandaproxy/schema_registry/sharded_store.cc index e1becfae68d9..67e15dc180a1 100644 --- a/src/v/pandaproxy/schema_registry/sharded_store.cc +++ b/src/v/pandaproxy/schema_registry/sharded_store.cc @@ -459,6 +459,15 @@ sharded_store::get_subject_written_at(subject sub) { }); } +ss::future> +sharded_store::get_subject_config_written_at(subject sub) { + auto sub_shard{shard_for(sub)}; + co_return co_await _store.invoke_on( + sub_shard, _smp_opts, [sub{std::move(sub)}](store& s) { + return s.store::get_subject_config_written_at(sub).value(); + }); +} + ss::future> sharded_store::get_subject_version_written_at(subject sub, schema_version ver) { auto sub_shard{shard_for(sub)}; @@ -510,11 +519,12 @@ ss::future sharded_store::set_compatibility( }); } -ss::future sharded_store::clear_compatibility(subject sub) { +ss::future +sharded_store::clear_compatibility(seq_marker marker, subject sub) { auto sub_shard{shard_for(sub)}; co_return co_await _store.invoke_on( - sub_shard, _smp_opts, [sub{std::move(sub)}](store& s) { - return s.clear_compatibility(sub).value(); + sub_shard, _smp_opts, [marker, sub{std::move(sub)}](store& s) { + return s.clear_compatibility(marker, sub).value(); }); } diff --git a/src/v/pandaproxy/schema_registry/sharded_store.h b/src/v/pandaproxy/schema_registry/sharded_store.h index f9b317154123..c6473f0e6bc6 100644 --- a/src/v/pandaproxy/schema_registry/sharded_store.h +++ b/src/v/pandaproxy/schema_registry/sharded_store.h @@ -103,6 +103,11 @@ class sharded_store { ///\brief Get sequence number history (errors out if not soft-deleted) ss::future> get_subject_written_at(subject sub); + ///\brief Get sequence number history of subject config. Subject need + /// not be soft-deleted first + ss::future> + get_subject_config_written_at(subject sub); + ///\brief Get sequence number history (errors out if not soft-deleted) ss::future> get_subject_version_written_at(subject sub, schema_version version); @@ -126,7 +131,7 @@ class sharded_store { seq_marker marker, subject sub, compatibility_level compatibility); ///\brief Clear the compatibility level for a subject. - ss::future clear_compatibility(subject sub); + ss::future clear_compatibility(seq_marker marker, subject sub); ///\brief Check if the provided schema is compatible with the subject and /// version, according the the current compatibility. diff --git a/src/v/pandaproxy/schema_registry/storage.h b/src/v/pandaproxy/schema_registry/storage.h index 38aa1121930d..2028ec309b02 100644 --- a/src/v/pandaproxy/schema_registry/storage.h +++ b/src/v/pandaproxy/schema_registry/storage.h @@ -1286,19 +1286,31 @@ struct consume_to_store { } try { vlog(plog.debug, "Applying: {}", key); - if (!val) { - co_await _store.clear_compatibility(*key.sub); - } else if (key.sub) { - co_await _store.set_compatibility( - seq_marker{ - .seq = key.seq, - .node = key.node, - .version{invalid_schema_version}, // Not applicable - .key_type = seq_marker_key_type::config}, - *key.sub, - val->compat); - } else { + if (key.sub.has_value()) { + if (!val.has_value()) { + co_await _store.clear_compatibility( + seq_marker{ + .seq = key.seq, + .node = key.node, + .version{invalid_schema_version}, // Not applicable + .key_type = seq_marker_key_type::config}, + *key.sub); + } else { + co_await _store.set_compatibility( + seq_marker{ + .seq = key.seq, + .node = key.node, + .version{invalid_schema_version}, // Not applicable + .key_type = seq_marker_key_type::config}, + *key.sub, + val->compat); + } + } else if (val.has_value()) { co_await _store.set_compatibility(val->compat); + } else { + vlog( + plog.warn, + "Tried to apply config with neither subject nor value"); } } catch (const exception& e) { vlog(plog.debug, "Error replaying: {}: {}", key, e); diff --git a/src/v/pandaproxy/schema_registry/store.h b/src/v/pandaproxy/schema_registry/store.h index afa59045e154..456424e64651 100644 --- a/src/v/pandaproxy/schema_registry/store.h +++ b/src/v/pandaproxy/schema_registry/store.h @@ -246,6 +246,34 @@ class store { } } + /// \brief Return the seq_marker write history of a subject, but only + /// config_keys + /// + /// \return A vector (possibly empty) + result> + get_subject_config_written_at(const subject& sub) const { + auto sub_it = BOOST_OUTCOME_TRYX( + get_subject_iter(sub, include_deleted::yes)); + + // This should never happen (how can a record get into the + // store without an originating sequenced record?), but return + // an error instead of vasserting out. + if (sub_it->second.written_at.empty()) { + return not_found(sub); + } + + std::vector result; + std::copy_if( + sub_it->second.written_at.begin(), + sub_it->second.written_at.end(), + std::back_inserter(result), + [](const auto& sm) { + return sm.key_type == seq_marker_key_type::config; + }); + + return result; + } + /// \brief Return the seq_marker write history of a version. /// /// \return A vector with at least one element @@ -471,9 +499,11 @@ class store { } ///\brief Clear the compatibility level for a subject. - result clear_compatibility(const subject& sub) { + result + clear_compatibility(const seq_marker& marker, const subject& sub) { auto sub_it = BOOST_OUTCOME_TRYX( get_subject_iter(sub, include_deleted::yes)); + std::erase(sub_it->second.written_at, marker); return std::exchange(sub_it->second.compatibility, std::nullopt) != std::nullopt; } diff --git a/src/v/pandaproxy/schema_registry/test/store.cc b/src/v/pandaproxy/schema_registry/test/store.cc index 1476b8a37457..74f424cfc267 100644 --- a/src/v/pandaproxy/schema_registry/test/store.cc +++ b/src/v/pandaproxy/schema_registry/test/store.cc @@ -502,7 +502,8 @@ BOOST_AUTO_TEST_CASE(test_store_subject_compat) { BOOST_REQUIRE(s.get_compatibility().value() == global_expected); // Clearing compatibility should fallback to global - BOOST_REQUIRE(s.clear_compatibility(subject0).value() == true); + BOOST_REQUIRE( + s.clear_compatibility(dummy_marker, subject0).value() == true); BOOST_REQUIRE( s.get_compatibility(subject0, fallback).value() == global_expected); } @@ -615,7 +616,7 @@ BOOST_AUTO_TEST_CASE(test_store_delete_subject) { d_res.error().code(), pps::error_code::subject_soft_deleted); // Clearing the compatibility of a soft-deleted subject is allowed - BOOST_REQUIRE(s.clear_compatibility(subject0).has_value()); + BOOST_REQUIRE(s.clear_compatibility(dummy_marker, subject0).has_value()); v_res = s.get_versions(subject0, pps::include_deleted::yes); BOOST_REQUIRE(v_res.has_value()); @@ -653,7 +654,7 @@ BOOST_AUTO_TEST_CASE(test_store_delete_subject) { // Clearing the compatibility of a hard-deleted subject should fail BOOST_REQUIRE( - s.clear_compatibility(subject0).error().code() + s.clear_compatibility(dummy_marker, subject0).error().code() == pps::error_code::subject_not_found); } diff --git a/src/v/pandaproxy/schema_registry/types.h b/src/v/pandaproxy/schema_registry/types.h index da0cdeae7bfe..69a2ecb6eaeb 100644 --- a/src/v/pandaproxy/schema_registry/types.h +++ b/src/v/pandaproxy/schema_registry/types.h @@ -315,6 +315,11 @@ struct seq_marker { schema_version version; seq_marker_key_type key_type{seq_marker_key_type::invalid}; + // Note that matching nullopts is possible on the seq and node fields. + // This is intentional; both fields are particular to redpanda, so making + // them optional provides compatibility with non-rp schema registries. If + // either is not present, we can assume a collision has not occurred. + friend bool operator==(const seq_marker&, const seq_marker&) = default; friend std::ostream& operator<<(std::ostream& os, const seq_marker& v); }; diff --git a/tests/rptest/tests/schema_registry_test.py b/tests/rptest/tests/schema_registry_test.py index 0d81c839d28e..8e37ab885911 100644 --- a/tests/rptest/tests/schema_registry_test.py +++ b/tests/rptest/tests/schema_registry_test.py @@ -304,6 +304,15 @@ def _set_config_subject(self, data=data, **kwargs) + def _delete_config_subject(self, + subject, + headers=HTTP_POST_HEADERS, + **kwargs): + return self._request("DELETE", + f"config/{subject}", + headers=headers, + **kwargs) + def _get_mode(self, headers=HTTP_GET_HEADERS, **kwargs): return self._request("GET", "mode", headers=headers, **kwargs) @@ -883,6 +892,54 @@ def test_config(self): result_raw = self._get_config_subject(subject=f"{topic}-key") assert result_raw.json()["compatibilityLevel"] == "BACKWARD_TRANSITIVE" + prev_compat = result_raw.json()["compatibilityLevel"] + global_config = self._get_config().json() + + result_raw = self._delete_config_subject(subject=f"{topic}-key") + assert result_raw.json( + )["compatibilityLevel"] == prev_compat, f"{json.dumps(result_raw.json(), indent=1)}" + + self.logger.debug("Second DELETE should return 40401") + result_raw = self._delete_config_subject(subject=f"{topic}-key") + assert result_raw.status_code == requests.codes.not_found, result_raw.status_code + assert result_raw.json( + )["error_code"] == 40401, f"Wrong err code: {result_raw.json()}" + assert result_raw.json( + )["message"] == f"Subject '{topic}-key' not found.", f"{json.dumps(result_raw.json(), indent=1)}" + + self.logger.debug( + "GET config/{subject} should indicate missing subject-level config" + ) + result_raw = self._get_config_subject(subject=f"{topic}-key") + assert result_raw.status_code == requests.codes.not_found + assert result_raw.json()["error_code"] == 40408 + assert result_raw.json( + )["message"] == f"Subject '{topic}-key' does not have subject-level compatibility configured" + + result_raw = self._get_config_subject(subject=f"{topic}-key", + fallback=True) + assert result_raw.json( + )["compatibilityLevel"] == global_config["compatibilityLevel"] + + self.logger.debug( + "Subject compatibility should reflect the new global config") + global_config = self._set_config( + data=json.dumps({"compatibility": "NONE"})) + assert global_config.json()["compatibility"] == "NONE" + + result_raw = self._get_config_subject(subject=f"{topic}-key", + fallback=True) + assert result_raw.json()["compatibilityLevel"] == global_config.json( + )["compatibility"] + + self.logger.debug("DELETE on non-existant subject should 404") + result_raw = self._delete_config_subject(subject=f"foo-key") + assert result_raw.status_code == requests.codes.not_found, result_raw.status_code + assert result_raw.json( + )["error_code"] == 40401, f"Wrong err code: {result_raw.json()}" + assert result_raw.json( + )["message"] == f"Subject 'foo-key' not found.", f"{json.dumps(result_raw.json(), indent=1)}" + @cluster(num_nodes=3) def test_mode(self): """ @@ -1927,6 +1984,25 @@ def test_config(self): super_password)) assert result_raw.json()["compatibilityLevel"] == "BACKWARD_TRANSITIVE" + global_config = self._get_config(auth=(super_username, + super_password)).json() + + old_config = result_raw.json() + + result_raw = self._delete_config_subject(subject=f"{topic}-key", + auth=(super_username, + super_password)) + assert result_raw.json( + )["compatibilityLevel"] == old_config["compatibilityLevel"] + #, f"{json.dumps(result_raw.json(), indent=1)}, {json.dumps(global_config, indent=1)}" + + result_raw = self._get_config_subject(subject=f"{topic}-key", + fallback=True, + auth=(super_username, + super_password)) + assert result_raw.json( + )["compatibilityLevel"] == global_config["compatibilityLevel"] + @cluster(num_nodes=3) def test_mode(self): """