Skip to content

Commit

Permalink
schema_registry: Reduce copies of schema definitions
Browse files Browse the repository at this point in the history
Disable copy for types that hold a schema definition; replacing them
with an explicit `copy()` or `share()`.

Also introduce a `destructure` call to avoid `bugprone-use-after-move`

Fixes [CORE-1138]

Signed-off-by: Ben Pope <ben@redpanda.com>
  • Loading branch information
BenPope committed Jul 16, 2024
1 parent a940e07 commit e189e05
Show file tree
Hide file tree
Showing 25 changed files with 333 additions and 258 deletions.
10 changes: 3 additions & 7 deletions src/v/pandaproxy/schema_registry/avro.cc
Original file line number Diff line number Diff line change
Expand Up @@ -454,18 +454,14 @@ ss::future<collected_schema> collect_schema(
collected_schema collected,
ss::sstring name,
canonical_schema schema) {
for (auto& ref : schema.def().refs()) {
for (auto const& ref : schema.def().refs()) {
if (!collected.contains(ref.name)) {
auto ss = co_await store.get_subject_schema(
std::move(ref.sub), ref.version, include_deleted::no);
ref.sub, ref.version, include_deleted::no);
collected = co_await collect_schema(
store,
std::move(collected),
std::move(ref.name),
std::move(ss.schema));
store, std::move(collected), ref.name, std::move(ss.schema));
}
}
// NOLINTNEXTLINE(bugprone-use-after-move)
collected.insert(std::move(name), std::move(schema).def());
co_return std::move(collected);
}
Expand Down
13 changes: 8 additions & 5 deletions src/v/pandaproxy/schema_registry/handlers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ post_subject(server::request_t rq, server::reply_t rp) {
}

auto sub_schema = co_await rq.service().schema_store().has_schema(
schema, inc_del);
std::move(schema), inc_del);

rp.rep->write_body(
"json",
Expand Down Expand Up @@ -460,7 +460,8 @@ post_subject_versions(server::request_t rq, server::reply_t rp) {
unparsed.id.value_or(invalid_schema_id),
is_deleted::no};

auto ids = co_await rq.service().schema_store().get_schema_version(schema);
auto ids = co_await rq.service().schema_store().get_schema_version(
schema.share());

schema_id schema_id{ids.id.value_or(invalid_schema_id)};
if (!ids.version.has_value()) {
Expand Down Expand Up @@ -647,9 +648,11 @@ compatibility_subject_version(server::request_t rq, server::reply_t rp) {

auto schema = co_await rq.service().schema_store().make_canonical_schema(
std::move(unparsed.def));
auto get_res = co_await get_or_load(rq, [&rq, &schema, version]() {
return rq.service().schema_store().is_compatible(version, schema);
});
auto get_res = co_await get_or_load(
rq, [&rq, schema{std::move(schema)}, version]() {
return rq.service().schema_store().is_compatible(
version, schema.share());
});

rp.rep->write_body(
"json",
Expand Down
26 changes: 13 additions & 13 deletions src/v/pandaproxy/schema_registry/json.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1859,22 +1859,22 @@ make_json_schema_definition(sharded_store&, canonical_schema schema) {
std::move(refs)};
}

ss::future<canonical_schema>
make_canonical_json_schema(sharded_store& store, unparsed_schema def) {
ss::future<canonical_schema> make_canonical_json_schema(
sharded_store& store, unparsed_schema unparsed_schema) {
// TODO BP: More validation and normalisation
parse_json(def.def().raw()()).value(); // throws on error
auto raw_def = std::move(def).def();
auto schema = canonical_schema{
// NOLINTNEXTLINE(bugprone-use-after-move)
def.sub(),
canonical_schema_definition{// NOLINTNEXTLINE(bugprone-use-after-move)
std::move(raw_def).raw(),
def.type(),
// NOLINTNEXTLINE(bugprone-use-after-move)
std::move(raw_def).refs()}};
parse_json(unparsed_schema.def().shared_raw()()).value(); // throws on error
auto [sub, unparsed] = std::move(unparsed_schema).destructure();
auto [def, type, refs] = std::move(unparsed).destructure();

canonical_schema schema{
std::move(sub),
canonical_schema_definition{
canonical_schema_definition::raw_string{std::move(def)()},
type,
std::move(refs)}};

// Ensure all references exist
co_await check_references(store, schema);
co_await check_references(store, schema.share());

co_return schema;
}
Expand Down
21 changes: 11 additions & 10 deletions src/v/pandaproxy/schema_registry/protobuf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ ss::future<const pb::FileDescriptor*> build_file_with_refs(
ss::future<const pb::FileDescriptor*> import_schema(
pb::DescriptorPool& dp, sharded_store& store, canonical_schema schema) {
try {
co_return co_await build_file_with_refs(dp, store, schema);
co_return co_await build_file_with_refs(dp, store, schema.share());
} catch (const exception& e) {
vlog(plog.warn, "Failed to decode schema: {}", e.what());
throw as_exception(invalid_schema(schema));
Expand Down Expand Up @@ -409,16 +409,17 @@ validate_protobuf_schema(sharded_store& store, canonical_schema schema) {

ss::future<canonical_schema>
make_canonical_protobuf_schema(sharded_store& store, unparsed_schema schema) {
// NOLINTBEGIN(bugprone-use-after-move)
auto [sub, unparsed] = std::move(schema).destructure();
auto [def, type, refs] = std::move(unparsed).destructure();
canonical_schema temp{
std::move(schema).sub(),
{canonical_schema_definition::raw_string{schema.def().raw()()},
schema.def().type(),
schema.def().refs()}};

auto validated = co_await validate_protobuf_schema(store, temp);
co_return canonical_schema{std::move(temp).sub(), std::move(validated)};
// NOLINTEND(bugprone-use-after-move)
sub,
{canonical_schema_definition::raw_string{std::move(def)()},
type,
std::move(refs)}};

co_return canonical_schema{
std::move(sub),
co_await validate_protobuf_schema(store, std::move(temp))};
}

namespace {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@
#include "pandaproxy/schema_registry/requests/get_subject_versions_version.h"

#include "base/seastarx.h"
#include "pandaproxy/json/rjson_util.h"

#include <seastar/testing/thread_test_case.hh>

#include <type_traits>

namespace ppj = pandaproxy::json;
namespace pps = pandaproxy::schema_registry;

Expand All @@ -30,7 +29,9 @@ SEASTAR_THREAD_TEST_CASE(test_post_subject_versions_version_response) {
const pps::subject sub{"imported-ref"};

pps::post_subject_versions_version_response response{
.schema{pps::subject{"imported-ref"}, schema_def}, .id{12}, .version{2}};
.schema{pps::subject{"imported-ref"}, schema_def.copy()},
.id{12},
.version{2}};

const ss::sstring expected{
R"(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,22 @@ SEASTAR_THREAD_TEST_CASE(test_post_subject_versions_parser) {
})"};
const pps::subject sub{"test_subject"};
const parse_result expected{
{sub, expected_schema_def}, std::nullopt, std::nullopt};
{sub, expected_schema_def.share()}, std::nullopt, std::nullopt};

auto result{ppj::impl::rjson_parse(
payload.data(), pps::post_subject_versions_request_handler{sub})};

// canonicalisation now requires a sharded_store, for now, minify.
// NOLINTBEGIN(bugprone-use-after-move)
auto [rsub, unparsed] = std::move(result.def).destructure();
auto [def, type, refs] = std::move(unparsed).destructure();

result.def = {
std::move(result.def).sub(),
std::move(rsub),
pps::unparsed_schema_definition{
::json::minify(result.def.def().raw()()),
pps::unparsed_schema_definition::raw_string{
::json::minify(std::move(def)())},
pps::schema_type::avro,
std::move(result.def).def().refs()}};
// NOLINTEND(bugprone-use-after-move)
std::move(refs)}};

BOOST_REQUIRE_EQUAL(expected.def, result.def);
BOOST_REQUIRE_EQUAL(expected.id.has_value(), result.id.has_value());
Expand Down
28 changes: 16 additions & 12 deletions src/v/pandaproxy/schema_registry/seq_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -216,39 +216,43 @@ ss::future<std::optional<schema_id>> seq_writer::do_write_subject_version(

// Check if store already contains this data: if
// so, we do no I/O and return the schema ID.
auto projected = co_await _store.project_ids(schema).handle_exception(
[](std::exception_ptr e) {
vlog(plog.debug, "write_subject_version: project_ids failed: {}", e);
return ss::make_exception_future<sharded_store::insert_result>(e);
});
auto projected
= co_await _store.project_ids(schema.share())
.handle_exception([](std::exception_ptr e) {
vlog(
plog.debug, "write_subject_version: project_ids failed: {}", e);
return ss::make_exception_future<sharded_store::insert_result>(e);
});

if (!projected.inserted) {
vlog(plog.debug, "write_subject_version: no-op");
co_return projected.id;
} else {
auto canonical = std::move(schema.schema);
auto sub = canonical.sub();
vlog(
plog.debug,
"seq_writer::write_subject_version project offset={} "
"subject={} "
"schema={} "
"version={}",
write_at,
schema.schema.sub(),
sub,
projected.id,
projected.version);

auto key = schema_key{
.seq{write_at},
.node{_node_id},
.sub{schema.schema.sub()},
.sub{sub},
.version{projected.version}};
auto value = canonical_schema_value{
.schema{schema.schema},
.schema{std::move(canonical)},
.version{projected.version},
.id{projected.id},
.deleted = is_deleted::no};

batch_builder rb(write_at, schema.schema.sub());
batch_builder rb(write_at, sub);
rb(std::move(key), std::move(value));

if (co_await produce_and_apply(write_at, std::move(rb).build())) {
Expand All @@ -261,9 +265,9 @@ ss::future<std::optional<schema_id>> seq_writer::do_write_subject_version(
}

ss::future<schema_id> seq_writer::write_subject_version(subject_schema schema) {
return sequenced_write(
[schema{std::move(schema)}](model::offset write_at, seq_writer& seq) {
return seq.do_write_subject_version(schema, write_at);
co_return co_await sequenced_write(
[&schema](model::offset write_at, seq_writer& seq) {
return seq.do_write_subject_version(schema.share(), write_at);
});
}

Expand Down
50 changes: 27 additions & 23 deletions src/v/pandaproxy/schema_registry/sharded_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,10 @@ ss::future<canonical_schema>
sharded_store::make_canonical_schema(unparsed_schema schema) {
switch (schema.type()) {
case schema_type::avro: {
auto [sub, unparsed] = std::move(schema).destructure();
co_return canonical_schema{
std::move(schema.sub()),
sanitize_avro_schema_definition(schema.def()).value()};
std::move(sub),
sanitize_avro_schema_definition(std::move(unparsed)).value()};
}
case schema_type::protobuf:
co_return co_await make_canonical_protobuf_schema(
Expand All @@ -94,14 +95,14 @@ sharded_store::make_canonical_schema(unparsed_schema schema) {
ss::future<> sharded_store::validate_schema(canonical_schema schema) {
switch (schema.type()) {
case schema_type::avro: {
co_await make_avro_schema_definition(*this, schema);
co_await make_avro_schema_definition(*this, std::move(schema));
co_return;
}
case schema_type::protobuf:
co_await validate_protobuf_schema(*this, std::move(schema));
co_return;
case schema_type::json:
co_await make_json_schema_definition((*this), schema);
co_await make_json_schema_definition((*this), std::move(schema));
co_return;
}
__builtin_unreachable();
Expand All @@ -113,21 +114,24 @@ sharded_store::make_valid_schema(canonical_schema schema) {
// See #3596 for details, especially if modifying it.
switch (schema.type()) {
case schema_type::avro: {
co_return co_await make_avro_schema_definition(*this, schema);
co_return co_await make_avro_schema_definition(
*this, std::move(schema));
}
case schema_type::protobuf: {
co_return co_await make_protobuf_schema_definition(*this, schema);
co_return co_await make_protobuf_schema_definition(
*this, std::move(schema));
}
case schema_type::json:
co_return co_await make_json_schema_definition(*this, schema);
co_return co_await make_json_schema_definition(
*this, std::move(schema));
}
throw as_exception(invalid_schema_type(schema.type()));
}

ss::future<sharded_store::has_schema_result>
sharded_store::get_schema_version(subject_schema schema) {
// Validate the schema (may throw)
co_await validate_schema(schema.schema);
co_await validate_schema(schema.schema.share());

// Determine if the definition already exists
auto map = [&schema](store& s) {
Expand Down Expand Up @@ -193,7 +197,7 @@ sharded_store::get_schema_version(subject_schema schema) {
// Check compatibility of the schema
if (!v_id.has_value() && !versions.empty()) {
auto compat = co_await is_compatible(
versions.back().version, schema.schema);
versions.back().version, schema.schema.share());
if (!compat) {
throw exception(
error_code::schema_incompatible,
Expand Down Expand Up @@ -237,8 +241,12 @@ ss::future<bool> sharded_store::upsert(
schema_id id,
schema_version version,
is_deleted deleted) {
auto canonical = co_await make_canonical_schema(schema);
co_return co_await upsert(marker, canonical, id, version, deleted);
co_return co_await upsert(
marker,
co_await make_canonical_schema(std::move(schema)),
id,
version,
deleted);
}

ss::future<bool> sharded_store::upsert(
Expand All @@ -247,15 +255,10 @@ ss::future<bool> sharded_store::upsert(
schema_id id,
schema_version version,
is_deleted deleted) {
// NOLINTNEXTLINE(bugprone-use-after-move)
co_await upsert_schema(id, std::move(schema).def());
auto [sub, def] = std::move(schema).destructure();
co_await upsert_schema(id, std::move(def));
co_return co_await upsert_subject(
marker,
// NOLINTNEXTLINE(bugprone-use-after-move)
std::move(schema).sub(),
version,
id,
deleted);
marker, std::move(sub), version, id, deleted);
}

ss::future<bool> sharded_store::has_schema(schema_id id) {
Expand All @@ -275,7 +278,7 @@ sharded_store::has_schema(canonical_schema schema, include_deleted inc_del) {
auto versions = co_await get_versions(schema.sub(), inc_del);

try {
co_await validate_schema(schema);
co_await validate_schema(schema.share());
} catch (const exception& e) {
throw as_exception(invalid_subject_schema(schema.sub()));
}
Expand Down Expand Up @@ -676,7 +679,7 @@ ss::future<> sharded_store::maybe_update_max_schema_id(schema_id id) {
ss::future<bool> sharded_store::is_compatible(
schema_version version, canonical_schema new_schema) {
// Lookup the version_ids
const auto& sub = new_schema.sub();
const auto sub = new_schema.sub();
const auto versions = co_await _store.invoke_on(
shard_for(sub), _smp_opts, [sub](auto& s) {
return s.get_version_ids(sub, include_deleted::no).value();
Expand Down Expand Up @@ -733,7 +736,7 @@ ss::future<bool> sharded_store::is_compatible(
ver_it = versions.begin();
}

auto new_valid = co_await make_valid_schema(new_schema);
auto new_valid = co_await make_valid_schema(std::move(new_schema));

auto is_compat = true;
for (; is_compat && ver_it != versions.end(); ++ver_it) {
Expand All @@ -743,7 +746,8 @@ ss::future<bool> sharded_store::is_compatible(

auto old_schema = co_await get_subject_schema(
sub, ver_it->version, include_deleted::no);
auto old_valid = co_await make_valid_schema(old_schema.schema);
auto old_valid = co_await make_valid_schema(
std::move(old_schema.schema));

if (
compat == compatibility_level::backward
Expand Down
Loading

0 comments on commit e189e05

Please sign in to comment.