Skip to content

Commit

Permalink
schema_registry/storage: Support writing mode records
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Pope <ben@redpanda.com>
  • Loading branch information
BenPope committed May 13, 2024
1 parent 8b0915f commit b55d09e
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 1 deletion.
88 changes: 87 additions & 1 deletion src/v/pandaproxy/schema_registry/seq_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "base/vlog.h"
#include "kafka/client/client_fetch_batch_reader.h"
#include "pandaproxy/logger.h"
#include "pandaproxy/schema_registry/error.h"
#include "pandaproxy/schema_registry/errors.h"
#include "pandaproxy/schema_registry/exceptions.h"
#include "pandaproxy/schema_registry/sharded_store.h"
Expand Down Expand Up @@ -82,8 +83,13 @@ struct batch_builder : public storage::record_batch_builder {
auto key = config_key{.seq{s.seq}, .node{s.node}, .sub{sub}};
add_raw_kv(to_json_iobuf(std::move(key)), std::nullopt);
} break;
default:
case seq_marker_key_type::mode: {
auto key = mode_key{.seq{s.seq}, .node{s.node}, .sub{sub}};
add_raw_kv(to_json_iobuf(std::move(key)), std::nullopt);
} break;
case seq_marker_key_type::invalid:
vassert(false, "Unknown key type");
break;
}
}

Expand Down Expand Up @@ -326,6 +332,78 @@ ss::future<bool> seq_writer::delete_config(subject sub) {
});
}

ss::future<std::optional<bool>> seq_writer::do_write_mode(
std::optional<subject> sub, mode m, force f, model::offset write_at) {
vlog(
plog.debug,
"write_mode sub={} mode={} force={} offset={}",
sub,
to_string_view(m),
f,
write_at);

_store.check_mode_mutability(force::no);

try {
// Check for no-op case
mode existing = sub ? co_await _store.get_mode(
sub.value(), default_to_global::no)
: co_await _store.get_mode();
if (existing == m) {
co_return false;
}
} catch (const exception& e) {
if (e.code() != error_code::mode_not_found) {
throw;
}
}

batch_builder rb(write_at, sub);
rb(
mode_key{.seq{write_at}, .node{_node_id}, .sub{sub}},
mode_value{.mode = m});

if (co_await produce_and_apply(write_at, std::move(rb).build())) {
co_return true;
} else {
// Pass up a None, our caller's cue to retry
co_return std::nullopt;
}
}

ss::future<bool>
seq_writer::write_mode(std::optional<subject> sub, mode mode, force f) {
return sequenced_write(
[sub{std::move(sub)}, mode, f](model::offset write_at, seq_writer& seq) {
return seq.do_write_mode(sub, mode, f, write_at);
});
}

ss::future<std::optional<bool>>
seq_writer::do_delete_mode(subject sub, model::offset write_at) {
vlog(plog.debug, "delete mode sub={} offset={}", sub, write_at);

// Report an error if the mode isn't registered
co_await _store.get_mode(sub, default_to_global::no);
_store.check_mode_mutability(force::no);

batch_builder rb{write_at, sub};
rb(co_await _store.get_subject_mode_written_at(sub));
if (co_await produce_and_apply(std::nullopt, std::move(rb).build())) {
co_return true;
} else {
// Pass up a None, our caller's cue to retry
co_return std::nullopt;
}
}

ss::future<bool> seq_writer::delete_mode(subject sub) {
return sequenced_write(
[sub{std::move(sub)}](model::offset write_at, seq_writer& seq) {
return seq.do_delete_mode(sub, write_at);
});
}

/// Impermanent delete: update a version with is_deleted=true
ss::future<std::optional<bool>> seq_writer::do_delete_subject_version(
subject sub, schema_version version, model::offset write_at) {
Expand Down Expand Up @@ -391,6 +469,14 @@ seq_writer::do_delete_subject_impermanent(subject sub, model::offset write_at) {
delete_subject_key{.seq{write_at}, .node{_node_id}, .sub{sub}},
delete_subject_value{.sub{sub}});

try {
rb(co_await _store.get_subject_mode_written_at(sub));
} catch (exception const& e) {
if (e.code() != error_code::subject_not_found) {
throw;
}
}

try {
rb(co_await _store.get_subject_config_written_at(sub));
} catch (exception const& e) {
Expand Down
10 changes: 10 additions & 0 deletions src/v/pandaproxy/schema_registry/seq_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ class seq_writer final : public ss::peering_sharded_service<seq_writer> {

ss::future<bool> delete_config(subject sub);

ss::future<bool> write_mode(std::optional<subject> sub, mode m, force f);

ss::future<bool> delete_mode(subject sub);

ss::future<bool>
delete_subject_version(subject sub, schema_version version);

Expand Down Expand Up @@ -79,6 +83,12 @@ class seq_writer final : public ss::peering_sharded_service<seq_writer> {

ss::future<std::optional<bool>> do_delete_config(subject sub);

ss::future<std::optional<bool>> do_write_mode(
std::optional<subject> sub, mode m, force f, model::offset write_at);

ss::future<std::optional<bool>>
do_delete_mode(subject sub, model::offset write_at);

ss::future<std::optional<bool>> do_delete_subject_version(
subject sub, schema_version version, model::offset write_at);

Expand Down
9 changes: 9 additions & 0 deletions src/v/pandaproxy/schema_registry/sharded_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,15 @@ sharded_store::get_subject_config_written_at(subject sub) {
});
}

ss::future<std::vector<seq_marker>>
sharded_store::get_subject_mode_written_at(subject sub) {
auto sub_shard{shard_for(sub)};
co_return co_await _store.invoke_on(
sub_shard, _smp_opts, [sub{std::move(sub)}](store& s) {
return s.store::get_subject_mode_written_at(sub).value();
});
}

ss::future<std::vector<seq_marker>>
sharded_store::get_subject_version_written_at(subject sub, schema_version ver) {
auto sub_shard{shard_for(sub)};
Expand Down
5 changes: 5 additions & 0 deletions src/v/pandaproxy/schema_registry/sharded_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ class sharded_store {
ss::future<std::vector<seq_marker>>
get_subject_config_written_at(subject sub);

///\brief Get sequence number history of subject mode. Subject need
/// not be soft-deleted first
ss::future<std::vector<seq_marker>>
get_subject_mode_written_at(subject sub);

///\brief Get sequence number history (errors out if not soft-deleted)
ss::future<std::vector<seq_marker>>
get_subject_version_written_at(subject sub, schema_version version);
Expand Down
28 changes: 28 additions & 0 deletions src/v/pandaproxy/schema_registry/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,34 @@ class store {
return result;
}

/// \brief Return the seq_marker write history of a subject, but only
/// mode_keys
///
/// \return A vector (possibly empty)
result<std::vector<seq_marker>>
get_subject_mode_written_at(const subject& sub) const {
auto sub_it = BOOST_OUTCOME_TRYX(
get_subject_iter(sub, include_deleted::yes));

// This should never happen (how can a record get into the
// store without an originating sequenced record?), but return
// an error instead of vasserting out.
if (sub_it->second.written_at.empty()) {
return not_found(sub);
}

std::vector<seq_marker> result;
std::copy_if(
sub_it->second.written_at.begin(),
sub_it->second.written_at.end(),
std::back_inserter(result),
[](const auto& sm) {
return sm.key_type == seq_marker_key_type::mode;
});

return result;
}

/// \brief Return the seq_marker write history of a version.
///
/// \return A vector with at least one element
Expand Down

0 comments on commit b55d09e

Please sign in to comment.