From e7fab4a27489a8862e2bce36589e5770ec089882 Mon Sep 17 00:00:00 2001 From: Ben Pope Date: Wed, 10 Jul 2024 13:47:18 +0100 Subject: [PATCH] [CORE-4444] schema_registry: Switch raw_string to iobuf Also fixes [CORE-684] [CORE-4446] [CORE-4447] Signed-off-by: Ben Pope --- src/v/pandaproxy/schema_registry/avro.cc | 57 ++++++++++++------- src/v/pandaproxy/schema_registry/handlers.cc | 5 +- src/v/pandaproxy/schema_registry/json.cc | 21 +++---- src/v/pandaproxy/schema_registry/protobuf.cc | 19 +++---- .../requests/post_subject_versions.h | 4 +- src/v/pandaproxy/schema_registry/storage.h | 1 - src/v/pandaproxy/schema_registry/test/util.cc | 3 +- src/v/pandaproxy/schema_registry/types.cc | 8 ++- src/v/pandaproxy/schema_registry/types.h | 35 +++++++++--- src/v/pandaproxy/schema_registry/util.h | 18 +++--- src/v/wasm/tests/wasm_transform_test.cc | 5 +- 11 files changed, 112 insertions(+), 64 deletions(-) diff --git a/src/v/pandaproxy/schema_registry/avro.cc b/src/v/pandaproxy/schema_registry/avro.cc index a6e48b39a7952..6e9193187a260 100644 --- a/src/v/pandaproxy/schema_registry/avro.cc +++ b/src/v/pandaproxy/schema_registry/avro.cc @@ -11,15 +11,16 @@ #include "pandaproxy/schema_registry/avro.h" +#include "bytes/streambuf.h" #include "json/allocator.h" +#include "json/chunked_input_stream.h" #include "json/document.h" -#include "json/encodings.h" -#include "json/stringbuffer.h" +#include "json/json.h" #include "json/types.h" -#include "json/writer.h" #include "pandaproxy/schema_registry/error.h" #include "pandaproxy/schema_registry/errors.h" #include "pandaproxy/schema_registry/sharded_store.h" +#include "pandaproxy/schema_registry/types.h" #include "strings/string_switch.h" #include @@ -30,6 +31,7 @@ #include #include #include +#include #include #include #include @@ -43,6 +45,10 @@ #include #include +namespace pandaproxy::json { +using namespace ::json; +} + namespace pandaproxy::schema_registry { namespace { @@ -421,7 +427,10 @@ std::ostream& operator<<(std::ostream& os, const avro_schema_definition& def) { } canonical_schema_definition::raw_string avro_schema_definition::raw() const { - return canonical_schema_definition::raw_string{_impl.toJson(false)}; + iobuf_ostream os; + _impl.toJson(os.ostream()); + return canonical_schema_definition::raw_string{ + json::minify(std::move(os).buf())}; } ss::sstring avro_schema_definition::name() const { @@ -436,17 +445,22 @@ class collected_schema { bool insert(ss::sstring name, canonical_schema_definition def) { bool inserted = _names.insert(std::move(name)).second; if (inserted) { - _schemas.push_back(std::move(def).raw()()); + _schemas.push_back(std::move(def).raw()); } return inserted; } - ss::sstring flatten() { - return fmt::format("{}", fmt::join(_schemas, "\n")); + canonical_schema_definition::raw_string flatten() && { + iobuf out; + for (auto& s : _schemas) { + out.append(std::move(s)); + out.append("\n", 1); + } + return canonical_schema_definition::raw_string{std::move(out)}; } private: absl::flat_hash_set _names; - std::vector _schemas; + std::vector _schemas; }; ss::future collect_schema( @@ -473,11 +487,10 @@ make_avro_schema_definition(sharded_store& store, canonical_schema schema) { auto name = schema.sub()(); auto schema_refs = schema.def().refs(); auto refs = co_await collect_schema(store, {}, name, std::move(schema)); - auto def = refs.flatten(); + iobuf_istream sis{std::move(refs).flatten()()}; + auto is = avro::istreamInputStream(sis.istream()); co_return avro_schema_definition{ - avro::compileJsonSchemaFromMemory( - reinterpret_cast(def.data()), def.length()), - std::move(schema_refs)}; + avro::compileJsonSchemaFromStream(*is), std::move(schema_refs)}; } catch (const avro::Exception& e) { ex = e; } @@ -492,12 +505,12 @@ sanitize_avro_schema_definition(unparsed_schema_definition def) { json::Document doc; constexpr auto flags = rapidjson::kParseDefaultFlags | rapidjson::kParseStopWhenDoneFlag; - const auto& raw = def.raw()(); - if (raw.empty()) { + if (def.raw()().empty()) { auto ec = error_code::schema_empty; return error_info{ec, make_error_code(ec).message()}; } - doc.Parse(raw.data(), raw.size()); + json::chunked_input_stream is{def.shared_raw()()}; + doc.ParseStream(is); if (doc.HasParseError()) { return error_info{ error_code::schema_invalid, @@ -509,21 +522,25 @@ sanitize_avro_schema_definition(unparsed_schema_definition def) { sanitize_context ctx{.alloc = doc.GetAllocator()}; auto res = sanitize(doc, ctx); if (res.has_error()) { + // TODO BP: Prevent this linearizaton + iobuf_parser p(std::move(def).raw()()); return error_info{ res.assume_error().code(), - fmt::format("{} {}", res.assume_error().message(), raw)}; + fmt::format( + "{} {}", + res.assume_error().message(), + p.read_string(p.bytes_left()))}; } - json::StringBuffer str_buf; - str_buf.Reserve(raw.size()); - json::Writer w{str_buf}; + json::chunked_buffer buf; + json::Writer w{buf}; if (!doc.Accept(w)) { return error_info{error_code::schema_invalid, "Invalid schema"}; } return canonical_schema_definition{ - std::string_view{str_buf.GetString(), str_buf.GetSize()}, + canonical_schema_definition::raw_string{std::move(buf).as_iobuf()}, schema_type::avro, def.refs()}; } diff --git a/src/v/pandaproxy/schema_registry/handlers.cc b/src/v/pandaproxy/schema_registry/handlers.cc index e8630291fa271..2edfd6d65ca05 100644 --- a/src/v/pandaproxy/schema_registry/handlers.cc +++ b/src/v/pandaproxy/schema_registry/handlers.cc @@ -524,7 +524,8 @@ ss::future::reply_t> get_subject_versions_version_schema( auto get_res = co_await rq.service().schema_store().get_subject_schema( sub, version, inc_del); - rp.rep->write_body("json", get_res.schema.def().raw()()); + rp.rep->write_body( + "json", ppj::as_body_writer(std::move(get_res.schema).def().raw()())); co_return rp; } @@ -543,7 +544,7 @@ get_subject_versions_version_referenced_by( auto references = co_await rq.service().schema_store().referenced_by( sub, version); - rp.rep->write_body("json", ppj::rjson_serialize(references)); + rp.rep->write_body("json", ppj::rjson_serialize(std::move(references))); co_return rp; } diff --git a/src/v/pandaproxy/schema_registry/json.cc b/src/v/pandaproxy/schema_registry/json.cc index 4a70d79b5fc8e..7fe607dfe3ace 100644 --- a/src/v/pandaproxy/schema_registry/json.cc +++ b/src/v/pandaproxy/schema_registry/json.cc @@ -11,6 +11,8 @@ #include "pandaproxy/schema_registry/json.h" +#include "json/chunked_buffer.h" +#include "json/chunked_input_stream.h" #include "json/document.h" #include "json/ostreamwrapper.h" #include "json/schema.h" @@ -47,11 +49,11 @@ namespace pandaproxy::schema_registry { struct json_schema_definition::impl { - ss::sstring to_json() const { - json::StringBuffer buf; - json::Writer wrt(buf); + iobuf to_json() const { + json::chunked_buffer buf; + json::Writer wrt(buf); doc.Accept(wrt); - return {buf.GetString(), buf.GetLength()}; + return std::move(buf).as_iobuf(); } explicit impl(json::Document doc, std::string_view name) @@ -638,8 +640,6 @@ constexpr std::string_view json_draft_7_metaschema = R"json( } )json"; -result parse_json(std::string_view v); - ss::future<> check_references(sharded_store& store, canonical_schema schema) { for (const auto& ref : schema.def().refs()) { co_await store.is_subject_version_deleted(ref.sub, ref.version) @@ -813,9 +813,9 @@ result try_validate_json_schema(json::Document const& schema) { return first_error.value(); } -result parse_json(std::string_view v) { +result parse_json(iobuf buf) { // parse string in json document, check it's a valid json - auto schema_stream = rapidjson::MemoryStream{v.data(), v.size()}; + auto schema_stream = json::chunked_input_stream{std::move(buf)}; auto schema = json::Document{}; if (schema.ParseStream(schema_stream).HasParseError()) { // not a valid json document, return error @@ -1851,7 +1851,8 @@ bool is_superset(json::Value const& older, json::Value const& newer) { ss::future make_json_schema_definition(sharded_store&, canonical_schema schema) { - auto doc = parse_json(schema.def().raw()()).value(); // throws on error + auto doc + = parse_json(schema.def().shared_raw()()).value(); // throws on error std::string_view name = schema.sub()(); auto refs = std::move(schema).def().refs(); co_return json_schema_definition{ @@ -1862,7 +1863,7 @@ make_json_schema_definition(sharded_store&, canonical_schema schema) { ss::future make_canonical_json_schema( sharded_store& store, unparsed_schema unparsed_schema) { // TODO BP: More validation and normalisation - parse_json(unparsed_schema.def().shared_raw()()).value(); // throws on error + parse_json(unparsed_schema.def().shared_raw()).value(); // throws on error auto [sub, unparsed] = std::move(unparsed_schema).destructure(); auto [def, type, refs] = std::move(unparsed).destructure(); diff --git a/src/v/pandaproxy/schema_registry/protobuf.cc b/src/v/pandaproxy/schema_registry/protobuf.cc index e534afe058914..c0a55b8ed27ba 100644 --- a/src/v/pandaproxy/schema_registry/protobuf.cc +++ b/src/v/pandaproxy/schema_registry/protobuf.cc @@ -12,6 +12,7 @@ #include "pandaproxy/schema_registry/protobuf.h" #include "base/vlog.h" +#include "bytes/streambuf.h" #include "kafka/protocol/errors.h" #include "pandaproxy/logger.h" #include "pandaproxy/schema_registry/errors.h" @@ -201,8 +202,8 @@ class dp_error_collector final : public pb::DescriptorPool::ErrorCollector { class schema_def_input_stream : public pb::io::ZeroCopyInputStream { public: explicit schema_def_input_stream(const canonical_schema_definition& def) - : _str(def.raw()) - , _impl{_str().data(), static_cast(_str().size())} {} + : _is{def.shared_raw()} + , _impl{&_is.istream()} {} bool Next(const void** data, int* size) override { return _impl.Next(data, size); @@ -212,8 +213,8 @@ class schema_def_input_stream : public pb::io::ZeroCopyInputStream { int64_t ByteCount() const override { return _impl.ByteCount(); } private: - canonical_schema_definition::raw_string _str; - pb::io::ArrayInputStream _impl; + iobuf_istream _is; + pb::io::IstreamInputStream _impl; }; class parser { @@ -231,13 +232,9 @@ class parser { // Attempt parse a .proto file if (!_parser.Parse(&t, &_fdp)) { // base64 decode the schema - std::string_view b64_def{ - schema.def().raw()().data(), schema.def().raw()().size()}; - auto bytes_def = base64_to_bytes(b64_def); - + iobuf_istream is{base64_to_iobuf(schema.def().raw()())}; // Attempt parse as an encoded FileDescriptorProto.pb - if (!_fdp.ParseFromArray( - bytes_def.data(), static_cast(bytes_def.size()))) { + if (!_fdp.ParseFromIstream(&is.istream())) { throw as_exception(error_collector.error()); } } @@ -326,6 +323,7 @@ struct protobuf_schema_definition::impl { * messages */ ss::sstring debug_string() const { + // TODO BP: Prevent this linearization auto s = fd->DebugString(); // reordering not required if no package or no dependencies @@ -353,6 +351,7 @@ struct protobuf_schema_definition::impl { auto imports = trim(sv.substr(imports_pos, imports_len)); auto footer = trim(sv.substr(package_pos + package.length())); + // TODO BP: Prevent this linearization return ssx::sformat( "{}\n{}\n\n{}\n\n{}\n", header, package, imports, footer); } diff --git a/src/v/pandaproxy/schema_registry/requests/post_subject_versions.h b/src/v/pandaproxy/schema_registry/requests/post_subject_versions.h index c83427b002758..455cdfaf38143 100644 --- a/src/v/pandaproxy/schema_registry/requests/post_subject_versions.h +++ b/src/v/pandaproxy/schema_registry/requests/post_subject_versions.h @@ -141,8 +141,10 @@ class post_subject_versions_request_handler auto sv = std::string_view{str, len}; switch (_state) { case state::schema: { + iobuf buf; + buf.append(sv.data(), sv.size()); _schema.def = unparsed_schema_definition::raw_string{ - ss::sstring{sv}}; + std::move(buf)}; _state = state::record; return true; } diff --git a/src/v/pandaproxy/schema_registry/storage.h b/src/v/pandaproxy/schema_registry/storage.h index c3624a3d865f0..2a6d3c71c0bee 100644 --- a/src/v/pandaproxy/schema_registry/storage.h +++ b/src/v/pandaproxy/schema_registry/storage.h @@ -12,7 +12,6 @@ #pragma once #include "base/vlog.h" -#include "bytes/iobuf_parser.h" #include "json/iobuf_writer.h" #include "json/json.h" #include "json/types.h" diff --git a/src/v/pandaproxy/schema_registry/test/util.cc b/src/v/pandaproxy/schema_registry/test/util.cc index d5bbc282833bc..f50297e631ba5 100644 --- a/src/v/pandaproxy/schema_registry/test/util.cc +++ b/src/v/pandaproxy/schema_registry/test/util.cc @@ -44,7 +44,8 @@ BOOST_AUTO_TEST_CASE(test_make_schema_definition) { auto res = pps::make_schema_definition>(example_avro_schema); BOOST_REQUIRE(res); - BOOST_REQUIRE_EQUAL(res.value()(), minified_avro_schema); + auto str = to_string(std::move(res).value()); + BOOST_REQUIRE_EQUAL(str, minified_avro_schema); } BOOST_AUTO_TEST_CASE(test_make_schema_definition_failure) { diff --git a/src/v/pandaproxy/schema_registry/types.cc b/src/v/pandaproxy/schema_registry/types.cc index 4bc2230bd1399..e8c4cdf01909e 100644 --- a/src/v/pandaproxy/schema_registry/types.cc +++ b/src/v/pandaproxy/schema_registry/types.cc @@ -11,6 +11,8 @@ #include "types.h" +#include "util.h" + #include #include #include @@ -47,7 +49,8 @@ std::ostream& operator<<( os, "type: {}, definition: {}, references: {}", to_string_view(def.type()), - def.raw(), + // TODO BP: Prevent this linearization + to_string(def.shared_raw()), def.refs()); return os; } @@ -59,7 +62,8 @@ std::ostream& operator<<( os, "type: {}, definition: {}, references: {}", to_string_view(def.type()), - def.raw(), + // TODO BP: Prevent this linearization + to_string(def.shared_raw()), def.refs()); return os; } diff --git a/src/v/pandaproxy/schema_registry/types.h b/src/v/pandaproxy/schema_registry/types.h index 7b0ad638d0b33..0e20be93ef953 100644 --- a/src/v/pandaproxy/schema_registry/types.h +++ b/src/v/pandaproxy/schema_registry/types.h @@ -13,6 +13,7 @@ #include "base/outcome.h" #include "base/seastarx.h" +#include "json/iobuf_writer.h" #include "kafka/protocol/errors.h" #include "model/metadata.h" #include "strings/string_switch.h" @@ -117,7 +118,13 @@ template class typed_schema_definition { public: using tag = Tag; - using raw_string = named_type; + struct raw_string : named_type { + raw_string() = default; + explicit raw_string(iobuf&& buf) noexcept + : named_type{std::move(buf)} {} + explicit raw_string(std::string_view sv) + : named_type{iobuf::from(sv)} {} + }; using references = std::vector; typed_schema_definition() = default; @@ -131,13 +138,13 @@ class typed_schema_definition { template typed_schema_definition(T&& def, schema_type type) - : _def{ss::sstring{std::forward(def)}} + : _def{std::forward(def)} , _type{type} , _refs{} {} template typed_schema_definition(T&& def, schema_type type, references refs) - : _def{ss::sstring{std::forward(def)}} + : _def{std::forward(def)} , _type{type} , _refs{std::move(refs)} {} @@ -152,9 +159,9 @@ class typed_schema_definition { const raw_string& raw() const& { return _def; } raw_string raw() && { return std::move(_def); } - raw_string shared_raw() const& { - // temporarily implemented with copy before the type is changed - return _def; + raw_string shared_raw() const { + auto& buf = const_cast(_def()); + return raw_string{buf.share(0, buf.size_bytes())}; } const references& refs() const& { return _refs; } @@ -164,7 +171,9 @@ class typed_schema_definition { return {shared_raw(), type(), refs()}; } - typed_schema_definition copy() const { return {_def, type(), refs()}; } + typed_schema_definition copy() const { + return {raw_string{_def().copy()}, type(), refs()}; + } auto destructure() && { return make_tuple(std::move(_def), _type, std::move(_refs)); @@ -563,3 +572,15 @@ struct fmt::formatter { // e : format for error_reporting char presentation{'l'}; }; + +namespace json { + +template +void rjson_serialize( + json::iobuf_writer& w, + const pandaproxy::schema_registry::canonical_schema_definition::raw_string& + def) { + w.String(def()); +} + +} // namespace json diff --git a/src/v/pandaproxy/schema_registry/util.h b/src/v/pandaproxy/schema_registry/util.h index 1882fed6faf68..9955b6cbfe6fd 100644 --- a/src/v/pandaproxy/schema_registry/util.h +++ b/src/v/pandaproxy/schema_registry/util.h @@ -12,8 +12,9 @@ #pragma once #include "base/seastarx.h" +#include "bytes/iobuf_parser.h" +#include "json/chunked_buffer.h" #include "json/document.h" -#include "json/stringbuffer.h" #include "json/writer.h" #include "pandaproxy/schema_registry/errors.h" #include "pandaproxy/schema_registry/types.h" @@ -31,7 +32,7 @@ namespace pandaproxy::schema_registry { /// /// Returns error_code::schema_invalid on failure template -result +result make_schema_definition(std::string_view sv) { // Validate and minify // TODO (Ben): Minify. e.g.: @@ -46,17 +47,16 @@ make_schema_definition(std::string_view sv) { rapidjson::GetParseError_En(doc.GetParseError()), doc.GetErrorOffset())}; } - ::json::GenericStringBuffer str_buf; - str_buf.Reserve(sv.size()); - ::json::Writer<::json::GenericStringBuffer> w{str_buf}; + ::json::chunked_buffer buf; + ::json::Writer<::json::chunked_buffer> w{buf}; doc.Accept(w); - return unparsed_schema_definition::raw_string{ - ss::sstring{str_buf.GetString(), str_buf.GetSize()}}; + return canonical_schema_definition::raw_string{std::move(buf).as_iobuf()}; } template -ss::sstring to_string(typename typed_schema_definition::raw_string def) { - return def; +ss::sstring to_string(named_type def) { + iobuf_parser p{std::move(def)}; + return p.read_string(p.bytes_left()); } } // namespace pandaproxy::schema_registry diff --git a/src/v/wasm/tests/wasm_transform_test.cc b/src/v/wasm/tests/wasm_transform_test.cc index a41f61729203f..22d96d2acd095 100644 --- a/src/v/wasm/tests/wasm_transform_test.cc +++ b/src/v/wasm/tests/wasm_transform_test.cc @@ -10,6 +10,7 @@ */ #include "bytes/bytes.h" +#include "bytes/streambuf.h" #include "pandaproxy/schema_registry/types.h" #include "wasm/errc.h" #include "wasm/tests/wasm_fixture.h" @@ -69,7 +70,9 @@ std::string generate_example_avro_record( const pandaproxy::schema_registry::canonical_schema_definition& def) { // Generate a simple avro record that looks like this (as json): // {"a":5,"b":"foo"} - auto schema = avro::compileJsonSchemaFromString(def.raw()().c_str()); + iobuf_istream bis{def.shared_raw()}; + auto is = avro::istreamInputStream(bis.istream()); + auto schema = avro::compileJsonSchemaFromStream(*is); avro::GenericRecord r(schema.root()); r.setFieldAt(r.fieldIndex("a"), int64_t(4)); r.setFieldAt(r.fieldIndex("b"), std::string("foo"));