Skip to content

Commit

Permalink
schema_registry/storage: Support consuming mode records
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Pope <ben@redpanda.com>
  • Loading branch information
BenPope committed May 13, 2024
1 parent 2776733 commit 282854b
Showing 1 changed file with 309 additions and 2 deletions.
311 changes: 309 additions & 2 deletions src/v/pandaproxy/schema_registry/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
namespace pandaproxy::schema_registry {

using topic_key_magic = named_type<int32_t, struct topic_key_magic_tag>;
enum class topic_key_type { noop = 0, schema, config, delete_subject };
enum class topic_key_type { noop = 0, schema, config, mode, delete_subject };

constexpr std::string_view to_string_view(topic_key_type kt) {
switch (kt) {
case topic_key_type::noop:
Expand All @@ -45,6 +46,8 @@ constexpr std::string_view to_string_view(topic_key_type kt) {
return "SCHEMA";
case topic_key_type::config:
return "CONFIG";
case topic_key_type::mode:
return "MODE";
case topic_key_type::delete_subject:
return "DELETE_SUBJECT";
}
Expand All @@ -57,6 +60,7 @@ from_string_view<topic_key_type>(std::string_view sv) {
.match(to_string_view(topic_key_type::noop), topic_key_type::noop)
.match(to_string_view(topic_key_type::schema), topic_key_type::schema)
.match(to_string_view(topic_key_type::config), topic_key_type::config)
.match(to_string_view(topic_key_type::mode), topic_key_type::mode)
.match(
to_string_view(topic_key_type::delete_subject),
topic_key_type::delete_subject)
Expand Down Expand Up @@ -846,6 +850,241 @@ class config_value_handler : public json::base_handler<Encoding> {
}
};

struct mode_key {
static constexpr topic_key_type keytype{topic_key_type::mode};
std::optional<model::offset> seq;
std::optional<model::node_id> node;
std::optional<subject> sub;
topic_key_magic magic{0};

friend bool operator==(const mode_key&, const mode_key&) = default;

friend std::ostream& operator<<(std::ostream& os, const mode_key& v) {
if (v.seq.has_value() && v.node.has_value()) {
fmt::print(
os,
"seq: {} node: {} keytype: {}, subject: {}, magic: {}",
*v.seq,
*v.node,
to_string_view(v.keytype),
v.sub.value_or(invalid_subject),
v.magic);
} else {
fmt::print(
os,
"unsequenced keytype: {}, subject: {}, magic: {}",
to_string_view(v.keytype),
v.sub.value_or(invalid_subject),
v.magic);
}
return os;
}
};

inline void rjson_serialize(
::json::Writer<::json::StringBuffer>& w,
const schema_registry::mode_key& key) {
w.StartObject();
w.Key("keytype");
::json::rjson_serialize(w, to_string_view(key.keytype));
w.Key("subject");
if (key.sub) {
::json::rjson_serialize(w, key.sub.value());
} else {
w.Null();
}
w.Key("magic");
::json::rjson_serialize(w, key.magic);
if (key.seq.has_value()) {
w.Key("seq");
::json::rjson_serialize(w, *key.seq);
}
if (key.node.has_value()) {
w.Key("node");
::json::rjson_serialize(w, *key.node);
}
w.EndObject();
}

template<typename Encoding = ::json::UTF8<>>
class mode_key_handler : public json::base_handler<Encoding> {
enum class state {
empty = 0,
object,
keytype,
seq,
node,
subject,
magic,
};
state _state = state::empty;

public:
using Ch = typename json::base_handler<Encoding>::Ch;
using rjson_parse_result = mode_key;
rjson_parse_result result;

mode_key_handler()
: json::base_handler<Encoding>{json::serialization_format::none} {}

bool Key(const Ch* str, ::json::SizeType len, bool) {
auto sv = std::string_view{str, len};
std::optional<state> s{string_switch<std::optional<state>>(sv)
.match("keytype", state::keytype)
.match("seq", state::seq)
.match("node", state::node)
.match("subject", state::subject)
.match("magic", state::magic)
.default_match(std::nullopt)};
return s.has_value() && std::exchange(_state, *s) == state::object;
}

bool Uint(int i) {
switch (_state) {
case state::magic: {
result.magic = topic_key_magic{i};
_state = state::object;
return true;
}
case state::seq: {
result.seq = model::offset{i};
_state = state::object;
return true;
}
case state::node: {
result.node = model::node_id{i};
_state = state::object;
return true;
}
case state::empty:
case state::subject:
case state::keytype:
case state::object:
return false;
}
return false;
}

bool String(const Ch* str, ::json::SizeType len, bool) {
auto sv = std::string_view{str, len};
switch (_state) {
case state::keytype: {
auto kt = from_string_view<topic_key_type>(sv);
_state = state::object;
return kt == result.keytype;
}
case state::subject: {
result.sub = subject{ss::sstring{sv}};
_state = state::object;
return true;
}
case state::empty:
case state::seq:
case state::node:
case state::object:
case state::magic:
return false;
}
return false;
}

bool Null() {
// The subject, and only the subject, is nullable.
return std::exchange(_state, state::object) == state::subject;
}

bool StartObject() {
return std::exchange(_state, state::object) == state::empty;
}

bool EndObject(::json::SizeType) {
return result.seq.has_value() == result.node.has_value()
&& std::exchange(_state, state::empty) == state::object;
}
};

struct mode_value {
mode mode{mode::read_write};
std::optional<subject> sub;

friend bool operator==(const mode_value&, const mode_value&) = default;

friend std::ostream& operator<<(std::ostream& os, const mode_value& v) {
if (v.sub.has_value()) {
fmt::print(os, "subject: {}, ", v.sub.value());
}
fmt::print(os, "mode: {}", to_string_view(v.mode));

return os;
}
};

inline void rjson_serialize(
::json::Writer<::json::StringBuffer>& w,
const schema_registry::mode_value& val) {
w.StartObject();
if (val.sub.has_value()) {
w.Key("subject");
::json::rjson_serialize(w, val.sub.value());
}
w.Key("mode");
::json::rjson_serialize(w, to_string_view(val.mode));
w.EndObject();
}

template<typename Encoding = ::json::UTF8<>>
class mode_value_handler : public json::base_handler<Encoding> {
enum class state {
empty = 0,
object,
mode,
subject,
};
state _state = state::empty;

public:
using Ch = typename json::base_handler<Encoding>::Ch;
using rjson_parse_result = mode_value;
rjson_parse_result result;

mode_value_handler()
: json::base_handler<Encoding>{json::serialization_format::none} {}

bool Key(const Ch* str, ::json::SizeType len, bool) {
auto sv = std::string_view{str, len};
std::optional<state> s{string_switch<std::optional<state>>(sv)
.match("mode", state::mode)
.match("subject", state::subject)
.default_match(std::nullopt)};
return s.has_value() && std::exchange(_state, *s) == state::object;
}

bool String(const Ch* str, ::json::SizeType len, bool) {
auto sv = std::string_view{str, len};
if (_state == state::mode) {
auto s = from_string_view<mode>(sv);
if (s.has_value()) {
result.mode = *s;
_state = state::object;
}
return s.has_value();
} else if (_state == state::subject) {
result.sub.emplace(sv);
_state = state::object;
return true;
}
return false;
}

bool StartObject() {
return std::exchange(_state, state::object) == state::empty;
}

bool EndObject(::json::SizeType) {
return std::exchange(_state, state::empty) == state::object;
}
};

struct delete_subject_key {
static constexpr topic_key_type keytype{topic_key_type::delete_subject};
std::optional<model::offset> seq;
Expand Down Expand Up @@ -1188,7 +1427,18 @@ struct consume_to_store {
val);
break;
}
case topic_key_type::delete_subject:
case topic_key_type::mode: {
std::optional<mode_value> val;
if (!record.value().empty()) {
auto value = record.release_value();
val.emplace(
from_json_iobuf<mode_value_handler<>>(std::move(value)));
}
co_await apply(
offset, from_json_iobuf<mode_key_handler<>>(std::move(key)), val);
break;
}
case topic_key_type::delete_subject: {
std::optional<delete_subject_value> val;
if (!record.value().empty()) {
val.emplace(from_json_iobuf<delete_subject_value_handler<>>(
Expand All @@ -1201,6 +1451,7 @@ struct consume_to_store {
std::move(val));
break;
}
}

co_await _sequencer.advance_offset(offset);
}
Expand Down Expand Up @@ -1331,6 +1582,61 @@ struct consume_to_store {
}
}

ss::future<>
apply(model::offset offset, mode_key key, std::optional<mode_value> val) {
// Drop out-of-sequence messages
//
// Check seq if it was provided, otherwise assume 3rdparty
// compatibility, which can't collide.
if (val && key.seq.has_value() && offset != key.seq) {
vlog(
plog.debug,
"Ignoring out of order {} (at offset {})",
key,
offset);
co_return;
}

if (key.magic != 0) {
throw exception(
error_code::topic_parse_error,
fmt::format("Unexpected magic: {}", key));
}
try {
vlog(plog.debug, "Applying: {}", key);
if (key.sub.has_value()) {
if (!val.has_value()) {
co_await _store.clear_mode(
seq_marker{
.seq = key.seq,
.node = key.node,
.version{invalid_schema_version}, // Not applicable
.key_type = seq_marker_key_type::mode},
*key.sub,
force::yes);
} else {
co_await _store.set_mode(
seq_marker{
.seq = key.seq,
.node = key.node,
.version{invalid_schema_version}, // Not applicable
.key_type = seq_marker_key_type::mode},
*key.sub,
val->mode,
force::yes);
}
} else if (val.has_value()) {
co_await _store.set_mode(val->mode, force::yes);
} else {
vlog(
plog.warn,
"Tried to apply mode with neither subject nor value");
}
} catch (const exception& e) {
vlog(plog.debug, "Error replaying: {}: {}", key, e);
}
}

ss::future<> apply(
model::offset offset,
delete_subject_key key,
Expand Down Expand Up @@ -1379,6 +1685,7 @@ struct consume_to_store {
vlog(plog.debug, "Error replaying: {}: {}", key, e);
}
}

void end_of_stream() {}
sharded_store& _store;
seq_writer& _sequencer;
Expand Down

0 comments on commit 282854b

Please sign in to comment.