Skip to content

Commit

Permalink
[CORE-4444] schema_registry: Switch raw_string to iobuf
Browse files Browse the repository at this point in the history
Also fixes [CORE-684] [CORE-4446] [CORE-4447]

Signed-off-by: Ben Pope <ben@redpanda.com>
  • Loading branch information
BenPope committed Jul 16, 2024
1 parent aa569f8 commit e7fab4a
Show file tree
Hide file tree
Showing 11 changed files with 112 additions and 64 deletions.
57 changes: 37 additions & 20 deletions src/v/pandaproxy/schema_registry/avro.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,16 @@

#include "pandaproxy/schema_registry/avro.h"

#include "bytes/streambuf.h"
#include "json/allocator.h"
#include "json/chunked_input_stream.h"
#include "json/document.h"
#include "json/encodings.h"
#include "json/stringbuffer.h"
#include "json/json.h"
#include "json/types.h"
#include "json/writer.h"
#include "pandaproxy/schema_registry/error.h"
#include "pandaproxy/schema_registry/errors.h"
#include "pandaproxy/schema_registry/sharded_store.h"
#include "pandaproxy/schema_registry/types.h"
#include "strings/string_switch.h"

#include <seastar/core/coroutine.hh>
Expand All @@ -30,6 +31,7 @@
#include <avro/Compiler.hh>
#include <avro/Exception.hh>
#include <avro/GenericDatum.hh>
#include <avro/Stream.hh>
#include <avro/Types.hh>
#include <avro/ValidSchema.hh>
#include <boost/outcome/std_result.hpp>
Expand All @@ -43,6 +45,10 @@
#include <stack>
#include <string_view>

namespace pandaproxy::json {
using namespace ::json;
}

namespace pandaproxy::schema_registry {

namespace {
Expand Down Expand Up @@ -421,7 +427,10 @@ std::ostream& operator<<(std::ostream& os, const avro_schema_definition& def) {
}

canonical_schema_definition::raw_string avro_schema_definition::raw() const {
return canonical_schema_definition::raw_string{_impl.toJson(false)};
iobuf_ostream os;
_impl.toJson(os.ostream());
return canonical_schema_definition::raw_string{
json::minify(std::move(os).buf())};
}

ss::sstring avro_schema_definition::name() const {
Expand All @@ -436,17 +445,22 @@ class collected_schema {
bool insert(ss::sstring name, canonical_schema_definition def) {
bool inserted = _names.insert(std::move(name)).second;
if (inserted) {
_schemas.push_back(std::move(def).raw()());
_schemas.push_back(std::move(def).raw());
}
return inserted;
}
ss::sstring flatten() {
return fmt::format("{}", fmt::join(_schemas, "\n"));
canonical_schema_definition::raw_string flatten() && {
iobuf out;
for (auto& s : _schemas) {
out.append(std::move(s));
out.append("\n", 1);
}
return canonical_schema_definition::raw_string{std::move(out)};
}

private:
absl::flat_hash_set<ss::sstring> _names;
std::vector<ss::sstring> _schemas;
std::vector<canonical_schema_definition::raw_string> _schemas;
};

ss::future<collected_schema> collect_schema(
Expand All @@ -473,11 +487,10 @@ make_avro_schema_definition(sharded_store& store, canonical_schema schema) {
auto name = schema.sub()();
auto schema_refs = schema.def().refs();
auto refs = co_await collect_schema(store, {}, name, std::move(schema));
auto def = refs.flatten();
iobuf_istream sis{std::move(refs).flatten()()};
auto is = avro::istreamInputStream(sis.istream());
co_return avro_schema_definition{
avro::compileJsonSchemaFromMemory(
reinterpret_cast<const uint8_t*>(def.data()), def.length()),
std::move(schema_refs)};
avro::compileJsonSchemaFromStream(*is), std::move(schema_refs)};
} catch (const avro::Exception& e) {
ex = e;
}
Expand All @@ -492,12 +505,12 @@ sanitize_avro_schema_definition(unparsed_schema_definition def) {
json::Document doc;
constexpr auto flags = rapidjson::kParseDefaultFlags
| rapidjson::kParseStopWhenDoneFlag;
const auto& raw = def.raw()();
if (raw.empty()) {
if (def.raw()().empty()) {
auto ec = error_code::schema_empty;
return error_info{ec, make_error_code(ec).message()};
}
doc.Parse<flags>(raw.data(), raw.size());
json::chunked_input_stream is{def.shared_raw()()};
doc.ParseStream<flags>(is);
if (doc.HasParseError()) {
return error_info{
error_code::schema_invalid,
Expand All @@ -509,21 +522,25 @@ sanitize_avro_schema_definition(unparsed_schema_definition def) {
sanitize_context ctx{.alloc = doc.GetAllocator()};
auto res = sanitize(doc, ctx);
if (res.has_error()) {
// TODO BP: Prevent this linearizaton
iobuf_parser p(std::move(def).raw()());
return error_info{
res.assume_error().code(),
fmt::format("{} {}", res.assume_error().message(), raw)};
fmt::format(
"{} {}",
res.assume_error().message(),
p.read_string(p.bytes_left()))};
}

json::StringBuffer str_buf;
str_buf.Reserve(raw.size());
json::Writer<json::StringBuffer> w{str_buf};
json::chunked_buffer buf;
json::Writer<json::chunked_buffer> w{buf};

if (!doc.Accept(w)) {
return error_info{error_code::schema_invalid, "Invalid schema"};
}

return canonical_schema_definition{
std::string_view{str_buf.GetString(), str_buf.GetSize()},
canonical_schema_definition::raw_string{std::move(buf).as_iobuf()},
schema_type::avro,
def.refs()};
}
Expand Down
5 changes: 3 additions & 2 deletions src/v/pandaproxy/schema_registry/handlers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,8 @@ ss::future<ctx_server<service>::reply_t> get_subject_versions_version_schema(
auto get_res = co_await rq.service().schema_store().get_subject_schema(
sub, version, inc_del);

rp.rep->write_body("json", get_res.schema.def().raw()());
rp.rep->write_body(
"json", ppj::as_body_writer(std::move(get_res.schema).def().raw()()));
co_return rp;
}

Expand All @@ -543,7 +544,7 @@ get_subject_versions_version_referenced_by(
auto references = co_await rq.service().schema_store().referenced_by(
sub, version);

rp.rep->write_body("json", ppj::rjson_serialize(references));
rp.rep->write_body("json", ppj::rjson_serialize(std::move(references)));
co_return rp;
}

Expand Down
21 changes: 11 additions & 10 deletions src/v/pandaproxy/schema_registry/json.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

#include "pandaproxy/schema_registry/json.h"

#include "json/chunked_buffer.h"
#include "json/chunked_input_stream.h"
#include "json/document.h"
#include "json/ostreamwrapper.h"
#include "json/schema.h"
Expand Down Expand Up @@ -47,11 +49,11 @@
namespace pandaproxy::schema_registry {

struct json_schema_definition::impl {
ss::sstring to_json() const {
json::StringBuffer buf;
json::Writer<json::StringBuffer> wrt(buf);
iobuf to_json() const {
json::chunked_buffer buf;
json::Writer<json::chunked_buffer> wrt(buf);
doc.Accept(wrt);
return {buf.GetString(), buf.GetLength()};
return std::move(buf).as_iobuf();
}

explicit impl(json::Document doc, std::string_view name)
Expand Down Expand Up @@ -638,8 +640,6 @@ constexpr std::string_view json_draft_7_metaschema = R"json(
}
)json";

result<json::Document> parse_json(std::string_view v);

ss::future<> check_references(sharded_store& store, canonical_schema schema) {
for (const auto& ref : schema.def().refs()) {
co_await store.is_subject_version_deleted(ref.sub, ref.version)
Expand Down Expand Up @@ -813,9 +813,9 @@ result<void> try_validate_json_schema(json::Document const& schema) {
return first_error.value();
}

result<json::Document> parse_json(std::string_view v) {
result<json::Document> parse_json(iobuf buf) {
// parse string in json document, check it's a valid json
auto schema_stream = rapidjson::MemoryStream{v.data(), v.size()};
auto schema_stream = json::chunked_input_stream{std::move(buf)};
auto schema = json::Document{};
if (schema.ParseStream(schema_stream).HasParseError()) {
// not a valid json document, return error
Expand Down Expand Up @@ -1851,7 +1851,8 @@ bool is_superset(json::Value const& older, json::Value const& newer) {

ss::future<json_schema_definition>
make_json_schema_definition(sharded_store&, canonical_schema schema) {
auto doc = parse_json(schema.def().raw()()).value(); // throws on error
auto doc
= parse_json(schema.def().shared_raw()()).value(); // throws on error
std::string_view name = schema.sub()();
auto refs = std::move(schema).def().refs();
co_return json_schema_definition{
Expand All @@ -1862,7 +1863,7 @@ make_json_schema_definition(sharded_store&, canonical_schema schema) {
ss::future<canonical_schema> make_canonical_json_schema(
sharded_store& store, unparsed_schema unparsed_schema) {
// TODO BP: More validation and normalisation
parse_json(unparsed_schema.def().shared_raw()()).value(); // throws on error
parse_json(unparsed_schema.def().shared_raw()).value(); // throws on error
auto [sub, unparsed] = std::move(unparsed_schema).destructure();
auto [def, type, refs] = std::move(unparsed).destructure();

Expand Down
19 changes: 9 additions & 10 deletions src/v/pandaproxy/schema_registry/protobuf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "pandaproxy/schema_registry/protobuf.h"

#include "base/vlog.h"
#include "bytes/streambuf.h"
#include "kafka/protocol/errors.h"
#include "pandaproxy/logger.h"
#include "pandaproxy/schema_registry/errors.h"
Expand Down Expand Up @@ -201,8 +202,8 @@ class dp_error_collector final : public pb::DescriptorPool::ErrorCollector {
class schema_def_input_stream : public pb::io::ZeroCopyInputStream {
public:
explicit schema_def_input_stream(const canonical_schema_definition& def)
: _str(def.raw())
, _impl{_str().data(), static_cast<int>(_str().size())} {}
: _is{def.shared_raw()}
, _impl{&_is.istream()} {}

bool Next(const void** data, int* size) override {
return _impl.Next(data, size);
Expand All @@ -212,8 +213,8 @@ class schema_def_input_stream : public pb::io::ZeroCopyInputStream {
int64_t ByteCount() const override { return _impl.ByteCount(); }

private:
canonical_schema_definition::raw_string _str;
pb::io::ArrayInputStream _impl;
iobuf_istream _is;
pb::io::IstreamInputStream _impl;
};

class parser {
Expand All @@ -231,13 +232,9 @@ class parser {
// Attempt parse a .proto file
if (!_parser.Parse(&t, &_fdp)) {
// base64 decode the schema
std::string_view b64_def{
schema.def().raw()().data(), schema.def().raw()().size()};
auto bytes_def = base64_to_bytes(b64_def);

iobuf_istream is{base64_to_iobuf(schema.def().raw()())};
// Attempt parse as an encoded FileDescriptorProto.pb
if (!_fdp.ParseFromArray(
bytes_def.data(), static_cast<int>(bytes_def.size()))) {
if (!_fdp.ParseFromIstream(&is.istream())) {
throw as_exception(error_collector.error());
}
}
Expand Down Expand Up @@ -326,6 +323,7 @@ struct protobuf_schema_definition::impl {
* messages
*/
ss::sstring debug_string() const {
// TODO BP: Prevent this linearization
auto s = fd->DebugString();

// reordering not required if no package or no dependencies
Expand Down Expand Up @@ -353,6 +351,7 @@ struct protobuf_schema_definition::impl {
auto imports = trim(sv.substr(imports_pos, imports_len));
auto footer = trim(sv.substr(package_pos + package.length()));

// TODO BP: Prevent this linearization
return ssx::sformat(
"{}\n{}\n\n{}\n\n{}\n", header, package, imports, footer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,10 @@ class post_subject_versions_request_handler
auto sv = std::string_view{str, len};
switch (_state) {
case state::schema: {
iobuf buf;
buf.append(sv.data(), sv.size());
_schema.def = unparsed_schema_definition::raw_string{
ss::sstring{sv}};
std::move(buf)};
_state = state::record;
return true;
}
Expand Down
1 change: 0 additions & 1 deletion src/v/pandaproxy/schema_registry/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
#pragma once

#include "base/vlog.h"
#include "bytes/iobuf_parser.h"
#include "json/iobuf_writer.h"
#include "json/json.h"
#include "json/types.h"
Expand Down
3 changes: 2 additions & 1 deletion src/v/pandaproxy/schema_registry/test/util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ BOOST_AUTO_TEST_CASE(test_make_schema_definition) {
auto res = pps::make_schema_definition<json::UTF8<>>(example_avro_schema);

BOOST_REQUIRE(res);
BOOST_REQUIRE_EQUAL(res.value()(), minified_avro_schema);
auto str = to_string(std::move(res).value());
BOOST_REQUIRE_EQUAL(str, minified_avro_schema);
}

BOOST_AUTO_TEST_CASE(test_make_schema_definition_failure) {
Expand Down
8 changes: 6 additions & 2 deletions src/v/pandaproxy/schema_registry/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

#include "types.h"

#include "util.h"

#include <fmt/core.h>
#include <fmt/format.h>
#include <fmt/ostream.h>
Expand Down Expand Up @@ -47,7 +49,8 @@ std::ostream& operator<<(
os,
"type: {}, definition: {}, references: {}",
to_string_view(def.type()),
def.raw(),
// TODO BP: Prevent this linearization
to_string(def.shared_raw()),
def.refs());
return os;
}
Expand All @@ -59,7 +62,8 @@ std::ostream& operator<<(
os,
"type: {}, definition: {}, references: {}",
to_string_view(def.type()),
def.raw(),
// TODO BP: Prevent this linearization
to_string(def.shared_raw()),
def.refs());
return os;
}
Expand Down
Loading

0 comments on commit e7fab4a

Please sign in to comment.