From abad8b63b713c56b5b4c94896e0dd9a5037b53fa Mon Sep 17 00:00:00 2001 From: Ben Pope Date: Fri, 10 Nov 2023 00:27:06 +0000 Subject: [PATCH] schema_registry: Support the compatible format for CONFIG value An example of a config record to be supported. ```json { "topic": "_schemas", "key": "{\"keytype\":\"CONFIG\",\"subject\":\"test-ben1234\",\"magic\":0}", "value": "{\"subject\":\"test-ben1234\",\"compatibilityLevel\":\"FULL_TRANSITIVE\"}", "timestamp": 1699574968787, "partition": 0, "offset": 2 } ``` The subject field in the value is not checked against the key. Signed-off-by: Ben Pope (cherry picked from commit ae9445797aff5b8b6c5aea98d6e4ba71dc8ff93f) --- src/v/pandaproxy/schema_registry/storage.h | 25 +++++++++++++++---- .../schema_registry/test/storage.cc | 19 ++++++++++++++ 2 files changed, 39 insertions(+), 5 deletions(-) diff --git a/src/v/pandaproxy/schema_registry/storage.h b/src/v/pandaproxy/schema_registry/storage.h index 2028ec309b02..39bfb98532d1 100644 --- a/src/v/pandaproxy/schema_registry/storage.h +++ b/src/v/pandaproxy/schema_registry/storage.h @@ -766,11 +766,16 @@ class config_key_handler : public json::base_handler { struct config_value { compatibility_level compat{compatibility_level::none}; + std::optional sub; friend bool operator==(const config_value&, const config_value&) = default; friend std::ostream& operator<<(std::ostream& os, const config_value& v) { + if (v.sub.has_value()) { + fmt::print(os, "subject: {}, ", v.sub.value()); + } fmt::print(os, "compatibility: {}", to_string_view(v.compat)); + return os; } }; @@ -779,6 +784,10 @@ inline void rjson_serialize( ::json::Writer<::json::StringBuffer>& w, const schema_registry::config_value& val) { w.StartObject(); + if (val.sub.has_value()) { + w.Key("subject"); + ::json::rjson_serialize(w, val.sub.value()); + } w.Key("compatibilityLevel"); ::json::rjson_serialize(w, to_string_view(val.compat)); w.EndObject(); @@ -790,6 +799,7 @@ class config_value_handler : public json::base_handler { empty = 0, object, compatibility, + subject, }; state _state = state::empty; @@ -803,11 +813,12 @@ class config_value_handler : public json::base_handler { bool Key(const Ch* str, ::json::SizeType len, bool) { auto sv = std::string_view{str, len}; - if (_state == state::object && sv == "compatibilityLevel") { - _state = state::compatibility; - return true; - } - return false; + std::optional s{ + string_switch>(sv) + .match("compatibilityLevel", state::compatibility) + .match("subject", state::subject) + .default_match(std::nullopt)}; + return s.has_value() && std::exchange(_state, *s) == state::object; } bool String(const Ch* str, ::json::SizeType len, bool) { @@ -819,6 +830,10 @@ class config_value_handler : public json::base_handler { _state = state::object; } return s.has_value(); + } else if (_state == state::subject) { + result.sub.emplace(sv); + _state = state::object; + return true; } return false; } diff --git a/src/v/pandaproxy/schema_registry/test/storage.cc b/src/v/pandaproxy/schema_registry/test/storage.cc index 1bbe5391bf36..338cec0fcdf7 100644 --- a/src/v/pandaproxy/schema_registry/test/storage.cc +++ b/src/v/pandaproxy/schema_registry/test/storage.cc @@ -95,6 +95,16 @@ constexpr std::string_view config_value_sv{ const pps::config_value config_value{ .compat = pps::compatibility_level::forward_transitive}; +constexpr std::string_view config_value_sub_sv{ + R"({ + "subject": "my-kafka-value", + "compatibilityLevel": "FORWARD_TRANSITIVE" +})"}; +const pps::config_value config_value_sub{ + + .compat = pps::compatibility_level::forward_transitive, + .sub{pps::subject{"my-kafka-value"}}}; + constexpr std::string_view delete_subject_key_sv{ R"({ "keytype": "DELETE_SUBJECT", @@ -161,6 +171,15 @@ BOOST_AUTO_TEST_CASE(test_storage_serde) { BOOST_CHECK_EQUAL(str, ppj::minify(config_value_sv)); } + { + auto val = ppj::rjson_parse( + config_value_sub_sv.data(), pps::config_value_handler<>{}); + BOOST_CHECK_EQUAL(config_value_sub, val); + + auto str = ppj::rjson_serialize(config_value_sub); + BOOST_CHECK_EQUAL(str, ppj::minify(config_value_sub_sv)); + } + { auto val = ppj::rjson_parse( delete_subject_key_sv.data(), pps::delete_subject_key_handler<>{});