Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v23.1.x] schema_registry: Support the compatible format for CONFIG value #14876

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions src/v/pandaproxy/schema_registry/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -766,11 +766,16 @@ class config_key_handler : public json::base_handler<Encoding> {

struct config_value {
compatibility_level compat{compatibility_level::none};
std::optional<subject> sub;

friend bool operator==(const config_value&, const config_value&) = default;

friend std::ostream& operator<<(std::ostream& os, const config_value& v) {
if (v.sub.has_value()) {
fmt::print(os, "subject: {}, ", v.sub.value());
}
fmt::print(os, "compatibility: {}", to_string_view(v.compat));

return os;
}
};
Expand All @@ -779,6 +784,10 @@ inline void rjson_serialize(
::json::Writer<::json::StringBuffer>& w,
const schema_registry::config_value& val) {
w.StartObject();
if (val.sub.has_value()) {
w.Key("subject");
::json::rjson_serialize(w, val.sub.value());
}
w.Key("compatibilityLevel");
::json::rjson_serialize(w, to_string_view(val.compat));
w.EndObject();
Expand All @@ -790,6 +799,7 @@ class config_value_handler : public json::base_handler<Encoding> {
empty = 0,
object,
compatibility,
subject,
};
state _state = state::empty;

Expand All @@ -803,11 +813,12 @@ class config_value_handler : public json::base_handler<Encoding> {

bool Key(const Ch* str, ::json::SizeType len, bool) {
auto sv = std::string_view{str, len};
if (_state == state::object && sv == "compatibilityLevel") {
_state = state::compatibility;
return true;
}
return false;
std::optional<state> s{
string_switch<std::optional<state>>(sv)
.match("compatibilityLevel", state::compatibility)
.match("subject", state::subject)
.default_match(std::nullopt)};
return s.has_value() && std::exchange(_state, *s) == state::object;
}

bool String(const Ch* str, ::json::SizeType len, bool) {
Expand All @@ -819,6 +830,10 @@ class config_value_handler : public json::base_handler<Encoding> {
_state = state::object;
}
return s.has_value();
} else if (_state == state::subject) {
result.sub.emplace(sv);
_state = state::object;
return true;
}
return false;
}
Expand Down
19 changes: 19 additions & 0 deletions src/v/pandaproxy/schema_registry/test/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,16 @@ constexpr std::string_view config_value_sv{
const pps::config_value config_value{
.compat = pps::compatibility_level::forward_transitive};

constexpr std::string_view config_value_sub_sv{
R"({
"subject": "my-kafka-value",
"compatibilityLevel": "FORWARD_TRANSITIVE"
})"};
const pps::config_value config_value_sub{

.compat = pps::compatibility_level::forward_transitive,
.sub{pps::subject{"my-kafka-value"}}};

constexpr std::string_view delete_subject_key_sv{
R"({
"keytype": "DELETE_SUBJECT",
Expand Down Expand Up @@ -161,6 +171,15 @@ BOOST_AUTO_TEST_CASE(test_storage_serde) {
BOOST_CHECK_EQUAL(str, ppj::minify(config_value_sv));
}

{
auto val = ppj::rjson_parse(
config_value_sub_sv.data(), pps::config_value_handler<>{});
BOOST_CHECK_EQUAL(config_value_sub, val);

auto str = ppj::rjson_serialize(config_value_sub);
BOOST_CHECK_EQUAL(str, ppj::minify(config_value_sub_sv));
}

{
auto val = ppj::rjson_parse(
delete_subject_key_sv.data(), pps::delete_subject_key_handler<>{});
Expand Down
Loading