Skip to content

Commit

Permalink
Merge pull request redpanda-data#13557 from oleiman/delete-subject-co…
Browse files Browse the repository at this point in the history
…mpat

schema_registry: Add support for `DELETE /config/{subject}`
  • Loading branch information
oleiman authored Oct 23, 2023
2 parents c32a0ce + df1a49d commit 4600a7a
Show file tree
Hide file tree
Showing 13 changed files with 312 additions and 20 deletions.
43 changes: 43 additions & 0 deletions src/v/pandaproxy/api/api-doc/schema_registry.json
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,49 @@
}
}
}
},
"delete": {
"summary": "Delete the compatibility level for a subject.",
"operationId": "delete_config_subject",
"consumes": [
"application/vnd.schemaregistry.v1+json",
"application/vnd.schemaregistry+json",
"application/json"
],
"parameters": [
{
"name": "subject",
"in": "path",
"required": true,
"type": "string"
}
],
"produces": ["application/vnd.schemaregistry.v1+json"],
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "object",
"properties": {
"compatibility": {
"type": "string"
}
}
}
},
"404": {
"description": "Subject not found",
"schema": {
"$ref": "#/definitions/error_body"
}
},
"500": {
"description": "Internal Server error",
"schema": {
"$ref": "#/definitions/error_body"
}
}
}
}
},
"/config": {
Expand Down
31 changes: 31 additions & 0 deletions src/v/pandaproxy/schema_registry/handlers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "pandaproxy/parsing/httpd.h"
#include "pandaproxy/reply.h"
#include "pandaproxy/schema_registry/error.h"
#include "pandaproxy/schema_registry/errors.h"
#include "pandaproxy/schema_registry/requests/compatibility.h"
#include "pandaproxy/schema_registry/requests/config.h"
#include "pandaproxy/schema_registry/requests/get_schemas_ids_id.h"
Expand Down Expand Up @@ -175,6 +176,36 @@ put_config_subject(server::request_t rq, server::reply_t rp) {
co_return rp;
}

ss::future<server::reply_t>
delete_config_subject(server::request_t rq, server::reply_t rp) {
parse_content_type_header(rq);
parse_accept_header(rq, rp);
auto sub = parse::request_param<subject>(*rq.req, "subject");

rq.req.reset();

// ensure we see latest writes
co_await rq.service().writer().read_sync();

compatibility_level lvl{};
try {
lvl = co_await rq.service().schema_store().get_compatibility(
sub, default_to_global::no);
} catch (const exception& e) {
if (e.code() == error_code::compatibility_not_found) {
throw as_exception(not_found(sub));
} else {
throw;
}
}

co_await rq.service().writer().delete_config(sub);

auto json_rslt = ppj::rjson_serialize(get_config_req_rep{.compat = lvl});
rp.rep->write_body("json", json_rslt);
co_return rp;
}

ss::future<server::reply_t> get_mode(server::request_t rq, server::reply_t rp) {
parse_accept_header(rq, rp);
rq.req.reset();
Expand Down
3 changes: 3 additions & 0 deletions src/v/pandaproxy/schema_registry/handlers.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ ss::future<ctx_server<service>::reply_t> get_config_subject(
ss::future<ctx_server<service>::reply_t> put_config_subject(
ctx_server<service>::request_t rq, ctx_server<service>::reply_t rp);

ss::future<ctx_server<service>::reply_t> delete_config_subject(
ctx_server<service>::request_t rq, ctx_server<service>::reply_t rp);

ss::future<ctx_server<service>::reply_t>
get_mode(ctx_server<service>::request_t rq, ctx_server<service>::reply_t rp);

Expand Down
67 changes: 67 additions & 0 deletions src/v/pandaproxy/schema_registry/seq_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,73 @@ ss::future<bool> seq_writer::write_config(
});
}

ss::future<std::optional<bool>> seq_writer::do_delete_config(
subject sub, model::offset write_at, seq_writer& seq) {
vlog(plog.debug, "delete config sub={} offset={}", sub, write_at);

try {
co_await seq._store.get_compatibility(sub, default_to_global::no);
} catch (const exception&) {
// subject config already blank
co_return false;
}

std::vector<seq_marker> sequences{
co_await _store.get_subject_config_written_at(sub)};

storage::record_batch_builder rb{
model::record_batch_type::raft_data, model::offset{0}};

std::vector<config_key> keys;
for (const auto& s : sequences) {
vlog(
plog.debug,
"Deleting config: tombstoning config_key for sub={} at {}",
sub,
s);

vassert(
s.key_type == seq_marker_key_type::config,
"Unexpected key type: {}",
s.key_type);

auto key = config_key{.seq{s.seq}, .node{s.node}, .sub{sub}};
keys.push_back(key);
rb.add_raw_kv(to_json_iobuf(std::move(key)), std::nullopt);
}

auto ts_batch = std::move(rb).build();
kafka::partition_produce_response res
= co_await _client.local().produce_record_batch(
model::schema_registry_internal_tp, std::move(ts_batch));

if (res.error_code != kafka::error_code::none) {
vlog(
plog.error,
"Error writing to subject topic: {} {}",
res.error_code,
res.error_message);
throw kafka::exception(res.error_code, *res.error_message);
}

auto applier = consume_to_store(seq._store, seq);
auto offset = res.base_offset;
for (const auto& k : keys) {
co_await applier.apply(offset, k, std::nullopt);
seq.advance_offset_inner(offset);
++offset;
}

co_return true;
}

ss::future<bool> seq_writer::delete_config(subject sub) {
return sequenced_write(
[this, sub{std::move(sub)}](model::offset write_at, seq_writer& seq) {
return do_delete_config(sub, write_at, seq);
});
}

/// Impermanent delete: update a version with is_deleted=true
ss::future<std::optional<bool>> seq_writer::do_delete_subject_version(
subject sub,
Expand Down
5 changes: 5 additions & 0 deletions src/v/pandaproxy/schema_registry/seq_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ class seq_writer final : public ss::peering_sharded_service<seq_writer> {
ss::future<bool>
write_config(std::optional<subject> sub, compatibility_level compat);

ss::future<bool> delete_config(subject sub);

ss::future<bool>
delete_subject_version(subject sub, schema_version version);

Expand Down Expand Up @@ -76,6 +78,9 @@ class seq_writer final : public ss::peering_sharded_service<seq_writer> {
model::offset write_at,
seq_writer& seq);

ss::future<std::optional<bool>>
do_delete_config(subject sub, model::offset write_at, seq_writer& seq);

ss::future<std::optional<bool>> do_delete_subject_version(
subject sub,
schema_version version,
Expand Down
4 changes: 4 additions & 0 deletions src/v/pandaproxy/schema_registry/service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ server::routes_t get_schema_registry_routes(ss::gate& gate, one_shot& es) {
ss::httpd::schema_registry_json::put_config_subject,
wrap(gate, es, put_config_subject)});

routes.routes.emplace_back(server::route_t{
ss::httpd::schema_registry_json::delete_config_subject,
wrap(gate, es, delete_config_subject)});

routes.routes.emplace_back(server::route_t{
ss::httpd::schema_registry_json::get_mode, wrap(gate, es, get_mode)});

Expand Down
16 changes: 13 additions & 3 deletions src/v/pandaproxy/schema_registry/sharded_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,15 @@ sharded_store::get_subject_written_at(subject sub) {
});
}

ss::future<std::vector<seq_marker>>
sharded_store::get_subject_config_written_at(subject sub) {
auto sub_shard{shard_for(sub)};
co_return co_await _store.invoke_on(
sub_shard, _smp_opts, [sub{std::move(sub)}](store& s) {
return s.store::get_subject_config_written_at(sub).value();
});
}

ss::future<std::vector<seq_marker>>
sharded_store::get_subject_version_written_at(subject sub, schema_version ver) {
auto sub_shard{shard_for(sub)};
Expand Down Expand Up @@ -510,11 +519,12 @@ ss::future<bool> sharded_store::set_compatibility(
});
}

ss::future<bool> sharded_store::clear_compatibility(subject sub) {
ss::future<bool>
sharded_store::clear_compatibility(seq_marker marker, subject sub) {
auto sub_shard{shard_for(sub)};
co_return co_await _store.invoke_on(
sub_shard, _smp_opts, [sub{std::move(sub)}](store& s) {
return s.clear_compatibility(sub).value();
sub_shard, _smp_opts, [marker, sub{std::move(sub)}](store& s) {
return s.clear_compatibility(marker, sub).value();
});
}

Expand Down
7 changes: 6 additions & 1 deletion src/v/pandaproxy/schema_registry/sharded_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ class sharded_store {
///\brief Get sequence number history (errors out if not soft-deleted)
ss::future<std::vector<seq_marker>> get_subject_written_at(subject sub);

///\brief Get sequence number history of subject config. Subject need
/// not be soft-deleted first
ss::future<std::vector<seq_marker>>
get_subject_config_written_at(subject sub);

///\brief Get sequence number history (errors out if not soft-deleted)
ss::future<std::vector<seq_marker>>
get_subject_version_written_at(subject sub, schema_version version);
Expand All @@ -126,7 +131,7 @@ class sharded_store {
seq_marker marker, subject sub, compatibility_level compatibility);

///\brief Clear the compatibility level for a subject.
ss::future<bool> clear_compatibility(subject sub);
ss::future<bool> clear_compatibility(seq_marker marker, subject sub);

///\brief Check if the provided schema is compatible with the subject and
/// version, according the the current compatibility.
Expand Down
36 changes: 24 additions & 12 deletions src/v/pandaproxy/schema_registry/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -1286,19 +1286,31 @@ struct consume_to_store {
}
try {
vlog(plog.debug, "Applying: {}", key);
if (!val) {
co_await _store.clear_compatibility(*key.sub);
} else if (key.sub) {
co_await _store.set_compatibility(
seq_marker{
.seq = key.seq,
.node = key.node,
.version{invalid_schema_version}, // Not applicable
.key_type = seq_marker_key_type::config},
*key.sub,
val->compat);
} else {
if (key.sub.has_value()) {
if (!val.has_value()) {
co_await _store.clear_compatibility(
seq_marker{
.seq = key.seq,
.node = key.node,
.version{invalid_schema_version}, // Not applicable
.key_type = seq_marker_key_type::config},
*key.sub);
} else {
co_await _store.set_compatibility(
seq_marker{
.seq = key.seq,
.node = key.node,
.version{invalid_schema_version}, // Not applicable
.key_type = seq_marker_key_type::config},
*key.sub,
val->compat);
}
} else if (val.has_value()) {
co_await _store.set_compatibility(val->compat);
} else {
vlog(
plog.warn,
"Tried to apply config with neither subject nor value");
}
} catch (const exception& e) {
vlog(plog.debug, "Error replaying: {}: {}", key, e);
Expand Down
32 changes: 31 additions & 1 deletion src/v/pandaproxy/schema_registry/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,34 @@ class store {
}
}

/// \brief Return the seq_marker write history of a subject, but only
/// config_keys
///
/// \return A vector (possibly empty)
result<std::vector<seq_marker>>
get_subject_config_written_at(const subject& sub) const {
auto sub_it = BOOST_OUTCOME_TRYX(
get_subject_iter(sub, include_deleted::yes));

// This should never happen (how can a record get into the
// store without an originating sequenced record?), but return
// an error instead of vasserting out.
if (sub_it->second.written_at.empty()) {
return not_found(sub);
}

std::vector<seq_marker> result;
std::copy_if(
sub_it->second.written_at.begin(),
sub_it->second.written_at.end(),
std::back_inserter(result),
[](const auto& sm) {
return sm.key_type == seq_marker_key_type::config;
});

return result;
}

/// \brief Return the seq_marker write history of a version.
///
/// \return A vector with at least one element
Expand Down Expand Up @@ -471,9 +499,11 @@ class store {
}

///\brief Clear the compatibility level for a subject.
result<bool> clear_compatibility(const subject& sub) {
result<bool>
clear_compatibility(const seq_marker& marker, const subject& sub) {
auto sub_it = BOOST_OUTCOME_TRYX(
get_subject_iter(sub, include_deleted::yes));
std::erase(sub_it->second.written_at, marker);
return std::exchange(sub_it->second.compatibility, std::nullopt)
!= std::nullopt;
}
Expand Down
7 changes: 4 additions & 3 deletions src/v/pandaproxy/schema_registry/test/store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,8 @@ BOOST_AUTO_TEST_CASE(test_store_subject_compat) {
BOOST_REQUIRE(s.get_compatibility().value() == global_expected);

// Clearing compatibility should fallback to global
BOOST_REQUIRE(s.clear_compatibility(subject0).value() == true);
BOOST_REQUIRE(
s.clear_compatibility(dummy_marker, subject0).value() == true);
BOOST_REQUIRE(
s.get_compatibility(subject0, fallback).value() == global_expected);
}
Expand Down Expand Up @@ -615,7 +616,7 @@ BOOST_AUTO_TEST_CASE(test_store_delete_subject) {
d_res.error().code(), pps::error_code::subject_soft_deleted);

// Clearing the compatibility of a soft-deleted subject is allowed
BOOST_REQUIRE(s.clear_compatibility(subject0).has_value());
BOOST_REQUIRE(s.clear_compatibility(dummy_marker, subject0).has_value());

v_res = s.get_versions(subject0, pps::include_deleted::yes);
BOOST_REQUIRE(v_res.has_value());
Expand Down Expand Up @@ -653,7 +654,7 @@ BOOST_AUTO_TEST_CASE(test_store_delete_subject) {

// Clearing the compatibility of a hard-deleted subject should fail
BOOST_REQUIRE(
s.clear_compatibility(subject0).error().code()
s.clear_compatibility(dummy_marker, subject0).error().code()
== pps::error_code::subject_not_found);
}

Expand Down
5 changes: 5 additions & 0 deletions src/v/pandaproxy/schema_registry/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,11 @@ struct seq_marker {
schema_version version;
seq_marker_key_type key_type{seq_marker_key_type::invalid};

// Note that matching nullopts is possible on the seq and node fields.
// This is intentional; both fields are particular to redpanda, so making
// them optional provides compatibility with non-rp schema registries. If
// either is not present, we can assume a collision has not occurred.
friend bool operator==(const seq_marker&, const seq_marker&) = default;
friend std::ostream& operator<<(std::ostream& os, const seq_marker& v);
};

Expand Down
Loading

0 comments on commit 4600a7a

Please sign in to comment.