diff --git a/src/v/pandaproxy/schema_registry/storage.h b/src/v/pandaproxy/schema_registry/storage.h index 2028ec309b02..39bfb98532d1 100644 --- a/src/v/pandaproxy/schema_registry/storage.h +++ b/src/v/pandaproxy/schema_registry/storage.h @@ -766,11 +766,16 @@ class config_key_handler : public json::base_handler { struct config_value { compatibility_level compat{compatibility_level::none}; + std::optional sub; friend bool operator==(const config_value&, const config_value&) = default; friend std::ostream& operator<<(std::ostream& os, const config_value& v) { + if (v.sub.has_value()) { + fmt::print(os, "subject: {}, ", v.sub.value()); + } fmt::print(os, "compatibility: {}", to_string_view(v.compat)); + return os; } }; @@ -779,6 +784,10 @@ inline void rjson_serialize( ::json::Writer<::json::StringBuffer>& w, const schema_registry::config_value& val) { w.StartObject(); + if (val.sub.has_value()) { + w.Key("subject"); + ::json::rjson_serialize(w, val.sub.value()); + } w.Key("compatibilityLevel"); ::json::rjson_serialize(w, to_string_view(val.compat)); w.EndObject(); @@ -790,6 +799,7 @@ class config_value_handler : public json::base_handler { empty = 0, object, compatibility, + subject, }; state _state = state::empty; @@ -803,11 +813,12 @@ class config_value_handler : public json::base_handler { bool Key(const Ch* str, ::json::SizeType len, bool) { auto sv = std::string_view{str, len}; - if (_state == state::object && sv == "compatibilityLevel") { - _state = state::compatibility; - return true; - } - return false; + std::optional s{ + string_switch>(sv) + .match("compatibilityLevel", state::compatibility) + .match("subject", state::subject) + .default_match(std::nullopt)}; + return s.has_value() && std::exchange(_state, *s) == state::object; } bool String(const Ch* str, ::json::SizeType len, bool) { @@ -819,6 +830,10 @@ class config_value_handler : public json::base_handler { _state = state::object; } return s.has_value(); + } else if (_state == state::subject) { + result.sub.emplace(sv); + _state = state::object; + return true; } return false; } diff --git a/src/v/pandaproxy/schema_registry/test/storage.cc b/src/v/pandaproxy/schema_registry/test/storage.cc index 1bbe5391bf36..338cec0fcdf7 100644 --- a/src/v/pandaproxy/schema_registry/test/storage.cc +++ b/src/v/pandaproxy/schema_registry/test/storage.cc @@ -95,6 +95,16 @@ constexpr std::string_view config_value_sv{ const pps::config_value config_value{ .compat = pps::compatibility_level::forward_transitive}; +constexpr std::string_view config_value_sub_sv{ + R"({ + "subject": "my-kafka-value", + "compatibilityLevel": "FORWARD_TRANSITIVE" +})"}; +const pps::config_value config_value_sub{ + + .compat = pps::compatibility_level::forward_transitive, + .sub{pps::subject{"my-kafka-value"}}}; + constexpr std::string_view delete_subject_key_sv{ R"({ "keytype": "DELETE_SUBJECT", @@ -161,6 +171,15 @@ BOOST_AUTO_TEST_CASE(test_storage_serde) { BOOST_CHECK_EQUAL(str, ppj::minify(config_value_sv)); } + { + auto val = ppj::rjson_parse( + config_value_sub_sv.data(), pps::config_value_handler<>{}); + BOOST_CHECK_EQUAL(config_value_sub, val); + + auto str = ppj::rjson_serialize(config_value_sub); + BOOST_CHECK_EQUAL(str, ppj::minify(config_value_sub_sv)); + } + { auto val = ppj::rjson_parse( delete_subject_key_sv.data(), pps::delete_subject_key_handler<>{});