diff --git a/src/v/bytes/include/bytes/iobuf.h b/src/v/bytes/include/bytes/iobuf.h index 6d5c3184a8a7..91769c185b31 100644 --- a/src/v/bytes/include/bytes/iobuf.h +++ b/src/v/bytes/include/bytes/iobuf.h @@ -76,6 +76,12 @@ class iobuf { using byte_iterator = details::io_byte_iterator; using placeholder = details::io_placeholder; + static iobuf from(std::string_view view) { + iobuf i; + i.append(view.data(), view.size()); + return i; + } + // NOLINTNEXTLINE iobuf() noexcept { // nothing allocates memory, but boost intrusive list is not marked as diff --git a/src/v/bytes/include/bytes/streambuf.h b/src/v/bytes/include/bytes/streambuf.h index 31be5e11669b..eecfb5431fcf 100644 --- a/src/v/bytes/include/bytes/streambuf.h +++ b/src/v/bytes/include/bytes/streambuf.h @@ -91,3 +91,43 @@ class iobuf_ostreambuf final : public std::streambuf { private: iobuf* _buf; }; + +///\brief Wrap a std::istream around an iobuf +/// +/// iobuf buf; +/// iobuf_istream is(std::move(buf)); +/// std::string out; +/// is.istream() >> out; +class iobuf_istream { +public: + explicit iobuf_istream(iobuf buf) + : _buf(std::move(buf)) + , _isb(_buf) + , _sis{&_isb} {} + std::istream& istream() { return _sis; } + +private: + iobuf _buf; + iobuf_istreambuf _isb; + std::istream _sis; +}; + +///\brief Wrap a std::ostream around an iobuf +/// +/// iobuf_ostream os; +/// os.ostream() << "Hello World"; +/// iobuf buf = std::move(os).buf(); +class iobuf_ostream { +public: + iobuf_ostream() + : _buf() + , _osb(_buf) + , _sos{&_osb} {} + std::ostream& ostream() { return _sos; } + iobuf buf() && { return std::move(_buf); } + +private: + iobuf _buf; + iobuf_ostreambuf _osb; + std::ostream _sos; +}; diff --git a/src/v/json/chunked_buffer.h b/src/v/json/chunked_buffer.h index 0b6d382497b8..3c9154594f0d 100644 --- a/src/v/json/chunked_buffer.h +++ b/src/v/json/chunked_buffer.h @@ -14,6 +14,13 @@ namespace json { +template< + typename OutputStream, + typename SourceEncoding, + typename TargetEncoding, + unsigned writeFlags> +class generic_iobuf_writer; + namespace impl { /** @@ -37,15 +44,32 @@ struct generic_chunked_buffer { //! Get the length of string in Ch in the string buffer. size_t GetLength() const { return _impl.size_bytes() / sizeof(Ch); } - void Reserve(size_t s) { _impl.reserve(s); } + void Reserve(size_t s) { _impl.reserve_memory(s); } void Clear() { _impl.clear(); } /**@}*/ + /** + * Append a fragment to this chunked_buffer. This takes ownership of the + * fragment and is a zero-copy operation. + */ + void append(std::unique_ptr frag) { + _impl.append(std::move(frag)); + } + + /** + * Return the underlying iobuf, this is destructive and zero-copy. + */ iobuf as_iobuf() && { return std::move(_impl); } private: + template< + typename OutputStream, + typename SourceEncoding, + typename TargetEncoding, + unsigned writeFlags> + friend class json::generic_iobuf_writer; iobuf _impl; }; diff --git a/src/v/json/iobuf_writer.h b/src/v/json/iobuf_writer.h new file mode 100644 index 000000000000..fbcbc503ddef --- /dev/null +++ b/src/v/json/iobuf_writer.h @@ -0,0 +1,117 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#pragma once + +#include "bytes/iobuf.h" +#include "bytes/iobuf_parser.h" +#include "json/chunked_buffer.h" +#include "json/writer.h" + +#include + +namespace json { + +///\brief a json::Writer that can accept an iobuf as a String payload. +template< + typename OutputStream, + typename SourceEncoding = json::UTF8<>, + typename TargetEncoding = json::UTF8<>, + unsigned writeFlags = rapidjson::kWriteDefaultFlags> +class generic_iobuf_writer + : public Writer { + using Base + = Writer; + +public: + explicit generic_iobuf_writer(OutputStream& os) + : Base{os} {} + + using Base::String; + bool String(const iobuf& buf) { + constexpr bool buffer_is_chunked + = std::same_as; + if constexpr (buffer_is_chunked) { + return write_chunked_string(buf); + } else { + iobuf_const_parser p{buf}; + auto str = p.read_string(p.bytes_left()); + return this->String(str.data(), str.size(), true); + } + } + +private: + bool write_chunked_string(const iobuf& buf) { + const auto last_frag = [this]() { + return std::prev(this->os_->_impl.end()); + }; + using Ch = Base::Ch; + this->Prefix(rapidjson::kStringType); + const auto beg = buf.begin(); + const auto end = buf.end(); + const auto last = std::prev(end); + Ch stashed{}; + Ch* stash_pos{}; + // Base::WriteString is used to JSON encode the string, and requires a + // contiguous range (pointer, len), so we pass it each fragment. + // + // Unfortunately it also encloses the encoded fragment with double + // quotes: + // R"("A string made of ""fragments will need ""fixing")" + // + // This algorithm efficiently removes the extra quotes without + // additional copying: + // For each encoded fragment that is written (except the last one): + // 1. Trim the suffix quote + // 2. Stash the final character, and where it is to be written + // 3. Drop the final character + // For each encoded fragment that is written (except the first one): + // 4. Restore the stashed character over the prefix-quote + for (auto i = beg; i != end; ++i) { + if (!Base::WriteString(i->get(), i->size())) { + return false; + } + if (i != beg) { + // 4. Restore the stashed character over the prefix-quote + *stash_pos = stashed; + } + if (i != last) { + // 1. Trim the suffix quote + this->os_->_impl.trim_back(1); + + // 2. Stash the final character, ... + auto last = last_frag(); + stashed = *std::prev(last->get_current()); + // 3. Drop the final character + this->os_->_impl.trim_back(1); + + // Ensure a stable address to restore the stashed character + if (last != last_frag()) { + this->os_->_impl.reserve_memory(1); + } + // 2. ...and where it is to be written. + stash_pos = last_frag()->get_current(); + } + } + return this->EndValue(true); + } +}; + +template< + typename OutputStream, + typename SourceEncoding = json::UTF8<>, + typename TargetEncoding = json::UTF8<>, + unsigned writeFlags = rapidjson::kWriteDefaultFlags> +using iobuf_writer = generic_iobuf_writer< + OutputStream, + SourceEncoding, + TargetEncoding, + writeFlags>; + +} // namespace json diff --git a/src/v/json/json.cc b/src/v/json/json.cc index 8ca4ec5bcbe6..69ee33de7920 100644 --- a/src/v/json/json.cc +++ b/src/v/json/json.cc @@ -10,6 +10,7 @@ #include "json/json.h" #include "json/chunked_buffer.h" +#include "json/chunked_input_stream.h" #include "json/stringbuffer.h" namespace json { @@ -128,6 +129,15 @@ ss::sstring minify(std::string_view json) { return ss::sstring(out.GetString(), out.GetSize()); } +iobuf minify(iobuf json) { + json::Reader r; + json::chunked_input_stream in(std::move(json)); + json::chunked_buffer out; + json::Writer w{out}; + r.Parse(in, w); + return std::move(out).as_iobuf(); +} + ss::sstring prettify(std::string_view json) { json::Reader r; json::StringStream in(json.data()); diff --git a/src/v/json/json.h b/src/v/json/json.h index b70f8d1961b4..23b3f7e85934 100644 --- a/src/v/json/json.h +++ b/src/v/json/json.h @@ -11,6 +11,7 @@ #pragma once +#include "bytes/iobuf.h" #include "json/_include_first.h" #include "json/prettywriter.h" #include "json/reader.h" @@ -132,6 +133,7 @@ void rjson_serialize( } ss::sstring minify(std::string_view json); +iobuf minify(iobuf json); ss::sstring prettify(std::string_view json); diff --git a/src/v/json/tests/json_serialization_test.cc b/src/v/json/tests/json_serialization_test.cc index 31aee1f7d34c..9f8bc668be2d 100644 --- a/src/v/json/tests/json_serialization_test.cc +++ b/src/v/json/tests/json_serialization_test.cc @@ -8,7 +8,11 @@ // by the Apache License, Version 2.0 #include "base/seastarx.h" +#include "bytes/iobuf_parser.h" +#include "json/chunked_buffer.h" +#include "json/chunked_input_stream.h" #include "json/document.h" +#include "json/iobuf_writer.h" #include "json/json.h" #include "json/stringbuffer.h" #include "json/writer.h" @@ -134,3 +138,56 @@ SEASTAR_THREAD_TEST_CASE(json_serialization_test) { BOOST_TEST(res_doc["obj"].IsObject()); } + +static constexpr std::string_view input_string{ + R"(The quick brown fox jumps over the lazy dog)"}; + +static constexpr auto make_chunked_str = []() { + constexpr auto half = input_string.size() / 2; + iobuf in; + in.append_fragments(iobuf::from(input_string.substr(0, half))); + in.append_fragments(iobuf::from(input_string.substr(half))); + BOOST_REQUIRE_EQUAL(std::distance(in.begin(), in.end()), 2); + return in; +}; + +static constexpr auto make_chunked_json = []() { + iobuf in; + in.append_fragments(iobuf::from("\"")); + in.append_fragments(make_chunked_str()); + in.append_fragments(iobuf::from("\"")); + BOOST_REQUIRE_EQUAL(std::distance(in.begin(), in.end()), 4); + return in; +}; + +SEASTAR_THREAD_TEST_CASE(json_chunked_input_stream_test) { + { + json::chunked_input_stream is{make_chunked_json()}; + json::Document doc; + doc.ParseStream(is); + BOOST_REQUIRE(!doc.HasParseError()); + + BOOST_REQUIRE(doc.IsString()); + auto out_str = std::string_view{doc.GetString(), doc.GetStringLength()}; + BOOST_REQUIRE_EQUAL(out_str, input_string); + } +} + +SEASTAR_THREAD_TEST_CASE(json_iobuf_writer_test) { + constexpr auto to_string = [](const iobuf& buf) { + iobuf_const_parser p{std::move(buf)}; + auto b = p.read_bytes(p.bytes_left()); + return std::string{b.begin(), b.end()}; + }; + + { + json::chunked_buffer out; + json::iobuf_writer os{out}; + auto buf = make_chunked_str(); + os.String(buf); + auto out_buf = std::move(out).as_iobuf(); + auto expected = make_chunked_json(); + BOOST_CHECK_EQUAL(out_buf, expected); + BOOST_CHECK_EQUAL(to_string(out_buf), to_string(expected)); + } +} diff --git a/src/v/pandaproxy/json/iobuf.h b/src/v/pandaproxy/json/iobuf.h index b3068b3f4251..b20f31ce1df1 100644 --- a/src/v/pandaproxy/json/iobuf.h +++ b/src/v/pandaproxy/json/iobuf.h @@ -12,9 +12,8 @@ #pragma once #include "bytes/iobuf.h" -#include "bytes/iobuf_parser.h" +#include "json/iobuf_writer.h" #include "json/reader.h" -#include "json/stream.h" #include "json/writer.h" #include "pandaproxy/json/rjson_util.h" #include "utils/base64.h" @@ -65,7 +64,7 @@ class rjson_serialize_impl { : _fmt(fmt) {} template - bool operator()(::json::Writer& w, iobuf buf) { + bool operator()(::json::iobuf_writer& w, iobuf buf) { switch (_fmt) { case serialization_format::none: [[fallthrough]]; @@ -81,7 +80,7 @@ class rjson_serialize_impl { } template - bool encode_base64(::json::Writer& w, iobuf buf) { + bool encode_base64(::json::iobuf_writer& w, iobuf buf) { if (buf.empty()) { return w.Null(); } @@ -94,11 +93,8 @@ class rjson_serialize_impl { if (buf.empty()) { return w.Null(); } - iobuf_parser p{std::move(buf)}; - auto str = p.read_string(p.bytes_left()); - static_assert(str.padding(), "StringStream requires null termination"); + ::json::chunked_input_stream ss{std::move(buf)}; ::json::Reader reader; - ::json::StringStream ss{str.c_str()}; return reader.Parse(ss, w); }; diff --git a/src/v/pandaproxy/json/requests/fetch.h b/src/v/pandaproxy/json/requests/fetch.h index c8abd9c76fa4..b8d6ab745f52 100644 --- a/src/v/pandaproxy/json/requests/fetch.h +++ b/src/v/pandaproxy/json/requests/fetch.h @@ -39,7 +39,7 @@ class rjson_serialize_impl { , _base_offset(base_offset) {} template - bool operator()(::json::Writer& w, model::record record) { + bool operator()(::json::iobuf_writer& w, model::record record) { auto offset = _base_offset() + record.offset_delta(); w.StartObject(); @@ -93,7 +93,8 @@ class rjson_serialize_impl { : _fmt(fmt) {} template - bool operator()(::json::Writer& w, kafka::fetch_response&& res) { + bool + operator()(::json::iobuf_writer& w, kafka::fetch_response&& res) { // Eager check for errors for (auto& v : res) { if (v.partition_response->error_code != kafka::error_code::none) { diff --git a/src/v/pandaproxy/json/requests/test/fetch.cc b/src/v/pandaproxy/json/requests/test/fetch.cc index b32b43c680c0..c4a927aee85f 100644 --- a/src/v/pandaproxy/json/requests/test/fetch.cc +++ b/src/v/pandaproxy/json/requests/test/fetch.cc @@ -67,7 +67,7 @@ SEASTAR_THREAD_TEST_CASE(test_produce_fetch_empty) { auto fmt = ppj::serialization_format::binary_v2; ::json::StringBuffer str_buf; - ::json::Writer<::json::StringBuffer> w(str_buf); + ::json::iobuf_writer<::json::StringBuffer> w(str_buf); ppj::rjson_serialize_fmt(fmt)(w, std::move(res)); auto expected = R"([])"; @@ -85,7 +85,7 @@ SEASTAR_THREAD_TEST_CASE(test_produce_fetch_one) { auto fmt = ppj::serialization_format::binary_v2; ::json::StringBuffer str_buf; - ::json::Writer<::json::StringBuffer> w(str_buf); + ::json::iobuf_writer<::json::StringBuffer> w(str_buf); ppj::rjson_serialize_fmt(fmt)(w, std::move(res)); auto expected diff --git a/src/v/pandaproxy/json/rjson_util.h b/src/v/pandaproxy/json/rjson_util.h index a3d3358f2078..3ff8ff2c998d 100644 --- a/src/v/pandaproxy/json/rjson_util.h +++ b/src/v/pandaproxy/json/rjson_util.h @@ -14,11 +14,11 @@ #include "bytes/iostream.h" #include "json/chunked_buffer.h" #include "json/chunked_input_stream.h" +#include "json/iobuf_writer.h" #include "json/json.h" #include "json/reader.h" #include "json/stream.h" #include "json/stringbuffer.h" -#include "json/writer.h" #include "pandaproxy/json/exceptions.h" #include "pandaproxy/json/types.h" @@ -28,6 +28,7 @@ #include #include +#include namespace pandaproxy::json { @@ -40,7 +41,7 @@ class rjson_serialize_impl; template Buffer rjson_serialize_buf(T&& v) { Buffer buf; - ::json::Writer wrt{buf}; + ::json::iobuf_writer wrt{buf}; using ::json::rjson_serialize; using ::pandaproxy::json::rjson_serialize; @@ -90,7 +91,7 @@ struct rjson_serialize_fmt_impl { std::forward(t)); } template - bool operator()(::json::Writer& w, T&& t) { + bool operator()(::json::iobuf_writer& w, T&& t) { return rjson_serialize_impl>{fmt}( w, std::forward(t)); } @@ -155,7 +156,7 @@ rjson_parse(std::unique_ptr req, Handler handler) { return ss::make_ready_future( ss::stop_iteration::yes); } - buf.append(std::move(tmp_buf)); + buf.append(std::make_unique(std::move(tmp_buf))); return ss::make_ready_future( ss::stop_iteration::no); }); diff --git a/src/v/pandaproxy/json/test/iobuf.cc b/src/v/pandaproxy/json/test/iobuf.cc index 8713238b3365..f0bf34d45155 100644 --- a/src/v/pandaproxy/json/test/iobuf.cc +++ b/src/v/pandaproxy/json/test/iobuf.cc @@ -10,8 +10,8 @@ #include "pandaproxy/json/iobuf.h" #include "bytes/iobuf_parser.h" +#include "json/iobuf_writer.h" #include "json/stringbuffer.h" -#include "json/writer.h" #include "pandaproxy/json/rjson_util.h" #include @@ -42,7 +42,7 @@ SEASTAR_THREAD_TEST_CASE(test_iobuf_serialize_binary) { in_buf.append(input.data(), input.size()); ::json::StringBuffer out_buf; - ::json::Writer<::json::StringBuffer> w(out_buf); + ::json::iobuf_writer<::json::StringBuffer> w(out_buf); ppj::rjson_serialize_fmt(ppj::serialization_format::binary_v2)( w, std::move(in_buf)); ss::sstring output{out_buf.GetString(), out_buf.GetSize()}; diff --git a/src/v/pandaproxy/rest/handlers.cc b/src/v/pandaproxy/rest/handlers.cc index 0f1f956468c8..bc0a1b9ab8d4 100644 --- a/src/v/pandaproxy/rest/handlers.cc +++ b/src/v/pandaproxy/rest/handlers.cc @@ -145,7 +145,7 @@ get_topics_records(server::request_t rq, server::reply_t rp) { .fetch_partition(std::move(tp), offset, max_bytes, timeout) .then([res_fmt](kafka::fetch_response res) { ::json::chunked_buffer buf; - ::json::Writer<::json::chunked_buffer> w(buf); + ::json::iobuf_writer<::json::chunked_buffer> w(buf); ppj::rjson_serialize_fmt(res_fmt)(w, std::move(res)); return buf; @@ -391,7 +391,7 @@ consumer_fetch(server::request_t rq, server::reply_t rp) { return client.consumer_fetch(group_id, name, timeout, max_bytes) .then([res_fmt, rp{std::move(rp)}](auto res) mutable { ::json::chunked_buffer buf; - ::json::Writer<::json::chunked_buffer> w(buf); + ::json::iobuf_writer<::json::chunked_buffer> w(buf); ppj::rjson_serialize_fmt(res_fmt)(w, std::move(res)); diff --git a/src/v/pandaproxy/schema_registry/avro.cc b/src/v/pandaproxy/schema_registry/avro.cc index 10e085a70512..6e9193187a26 100644 --- a/src/v/pandaproxy/schema_registry/avro.cc +++ b/src/v/pandaproxy/schema_registry/avro.cc @@ -11,15 +11,16 @@ #include "pandaproxy/schema_registry/avro.h" +#include "bytes/streambuf.h" #include "json/allocator.h" +#include "json/chunked_input_stream.h" #include "json/document.h" -#include "json/encodings.h" -#include "json/stringbuffer.h" +#include "json/json.h" #include "json/types.h" -#include "json/writer.h" #include "pandaproxy/schema_registry/error.h" #include "pandaproxy/schema_registry/errors.h" #include "pandaproxy/schema_registry/sharded_store.h" +#include "pandaproxy/schema_registry/types.h" #include "strings/string_switch.h" #include @@ -30,6 +31,7 @@ #include #include #include +#include #include #include #include @@ -43,6 +45,10 @@ #include #include +namespace pandaproxy::json { +using namespace ::json; +} + namespace pandaproxy::schema_registry { namespace { @@ -421,7 +427,10 @@ std::ostream& operator<<(std::ostream& os, const avro_schema_definition& def) { } canonical_schema_definition::raw_string avro_schema_definition::raw() const { - return canonical_schema_definition::raw_string{_impl.toJson(false)}; + iobuf_ostream os; + _impl.toJson(os.ostream()); + return canonical_schema_definition::raw_string{ + json::minify(std::move(os).buf())}; } ss::sstring avro_schema_definition::name() const { @@ -436,17 +445,22 @@ class collected_schema { bool insert(ss::sstring name, canonical_schema_definition def) { bool inserted = _names.insert(std::move(name)).second; if (inserted) { - _schemas.push_back(std::move(def).raw()()); + _schemas.push_back(std::move(def).raw()); } return inserted; } - ss::sstring flatten() { - return fmt::format("{}", fmt::join(_schemas, "\n")); + canonical_schema_definition::raw_string flatten() && { + iobuf out; + for (auto& s : _schemas) { + out.append(std::move(s)); + out.append("\n", 1); + } + return canonical_schema_definition::raw_string{std::move(out)}; } private: absl::flat_hash_set _names; - std::vector _schemas; + std::vector _schemas; }; ss::future collect_schema( @@ -454,18 +468,14 @@ ss::future collect_schema( collected_schema collected, ss::sstring name, canonical_schema schema) { - for (auto& ref : schema.def().refs()) { + for (auto const& ref : schema.def().refs()) { if (!collected.contains(ref.name)) { auto ss = co_await store.get_subject_schema( - std::move(ref.sub), ref.version, include_deleted::no); + ref.sub, ref.version, include_deleted::no); collected = co_await collect_schema( - store, - std::move(collected), - std::move(ref.name), - std::move(ss.schema)); + store, std::move(collected), ref.name, std::move(ss.schema)); } } - // NOLINTNEXTLINE(bugprone-use-after-move) collected.insert(std::move(name), std::move(schema).def()); co_return std::move(collected); } @@ -477,11 +487,10 @@ make_avro_schema_definition(sharded_store& store, canonical_schema schema) { auto name = schema.sub()(); auto schema_refs = schema.def().refs(); auto refs = co_await collect_schema(store, {}, name, std::move(schema)); - auto def = refs.flatten(); + iobuf_istream sis{std::move(refs).flatten()()}; + auto is = avro::istreamInputStream(sis.istream()); co_return avro_schema_definition{ - avro::compileJsonSchemaFromMemory( - reinterpret_cast(def.data()), def.length()), - std::move(schema_refs)}; + avro::compileJsonSchemaFromStream(*is), std::move(schema_refs)}; } catch (const avro::Exception& e) { ex = e; } @@ -496,12 +505,12 @@ sanitize_avro_schema_definition(unparsed_schema_definition def) { json::Document doc; constexpr auto flags = rapidjson::kParseDefaultFlags | rapidjson::kParseStopWhenDoneFlag; - const auto& raw = def.raw()(); - if (raw.empty()) { + if (def.raw()().empty()) { auto ec = error_code::schema_empty; return error_info{ec, make_error_code(ec).message()}; } - doc.Parse(raw.data(), raw.size()); + json::chunked_input_stream is{def.shared_raw()()}; + doc.ParseStream(is); if (doc.HasParseError()) { return error_info{ error_code::schema_invalid, @@ -513,21 +522,25 @@ sanitize_avro_schema_definition(unparsed_schema_definition def) { sanitize_context ctx{.alloc = doc.GetAllocator()}; auto res = sanitize(doc, ctx); if (res.has_error()) { + // TODO BP: Prevent this linearizaton + iobuf_parser p(std::move(def).raw()()); return error_info{ res.assume_error().code(), - fmt::format("{} {}", res.assume_error().message(), raw)}; + fmt::format( + "{} {}", + res.assume_error().message(), + p.read_string(p.bytes_left()))}; } - json::StringBuffer str_buf; - str_buf.Reserve(raw.size()); - json::Writer w{str_buf}; + json::chunked_buffer buf; + json::Writer w{buf}; if (!doc.Accept(w)) { return error_info{error_code::schema_invalid, "Invalid schema"}; } return canonical_schema_definition{ - std::string_view{str_buf.GetString(), str_buf.GetSize()}, + canonical_schema_definition::raw_string{std::move(buf).as_iobuf()}, schema_type::avro, def.refs()}; } diff --git a/src/v/pandaproxy/schema_registry/handlers.cc b/src/v/pandaproxy/schema_registry/handlers.cc index bea8d14fcdb4..6075f178d143 100644 --- a/src/v/pandaproxy/schema_registry/handlers.cc +++ b/src/v/pandaproxy/schema_registry/handlers.cc @@ -430,7 +430,7 @@ post_subject(server::request_t rq, server::reply_t rp) { } auto sub_schema = co_await rq.service().schema_store().has_schema( - schema, inc_del); + std::move(schema), inc_del); rp.rep->write_body( "json", @@ -460,7 +460,8 @@ post_subject_versions(server::request_t rq, server::reply_t rp) { unparsed.id.value_or(invalid_schema_id), is_deleted::no}; - auto ids = co_await rq.service().schema_store().get_schema_version(schema); + auto ids = co_await rq.service().schema_store().get_schema_version( + schema.share()); schema_id schema_id{ids.id.value_or(invalid_schema_id)}; if (!ids.version.has_value()) { @@ -523,7 +524,8 @@ ss::future::reply_t> get_subject_versions_version_schema( auto get_res = co_await rq.service().schema_store().get_subject_schema( sub, version, inc_del); - rp.rep->write_body("json", get_res.schema.def().raw()()); + rp.rep->write_body( + "json", ppj::as_body_writer(std::move(get_res.schema).def().raw()())); co_return rp; } @@ -542,7 +544,7 @@ get_subject_versions_version_referenced_by( auto references = co_await rq.service().schema_store().referenced_by( sub, version); - rp.rep->write_body("json", ppj::rjson_serialize(references)); + rp.rep->write_body("json", ppj::rjson_serialize(std::move(references))); co_return rp; } @@ -647,9 +649,11 @@ compatibility_subject_version(server::request_t rq, server::reply_t rp) { auto schema = co_await rq.service().schema_store().make_canonical_schema( std::move(unparsed.def)); - auto get_res = co_await get_or_load(rq, [&rq, &schema, version]() { - return rq.service().schema_store().is_compatible(version, schema); - }); + auto get_res = co_await get_or_load( + rq, [&rq, schema{std::move(schema)}, version]() { + return rq.service().schema_store().is_compatible( + version, schema.share()); + }); rp.rep->write_body( "json", diff --git a/src/v/pandaproxy/schema_registry/protobuf.cc b/src/v/pandaproxy/schema_registry/protobuf.cc index c492cc82b46b..c0a55b8ed27b 100644 --- a/src/v/pandaproxy/schema_registry/protobuf.cc +++ b/src/v/pandaproxy/schema_registry/protobuf.cc @@ -12,6 +12,7 @@ #include "pandaproxy/schema_registry/protobuf.h" #include "base/vlog.h" +#include "bytes/streambuf.h" #include "kafka/protocol/errors.h" #include "pandaproxy/logger.h" #include "pandaproxy/schema_registry/errors.h" @@ -201,8 +202,8 @@ class dp_error_collector final : public pb::DescriptorPool::ErrorCollector { class schema_def_input_stream : public pb::io::ZeroCopyInputStream { public: explicit schema_def_input_stream(const canonical_schema_definition& def) - : _str(def.raw()) - , _impl{_str().data(), static_cast(_str().size())} {} + : _is{def.shared_raw()} + , _impl{&_is.istream()} {} bool Next(const void** data, int* size) override { return _impl.Next(data, size); @@ -212,8 +213,8 @@ class schema_def_input_stream : public pb::io::ZeroCopyInputStream { int64_t ByteCount() const override { return _impl.ByteCount(); } private: - canonical_schema_definition::raw_string _str; - pb::io::ArrayInputStream _impl; + iobuf_istream _is; + pb::io::IstreamInputStream _impl; }; class parser { @@ -231,13 +232,9 @@ class parser { // Attempt parse a .proto file if (!_parser.Parse(&t, &_fdp)) { // base64 decode the schema - std::string_view b64_def{ - schema.def().raw()().data(), schema.def().raw()().size()}; - auto bytes_def = base64_to_bytes(b64_def); - + iobuf_istream is{base64_to_iobuf(schema.def().raw()())}; // Attempt parse as an encoded FileDescriptorProto.pb - if (!_fdp.ParseFromArray( - bytes_def.data(), static_cast(bytes_def.size()))) { + if (!_fdp.ParseFromIstream(&is.istream())) { throw as_exception(error_collector.error()); } } @@ -297,7 +294,7 @@ ss::future build_file_with_refs( ss::future import_schema( pb::DescriptorPool& dp, sharded_store& store, canonical_schema schema) { try { - co_return co_await build_file_with_refs(dp, store, schema); + co_return co_await build_file_with_refs(dp, store, schema.share()); } catch (const exception& e) { vlog(plog.warn, "Failed to decode schema: {}", e.what()); throw as_exception(invalid_schema(schema)); @@ -326,6 +323,7 @@ struct protobuf_schema_definition::impl { * messages */ ss::sstring debug_string() const { + // TODO BP: Prevent this linearization auto s = fd->DebugString(); // reordering not required if no package or no dependencies @@ -353,6 +351,7 @@ struct protobuf_schema_definition::impl { auto imports = trim(sv.substr(imports_pos, imports_len)); auto footer = trim(sv.substr(package_pos + package.length())); + // TODO BP: Prevent this linearization return ssx::sformat( "{}\n{}\n\n{}\n\n{}\n", header, package, imports, footer); } @@ -409,16 +408,17 @@ validate_protobuf_schema(sharded_store& store, canonical_schema schema) { ss::future make_canonical_protobuf_schema(sharded_store& store, unparsed_schema schema) { - // NOLINTBEGIN(bugprone-use-after-move) + auto [sub, unparsed] = std::move(schema).destructure(); + auto [def, type, refs] = std::move(unparsed).destructure(); canonical_schema temp{ - std::move(schema).sub(), - {canonical_schema_definition::raw_string{schema.def().raw()()}, - schema.def().type(), - schema.def().refs()}}; - - auto validated = co_await validate_protobuf_schema(store, temp); - co_return canonical_schema{std::move(temp).sub(), std::move(validated)}; - // NOLINTEND(bugprone-use-after-move) + sub, + {canonical_schema_definition::raw_string{std::move(def)()}, + type, + std::move(refs)}}; + + co_return canonical_schema{ + std::move(sub), + co_await validate_protobuf_schema(store, std::move(temp))}; } namespace { diff --git a/src/v/pandaproxy/schema_registry/requests/get_schemas_ids_id.h b/src/v/pandaproxy/schema_registry/requests/get_schemas_ids_id.h index c6d75a685464..36ee3f8b6d45 100644 --- a/src/v/pandaproxy/schema_registry/requests/get_schemas_ids_id.h +++ b/src/v/pandaproxy/schema_registry/requests/get_schemas_ids_id.h @@ -11,6 +11,7 @@ #pragma once +#include "json/iobuf_writer.h" #include "pandaproxy/json/rjson_util.h" #include "pandaproxy/schema_registry/types.h" @@ -22,7 +23,7 @@ struct get_schemas_ids_id_response { template void rjson_serialize( - ::json::Writer& w, const get_schemas_ids_id_response& res) { + ::json::iobuf_writer& w, const get_schemas_ids_id_response& res) { w.StartObject(); if (res.definition.type() != schema_type::avro) { w.Key("schemaType"); diff --git a/src/v/pandaproxy/schema_registry/requests/get_subject_versions_version.h b/src/v/pandaproxy/schema_registry/requests/get_subject_versions_version.h index 0b6b03d5cf7c..852ecc5ba91d 100644 --- a/src/v/pandaproxy/schema_registry/requests/get_subject_versions_version.h +++ b/src/v/pandaproxy/schema_registry/requests/get_subject_versions_version.h @@ -11,6 +11,7 @@ #pragma once +#include "json/iobuf_writer.h" #include "pandaproxy/json/rjson_util.h" #include "pandaproxy/schema_registry/types.h" @@ -23,8 +24,8 @@ struct post_subject_versions_version_response { }; template -inline void rjson_serialize( - ::json::Writer& w, +void rjson_serialize( + ::json::iobuf_writer& w, const post_subject_versions_version_response& res) { w.StartObject(); w.Key("subject"); diff --git a/src/v/pandaproxy/schema_registry/requests/post_subject_versions.h b/src/v/pandaproxy/schema_registry/requests/post_subject_versions.h index c83427b00275..455cdfaf3814 100644 --- a/src/v/pandaproxy/schema_registry/requests/post_subject_versions.h +++ b/src/v/pandaproxy/schema_registry/requests/post_subject_versions.h @@ -141,8 +141,10 @@ class post_subject_versions_request_handler auto sv = std::string_view{str, len}; switch (_state) { case state::schema: { + iobuf buf; + buf.append(sv.data(), sv.size()); _schema.def = unparsed_schema_definition::raw_string{ - ss::sstring{sv}}; + std::move(buf)}; _state = state::record; return true; } diff --git a/src/v/pandaproxy/schema_registry/requests/test/get_subject_versions_version.cc b/src/v/pandaproxy/schema_registry/requests/test/get_subject_versions_version.cc index 228e57fbad06..ef269e28ad64 100644 --- a/src/v/pandaproxy/schema_registry/requests/test/get_subject_versions_version.cc +++ b/src/v/pandaproxy/schema_registry/requests/test/get_subject_versions_version.cc @@ -10,11 +10,10 @@ #include "pandaproxy/schema_registry/requests/get_subject_versions_version.h" #include "base/seastarx.h" +#include "pandaproxy/json/rjson_util.h" #include -#include - namespace ppj = pandaproxy::json; namespace pps = pandaproxy::schema_registry; @@ -30,7 +29,9 @@ SEASTAR_THREAD_TEST_CASE(test_post_subject_versions_version_response) { const pps::subject sub{"imported-ref"}; pps::post_subject_versions_version_response response{ - .schema{pps::subject{"imported-ref"}, schema_def}, .id{12}, .version{2}}; + .schema{pps::subject{"imported-ref"}, schema_def.copy()}, + .id{12}, + .version{2}}; const ss::sstring expected{ R"( diff --git a/src/v/pandaproxy/schema_registry/requests/test/post_subject_versions.cc b/src/v/pandaproxy/schema_registry/requests/test/post_subject_versions.cc index a0befd798c81..645e35866ce4 100644 --- a/src/v/pandaproxy/schema_registry/requests/test/post_subject_versions.cc +++ b/src/v/pandaproxy/schema_registry/requests/test/post_subject_versions.cc @@ -50,20 +50,22 @@ SEASTAR_THREAD_TEST_CASE(test_post_subject_versions_parser) { })"}; const pps::subject sub{"test_subject"}; const parse_result expected{ - {sub, expected_schema_def}, std::nullopt, std::nullopt}; + {sub, expected_schema_def.share()}, std::nullopt, std::nullopt}; auto result{ppj::impl::rjson_parse( payload.data(), pps::post_subject_versions_request_handler{sub})}; // canonicalisation now requires a sharded_store, for now, minify. - // NOLINTBEGIN(bugprone-use-after-move) + auto [rsub, unparsed] = std::move(result.def).destructure(); + auto [def, type, refs] = std::move(unparsed).destructure(); + result.def = { - std::move(result.def).sub(), + std::move(rsub), pps::unparsed_schema_definition{ - ::json::minify(result.def.def().raw()()), + pps::unparsed_schema_definition::raw_string{ + ::json::minify(std::move(def)())}, pps::schema_type::avro, - std::move(result.def).def().refs()}}; - // NOLINTEND(bugprone-use-after-move) + std::move(refs)}}; BOOST_REQUIRE_EQUAL(expected.def, result.def); BOOST_REQUIRE_EQUAL(expected.id.has_value(), result.id.has_value()); diff --git a/src/v/pandaproxy/schema_registry/seq_writer.cc b/src/v/pandaproxy/schema_registry/seq_writer.cc index 7fb68c39ad91..00c2997b0fea 100644 --- a/src/v/pandaproxy/schema_registry/seq_writer.cc +++ b/src/v/pandaproxy/schema_registry/seq_writer.cc @@ -170,7 +170,7 @@ ss::future seq_writer::produce_and_apply( model::schema_registry_internal_tp, batch.copy()); if (res.error_code != kafka::error_code::none) { - throw kafka::exception(res.error_code, *res.error_message); + throw kafka::exception(res.error_code, res.error_message.value_or("")); } auto success = write_at.value_or(res.base_offset) == res.base_offset; @@ -216,16 +216,20 @@ ss::future> seq_writer::do_write_subject_version( // Check if store already contains this data: if // so, we do no I/O and return the schema ID. - auto projected = co_await _store.project_ids(schema).handle_exception( - [](std::exception_ptr e) { - vlog(plog.debug, "write_subject_version: project_ids failed: {}", e); - return ss::make_exception_future(e); - }); + auto projected + = co_await _store.project_ids(schema.share()) + .handle_exception([](std::exception_ptr e) { + vlog( + plog.debug, "write_subject_version: project_ids failed: {}", e); + return ss::make_exception_future(e); + }); if (!projected.inserted) { vlog(plog.debug, "write_subject_version: no-op"); co_return projected.id; } else { + auto canonical = std::move(schema.schema); + auto sub = canonical.sub(); vlog( plog.debug, "seq_writer::write_subject_version project offset={} " @@ -233,22 +237,22 @@ ss::future> seq_writer::do_write_subject_version( "schema={} " "version={}", write_at, - schema.schema.sub(), + sub, projected.id, projected.version); auto key = schema_key{ .seq{write_at}, .node{_node_id}, - .sub{schema.schema.sub()}, + .sub{sub}, .version{projected.version}}; auto value = canonical_schema_value{ - .schema{schema.schema}, + .schema{std::move(canonical)}, .version{projected.version}, .id{projected.id}, .deleted = is_deleted::no}; - batch_builder rb(write_at, schema.schema.sub()); + batch_builder rb(write_at, sub); rb(std::move(key), std::move(value)); if (co_await produce_and_apply(write_at, std::move(rb).build())) { @@ -261,9 +265,9 @@ ss::future> seq_writer::do_write_subject_version( } ss::future seq_writer::write_subject_version(subject_schema schema) { - return sequenced_write( - [schema{std::move(schema)}](model::offset write_at, seq_writer& seq) { - return seq.do_write_subject_version(schema, write_at); + co_return co_await sequenced_write( + [&schema](model::offset write_at, seq_writer& seq) { + return seq.do_write_subject_version(schema.share(), write_at); }); } diff --git a/src/v/pandaproxy/schema_registry/sharded_store.cc b/src/v/pandaproxy/schema_registry/sharded_store.cc index 84252cfb9b8e..a3c662203c92 100644 --- a/src/v/pandaproxy/schema_registry/sharded_store.cc +++ b/src/v/pandaproxy/schema_registry/sharded_store.cc @@ -77,9 +77,10 @@ ss::future sharded_store::make_canonical_schema(unparsed_schema schema) { switch (schema.type()) { case schema_type::avro: { + auto [sub, unparsed] = std::move(schema).destructure(); co_return canonical_schema{ - std::move(schema.sub()), - sanitize_avro_schema_definition(schema.def()).value()}; + std::move(sub), + sanitize_avro_schema_definition(std::move(unparsed)).value()}; } case schema_type::protobuf: co_return co_await make_canonical_protobuf_schema( @@ -93,7 +94,7 @@ sharded_store::make_canonical_schema(unparsed_schema schema) { ss::future<> sharded_store::validate_schema(canonical_schema schema) { switch (schema.type()) { case schema_type::avro: { - co_await make_avro_schema_definition(*this, schema); + co_await make_avro_schema_definition(*this, std::move(schema)); co_return; } case schema_type::protobuf: @@ -111,10 +112,12 @@ sharded_store::make_valid_schema(canonical_schema schema) { // See #3596 for details, especially if modifying it. switch (schema.type()) { case schema_type::avro: { - co_return co_await make_avro_schema_definition(*this, schema); + co_return co_await make_avro_schema_definition( + *this, std::move(schema)); } case schema_type::protobuf: { - co_return co_await make_protobuf_schema_definition(*this, schema); + co_return co_await make_protobuf_schema_definition( + *this, std::move(schema)); } case schema_type::json: break; @@ -125,7 +128,7 @@ sharded_store::make_valid_schema(canonical_schema schema) { ss::future sharded_store::get_schema_version(subject_schema schema) { // Validate the schema (may throw) - co_await validate_schema(schema.schema); + co_await validate_schema(schema.schema.share()); // Determine if the definition already exists auto map = [&schema](store& s) { @@ -191,7 +194,7 @@ sharded_store::get_schema_version(subject_schema schema) { // Check compatibility of the schema if (!v_id.has_value() && !versions.empty()) { auto compat = co_await is_compatible( - versions.back().version, schema.schema); + versions.back().version, schema.schema.share()); if (!compat) { throw exception( error_code::schema_incompatible, @@ -235,8 +238,12 @@ ss::future sharded_store::upsert( schema_id id, schema_version version, is_deleted deleted) { - auto canonical = co_await make_canonical_schema(schema); - co_return co_await upsert(marker, canonical, id, version, deleted); + co_return co_await upsert( + marker, + co_await make_canonical_schema(std::move(schema)), + id, + version, + deleted); } ss::future sharded_store::upsert( @@ -245,15 +252,10 @@ ss::future sharded_store::upsert( schema_id id, schema_version version, is_deleted deleted) { - // NOLINTNEXTLINE(bugprone-use-after-move) - co_await upsert_schema(id, std::move(schema).def()); + auto [sub, def] = std::move(schema).destructure(); + co_await upsert_schema(id, std::move(def)); co_return co_await upsert_subject( - marker, - // NOLINTNEXTLINE(bugprone-use-after-move) - std::move(schema).sub(), - version, - id, - deleted); + marker, std::move(sub), version, id, deleted); } ss::future sharded_store::has_schema(schema_id id) { @@ -273,7 +275,7 @@ sharded_store::has_schema(canonical_schema schema, include_deleted inc_del) { auto versions = co_await get_versions(schema.sub(), inc_del); try { - co_await validate_schema(schema); + co_await validate_schema(schema.share()); } catch (const exception& e) { throw as_exception(invalid_subject_schema(schema.sub())); } @@ -674,7 +676,7 @@ ss::future<> sharded_store::maybe_update_max_schema_id(schema_id id) { ss::future sharded_store::is_compatible( schema_version version, canonical_schema new_schema) { // Lookup the version_ids - const auto& sub = new_schema.sub(); + const auto sub = new_schema.sub(); const auto versions = co_await _store.invoke_on( shard_for(sub), _smp_opts, [sub](auto& s) { return s.get_version_ids(sub, include_deleted::no).value(); @@ -725,7 +727,7 @@ ss::future sharded_store::is_compatible( ver_it = versions.begin(); } - auto new_valid = co_await make_valid_schema(new_schema); + auto new_valid = co_await make_valid_schema(std::move(new_schema)); auto is_compat = true; for (; is_compat && ver_it != versions.end(); ++ver_it) { @@ -735,7 +737,8 @@ ss::future sharded_store::is_compatible( auto old_schema = co_await get_subject_schema( sub, ver_it->version, include_deleted::no); - auto old_valid = co_await make_valid_schema(old_schema.schema); + auto old_valid = co_await make_valid_schema( + std::move(old_schema.schema)); if ( compat == compatibility_level::backward diff --git a/src/v/pandaproxy/schema_registry/storage.h b/src/v/pandaproxy/schema_registry/storage.h index 1f3a3e23e9f8..2a6d3c71c0be 100644 --- a/src/v/pandaproxy/schema_registry/storage.h +++ b/src/v/pandaproxy/schema_registry/storage.h @@ -12,7 +12,7 @@ #pragma once #include "base/vlog.h" -#include "bytes/iobuf_parser.h" +#include "json/iobuf_writer.h" #include "json/json.h" #include "json/types.h" #include "json/writer.h" @@ -324,7 +324,8 @@ using unparsed_schema_value = schema_value; using canonical_schema_value = schema_value; template -void rjson_serialize(::json::Writer& w, const schema_value& val) { +void rjson_serialize( + ::json::iobuf_writer& w, const schema_value& val) { w.StartObject(); w.Key("subject"); ::json::rjson_serialize(w, val.schema.sub()); diff --git a/src/v/pandaproxy/schema_registry/store.h b/src/v/pandaproxy/schema_registry/store.h index dd0d29526a36..aa3c51ad0124 100644 --- a/src/v/pandaproxy/schema_registry/store.h +++ b/src/v/pandaproxy/schema_registry/store.h @@ -76,9 +76,9 @@ class store { /// /// return the schema_version and schema_id, and whether it's new. insert_result insert(canonical_schema schema) { - auto id = insert_schema(std::move(schema).def()).id; - // NOLINTNEXTLINE(bugprone-use-after-move) - auto [version, inserted] = insert_subject(std::move(schema).sub(), id); + auto [sub, def] = std::move(schema).destructure(); + auto id = insert_schema(std::move(def)).id; + auto [version, inserted] = insert_subject(std::move(sub), id); return {version, id, inserted}; } @@ -89,7 +89,7 @@ class store { if (it == _schemas.end()) { return not_found(id); } - return {it->second.definition}; + return {it->second.definition.share()}; } ///\brief Return the id of the schema, if it already exists. diff --git a/src/v/pandaproxy/schema_registry/test/compatibility_avro.cc b/src/v/pandaproxy/schema_registry/test/compatibility_avro.cc index 79decc2cc31c..78e2dc4e6ee6 100644 --- a/src/v/pandaproxy/schema_registry/test/compatibility_avro.cc +++ b/src/v/pandaproxy/schema_registry/test/compatibility_avro.cc @@ -24,10 +24,10 @@ bool check_compatible( pps::sharded_store s; return check_compatible( pps::make_avro_schema_definition( - s, {pps::subject("r"), {r.raw(), pps::schema_type::avro}}) + s, {pps::subject("r"), {r.shared_raw(), pps::schema_type::avro}}) .get(), pps::make_avro_schema_definition( - s, {pps::subject("w"), {w.raw(), pps::schema_type::avro}}) + s, {pps::subject("w"), {w.shared_raw(), pps::schema_type::avro}}) .get()); } @@ -239,10 +239,11 @@ SEASTAR_THREAD_TEST_CASE(test_avro_schema_definition) { R"({"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"},{"name":"f2","type":"string","default":"foo"}]})", pps::schema_type::avro}; pps::sharded_store s; - auto valid - = pps::make_avro_schema_definition( - s, {pps::subject("s2"), {schema2.raw(), pps::schema_type::avro}}) - .get(); + auto valid = pps::make_avro_schema_definition( + s, + {pps::subject("s2"), + {schema2.shared_raw(), pps::schema_type::avro}}) + .get(); static_assert( std:: is_same_v, pps::avro_schema_definition>, @@ -267,7 +268,8 @@ SEASTAR_THREAD_TEST_CASE(test_avro_schema_definition_custom_attributes) { auto valid = pps::make_avro_schema_definition( s, {pps::subject("s2"), - {avro_metadata_schema.raw(), pps::schema_type::avro}}) + {avro_metadata_schema.shared_raw(), + pps::schema_type::avro}}) .get(); static_assert( std:: diff --git a/src/v/pandaproxy/schema_registry/test/compatibility_protobuf.cc b/src/v/pandaproxy/schema_registry/test/compatibility_protobuf.cc index ac5e5ceb79e3..5ca0449d2a61 100644 --- a/src/v/pandaproxy/schema_registry/test/compatibility_protobuf.cc +++ b/src/v/pandaproxy/schema_registry/test/compatibility_protobuf.cc @@ -45,7 +45,7 @@ struct simple_sharded_store { std::nullopt, version, pps::seq_marker_key_type::schema}, - schema, + schema.share(), id, version, pps::is_deleted::no) @@ -80,20 +80,24 @@ bool check_compatible( SEASTAR_THREAD_TEST_CASE(test_protobuf_simple) { simple_sharded_store store; - auto schema1 = pps::canonical_schema{pps::subject{"simple"}, simple}; + auto schema1 = pps::canonical_schema{ + pps::subject{"simple"}, simple.share()}; store.insert(schema1, pps::schema_version{1}); - auto valid_simple - = pps::make_protobuf_schema_definition(store.store, schema1).get(); + auto valid_simple = pps::make_protobuf_schema_definition( + store.store, schema1.share()) + .get(); BOOST_REQUIRE_EQUAL(valid_simple.name({0}).value(), "Simple"); } SEASTAR_THREAD_TEST_CASE(test_protobuf_nested) { simple_sharded_store store; - auto schema1 = pps::canonical_schema{pps::subject{"nested"}, nested}; + auto schema1 = pps::canonical_schema{ + pps::subject{"nested"}, nested.share()}; store.insert(schema1, pps::schema_version{1}); - auto valid_nested - = pps::make_protobuf_schema_definition(store.store, schema1).get(); + auto valid_nested = pps::make_protobuf_schema_definition( + store.store, schema1.share()) + .get(); BOOST_REQUIRE_EQUAL(valid_nested.name({0}).value(), "A0"); BOOST_REQUIRE_EQUAL(valid_nested.name({1, 0, 2}).value(), "A1.B0.C2"); BOOST_REQUIRE_EQUAL(valid_nested.name({1, 0, 4}).value(), "A1.B0.C4"); @@ -103,10 +107,11 @@ SEASTAR_THREAD_TEST_CASE(test_protobuf_imported_failure) { simple_sharded_store store; // imported depends on simple, which han't been inserted - auto schema1 = pps::canonical_schema{pps::subject{"imported"}, imported}; + auto schema1 = pps::canonical_schema{ + pps::subject{"imported"}, imported.share()}; store.insert(schema1, pps::schema_version{1}); BOOST_REQUIRE_EXCEPTION( - pps::make_protobuf_schema_definition(store.store, schema1).get(), + pps::make_protobuf_schema_definition(store.store, schema1.share()).get(), pps::exception, [](const pps::exception& ex) { return ex.code() == pps::error_code::schema_invalid; @@ -116,16 +121,18 @@ SEASTAR_THREAD_TEST_CASE(test_protobuf_imported_failure) { SEASTAR_THREAD_TEST_CASE(test_protobuf_imported_not_referenced) { simple_sharded_store store; - auto schema1 = pps::canonical_schema{pps::subject{"simple"}, simple}; + auto schema1 = pps::canonical_schema{ + pps::subject{"simple"}, simple.share()}; auto schema2 = pps::canonical_schema{ - pps::subject{"imported"}, imported_no_ref}; + pps::subject{"imported"}, imported_no_ref.share()}; store.insert(schema1, pps::schema_version{1}); - auto valid_simple - = pps::make_protobuf_schema_definition(store.store, schema1).get(); + auto valid_simple = pps::make_protobuf_schema_definition( + store.store, schema1.share()) + .get(); BOOST_REQUIRE_EXCEPTION( - pps::make_protobuf_schema_definition(store.store, schema2).get(), + pps::make_protobuf_schema_definition(store.store, schema2.share()).get(), pps::exception, [](const pps::exception& ex) { return ex.code() == pps::error_code::schema_invalid; @@ -135,43 +142,51 @@ SEASTAR_THREAD_TEST_CASE(test_protobuf_imported_not_referenced) { SEASTAR_THREAD_TEST_CASE(test_protobuf_referenced) { simple_sharded_store store; - auto schema1 = pps::canonical_schema{pps::subject{"simple.proto"}, simple}; + auto schema1 = pps::canonical_schema{ + pps::subject{"simple.proto"}, simple.share()}; auto schema2 = pps::canonical_schema{ - pps::subject{"imported.proto"}, imported}; + pps::subject{"imported.proto"}, imported.share()}; auto schema3 = pps::canonical_schema{ - pps::subject{"imported-again.proto"}, imported_again}; + pps::subject{"imported-again.proto"}, imported_again.share()}; store.insert(schema1, pps::schema_version{1}); store.insert(schema2, pps::schema_version{1}); store.insert(schema3, pps::schema_version{1}); - auto valid_simple - = pps::make_protobuf_schema_definition(store.store, schema1).get(); - auto valid_imported - = pps::make_protobuf_schema_definition(store.store, schema2).get(); - auto valid_imported_again - = pps::make_protobuf_schema_definition(store.store, schema3).get(); + auto valid_simple = pps::make_protobuf_schema_definition( + store.store, schema1.share()) + .get(); + auto valid_imported = pps::make_protobuf_schema_definition( + store.store, schema2.share()) + .get(); + auto valid_imported_again = pps::make_protobuf_schema_definition( + store.store, schema3.share()) + .get(); } SEASTAR_THREAD_TEST_CASE(test_protobuf_recursive_reference) { simple_sharded_store store; - auto schema1 = pps::canonical_schema{pps::subject{"simple.proto"}, simple}; + auto schema1 = pps::canonical_schema{ + pps::subject{"simple.proto"}, simple.share()}; auto schema2 = pps::canonical_schema{ - pps::subject{"imported.proto"}, imported}; + pps::subject{"imported.proto"}, imported.share()}; auto schema3 = pps::canonical_schema{ - pps::subject{"imported-twice.proto"}, imported_twice}; + pps::subject{"imported-twice.proto"}, imported_twice.share()}; store.insert(schema1, pps::schema_version{1}); store.insert(schema2, pps::schema_version{1}); store.insert(schema3, pps::schema_version{1}); - auto valid_simple - = pps::make_protobuf_schema_definition(store.store, schema1).get(); - auto valid_imported - = pps::make_protobuf_schema_definition(store.store, schema2).get(); - auto valid_imported_again - = pps::make_protobuf_schema_definition(store.store, schema3).get(); + auto valid_simple = pps::make_protobuf_schema_definition( + store.store, schema1.share()) + .get(); + auto valid_imported = pps::make_protobuf_schema_definition( + store.store, schema2.share()) + .get(); + auto valid_imported_again = pps::make_protobuf_schema_definition( + store.store, schema3.share()) + .get(); } SEASTAR_THREAD_TEST_CASE(test_protobuf_well_known) { @@ -266,7 +281,7 @@ message well_known_types { store.insert(schema, pps::schema_version{1}); auto valid_empty - = pps::make_protobuf_schema_definition(store.store, schema).get(); + = pps::make_protobuf_schema_definition(store.store, schema.share()).get(); } SEASTAR_THREAD_TEST_CASE(test_protobuf_compatibility_empty) { diff --git a/src/v/pandaproxy/schema_registry/test/compatibility_store.cc b/src/v/pandaproxy/schema_registry/test/compatibility_store.cc index 25f5ebc594fc..e5a881d758b8 100644 --- a/src/v/pandaproxy/schema_registry/test/compatibility_store.cc +++ b/src/v/pandaproxy/schema_registry/test/compatibility_store.cc @@ -33,56 +33,46 @@ SEASTAR_THREAD_TEST_CASE(test_avro_basic_backwards_store_compat) { auto sub = pps::subject{"sub"}; s.upsert( dummy_marker, - {sub, pps::canonical_schema_definition{schema1}}, + {sub, schema1.share()}, pps::schema_id{1}, pps::schema_version{1}, pps::is_deleted::no) .get(); // add a defaulted field - BOOST_REQUIRE(s.is_compatible( - pps::schema_version{1}, - {sub, pps::canonical_schema_definition{schema2}}) - .get()); + BOOST_REQUIRE( + s.is_compatible(pps::schema_version{1}, {sub, schema2.share()}).get()); s.upsert( dummy_marker, - {sub, pps::canonical_schema_definition{schema2}}, + {sub, schema2.share()}, pps::schema_id{2}, pps::schema_version{2}, pps::is_deleted::no) .get(); // Test non-defaulted field - BOOST_REQUIRE(!s.is_compatible( - pps::schema_version{1}, - {sub, pps::canonical_schema_definition{schema3}}) - .get()); + BOOST_REQUIRE( + !s.is_compatible(pps::schema_version{1}, {sub, schema3.share()}).get()); // Insert schema with non-defaulted field s.upsert( dummy_marker, - {sub, pps::canonical_schema_definition{schema2}}, + {sub, schema2.share()}, pps::schema_id{2}, pps::schema_version{2}, pps::is_deleted::no) .get(); // Test Remove defaulted field to previous - BOOST_REQUIRE(s.is_compatible( - pps::schema_version{2}, - {sub, pps::canonical_schema_definition{schema3}}) - .get()); + BOOST_REQUIRE( + s.is_compatible(pps::schema_version{2}, {sub, schema3.share()}).get()); // Test Remove defaulted field to first - should fail - BOOST_REQUIRE(!s.is_compatible( - pps::schema_version{1}, - {sub, pps::canonical_schema_definition{schema3}}) - .get()); + BOOST_REQUIRE( + !s.is_compatible(pps::schema_version{1}, {sub, schema3.share()}).get()); s.set_compatibility(pps::compatibility_level::backward_transitive).get(); // Test transitive defaulted field to previous - should fail - BOOST_REQUIRE(!s.is_compatible( - pps::schema_version{2}, - {sub, pps::canonical_schema_definition{schema3}}) - .get()); + BOOST_REQUIRE( + !s.is_compatible(pps::schema_version{2}, {sub, schema3.share()}).get()); } diff --git a/src/v/pandaproxy/schema_registry/test/consume_to_store.cc b/src/v/pandaproxy/schema_registry/test/consume_to_store.cc index 8d351efdff19..f750703af741 100644 --- a/src/v/pandaproxy/schema_registry/test/consume_to_store.cc +++ b/src/v/pandaproxy/schema_registry/test/consume_to_store.cc @@ -114,7 +114,8 @@ SEASTAR_THREAD_TEST_CASE(test_consume_to_store) { auto good_schema_1 = pps::as_record_batch( pps::schema_key{sequence, node_id, subject0, version0, magic1}, - pps::canonical_schema_value{{subject0, string_def0}, version0, id0}); + pps::canonical_schema_value{ + {subject0, string_def0.share()}, version0, id0}); BOOST_REQUIRE_NO_THROW(c(good_schema_1.copy()).get()); auto s_res = s.get_subject_schema( @@ -124,7 +125,8 @@ SEASTAR_THREAD_TEST_CASE(test_consume_to_store) { auto good_schema_ref_1 = pps::as_record_batch( pps::schema_key{sequence, node_id, subject0, version1, magic1}, - pps::canonical_schema_value{{subject0, string_def0}, version1, id1}); + pps::canonical_schema_value{ + {subject0, string_def0.share()}, version1, id1}); BOOST_REQUIRE_NO_THROW(c(good_schema_ref_1.copy()).get()); auto s_ref_res = s.get_subject_schema( @@ -141,7 +143,8 @@ SEASTAR_THREAD_TEST_CASE(test_consume_to_store) { auto bad_schema_magic = pps::as_record_batch( pps::schema_key{sequence, node_id, subject0, version0, magic2}, - pps::canonical_schema_value{{subject0, string_def0}, version0, id0}); + pps::canonical_schema_value{ + {subject0, string_def0.share()}, version0, id0}); BOOST_REQUIRE_THROW(c(bad_schema_magic.copy()).get(), pps::exception); BOOST_REQUIRE( @@ -234,7 +237,8 @@ SEASTAR_THREAD_TEST_CASE(test_consume_to_store_after_compaction) { // Insert the schema at seq 0 auto good_schema_1 = pps::as_record_batch( pps::schema_key{sequence, node_id, subject0, version0, magic1}, - pps::canonical_schema_value{{subject0, string_def0}, version0, id0}); + pps::canonical_schema_value{ + {subject0, string_def0.share()}, version0, id0}); BOOST_REQUIRE_NO_THROW(c(good_schema_1.copy()).get()); // Roll the segment // Soft delete the version (at seq 1) diff --git a/src/v/pandaproxy/schema_registry/test/post_subjects_subject_version.cc b/src/v/pandaproxy/schema_registry/test/post_subjects_subject_version.cc index 54f41088e669..5ecd5cd430f7 100644 --- a/src/v/pandaproxy/schema_registry/test/post_subjects_subject_version.cc +++ b/src/v/pandaproxy/schema_registry/test/post_subjects_subject_version.cc @@ -24,7 +24,7 @@ struct request { }; template -void rjson_serialize(::json::Writer& w, const request& r) { +void rjson_serialize(::json::iobuf_writer& w, const request& r) { w.StartObject(); w.Key("schema"); ::json::rjson_serialize(w, r.schema.def().raw()); diff --git a/src/v/pandaproxy/schema_registry/test/sanitize_avro.cc b/src/v/pandaproxy/schema_registry/test/sanitize_avro.cc index 9a1fb9cfc563..37a31b7d1232 100644 --- a/src/v/pandaproxy/schema_registry/test/sanitize_avro.cc +++ b/src/v/pandaproxy/schema_registry/test/sanitize_avro.cc @@ -17,7 +17,7 @@ namespace pp = pandaproxy; namespace pps = pp::schema_registry; -pps::unparsed_schema_definition not_minimal{ +const pps::unparsed_schema_definition not_minimal{ R"({ "type": "record", "name": "myrecord", @@ -25,73 +25,73 @@ pps::unparsed_schema_definition not_minimal{ })", pps::schema_type::avro}; -pps::canonical_schema_definition not_minimal_sanitized{ +const pps::canonical_schema_definition not_minimal_sanitized{ R"({"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]})", pps::schema_type::avro}; -pps::unparsed_schema_definition leading_dot{ +const pps::unparsed_schema_definition leading_dot{ R"({"type":"record","name":"record","fields":[{"name":"one","type":["null",{"fields":[{"name":"f1","type":["null","string"]}],"name":".r1","type":"record"}]},{"name":"two","type":["null",".r1"]}]})", pps::schema_type::avro}; -pps::canonical_schema_definition leading_dot_sanitized{ +const pps::canonical_schema_definition leading_dot_sanitized{ R"({"type":"record","name":"record","fields":[{"name":"one","type":["null",{"type":"record","name":"r1","fields":[{"name":"f1","type":["null","string"]}]}]},{"name":"two","type":["null","r1"]}]})", pps::schema_type::avro}; -pps::unparsed_schema_definition leading_dot_ns{ +const pps::unparsed_schema_definition leading_dot_ns{ R"({"type":"record","name":"record","fields":[{"name":"one","type":["null",{"fields":[{"name":"f1","type":["null","string"]}],"name":".ns.r1","type":"record"}]},{"name":"two","type":["null",".ns.r1"]}]})", pps::schema_type::avro}; -pps::canonical_schema_definition leading_dot_ns_sanitized{ +const pps::canonical_schema_definition leading_dot_ns_sanitized{ R"({"type":"record","name":"record","fields":[{"name":"one","type":["null",{"type":"record","name":"r1","namespace":".ns","fields":[{"name":"f1","type":["null","string"]}]}]},{"name":"two","type":["null",".ns.r1"]}]})", pps::schema_type::avro}; -pps::unparsed_schema_definition record_not_sorted{ +const pps::unparsed_schema_definition record_not_sorted{ R"({"name":"sort_record","type":"record","aliases":["alias"],"fields":[{"type":"string","name":"one"}],"namespace":"ns","doc":"doc"})", pps::schema_type::avro}; -pps::canonical_schema_definition record_sorted_sanitized{ +const pps::canonical_schema_definition record_sorted_sanitized{ R"({"type":"record","name":"sort_record","namespace":"ns","doc":"doc","fields":[{"name":"one","type":"string"}],"aliases":["alias"]})", pps::schema_type::avro}; -pps::unparsed_schema_definition enum_not_sorted{ +const pps::unparsed_schema_definition enum_not_sorted{ R"({"name":"ns.sort_enum","type":"enum","aliases":["alias"],"symbols":["one", "two", "three"],"default":"two","doc":"doc"})", pps::schema_type::avro}; -pps::canonical_schema_definition enum_sorted_sanitized{ +const pps::canonical_schema_definition enum_sorted_sanitized{ R"({"type":"enum","name":"sort_enum","namespace":"ns","doc":"doc","symbols":["one","two","three"],"default":"two","aliases":["alias"]})", pps::schema_type::avro}; -pps::unparsed_schema_definition array_not_sorted{ +const pps::unparsed_schema_definition array_not_sorted{ R"({"type": "array", "default": [], "items" : "string"})", pps::schema_type::avro}; -pps::canonical_schema_definition array_sorted_sanitized{ +const pps::canonical_schema_definition array_sorted_sanitized{ R"({"type":"array","items":"string","default":[]})", pps::schema_type::avro}; -pps::unparsed_schema_definition map_not_sorted{ +const pps::unparsed_schema_definition map_not_sorted{ R"({"type": "map", "default": {}, "values" : "string"})", pps::schema_type::avro}; -pps::canonical_schema_definition map_sorted_sanitized{ +const pps::canonical_schema_definition map_sorted_sanitized{ R"({"type":"map","values":"string","default":{}})", pps::schema_type::avro}; -pps::unparsed_schema_definition fixed_not_sorted{ +const pps::unparsed_schema_definition fixed_not_sorted{ R"({"size":16, "type": "fixed", "aliases":["fixed"], "name":"ns.sorted_fixed"})", pps::schema_type::avro}; -pps::canonical_schema_definition fixed_sorted_sanitized{ +const pps::canonical_schema_definition fixed_sorted_sanitized{ R"({"type":"fixed","name":"sorted_fixed","namespace":"ns","size":16,"aliases":["fixed"]})", pps::schema_type::avro}; -pps::unparsed_schema_definition record_of_obj_unsanitized{ +const pps::unparsed_schema_definition record_of_obj_unsanitized{ R"({"name":"sort_record_of_obj","type":"record","fields":[{"type":{"type":"string","connect.parameters":{"tidb_type":"TEXT"}},"default":"","name":"field"}]})", pps::schema_type::avro}; -pps::canonical_schema_definition record_of_obj_sanitized{ +const pps::canonical_schema_definition record_of_obj_sanitized{ R"({"type":"record","name":"sort_record_of_obj","fields":[{"name":"field","type":{"type":"string","connect.parameters":{"tidb_type":"TEXT"}},"default":""}]})", pps::schema_type::avro}; -pps::unparsed_schema_definition namespace_nested_same_unsanitized{ +const pps::unparsed_schema_definition namespace_nested_same_unsanitized{ R"({ "type": "record", "name": "Example", @@ -184,7 +184,7 @@ pps::unparsed_schema_definition namespace_nested_same_unsanitized{ })", pps::schema_type::avro}; -pps::canonical_schema_definition namespace_nested_same_sanitized{ +const pps::canonical_schema_definition namespace_nested_same_sanitized{ ::json::minify( R"({ "type": "record", @@ -280,61 +280,63 @@ pps::canonical_schema_definition namespace_nested_same_sanitized{ BOOST_AUTO_TEST_CASE(test_sanitize_avro_minify) { BOOST_REQUIRE_EQUAL( - pps::sanitize_avro_schema_definition(not_minimal).value(), + pps::sanitize_avro_schema_definition(not_minimal.share()).value(), not_minimal_sanitized); } BOOST_AUTO_TEST_CASE(test_sanitize_avro_name) { BOOST_REQUIRE_EQUAL( - pps::sanitize_avro_schema_definition(leading_dot).value(), + pps::sanitize_avro_schema_definition(leading_dot.share()).value(), leading_dot_sanitized); } BOOST_AUTO_TEST_CASE(test_sanitize_avro_name_ns) { BOOST_REQUIRE_EQUAL( - pps::sanitize_avro_schema_definition(leading_dot_ns).value(), + pps::sanitize_avro_schema_definition(leading_dot_ns.share()).value(), leading_dot_ns_sanitized); } BOOST_AUTO_TEST_CASE(test_sanitize_avro_record_sorting) { BOOST_REQUIRE_EQUAL( - pps::sanitize_avro_schema_definition(record_not_sorted).value(), + pps::sanitize_avro_schema_definition(record_not_sorted.share()).value(), record_sorted_sanitized); } BOOST_AUTO_TEST_CASE(test_sanitize_avro_enum_sorting) { BOOST_REQUIRE_EQUAL( - pps::sanitize_avro_schema_definition(enum_not_sorted).value(), + pps::sanitize_avro_schema_definition(enum_not_sorted.share()).value(), enum_sorted_sanitized); } BOOST_AUTO_TEST_CASE(test_sanitize_avro_array_sorting) { BOOST_REQUIRE_EQUAL( - pps::sanitize_avro_schema_definition(array_not_sorted).value(), + pps::sanitize_avro_schema_definition(array_not_sorted.share()).value(), array_sorted_sanitized); } BOOST_AUTO_TEST_CASE(test_sanitize_avro_map_sorting) { BOOST_REQUIRE_EQUAL( - pps::sanitize_avro_schema_definition(map_not_sorted).value(), + pps::sanitize_avro_schema_definition(map_not_sorted.share()).value(), map_sorted_sanitized); } BOOST_AUTO_TEST_CASE(test_sanitize_avro_fixed_sorting) { BOOST_REQUIRE_EQUAL( - pps::sanitize_avro_schema_definition(fixed_not_sorted).value(), + pps::sanitize_avro_schema_definition(fixed_not_sorted.share()).value(), fixed_sorted_sanitized); } BOOST_AUTO_TEST_CASE(test_sanitize_record_of_obj_sorting) { BOOST_REQUIRE_EQUAL( - pps::sanitize_avro_schema_definition(record_of_obj_unsanitized).value(), + pps::sanitize_avro_schema_definition(record_of_obj_unsanitized.share()) + .value(), record_of_obj_sanitized); } BOOST_AUTO_TEST_CASE(test_namespace_nested_same) { BOOST_REQUIRE_EQUAL( - pps::sanitize_avro_schema_definition(namespace_nested_same_unsanitized) + pps::sanitize_avro_schema_definition( + namespace_nested_same_unsanitized.share()) .value(), namespace_nested_same_sanitized); } @@ -344,9 +346,12 @@ pps::canonical_schema_definition debezium_schema{ pps::schema_type::avro}; BOOST_AUTO_TEST_CASE(test_sanitize_avro_debzium) { - auto unparsed = pandaproxy::schema_registry::unparsed_schema_definition{ - debezium_schema.raw()(), debezium_schema.type()}; + auto unparsed = pps::unparsed_schema_definition{ + pps::unparsed_schema_definition::raw_string{ + debezium_schema.shared_raw()()}, + debezium_schema.type()}; BOOST_REQUIRE_EQUAL( - pps::sanitize_avro_schema_definition(unparsed).value(), debezium_schema); + pps::sanitize_avro_schema_definition(unparsed.share()).value(), + debezium_schema); } diff --git a/src/v/pandaproxy/schema_registry/test/sharded_store.cc b/src/v/pandaproxy/schema_registry/test/sharded_store.cc index 7621cc1e0077..5ec2d277da91 100644 --- a/src/v/pandaproxy/schema_registry/test/sharded_store.cc +++ b/src/v/pandaproxy/schema_registry/test/sharded_store.cc @@ -32,12 +32,12 @@ SEASTAR_THREAD_TEST_CASE(test_sharded_store_referenced_by) { // Insert simple auto referenced_schema = pps::canonical_schema{ - pps::subject{"simple.proto"}, simple}; + pps::subject{"simple.proto"}, simple.share()}; store .upsert( pps::seq_marker{ std::nullopt, std::nullopt, ver1, pps::seq_marker_key_type::schema}, - referenced_schema, + referenced_schema.share(), pps::schema_id{1}, ver1, pps::is_deleted::no) @@ -45,13 +45,13 @@ SEASTAR_THREAD_TEST_CASE(test_sharded_store_referenced_by) { // Insert referenced auto importing_schema = pps::canonical_schema{ - pps::subject{"imported.proto"}, imported}; + pps::subject{"imported.proto"}, imported.share()}; store .upsert( pps::seq_marker{ std::nullopt, std::nullopt, ver1, pps::seq_marker_key_type::schema}, - importing_schema, + importing_schema.share(), pps::schema_id{2}, ver1, pps::is_deleted::no) @@ -79,7 +79,7 @@ SEASTAR_THREAD_TEST_CASE(test_sharded_store_referenced_by) { .upsert( pps::seq_marker{ std::nullopt, std::nullopt, ver1, pps::seq_marker_key_type::schema}, - importing_schema, + importing_schema.share(), pps::schema_id{2}, ver1, pps::is_deleted::yes) @@ -116,18 +116,18 @@ SEASTAR_THREAD_TEST_CASE(test_sharded_store_find_unordered) { // Insert an unsorted schema "onto the topic". auto referenced_schema = pps::canonical_schema{ - pps::subject{"simple.proto"}, simple}; + pps::subject{"simple.proto"}, simple.share()}; store .upsert( pps::seq_marker{ std::nullopt, std::nullopt, ver1, pps::seq_marker_key_type::schema}, - array_unsanitized, + array_unsanitized.share(), pps::schema_id{1}, ver1, pps::is_deleted::no) .get(); - auto res = store.has_schema(array_sanitized).get(); + auto res = store.has_schema(array_sanitized.share()).get(); BOOST_REQUIRE_EQUAL(res.id, pps::schema_id{1}); BOOST_REQUIRE_EQUAL(res.version, ver1); } diff --git a/src/v/pandaproxy/schema_registry/test/store.cc b/src/v/pandaproxy/schema_registry/test/store.cc index 74f424cfc267..b551d773940a 100644 --- a/src/v/pandaproxy/schema_registry/test/store.cc +++ b/src/v/pandaproxy/schema_registry/test/store.cc @@ -38,31 +38,31 @@ BOOST_AUTO_TEST_CASE(test_store_insert) { pps::store s; // First insert, expect id{1}, version{1} - auto ins_res = s.insert({subject0, string_def0}); + auto ins_res = s.insert({subject0, string_def0.share()}); BOOST_REQUIRE(ins_res.inserted); BOOST_REQUIRE_EQUAL(ins_res.id, pps::schema_id{1}); BOOST_REQUIRE_EQUAL(ins_res.version, pps::schema_version{1}); // Insert duplicate, expect id{1}, versions{1} - ins_res = s.insert({subject0, string_def0}); + ins_res = s.insert({subject0, string_def0.share()}); BOOST_REQUIRE(!ins_res.inserted); BOOST_REQUIRE_EQUAL(ins_res.id, pps::schema_id{1}); BOOST_REQUIRE_EQUAL(ins_res.version, pps::schema_version{1}); // Insert duplicate, with spaces, expect id{1}, versions{1} - ins_res = s.insert({subject0, string_def1}); + ins_res = s.insert({subject0, string_def1.share()}); BOOST_REQUIRE(!ins_res.inserted); BOOST_REQUIRE_EQUAL(ins_res.id, pps::schema_id{1}); BOOST_REQUIRE_EQUAL(ins_res.version, pps::schema_version{1}); // Insert on different subject, expect id{1}, version{1} - ins_res = s.insert({subject1, string_def0}); + ins_res = s.insert({subject1, string_def0.share()}); BOOST_REQUIRE(ins_res.inserted); BOOST_REQUIRE_EQUAL(ins_res.id, pps::schema_id{1}); BOOST_REQUIRE_EQUAL(ins_res.version, pps::schema_version{1}); // Insert different schema, expect id{2}, version{2} - ins_res = s.insert({subject0, int_def0}); + ins_res = s.insert({subject0, int_def0.share()}); BOOST_REQUIRE(ins_res.inserted); BOOST_REQUIRE_EQUAL(ins_res.id, pps::schema_id{2}); BOOST_REQUIRE_EQUAL(ins_res.version, pps::schema_version{2}); @@ -90,7 +90,7 @@ BOOST_AUTO_TEST_CASE(test_store_upsert_in_order) { BOOST_REQUIRE(upsert( s, subject0, - string_def0, + string_def0.share(), pps::schema_type::avro, pps::schema_id{0}, pps::schema_version{0}, @@ -98,7 +98,7 @@ BOOST_AUTO_TEST_CASE(test_store_upsert_in_order) { BOOST_REQUIRE(upsert( s, subject0, - string_def0, + string_def0.share(), pps::schema_type::avro, pps::schema_id{1}, pps::schema_version{1}, @@ -121,7 +121,7 @@ BOOST_AUTO_TEST_CASE(test_store_upsert_reverse_order) { BOOST_REQUIRE(upsert( s, subject0, - string_def0, + string_def0.share(), pps::schema_type::avro, pps::schema_id{1}, pps::schema_version{1}, @@ -129,7 +129,7 @@ BOOST_AUTO_TEST_CASE(test_store_upsert_reverse_order) { BOOST_REQUIRE(upsert( s, subject0, - string_def0, + string_def0.share(), pps::schema_type::avro, pps::schema_id{0}, pps::schema_version{0}, @@ -152,7 +152,7 @@ BOOST_AUTO_TEST_CASE(test_store_upsert_override) { BOOST_REQUIRE(upsert( s, subject0, - string_def0, + string_def0.share(), pps::schema_type::avro, pps::schema_id{0}, pps::schema_version{0}, @@ -161,7 +161,7 @@ BOOST_AUTO_TEST_CASE(test_store_upsert_override) { BOOST_REQUIRE(!upsert( s, subject0, - int_def0, + int_def0.share(), pps::schema_type::avro, pps::schema_id{0}, pps::schema_version{0}, @@ -177,7 +177,7 @@ BOOST_AUTO_TEST_CASE(test_store_upsert_override) { auto s_res = s.get_subject_schema( subject0, pps::schema_version{0}, pps::include_deleted::no); BOOST_REQUIRE(s_res.has_value()); - BOOST_REQUIRE(s_res.value().schema.def() == int_def0); + BOOST_REQUIRE(s_res.value().schema.def() == int_def0.share()); } BOOST_AUTO_TEST_CASE(test_store_get_schema) { @@ -189,7 +189,7 @@ BOOST_AUTO_TEST_CASE(test_store_get_schema) { BOOST_REQUIRE(err.code() == pps::error_code::schema_id_not_found); // First insert, expect id{1} - auto ins_res = s.insert({subject0, string_def0}); + auto ins_res = s.insert({subject0, string_def0.share()}); BOOST_REQUIRE(ins_res.inserted); BOOST_REQUIRE_EQUAL(ins_res.id, pps::schema_id{1}); BOOST_REQUIRE_EQUAL(ins_res.version, pps::schema_version{1}); @@ -198,7 +198,7 @@ BOOST_AUTO_TEST_CASE(test_store_get_schema) { BOOST_REQUIRE(res.has_value()); auto def = std::move(res).assume_value(); - BOOST_REQUIRE_EQUAL(def, string_def0); + BOOST_REQUIRE_EQUAL(def, string_def0.share()); } BOOST_AUTO_TEST_CASE(test_store_get_schema_subject_versions) { @@ -208,7 +208,7 @@ BOOST_AUTO_TEST_CASE(test_store_get_schema_subject_versions) { // First insert, expect id{1} auto ins_res = s.insert( - {subject0, pps::canonical_schema_definition(schema1)}); + {subject0, pps::canonical_schema_definition(schema1.share())}); BOOST_REQUIRE(ins_res.inserted); BOOST_REQUIRE_EQUAL(ins_res.id, pps::schema_id{1}); BOOST_REQUIRE_EQUAL(ins_res.version, pps::schema_version{1}); @@ -222,7 +222,8 @@ BOOST_AUTO_TEST_CASE(test_store_get_schema_subject_versions) { BOOST_REQUIRE(versions.empty()); // Second insert, expect id{2} - ins_res = s.insert({subject0, pps::canonical_schema_definition(schema2)}); + ins_res = s.insert( + {subject0, pps::canonical_schema_definition(schema2.share())}); BOOST_REQUIRE(ins_res.inserted); BOOST_REQUIRE_EQUAL(ins_res.id, pps::schema_id{2}); BOOST_REQUIRE_EQUAL(ins_res.version, pps::schema_version{2}); @@ -259,7 +260,7 @@ BOOST_AUTO_TEST_CASE(test_store_get_schema_subjects) { // First insert, expect id{1} auto ins_res = s.insert( - {subject0, pps::canonical_schema_definition(schema1)}); + {subject0, pps::canonical_schema_definition(schema1.share())}); BOOST_REQUIRE(ins_res.inserted); BOOST_REQUIRE_EQUAL(ins_res.id, pps::schema_id{1}); BOOST_REQUIRE_EQUAL(ins_res.version, pps::schema_version{1}); @@ -270,13 +271,15 @@ BOOST_AUTO_TEST_CASE(test_store_get_schema_subjects) { BOOST_REQUIRE_EQUAL(absl::c_count_if(subjects, is_equal(subject0)), 1); // Second insert, same schema, expect id{1} - ins_res = s.insert({subject1, pps::canonical_schema_definition(schema1)}); + ins_res = s.insert( + {subject1, pps::canonical_schema_definition(schema1.share())}); BOOST_REQUIRE(ins_res.inserted); BOOST_REQUIRE_EQUAL(ins_res.id, pps::schema_id{1}); BOOST_REQUIRE_EQUAL(ins_res.version, pps::schema_version{1}); // Insert yet another schema associated with a different subject - ins_res = s.insert({subject2, pps::canonical_schema_definition(schema2)}); + ins_res = s.insert( + {subject2, pps::canonical_schema_definition(schema2.share())}); BOOST_REQUIRE(ins_res.inserted); BOOST_REQUIRE_EQUAL(ins_res.id, pps::schema_id{2}); BOOST_REQUIRE_EQUAL(ins_res.version, pps::schema_version{1}); @@ -326,7 +329,7 @@ BOOST_AUTO_TEST_CASE(test_store_get_subject_schema) { BOOST_REQUIRE(err.code() == pps::error_code::subject_not_found); // First insert, expect id{1}, version{1} - auto ins_res = s.insert({subject0, string_def0}); + auto ins_res = s.insert({subject0, string_def0.share()}); BOOST_REQUIRE(ins_res.inserted); BOOST_REQUIRE_EQUAL(ins_res.id, pps::schema_id{1}); BOOST_REQUIRE_EQUAL(ins_res.version, pps::schema_version{1}); @@ -339,10 +342,10 @@ BOOST_AUTO_TEST_CASE(test_store_get_subject_schema) { BOOST_REQUIRE_EQUAL(val.id, pps::schema_id{1}); BOOST_REQUIRE_EQUAL(val.version, pps::schema_version{1}); BOOST_REQUIRE_EQUAL(val.deleted, pps::is_deleted::no); - BOOST_REQUIRE_EQUAL(val.schema.def(), string_def0); + BOOST_REQUIRE_EQUAL(val.schema.def(), string_def0.share()); // Second insert, expect id{1}, version{1} - ins_res = s.insert({subject0, string_def0}); + ins_res = s.insert({subject0, string_def0.share()}); BOOST_REQUIRE(!ins_res.inserted); BOOST_REQUIRE_EQUAL(ins_res.id, pps::schema_id{1}); @@ -354,7 +357,7 @@ BOOST_AUTO_TEST_CASE(test_store_get_subject_schema) { BOOST_REQUIRE_EQUAL(val.id, pps::schema_id{1}); BOOST_REQUIRE_EQUAL(val.version, pps::schema_version{1}); BOOST_REQUIRE_EQUAL(val.deleted, pps::is_deleted::no); - BOOST_REQUIRE_EQUAL(val.schema.def(), string_def0); + BOOST_REQUIRE_EQUAL(val.schema.def(), string_def0.share()); // Request bad version res = s.get_subject_schema( @@ -368,7 +371,7 @@ BOOST_AUTO_TEST_CASE(test_store_get_versions) { pps::store s; // First insert, expect id{1}, version{1} - s.insert({subject0, string_def0}); + s.insert({subject0, string_def0.share()}); auto versions = s.get_versions(subject0, pps::include_deleted::no); BOOST_REQUIRE(versions.has_value()); @@ -376,7 +379,7 @@ BOOST_AUTO_TEST_CASE(test_store_get_versions) { BOOST_REQUIRE_EQUAL(versions.value().front(), pps::schema_version{1}); // Insert duplicate, expect id{1}, versions{1} - s.insert({subject0, string_def0}); + s.insert({subject0, string_def0.share()}); versions = s.get_versions(subject0, pps::include_deleted::no); BOOST_REQUIRE(versions.has_value()); @@ -384,7 +387,7 @@ BOOST_AUTO_TEST_CASE(test_store_get_versions) { BOOST_REQUIRE_EQUAL(versions.value().front(), pps::schema_version{1}); // Insert different schema, expect id{2}, version{2} - s.insert({subject0, int_def0}); + s.insert({subject0, int_def0.share()}); versions = s.get_versions(subject0, pps::include_deleted::no); BOOST_REQUIRE(versions.has_value()); @@ -404,13 +407,13 @@ BOOST_AUTO_TEST_CASE(test_store_get_subjects) { BOOST_REQUIRE(subjects.empty()); // First insert - s.insert({subject0, string_def0}); + s.insert({subject0, string_def0.share()}); subjects = s.get_subjects(pps::include_deleted::no); BOOST_REQUIRE_EQUAL(subjects.size(), 1); BOOST_REQUIRE_EQUAL(absl::c_count_if(subjects, is_equal(subject0)), 1); // second insert - s.insert({subject1, string_def0}); + s.insert({subject1, string_def0.share()}); subjects = s.get_subjects(pps::include_deleted::no); BOOST_REQUIRE(subjects.size() == 2); BOOST_REQUIRE_EQUAL(absl::c_count_if(subjects, is_equal(subject0)), 1); @@ -476,7 +479,7 @@ BOOST_AUTO_TEST_CASE(test_store_subject_compat) { pps::compatibility_level::backward}; pps::store s; BOOST_REQUIRE(s.get_compatibility().value() == global_expected); - s.insert({subject0, string_def0}); + s.insert({subject0, string_def0.share()}); auto sub_expected = pps::compatibility_level::backward; BOOST_REQUIRE( @@ -514,7 +517,7 @@ BOOST_AUTO_TEST_CASE(test_store_subject_compat_fallback) { pps::compatibility_level expected{pps::compatibility_level::backward}; pps::store s; - s.insert({subject0, string_def0}); + s.insert({subject0, string_def0.share()}); BOOST_REQUIRE(s.get_compatibility(subject0, fallback).value() == expected); expected = pps::compatibility_level::forward; @@ -562,8 +565,8 @@ BOOST_AUTO_TEST_CASE(test_store_delete_subject) { pps::error_code::subject_not_found); // First insert, expect id{1}, version{1} - s.insert({subject0, string_def0}); - s.insert({subject0, int_def0}); + s.insert({subject0, string_def0.share()}); + s.insert({subject0, int_def0.share()}); auto v_res = s.get_versions(subject0, pps::include_deleted::no); BOOST_REQUIRE(v_res.has_value()); @@ -672,8 +675,8 @@ BOOST_AUTO_TEST_CASE(test_store_delete_subject_version) { pps::error_code::subject_not_found); // First insert, expect id{1}, version{1} - s.insert({subject0, string_def0}); - s.insert({subject0, int_def0}); + s.insert({subject0, string_def0.share()}); + s.insert({subject0, int_def0.share()}); auto v_res = s.get_versions(subject0, pps::include_deleted::no); BOOST_REQUIRE(v_res.has_value()); @@ -743,9 +746,9 @@ BOOST_AUTO_TEST_CASE(test_store_subject_version_latest) { s.set_compatibility(pps::compatibility_level::none).value(); // First insert, expect id{1}, version{1} - s.insert({subject0, string_def0}); + s.insert({subject0, string_def0.share()}); // First insert, expect id{2}, version{2} - s.insert({subject0, int_def0}); + s.insert({subject0, int_def0.share()}); // Test latest auto latest = s.get_subject_version_id( @@ -791,8 +794,8 @@ BOOST_AUTO_TEST_CASE(test_store_delete_subject_after_delete_version) { pps::seq_marker dummy_marker; // First insert, expect id{1}, version{1} - s.insert({subject0, string_def0}); - s.insert({subject0, int_def0}); + s.insert({subject0, string_def0.share()}); + s.insert({subject0, int_def0.share()}); // delete version 1 s.upsert_subject( diff --git a/src/v/pandaproxy/schema_registry/test/util.cc b/src/v/pandaproxy/schema_registry/test/util.cc index d5bbc282833b..f50297e631ba 100644 --- a/src/v/pandaproxy/schema_registry/test/util.cc +++ b/src/v/pandaproxy/schema_registry/test/util.cc @@ -44,7 +44,8 @@ BOOST_AUTO_TEST_CASE(test_make_schema_definition) { auto res = pps::make_schema_definition>(example_avro_schema); BOOST_REQUIRE(res); - BOOST_REQUIRE_EQUAL(res.value()(), minified_avro_schema); + auto str = to_string(std::move(res).value()); + BOOST_REQUIRE_EQUAL(str, minified_avro_schema); } BOOST_AUTO_TEST_CASE(test_make_schema_definition_failure) { diff --git a/src/v/pandaproxy/schema_registry/types.cc b/src/v/pandaproxy/schema_registry/types.cc index a53d52b2bf3d..22c7bbe1ee9d 100644 --- a/src/v/pandaproxy/schema_registry/types.cc +++ b/src/v/pandaproxy/schema_registry/types.cc @@ -11,6 +11,8 @@ #include "types.h" +#include "util.h" + #include #include #include @@ -47,7 +49,8 @@ std::ostream& operator<<( os, "type: {}, definition: {}, references: {}", to_string_view(def.type()), - def.raw(), + // TODO BP: Prevent this linearization + to_string(def.shared_raw()), def.refs()); return os; } @@ -59,7 +62,8 @@ std::ostream& operator<<( os, "type: {}, definition: {}, references: {}", to_string_view(def.type()), - def.raw(), + // TODO BP: Prevent this linearization + to_string(def.shared_raw()), def.refs()); return os; } diff --git a/src/v/pandaproxy/schema_registry/types.h b/src/v/pandaproxy/schema_registry/types.h index f1655311288a..b243be50003a 100644 --- a/src/v/pandaproxy/schema_registry/types.h +++ b/src/v/pandaproxy/schema_registry/types.h @@ -13,6 +13,7 @@ #include "base/outcome.h" #include "base/seastarx.h" +#include "json/iobuf_writer.h" #include "kafka/protocol/errors.h" #include "model/metadata.h" #include "strings/string_switch.h" @@ -117,18 +118,33 @@ template class typed_schema_definition { public: using tag = Tag; - using raw_string = named_type; + struct raw_string : named_type { + raw_string() = default; + explicit raw_string(iobuf&& buf) noexcept + : named_type{std::move(buf)} {} + explicit raw_string(std::string_view sv) + : named_type{iobuf::from(sv)} {} + }; using references = std::vector; + typed_schema_definition() = default; + typed_schema_definition(typed_schema_definition&&) noexcept = default; + typed_schema_definition(const typed_schema_definition&) = delete; + typed_schema_definition& operator=(typed_schema_definition&&) noexcept + = default; + typed_schema_definition& operator=(const typed_schema_definition& other) + = delete; + ~typed_schema_definition() noexcept = default; + template typed_schema_definition(T&& def, schema_type type) - : _def{ss::sstring{std::forward(def)}} + : _def{std::forward(def)} , _type{type} , _refs{} {} template typed_schema_definition(T&& def, schema_type type, references refs) - : _def{ss::sstring{std::forward(def)}} + : _def{std::forward(def)} , _type{type} , _refs{std::move(refs)} {} @@ -143,10 +159,26 @@ class typed_schema_definition { const raw_string& raw() const& { return _def; } raw_string raw() && { return std::move(_def); } + raw_string shared_raw() const { + auto& buf = const_cast(_def()); + return raw_string{buf.share(0, buf.size_bytes())}; + } const references& refs() const& { return _refs; } references refs() && { return std::move(_refs); } + typed_schema_definition share() const { + return {shared_raw(), type(), refs()}; + } + + typed_schema_definition copy() const { + return {raw_string{_def().copy()}, type(), refs()}; + } + + auto destructure() && { + return make_tuple(std::move(_def), _type, std::move(_refs)); + } + private: raw_string _def; schema_type _type{schema_type::avro}; @@ -379,6 +411,13 @@ class typed_schema { const schema_definition& def() const& { return _def; } schema_definition def() && { return std::move(_def); } + typed_schema share() const { return {sub(), def().share()}; } + typed_schema copy() const { return {sub(), def().copy()}; } + + auto destructure() && { + return make_tuple(std::move(_sub), std::move(_def)); + } + private: subject _sub{invalid_subject}; schema_definition _def{"", schema_type::avro}; @@ -393,6 +432,9 @@ struct subject_schema { schema_version version{invalid_schema_version}; schema_id id{invalid_schema_id}; is_deleted deleted{false}; + subject_schema share() const { + return {schema.share(), version, id, deleted}; + } }; enum class compatibility_level { @@ -451,3 +493,15 @@ from_string_view(std::string_view sv) { } } // namespace pandaproxy::schema_registry + +namespace json { + +template +void rjson_serialize( + json::iobuf_writer& w, + const pandaproxy::schema_registry::canonical_schema_definition::raw_string& + def) { + w.String(def()); +} + +} // namespace json diff --git a/src/v/pandaproxy/schema_registry/util.h b/src/v/pandaproxy/schema_registry/util.h index bac6fae1bdff..9955b6cbfe6f 100644 --- a/src/v/pandaproxy/schema_registry/util.h +++ b/src/v/pandaproxy/schema_registry/util.h @@ -12,8 +12,9 @@ #pragma once #include "base/seastarx.h" +#include "bytes/iobuf_parser.h" +#include "json/chunked_buffer.h" #include "json/document.h" -#include "json/stringbuffer.h" #include "json/writer.h" #include "pandaproxy/schema_registry/errors.h" #include "pandaproxy/schema_registry/types.h" @@ -31,7 +32,7 @@ namespace pandaproxy::schema_registry { /// /// Returns error_code::schema_invalid on failure template -result +result make_schema_definition(std::string_view sv) { // Validate and minify // TODO (Ben): Minify. e.g.: @@ -46,12 +47,16 @@ make_schema_definition(std::string_view sv) { rapidjson::GetParseError_En(doc.GetParseError()), doc.GetErrorOffset())}; } - ::json::GenericStringBuffer str_buf; - str_buf.Reserve(sv.size()); - ::json::Writer<::json::GenericStringBuffer> w{str_buf}; + ::json::chunked_buffer buf; + ::json::Writer<::json::chunked_buffer> w{buf}; doc.Accept(w); - return unparsed_schema_definition::raw_string{ - ss::sstring{str_buf.GetString(), str_buf.GetSize()}}; + return canonical_schema_definition::raw_string{std::move(buf).as_iobuf()}; +} + +template +ss::sstring to_string(named_type def) { + iobuf_parser p{std::move(def)}; + return p.read_string(p.bytes_left()); } } // namespace pandaproxy::schema_registry diff --git a/src/v/pandaproxy/schema_registry/validation.cc b/src/v/pandaproxy/schema_registry/validation.cc index f87c3dfe770f..438faea03d4e 100644 --- a/src/v/pandaproxy/schema_registry/validation.cc +++ b/src/v/pandaproxy/schema_registry/validation.cc @@ -103,17 +103,19 @@ std::vector get_proto_offsets(iobuf_parser& p) { ss::future> get_record_name( pandaproxy::schema_registry::sharded_store& store, subject_name_strategy sns, - const canonical_schema_definition& schema, + canonical_schema_definition schema, std::optional>& offsets) { + vlog(plog.warn, "get_record_name: sns: {}, schema: {}", sns, schema); if (sns == subject_name_strategy::topic_name) { // Result is succesfully nothing co_return ""; } - switch (schema.type()) { + auto schema_type = schema.type(); + switch (schema_type) { case schema_type::avro: { auto s = co_await make_avro_schema_definition( - store, {subject("r"), {schema.raw(), schema.type()}}); + store, {subject("r"), {std::move(schema).raw(), schema_type}}); co_return s().root()->name().fullname(); } break; case schema_type::protobuf: { @@ -121,7 +123,7 @@ ss::future> get_record_name( co_return std::nullopt; } auto s = co_await make_protobuf_schema_definition( - store, {subject("r"), {schema.raw(), schema.type()}}); + store, {subject("r"), {std::move(schema).raw(), schema_type}}); auto r = s.name(*offsets); if (!r) { co_return std::nullopt; @@ -268,7 +270,7 @@ class schema_id_validator::impl { } auto record_name = co_await get_record_name( - *_api->_store, sns, *schema, proto_offsets); + *_api->_store, sns, *std::move(schema), proto_offsets); if (!record_name) { vlog( plog.debug, diff --git a/src/v/utils/base64.cc b/src/v/utils/base64.cc index 256b425d2830..ba7face5cf70 100644 --- a/src/v/utils/base64.cc +++ b/src/v/utils/base64.cc @@ -110,3 +110,22 @@ ss::sstring iobuf_to_base64(const iobuf& input) { output.resize(written); return output; } + +iobuf base64_to_iobuf(const iobuf& buf) { + base64_state state{}; + base64_stream_decode_init(&state, 0); + iobuf out; + for (const details::io_fragment& frag : buf) { + iobuf::fragment out_frag{frag.size()}; + size_t written{}; + if ( + 1 + != base64_stream_decode( + &state, frag.get(), frag.size(), out_frag.get_write(), &written)) { + throw base64_decoder_exception{}; + } + out_frag.reserve(written); + out.append(std::move(out_frag).release()); + } + return out; +} diff --git a/src/v/utils/base64.h b/src/v/utils/base64.h index edd78e28a386..7a7df277c221 100644 --- a/src/v/utils/base64.h +++ b/src/v/utils/base64.h @@ -29,5 +29,8 @@ ss::sstring bytes_to_base64(bytes_view); // base64 -> string ss::sstring base64_to_string(std::string_view); +// base64 -> iobuf +iobuf base64_to_iobuf(const iobuf&); + // base64 <-> iobuf ss::sstring iobuf_to_base64(const iobuf&); diff --git a/src/v/utils/tests/base64_test.cc b/src/v/utils/tests/base64_test.cc index 14285b7aa494..1f2be4773968 100644 --- a/src/v/utils/tests/base64_test.cc +++ b/src/v/utils/tests/base64_test.cc @@ -7,6 +7,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0 +#include "bytes/iobuf_parser.h" #include "bytes/random.h" #include "random/generators.h" #include "utils/base64.h" @@ -49,3 +50,17 @@ BOOST_AUTO_TEST_CASE(iobuf_type) { auto decoded = base64_to_bytes(encoded); BOOST_REQUIRE_EQUAL(decoded, iobuf_to_bytes(buf)); } + +BOOST_AUTO_TEST_CASE(test_base64_to_iobuf) { + const std::string_view a_string = "dGhpcyBpcyBhIHN0cmluZw=="; + iobuf buf; + const size_t half = a_string.size() / 2; + buf.append_fragments(iobuf::from(a_string.substr(0, half))); + buf.append_fragments(iobuf::from(a_string.substr(half))); + BOOST_REQUIRE_EQUAL(std::distance(buf.begin(), buf.end()), 2); + + auto decoded = base64_to_iobuf(buf); + iobuf_parser p{std::move(decoded)}; + auto decoded_str = p.read_string(p.bytes_left()); + BOOST_REQUIRE_EQUAL(decoded_str, "this is a string"); +} diff --git a/src/v/wasm/schema_registry.cc b/src/v/wasm/schema_registry.cc index 27af71ea0a6c..191ac299149f 100644 --- a/src/v/wasm/schema_registry.cc +++ b/src/v/wasm/schema_registry.cc @@ -49,7 +49,7 @@ class schema_registry_impl : public schema_registry { create_schema(ppsr::unparsed_schema schema) override { auto [reader, writer] = co_await service(); co_await writer->read_sync(); - auto parsed = co_await reader->make_canonical_schema(schema); + auto parsed = co_await reader->make_canonical_schema(std::move(schema)); co_return co_await writer->write_subject_version( {.schema = std::move(parsed)}); } diff --git a/src/v/wasm/schema_registry_module.cc b/src/v/wasm/schema_registry_module.cc index 382def3fc38e..bbd41e65a7f5 100644 --- a/src/v/wasm/schema_registry_module.cc +++ b/src/v/wasm/schema_registry_module.cc @@ -198,9 +198,8 @@ ss::future schema_registry_module::create_subject_schema( ffi::reader r(buf); using namespace pandaproxy::schema_registry; try { - auto unparsed = read_encoded_schema_def(&r); *out_schema_id = co_await _sr->create_schema( - unparsed_schema(sub, unparsed)); + unparsed_schema(sub, read_encoded_schema_def(&r))); } catch (const std::exception& ex) { vlog(wasm_log.warn, "error registering subject schema: {}", ex); co_return SCHEMA_REGISTRY_ERROR; diff --git a/src/v/wasm/tests/wasm_fixture.cc b/src/v/wasm/tests/wasm_fixture.cc index 6ff23f74d90d..64f308b193a1 100644 --- a/src/v/wasm/tests/wasm_fixture.cc +++ b/src/v/wasm/tests/wasm_fixture.cc @@ -56,7 +56,7 @@ class fake_schema_registry : public wasm::schema_registry { get_schema_definition(ppsr::schema_id id) const override { for (const auto& s : _schemas) { if (s.id == id) { - co_return s.schema.def(); + co_return s.schema.def().share(); } } throw std::runtime_error("unknown schema id"); @@ -75,9 +75,9 @@ class fake_schema_registry : public wasm::schema_registry { if (found && found->version > s.version) { continue; } - found = s; + found.emplace(s.share()); } - co_return found.value(); + co_return std::move(found).value(); } ss::future @@ -95,13 +95,15 @@ class fake_schema_registry : public wasm::schema_registry { } } // TODO: validate references too + auto [sub, unparsed_def] = std::move(unparsed).destructure(); + auto [def, type, refs] = std::move(unparsed_def).destructure(); _schemas.push_back({ .schema = ppsr::canonical_schema( - unparsed.sub(), + std::move(sub), ppsr::canonical_schema_definition( - unparsed.def().raw(), - unparsed.def().type(), - unparsed.def().refs())), + ppsr::canonical_schema_definition::raw_string{std::move(def)()}, + type, + std::move(refs))), .version = version + 1, .id = ppsr::schema_id(int32_t(_schemas.size() + 1)), .deleted = ppsr::is_deleted::no, @@ -109,7 +111,7 @@ class fake_schema_registry : public wasm::schema_registry { co_return _schemas.back().id; } - std::vector get_all() { return _schemas; } + const std::vector& get_all() { return _schemas; } private: std::vector _schemas; @@ -229,7 +231,7 @@ model::record_batch WasmTestFixture::make_tiny_batch(iobuf record_value) { b.add_raw_kv(model::test::make_iobuf(), std::move(record_value)); return std::move(b).build(); } -std::vector +const std::vector& WasmTestFixture::registered_schemas() const { return _sr->get_all(); } diff --git a/src/v/wasm/tests/wasm_fixture.h b/src/v/wasm/tests/wasm_fixture.h index 49dd7089d947..40add9072f1b 100644 --- a/src/v/wasm/tests/wasm_fixture.h +++ b/src/v/wasm/tests/wasm_fixture.h @@ -60,7 +60,7 @@ class WasmTestFixture : public ::testing::Test { wasm::engine* engine() { return _engine.get(); } - std::vector + const std::vector& registered_schemas() const; std::vector log_lines() const { return _log_lines; } diff --git a/src/v/wasm/tests/wasm_transform_test.cc b/src/v/wasm/tests/wasm_transform_test.cc index 85ce8e58ca99..bea3d8fb6b0b 100644 --- a/src/v/wasm/tests/wasm_transform_test.cc +++ b/src/v/wasm/tests/wasm_transform_test.cc @@ -10,6 +10,7 @@ */ #include "bytes/bytes.h" +#include "bytes/streambuf.h" #include "pandaproxy/schema_registry/types.h" #include "wasm/errc.h" #include "wasm/tests/wasm_fixture.h" @@ -69,7 +70,9 @@ std::string generate_example_avro_record( const pandaproxy::schema_registry::canonical_schema_definition& def) { // Generate a simple avro record that looks like this (as json): // {"a":5,"b":"foo"} - auto schema = avro::compileJsonSchemaFromString(def.raw()().c_str()); + iobuf_istream bis{def.shared_raw()}; + auto is = avro::istreamInputStream(bis.istream()); + auto schema = avro::compileJsonSchemaFromStream(*is); avro::GenericRecord r(schema.root()); r.setFieldAt(r.fieldIndex("a"), int64_t(4)); r.setFieldAt(r.fieldIndex("b"), std::string("foo")); @@ -87,7 +90,7 @@ std::string generate_example_avro_record( TEST_F(WasmTestFixture, SchemaRegistry) { // Test an example schema registry encoded avro value -> JSON transform load_wasm("schema-registry.wasm"); - auto schemas = registered_schemas(); + const auto& schemas = registered_schemas(); ASSERT_EQ(schemas.size(), 1); ASSERT_EQ(schemas[0].id, 1); iobuf record_value;