diff --git a/src/v/pandaproxy/schema_registry/avro.cc b/src/v/pandaproxy/schema_registry/avro.cc index 10e085a705127..a6e48b39a7952 100644 --- a/src/v/pandaproxy/schema_registry/avro.cc +++ b/src/v/pandaproxy/schema_registry/avro.cc @@ -454,18 +454,14 @@ ss::future collect_schema( collected_schema collected, ss::sstring name, canonical_schema schema) { - for (auto& ref : schema.def().refs()) { + for (auto const& ref : schema.def().refs()) { if (!collected.contains(ref.name)) { auto ss = co_await store.get_subject_schema( - std::move(ref.sub), ref.version, include_deleted::no); + ref.sub, ref.version, include_deleted::no); collected = co_await collect_schema( - store, - std::move(collected), - std::move(ref.name), - std::move(ss.schema)); + store, std::move(collected), ref.name, std::move(ss.schema)); } } - // NOLINTNEXTLINE(bugprone-use-after-move) collected.insert(std::move(name), std::move(schema).def()); co_return std::move(collected); } diff --git a/src/v/pandaproxy/schema_registry/handlers.cc b/src/v/pandaproxy/schema_registry/handlers.cc index b3d10cbaa3f22..e8630291fa271 100644 --- a/src/v/pandaproxy/schema_registry/handlers.cc +++ b/src/v/pandaproxy/schema_registry/handlers.cc @@ -430,7 +430,7 @@ post_subject(server::request_t rq, server::reply_t rp) { } auto sub_schema = co_await rq.service().schema_store().has_schema( - schema, inc_del); + std::move(schema), inc_del); rp.rep->write_body( "json", @@ -460,7 +460,8 @@ post_subject_versions(server::request_t rq, server::reply_t rp) { unparsed.id.value_or(invalid_schema_id), is_deleted::no}; - auto ids = co_await rq.service().schema_store().get_schema_version(schema); + auto ids = co_await rq.service().schema_store().get_schema_version( + schema.share()); schema_id schema_id{ids.id.value_or(invalid_schema_id)}; if (!ids.version.has_value()) { @@ -647,9 +648,11 @@ compatibility_subject_version(server::request_t rq, server::reply_t rp) { auto schema = co_await rq.service().schema_store().make_canonical_schema( std::move(unparsed.def)); - auto get_res = co_await get_or_load(rq, [&rq, &schema, version]() { - return rq.service().schema_store().is_compatible(version, schema); - }); + auto get_res = co_await get_or_load( + rq, [&rq, schema{std::move(schema)}, version]() { + return rq.service().schema_store().is_compatible( + version, schema.share()); + }); rp.rep->write_body( "json", diff --git a/src/v/pandaproxy/schema_registry/json.cc b/src/v/pandaproxy/schema_registry/json.cc index 08190711018cd..4a70d79b5fc8e 100644 --- a/src/v/pandaproxy/schema_registry/json.cc +++ b/src/v/pandaproxy/schema_registry/json.cc @@ -1859,22 +1859,22 @@ make_json_schema_definition(sharded_store&, canonical_schema schema) { std::move(refs)}; } -ss::future -make_canonical_json_schema(sharded_store& store, unparsed_schema def) { +ss::future make_canonical_json_schema( + sharded_store& store, unparsed_schema unparsed_schema) { // TODO BP: More validation and normalisation - parse_json(def.def().raw()()).value(); // throws on error - auto raw_def = std::move(def).def(); - auto schema = canonical_schema{ - // NOLINTNEXTLINE(bugprone-use-after-move) - def.sub(), - canonical_schema_definition{// NOLINTNEXTLINE(bugprone-use-after-move) - std::move(raw_def).raw(), - def.type(), - // NOLINTNEXTLINE(bugprone-use-after-move) - std::move(raw_def).refs()}}; + parse_json(unparsed_schema.def().shared_raw()()).value(); // throws on error + auto [sub, unparsed] = std::move(unparsed_schema).destructure(); + auto [def, type, refs] = std::move(unparsed).destructure(); + + canonical_schema schema{ + std::move(sub), + canonical_schema_definition{ + canonical_schema_definition::raw_string{std::move(def)()}, + type, + std::move(refs)}}; // Ensure all references exist - co_await check_references(store, schema); + co_await check_references(store, schema.share()); co_return schema; } diff --git a/src/v/pandaproxy/schema_registry/protobuf.cc b/src/v/pandaproxy/schema_registry/protobuf.cc index c492cc82b46b2..e534afe058914 100644 --- a/src/v/pandaproxy/schema_registry/protobuf.cc +++ b/src/v/pandaproxy/schema_registry/protobuf.cc @@ -297,7 +297,7 @@ ss::future build_file_with_refs( ss::future import_schema( pb::DescriptorPool& dp, sharded_store& store, canonical_schema schema) { try { - co_return co_await build_file_with_refs(dp, store, schema); + co_return co_await build_file_with_refs(dp, store, schema.share()); } catch (const exception& e) { vlog(plog.warn, "Failed to decode schema: {}", e.what()); throw as_exception(invalid_schema(schema)); @@ -409,16 +409,17 @@ validate_protobuf_schema(sharded_store& store, canonical_schema schema) { ss::future make_canonical_protobuf_schema(sharded_store& store, unparsed_schema schema) { - // NOLINTBEGIN(bugprone-use-after-move) + auto [sub, unparsed] = std::move(schema).destructure(); + auto [def, type, refs] = std::move(unparsed).destructure(); canonical_schema temp{ - std::move(schema).sub(), - {canonical_schema_definition::raw_string{schema.def().raw()()}, - schema.def().type(), - schema.def().refs()}}; - - auto validated = co_await validate_protobuf_schema(store, temp); - co_return canonical_schema{std::move(temp).sub(), std::move(validated)}; - // NOLINTEND(bugprone-use-after-move) + sub, + {canonical_schema_definition::raw_string{std::move(def)()}, + type, + std::move(refs)}}; + + co_return canonical_schema{ + std::move(sub), + co_await validate_protobuf_schema(store, std::move(temp))}; } namespace { diff --git a/src/v/pandaproxy/schema_registry/requests/test/get_subject_versions_version.cc b/src/v/pandaproxy/schema_registry/requests/test/get_subject_versions_version.cc index 228e57fbad063..ef269e28ad649 100644 --- a/src/v/pandaproxy/schema_registry/requests/test/get_subject_versions_version.cc +++ b/src/v/pandaproxy/schema_registry/requests/test/get_subject_versions_version.cc @@ -10,11 +10,10 @@ #include "pandaproxy/schema_registry/requests/get_subject_versions_version.h" #include "base/seastarx.h" +#include "pandaproxy/json/rjson_util.h" #include -#include - namespace ppj = pandaproxy::json; namespace pps = pandaproxy::schema_registry; @@ -30,7 +29,9 @@ SEASTAR_THREAD_TEST_CASE(test_post_subject_versions_version_response) { const pps::subject sub{"imported-ref"}; pps::post_subject_versions_version_response response{ - .schema{pps::subject{"imported-ref"}, schema_def}, .id{12}, .version{2}}; + .schema{pps::subject{"imported-ref"}, schema_def.copy()}, + .id{12}, + .version{2}}; const ss::sstring expected{ R"( diff --git a/src/v/pandaproxy/schema_registry/requests/test/post_subject_versions.cc b/src/v/pandaproxy/schema_registry/requests/test/post_subject_versions.cc index a0befd798c81c..645e35866ce47 100644 --- a/src/v/pandaproxy/schema_registry/requests/test/post_subject_versions.cc +++ b/src/v/pandaproxy/schema_registry/requests/test/post_subject_versions.cc @@ -50,20 +50,22 @@ SEASTAR_THREAD_TEST_CASE(test_post_subject_versions_parser) { })"}; const pps::subject sub{"test_subject"}; const parse_result expected{ - {sub, expected_schema_def}, std::nullopt, std::nullopt}; + {sub, expected_schema_def.share()}, std::nullopt, std::nullopt}; auto result{ppj::impl::rjson_parse( payload.data(), pps::post_subject_versions_request_handler{sub})}; // canonicalisation now requires a sharded_store, for now, minify. - // NOLINTBEGIN(bugprone-use-after-move) + auto [rsub, unparsed] = std::move(result.def).destructure(); + auto [def, type, refs] = std::move(unparsed).destructure(); + result.def = { - std::move(result.def).sub(), + std::move(rsub), pps::unparsed_schema_definition{ - ::json::minify(result.def.def().raw()()), + pps::unparsed_schema_definition::raw_string{ + ::json::minify(std::move(def)())}, pps::schema_type::avro, - std::move(result.def).def().refs()}}; - // NOLINTEND(bugprone-use-after-move) + std::move(refs)}}; BOOST_REQUIRE_EQUAL(expected.def, result.def); BOOST_REQUIRE_EQUAL(expected.id.has_value(), result.id.has_value()); diff --git a/src/v/pandaproxy/schema_registry/seq_writer.cc b/src/v/pandaproxy/schema_registry/seq_writer.cc index 7905e31c868de..00c2997b0fea0 100644 --- a/src/v/pandaproxy/schema_registry/seq_writer.cc +++ b/src/v/pandaproxy/schema_registry/seq_writer.cc @@ -216,16 +216,20 @@ ss::future> seq_writer::do_write_subject_version( // Check if store already contains this data: if // so, we do no I/O and return the schema ID. - auto projected = co_await _store.project_ids(schema).handle_exception( - [](std::exception_ptr e) { - vlog(plog.debug, "write_subject_version: project_ids failed: {}", e); - return ss::make_exception_future(e); - }); + auto projected + = co_await _store.project_ids(schema.share()) + .handle_exception([](std::exception_ptr e) { + vlog( + plog.debug, "write_subject_version: project_ids failed: {}", e); + return ss::make_exception_future(e); + }); if (!projected.inserted) { vlog(plog.debug, "write_subject_version: no-op"); co_return projected.id; } else { + auto canonical = std::move(schema.schema); + auto sub = canonical.sub(); vlog( plog.debug, "seq_writer::write_subject_version project offset={} " @@ -233,22 +237,22 @@ ss::future> seq_writer::do_write_subject_version( "schema={} " "version={}", write_at, - schema.schema.sub(), + sub, projected.id, projected.version); auto key = schema_key{ .seq{write_at}, .node{_node_id}, - .sub{schema.schema.sub()}, + .sub{sub}, .version{projected.version}}; auto value = canonical_schema_value{ - .schema{schema.schema}, + .schema{std::move(canonical)}, .version{projected.version}, .id{projected.id}, .deleted = is_deleted::no}; - batch_builder rb(write_at, schema.schema.sub()); + batch_builder rb(write_at, sub); rb(std::move(key), std::move(value)); if (co_await produce_and_apply(write_at, std::move(rb).build())) { @@ -261,9 +265,9 @@ ss::future> seq_writer::do_write_subject_version( } ss::future seq_writer::write_subject_version(subject_schema schema) { - return sequenced_write( - [schema{std::move(schema)}](model::offset write_at, seq_writer& seq) { - return seq.do_write_subject_version(schema, write_at); + co_return co_await sequenced_write( + [&schema](model::offset write_at, seq_writer& seq) { + return seq.do_write_subject_version(schema.share(), write_at); }); } diff --git a/src/v/pandaproxy/schema_registry/sharded_store.cc b/src/v/pandaproxy/schema_registry/sharded_store.cc index bac63bbea83b3..f1eb7d729998c 100644 --- a/src/v/pandaproxy/schema_registry/sharded_store.cc +++ b/src/v/pandaproxy/schema_registry/sharded_store.cc @@ -78,9 +78,10 @@ ss::future sharded_store::make_canonical_schema(unparsed_schema schema) { switch (schema.type()) { case schema_type::avro: { + auto [sub, unparsed] = std::move(schema).destructure(); co_return canonical_schema{ - std::move(schema.sub()), - sanitize_avro_schema_definition(schema.def()).value()}; + std::move(sub), + sanitize_avro_schema_definition(std::move(unparsed)).value()}; } case schema_type::protobuf: co_return co_await make_canonical_protobuf_schema( @@ -94,14 +95,14 @@ sharded_store::make_canonical_schema(unparsed_schema schema) { ss::future<> sharded_store::validate_schema(canonical_schema schema) { switch (schema.type()) { case schema_type::avro: { - co_await make_avro_schema_definition(*this, schema); + co_await make_avro_schema_definition(*this, std::move(schema)); co_return; } case schema_type::protobuf: co_await validate_protobuf_schema(*this, std::move(schema)); co_return; case schema_type::json: - co_await make_json_schema_definition((*this), schema); + co_await make_json_schema_definition((*this), std::move(schema)); co_return; } __builtin_unreachable(); @@ -113,13 +114,16 @@ sharded_store::make_valid_schema(canonical_schema schema) { // See #3596 for details, especially if modifying it. switch (schema.type()) { case schema_type::avro: { - co_return co_await make_avro_schema_definition(*this, schema); + co_return co_await make_avro_schema_definition( + *this, std::move(schema)); } case schema_type::protobuf: { - co_return co_await make_protobuf_schema_definition(*this, schema); + co_return co_await make_protobuf_schema_definition( + *this, std::move(schema)); } case schema_type::json: - co_return co_await make_json_schema_definition(*this, schema); + co_return co_await make_json_schema_definition( + *this, std::move(schema)); } throw as_exception(invalid_schema_type(schema.type())); } @@ -127,7 +131,7 @@ sharded_store::make_valid_schema(canonical_schema schema) { ss::future sharded_store::get_schema_version(subject_schema schema) { // Validate the schema (may throw) - co_await validate_schema(schema.schema); + co_await validate_schema(schema.schema.share()); // Determine if the definition already exists auto map = [&schema](store& s) { @@ -193,7 +197,7 @@ sharded_store::get_schema_version(subject_schema schema) { // Check compatibility of the schema if (!v_id.has_value() && !versions.empty()) { auto compat = co_await is_compatible( - versions.back().version, schema.schema); + versions.back().version, schema.schema.share()); if (!compat) { throw exception( error_code::schema_incompatible, @@ -237,8 +241,12 @@ ss::future sharded_store::upsert( schema_id id, schema_version version, is_deleted deleted) { - auto canonical = co_await make_canonical_schema(schema); - co_return co_await upsert(marker, canonical, id, version, deleted); + co_return co_await upsert( + marker, + co_await make_canonical_schema(std::move(schema)), + id, + version, + deleted); } ss::future sharded_store::upsert( @@ -247,15 +255,10 @@ ss::future sharded_store::upsert( schema_id id, schema_version version, is_deleted deleted) { - // NOLINTNEXTLINE(bugprone-use-after-move) - co_await upsert_schema(id, std::move(schema).def()); + auto [sub, def] = std::move(schema).destructure(); + co_await upsert_schema(id, std::move(def)); co_return co_await upsert_subject( - marker, - // NOLINTNEXTLINE(bugprone-use-after-move) - std::move(schema).sub(), - version, - id, - deleted); + marker, std::move(sub), version, id, deleted); } ss::future sharded_store::has_schema(schema_id id) { @@ -275,7 +278,7 @@ sharded_store::has_schema(canonical_schema schema, include_deleted inc_del) { auto versions = co_await get_versions(schema.sub(), inc_del); try { - co_await validate_schema(schema); + co_await validate_schema(schema.share()); } catch (const exception& e) { throw as_exception(invalid_subject_schema(schema.sub())); } @@ -676,7 +679,7 @@ ss::future<> sharded_store::maybe_update_max_schema_id(schema_id id) { ss::future sharded_store::is_compatible( schema_version version, canonical_schema new_schema) { // Lookup the version_ids - const auto& sub = new_schema.sub(); + const auto sub = new_schema.sub(); const auto versions = co_await _store.invoke_on( shard_for(sub), _smp_opts, [sub](auto& s) { return s.get_version_ids(sub, include_deleted::no).value(); @@ -733,7 +736,7 @@ ss::future sharded_store::is_compatible( ver_it = versions.begin(); } - auto new_valid = co_await make_valid_schema(new_schema); + auto new_valid = co_await make_valid_schema(std::move(new_schema)); auto is_compat = true; for (; is_compat && ver_it != versions.end(); ++ver_it) { @@ -743,7 +746,8 @@ ss::future sharded_store::is_compatible( auto old_schema = co_await get_subject_schema( sub, ver_it->version, include_deleted::no); - auto old_valid = co_await make_valid_schema(old_schema.schema); + auto old_valid = co_await make_valid_schema( + std::move(old_schema.schema)); if ( compat == compatibility_level::backward diff --git a/src/v/pandaproxy/schema_registry/store.h b/src/v/pandaproxy/schema_registry/store.h index 31693652c1f3d..2ce3cb343689e 100644 --- a/src/v/pandaproxy/schema_registry/store.h +++ b/src/v/pandaproxy/schema_registry/store.h @@ -76,9 +76,9 @@ class store { /// /// return the schema_version and schema_id, and whether it's new. insert_result insert(canonical_schema schema) { - auto id = insert_schema(std::move(schema).def()).id; - // NOLINTNEXTLINE(bugprone-use-after-move) - auto [version, inserted] = insert_subject(std::move(schema).sub(), id); + auto [sub, def] = std::move(schema).destructure(); + auto id = insert_schema(std::move(def)).id; + auto [version, inserted] = insert_subject(std::move(sub), id); return {version, id, inserted}; } @@ -89,7 +89,7 @@ class store { if (it == _schemas.end()) { return not_found(id); } - return {it->second.definition}; + return {it->second.definition.share()}; } ///\brief Return the id of the schema, if it already exists. diff --git a/src/v/pandaproxy/schema_registry/test/compatibility_avro.cc b/src/v/pandaproxy/schema_registry/test/compatibility_avro.cc index 79decc2cc31c0..78e2dc4e6ee6e 100644 --- a/src/v/pandaproxy/schema_registry/test/compatibility_avro.cc +++ b/src/v/pandaproxy/schema_registry/test/compatibility_avro.cc @@ -24,10 +24,10 @@ bool check_compatible( pps::sharded_store s; return check_compatible( pps::make_avro_schema_definition( - s, {pps::subject("r"), {r.raw(), pps::schema_type::avro}}) + s, {pps::subject("r"), {r.shared_raw(), pps::schema_type::avro}}) .get(), pps::make_avro_schema_definition( - s, {pps::subject("w"), {w.raw(), pps::schema_type::avro}}) + s, {pps::subject("w"), {w.shared_raw(), pps::schema_type::avro}}) .get()); } @@ -239,10 +239,11 @@ SEASTAR_THREAD_TEST_CASE(test_avro_schema_definition) { R"({"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"},{"name":"f2","type":"string","default":"foo"}]})", pps::schema_type::avro}; pps::sharded_store s; - auto valid - = pps::make_avro_schema_definition( - s, {pps::subject("s2"), {schema2.raw(), pps::schema_type::avro}}) - .get(); + auto valid = pps::make_avro_schema_definition( + s, + {pps::subject("s2"), + {schema2.shared_raw(), pps::schema_type::avro}}) + .get(); static_assert( std:: is_same_v, pps::avro_schema_definition>, @@ -267,7 +268,8 @@ SEASTAR_THREAD_TEST_CASE(test_avro_schema_definition_custom_attributes) { auto valid = pps::make_avro_schema_definition( s, {pps::subject("s2"), - {avro_metadata_schema.raw(), pps::schema_type::avro}}) + {avro_metadata_schema.shared_raw(), + pps::schema_type::avro}}) .get(); static_assert( std:: diff --git a/src/v/pandaproxy/schema_registry/test/compatibility_protobuf.cc b/src/v/pandaproxy/schema_registry/test/compatibility_protobuf.cc index ac5e5ceb79e3d..5ca0449d2a61d 100644 --- a/src/v/pandaproxy/schema_registry/test/compatibility_protobuf.cc +++ b/src/v/pandaproxy/schema_registry/test/compatibility_protobuf.cc @@ -45,7 +45,7 @@ struct simple_sharded_store { std::nullopt, version, pps::seq_marker_key_type::schema}, - schema, + schema.share(), id, version, pps::is_deleted::no) @@ -80,20 +80,24 @@ bool check_compatible( SEASTAR_THREAD_TEST_CASE(test_protobuf_simple) { simple_sharded_store store; - auto schema1 = pps::canonical_schema{pps::subject{"simple"}, simple}; + auto schema1 = pps::canonical_schema{ + pps::subject{"simple"}, simple.share()}; store.insert(schema1, pps::schema_version{1}); - auto valid_simple - = pps::make_protobuf_schema_definition(store.store, schema1).get(); + auto valid_simple = pps::make_protobuf_schema_definition( + store.store, schema1.share()) + .get(); BOOST_REQUIRE_EQUAL(valid_simple.name({0}).value(), "Simple"); } SEASTAR_THREAD_TEST_CASE(test_protobuf_nested) { simple_sharded_store store; - auto schema1 = pps::canonical_schema{pps::subject{"nested"}, nested}; + auto schema1 = pps::canonical_schema{ + pps::subject{"nested"}, nested.share()}; store.insert(schema1, pps::schema_version{1}); - auto valid_nested - = pps::make_protobuf_schema_definition(store.store, schema1).get(); + auto valid_nested = pps::make_protobuf_schema_definition( + store.store, schema1.share()) + .get(); BOOST_REQUIRE_EQUAL(valid_nested.name({0}).value(), "A0"); BOOST_REQUIRE_EQUAL(valid_nested.name({1, 0, 2}).value(), "A1.B0.C2"); BOOST_REQUIRE_EQUAL(valid_nested.name({1, 0, 4}).value(), "A1.B0.C4"); @@ -103,10 +107,11 @@ SEASTAR_THREAD_TEST_CASE(test_protobuf_imported_failure) { simple_sharded_store store; // imported depends on simple, which han't been inserted - auto schema1 = pps::canonical_schema{pps::subject{"imported"}, imported}; + auto schema1 = pps::canonical_schema{ + pps::subject{"imported"}, imported.share()}; store.insert(schema1, pps::schema_version{1}); BOOST_REQUIRE_EXCEPTION( - pps::make_protobuf_schema_definition(store.store, schema1).get(), + pps::make_protobuf_schema_definition(store.store, schema1.share()).get(), pps::exception, [](const pps::exception& ex) { return ex.code() == pps::error_code::schema_invalid; @@ -116,16 +121,18 @@ SEASTAR_THREAD_TEST_CASE(test_protobuf_imported_failure) { SEASTAR_THREAD_TEST_CASE(test_protobuf_imported_not_referenced) { simple_sharded_store store; - auto schema1 = pps::canonical_schema{pps::subject{"simple"}, simple}; + auto schema1 = pps::canonical_schema{ + pps::subject{"simple"}, simple.share()}; auto schema2 = pps::canonical_schema{ - pps::subject{"imported"}, imported_no_ref}; + pps::subject{"imported"}, imported_no_ref.share()}; store.insert(schema1, pps::schema_version{1}); - auto valid_simple - = pps::make_protobuf_schema_definition(store.store, schema1).get(); + auto valid_simple = pps::make_protobuf_schema_definition( + store.store, schema1.share()) + .get(); BOOST_REQUIRE_EXCEPTION( - pps::make_protobuf_schema_definition(store.store, schema2).get(), + pps::make_protobuf_schema_definition(store.store, schema2.share()).get(), pps::exception, [](const pps::exception& ex) { return ex.code() == pps::error_code::schema_invalid; @@ -135,43 +142,51 @@ SEASTAR_THREAD_TEST_CASE(test_protobuf_imported_not_referenced) { SEASTAR_THREAD_TEST_CASE(test_protobuf_referenced) { simple_sharded_store store; - auto schema1 = pps::canonical_schema{pps::subject{"simple.proto"}, simple}; + auto schema1 = pps::canonical_schema{ + pps::subject{"simple.proto"}, simple.share()}; auto schema2 = pps::canonical_schema{ - pps::subject{"imported.proto"}, imported}; + pps::subject{"imported.proto"}, imported.share()}; auto schema3 = pps::canonical_schema{ - pps::subject{"imported-again.proto"}, imported_again}; + pps::subject{"imported-again.proto"}, imported_again.share()}; store.insert(schema1, pps::schema_version{1}); store.insert(schema2, pps::schema_version{1}); store.insert(schema3, pps::schema_version{1}); - auto valid_simple - = pps::make_protobuf_schema_definition(store.store, schema1).get(); - auto valid_imported - = pps::make_protobuf_schema_definition(store.store, schema2).get(); - auto valid_imported_again - = pps::make_protobuf_schema_definition(store.store, schema3).get(); + auto valid_simple = pps::make_protobuf_schema_definition( + store.store, schema1.share()) + .get(); + auto valid_imported = pps::make_protobuf_schema_definition( + store.store, schema2.share()) + .get(); + auto valid_imported_again = pps::make_protobuf_schema_definition( + store.store, schema3.share()) + .get(); } SEASTAR_THREAD_TEST_CASE(test_protobuf_recursive_reference) { simple_sharded_store store; - auto schema1 = pps::canonical_schema{pps::subject{"simple.proto"}, simple}; + auto schema1 = pps::canonical_schema{ + pps::subject{"simple.proto"}, simple.share()}; auto schema2 = pps::canonical_schema{ - pps::subject{"imported.proto"}, imported}; + pps::subject{"imported.proto"}, imported.share()}; auto schema3 = pps::canonical_schema{ - pps::subject{"imported-twice.proto"}, imported_twice}; + pps::subject{"imported-twice.proto"}, imported_twice.share()}; store.insert(schema1, pps::schema_version{1}); store.insert(schema2, pps::schema_version{1}); store.insert(schema3, pps::schema_version{1}); - auto valid_simple - = pps::make_protobuf_schema_definition(store.store, schema1).get(); - auto valid_imported - = pps::make_protobuf_schema_definition(store.store, schema2).get(); - auto valid_imported_again - = pps::make_protobuf_schema_definition(store.store, schema3).get(); + auto valid_simple = pps::make_protobuf_schema_definition( + store.store, schema1.share()) + .get(); + auto valid_imported = pps::make_protobuf_schema_definition( + store.store, schema2.share()) + .get(); + auto valid_imported_again = pps::make_protobuf_schema_definition( + store.store, schema3.share()) + .get(); } SEASTAR_THREAD_TEST_CASE(test_protobuf_well_known) { @@ -266,7 +281,7 @@ message well_known_types { store.insert(schema, pps::schema_version{1}); auto valid_empty - = pps::make_protobuf_schema_definition(store.store, schema).get(); + = pps::make_protobuf_schema_definition(store.store, schema.share()).get(); } SEASTAR_THREAD_TEST_CASE(test_protobuf_compatibility_empty) { diff --git a/src/v/pandaproxy/schema_registry/test/compatibility_store.cc b/src/v/pandaproxy/schema_registry/test/compatibility_store.cc index 25f5ebc594fcd..e5a881d758b88 100644 --- a/src/v/pandaproxy/schema_registry/test/compatibility_store.cc +++ b/src/v/pandaproxy/schema_registry/test/compatibility_store.cc @@ -33,56 +33,46 @@ SEASTAR_THREAD_TEST_CASE(test_avro_basic_backwards_store_compat) { auto sub = pps::subject{"sub"}; s.upsert( dummy_marker, - {sub, pps::canonical_schema_definition{schema1}}, + {sub, schema1.share()}, pps::schema_id{1}, pps::schema_version{1}, pps::is_deleted::no) .get(); // add a defaulted field - BOOST_REQUIRE(s.is_compatible( - pps::schema_version{1}, - {sub, pps::canonical_schema_definition{schema2}}) - .get()); + BOOST_REQUIRE( + s.is_compatible(pps::schema_version{1}, {sub, schema2.share()}).get()); s.upsert( dummy_marker, - {sub, pps::canonical_schema_definition{schema2}}, + {sub, schema2.share()}, pps::schema_id{2}, pps::schema_version{2}, pps::is_deleted::no) .get(); // Test non-defaulted field - BOOST_REQUIRE(!s.is_compatible( - pps::schema_version{1}, - {sub, pps::canonical_schema_definition{schema3}}) - .get()); + BOOST_REQUIRE( + !s.is_compatible(pps::schema_version{1}, {sub, schema3.share()}).get()); // Insert schema with non-defaulted field s.upsert( dummy_marker, - {sub, pps::canonical_schema_definition{schema2}}, + {sub, schema2.share()}, pps::schema_id{2}, pps::schema_version{2}, pps::is_deleted::no) .get(); // Test Remove defaulted field to previous - BOOST_REQUIRE(s.is_compatible( - pps::schema_version{2}, - {sub, pps::canonical_schema_definition{schema3}}) - .get()); + BOOST_REQUIRE( + s.is_compatible(pps::schema_version{2}, {sub, schema3.share()}).get()); // Test Remove defaulted field to first - should fail - BOOST_REQUIRE(!s.is_compatible( - pps::schema_version{1}, - {sub, pps::canonical_schema_definition{schema3}}) - .get()); + BOOST_REQUIRE( + !s.is_compatible(pps::schema_version{1}, {sub, schema3.share()}).get()); s.set_compatibility(pps::compatibility_level::backward_transitive).get(); // Test transitive defaulted field to previous - should fail - BOOST_REQUIRE(!s.is_compatible( - pps::schema_version{2}, - {sub, pps::canonical_schema_definition{schema3}}) - .get()); + BOOST_REQUIRE( + !s.is_compatible(pps::schema_version{2}, {sub, schema3.share()}).get()); } diff --git a/src/v/pandaproxy/schema_registry/test/consume_to_store.cc b/src/v/pandaproxy/schema_registry/test/consume_to_store.cc index 8d351efdff19c..f750703af741d 100644 --- a/src/v/pandaproxy/schema_registry/test/consume_to_store.cc +++ b/src/v/pandaproxy/schema_registry/test/consume_to_store.cc @@ -114,7 +114,8 @@ SEASTAR_THREAD_TEST_CASE(test_consume_to_store) { auto good_schema_1 = pps::as_record_batch( pps::schema_key{sequence, node_id, subject0, version0, magic1}, - pps::canonical_schema_value{{subject0, string_def0}, version0, id0}); + pps::canonical_schema_value{ + {subject0, string_def0.share()}, version0, id0}); BOOST_REQUIRE_NO_THROW(c(good_schema_1.copy()).get()); auto s_res = s.get_subject_schema( @@ -124,7 +125,8 @@ SEASTAR_THREAD_TEST_CASE(test_consume_to_store) { auto good_schema_ref_1 = pps::as_record_batch( pps::schema_key{sequence, node_id, subject0, version1, magic1}, - pps::canonical_schema_value{{subject0, string_def0}, version1, id1}); + pps::canonical_schema_value{ + {subject0, string_def0.share()}, version1, id1}); BOOST_REQUIRE_NO_THROW(c(good_schema_ref_1.copy()).get()); auto s_ref_res = s.get_subject_schema( @@ -141,7 +143,8 @@ SEASTAR_THREAD_TEST_CASE(test_consume_to_store) { auto bad_schema_magic = pps::as_record_batch( pps::schema_key{sequence, node_id, subject0, version0, magic2}, - pps::canonical_schema_value{{subject0, string_def0}, version0, id0}); + pps::canonical_schema_value{ + {subject0, string_def0.share()}, version0, id0}); BOOST_REQUIRE_THROW(c(bad_schema_magic.copy()).get(), pps::exception); BOOST_REQUIRE( @@ -234,7 +237,8 @@ SEASTAR_THREAD_TEST_CASE(test_consume_to_store_after_compaction) { // Insert the schema at seq 0 auto good_schema_1 = pps::as_record_batch( pps::schema_key{sequence, node_id, subject0, version0, magic1}, - pps::canonical_schema_value{{subject0, string_def0}, version0, id0}); + pps::canonical_schema_value{ + {subject0, string_def0.share()}, version0, id0}); BOOST_REQUIRE_NO_THROW(c(good_schema_1.copy()).get()); // Roll the segment // Soft delete the version (at seq 1) diff --git a/src/v/pandaproxy/schema_registry/test/sanitize_avro.cc b/src/v/pandaproxy/schema_registry/test/sanitize_avro.cc index 9a1fb9cfc563d..37a31b7d12325 100644 --- a/src/v/pandaproxy/schema_registry/test/sanitize_avro.cc +++ b/src/v/pandaproxy/schema_registry/test/sanitize_avro.cc @@ -17,7 +17,7 @@ namespace pp = pandaproxy; namespace pps = pp::schema_registry; -pps::unparsed_schema_definition not_minimal{ +const pps::unparsed_schema_definition not_minimal{ R"({ "type": "record", "name": "myrecord", @@ -25,73 +25,73 @@ pps::unparsed_schema_definition not_minimal{ })", pps::schema_type::avro}; -pps::canonical_schema_definition not_minimal_sanitized{ +const pps::canonical_schema_definition not_minimal_sanitized{ R"({"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]})", pps::schema_type::avro}; -pps::unparsed_schema_definition leading_dot{ +const pps::unparsed_schema_definition leading_dot{ R"({"type":"record","name":"record","fields":[{"name":"one","type":["null",{"fields":[{"name":"f1","type":["null","string"]}],"name":".r1","type":"record"}]},{"name":"two","type":["null",".r1"]}]})", pps::schema_type::avro}; -pps::canonical_schema_definition leading_dot_sanitized{ +const pps::canonical_schema_definition leading_dot_sanitized{ R"({"type":"record","name":"record","fields":[{"name":"one","type":["null",{"type":"record","name":"r1","fields":[{"name":"f1","type":["null","string"]}]}]},{"name":"two","type":["null","r1"]}]})", pps::schema_type::avro}; -pps::unparsed_schema_definition leading_dot_ns{ +const pps::unparsed_schema_definition leading_dot_ns{ R"({"type":"record","name":"record","fields":[{"name":"one","type":["null",{"fields":[{"name":"f1","type":["null","string"]}],"name":".ns.r1","type":"record"}]},{"name":"two","type":["null",".ns.r1"]}]})", pps::schema_type::avro}; -pps::canonical_schema_definition leading_dot_ns_sanitized{ +const pps::canonical_schema_definition leading_dot_ns_sanitized{ R"({"type":"record","name":"record","fields":[{"name":"one","type":["null",{"type":"record","name":"r1","namespace":".ns","fields":[{"name":"f1","type":["null","string"]}]}]},{"name":"two","type":["null",".ns.r1"]}]})", pps::schema_type::avro}; -pps::unparsed_schema_definition record_not_sorted{ +const pps::unparsed_schema_definition record_not_sorted{ R"({"name":"sort_record","type":"record","aliases":["alias"],"fields":[{"type":"string","name":"one"}],"namespace":"ns","doc":"doc"})", pps::schema_type::avro}; -pps::canonical_schema_definition record_sorted_sanitized{ +const pps::canonical_schema_definition record_sorted_sanitized{ R"({"type":"record","name":"sort_record","namespace":"ns","doc":"doc","fields":[{"name":"one","type":"string"}],"aliases":["alias"]})", pps::schema_type::avro}; -pps::unparsed_schema_definition enum_not_sorted{ +const pps::unparsed_schema_definition enum_not_sorted{ R"({"name":"ns.sort_enum","type":"enum","aliases":["alias"],"symbols":["one", "two", "three"],"default":"two","doc":"doc"})", pps::schema_type::avro}; -pps::canonical_schema_definition enum_sorted_sanitized{ +const pps::canonical_schema_definition enum_sorted_sanitized{ R"({"type":"enum","name":"sort_enum","namespace":"ns","doc":"doc","symbols":["one","two","three"],"default":"two","aliases":["alias"]})", pps::schema_type::avro}; -pps::unparsed_schema_definition array_not_sorted{ +const pps::unparsed_schema_definition array_not_sorted{ R"({"type": "array", "default": [], "items" : "string"})", pps::schema_type::avro}; -pps::canonical_schema_definition array_sorted_sanitized{ +const pps::canonical_schema_definition array_sorted_sanitized{ R"({"type":"array","items":"string","default":[]})", pps::schema_type::avro}; -pps::unparsed_schema_definition map_not_sorted{ +const pps::unparsed_schema_definition map_not_sorted{ R"({"type": "map", "default": {}, "values" : "string"})", pps::schema_type::avro}; -pps::canonical_schema_definition map_sorted_sanitized{ +const pps::canonical_schema_definition map_sorted_sanitized{ R"({"type":"map","values":"string","default":{}})", pps::schema_type::avro}; -pps::unparsed_schema_definition fixed_not_sorted{ +const pps::unparsed_schema_definition fixed_not_sorted{ R"({"size":16, "type": "fixed", "aliases":["fixed"], "name":"ns.sorted_fixed"})", pps::schema_type::avro}; -pps::canonical_schema_definition fixed_sorted_sanitized{ +const pps::canonical_schema_definition fixed_sorted_sanitized{ R"({"type":"fixed","name":"sorted_fixed","namespace":"ns","size":16,"aliases":["fixed"]})", pps::schema_type::avro}; -pps::unparsed_schema_definition record_of_obj_unsanitized{ +const pps::unparsed_schema_definition record_of_obj_unsanitized{ R"({"name":"sort_record_of_obj","type":"record","fields":[{"type":{"type":"string","connect.parameters":{"tidb_type":"TEXT"}},"default":"","name":"field"}]})", pps::schema_type::avro}; -pps::canonical_schema_definition record_of_obj_sanitized{ +const pps::canonical_schema_definition record_of_obj_sanitized{ R"({"type":"record","name":"sort_record_of_obj","fields":[{"name":"field","type":{"type":"string","connect.parameters":{"tidb_type":"TEXT"}},"default":""}]})", pps::schema_type::avro}; -pps::unparsed_schema_definition namespace_nested_same_unsanitized{ +const pps::unparsed_schema_definition namespace_nested_same_unsanitized{ R"({ "type": "record", "name": "Example", @@ -184,7 +184,7 @@ pps::unparsed_schema_definition namespace_nested_same_unsanitized{ })", pps::schema_type::avro}; -pps::canonical_schema_definition namespace_nested_same_sanitized{ +const pps::canonical_schema_definition namespace_nested_same_sanitized{ ::json::minify( R"({ "type": "record", @@ -280,61 +280,63 @@ pps::canonical_schema_definition namespace_nested_same_sanitized{ BOOST_AUTO_TEST_CASE(test_sanitize_avro_minify) { BOOST_REQUIRE_EQUAL( - pps::sanitize_avro_schema_definition(not_minimal).value(), + pps::sanitize_avro_schema_definition(not_minimal.share()).value(), not_minimal_sanitized); } BOOST_AUTO_TEST_CASE(test_sanitize_avro_name) { BOOST_REQUIRE_EQUAL( - pps::sanitize_avro_schema_definition(leading_dot).value(), + pps::sanitize_avro_schema_definition(leading_dot.share()).value(), leading_dot_sanitized); } BOOST_AUTO_TEST_CASE(test_sanitize_avro_name_ns) { BOOST_REQUIRE_EQUAL( - pps::sanitize_avro_schema_definition(leading_dot_ns).value(), + pps::sanitize_avro_schema_definition(leading_dot_ns.share()).value(), leading_dot_ns_sanitized); } BOOST_AUTO_TEST_CASE(test_sanitize_avro_record_sorting) { BOOST_REQUIRE_EQUAL( - pps::sanitize_avro_schema_definition(record_not_sorted).value(), + pps::sanitize_avro_schema_definition(record_not_sorted.share()).value(), record_sorted_sanitized); } BOOST_AUTO_TEST_CASE(test_sanitize_avro_enum_sorting) { BOOST_REQUIRE_EQUAL( - pps::sanitize_avro_schema_definition(enum_not_sorted).value(), + pps::sanitize_avro_schema_definition(enum_not_sorted.share()).value(), enum_sorted_sanitized); } BOOST_AUTO_TEST_CASE(test_sanitize_avro_array_sorting) { BOOST_REQUIRE_EQUAL( - pps::sanitize_avro_schema_definition(array_not_sorted).value(), + pps::sanitize_avro_schema_definition(array_not_sorted.share()).value(), array_sorted_sanitized); } BOOST_AUTO_TEST_CASE(test_sanitize_avro_map_sorting) { BOOST_REQUIRE_EQUAL( - pps::sanitize_avro_schema_definition(map_not_sorted).value(), + pps::sanitize_avro_schema_definition(map_not_sorted.share()).value(), map_sorted_sanitized); } BOOST_AUTO_TEST_CASE(test_sanitize_avro_fixed_sorting) { BOOST_REQUIRE_EQUAL( - pps::sanitize_avro_schema_definition(fixed_not_sorted).value(), + pps::sanitize_avro_schema_definition(fixed_not_sorted.share()).value(), fixed_sorted_sanitized); } BOOST_AUTO_TEST_CASE(test_sanitize_record_of_obj_sorting) { BOOST_REQUIRE_EQUAL( - pps::sanitize_avro_schema_definition(record_of_obj_unsanitized).value(), + pps::sanitize_avro_schema_definition(record_of_obj_unsanitized.share()) + .value(), record_of_obj_sanitized); } BOOST_AUTO_TEST_CASE(test_namespace_nested_same) { BOOST_REQUIRE_EQUAL( - pps::sanitize_avro_schema_definition(namespace_nested_same_unsanitized) + pps::sanitize_avro_schema_definition( + namespace_nested_same_unsanitized.share()) .value(), namespace_nested_same_sanitized); } @@ -344,9 +346,12 @@ pps::canonical_schema_definition debezium_schema{ pps::schema_type::avro}; BOOST_AUTO_TEST_CASE(test_sanitize_avro_debzium) { - auto unparsed = pandaproxy::schema_registry::unparsed_schema_definition{ - debezium_schema.raw()(), debezium_schema.type()}; + auto unparsed = pps::unparsed_schema_definition{ + pps::unparsed_schema_definition::raw_string{ + debezium_schema.shared_raw()()}, + debezium_schema.type()}; BOOST_REQUIRE_EQUAL( - pps::sanitize_avro_schema_definition(unparsed).value(), debezium_schema); + pps::sanitize_avro_schema_definition(unparsed.share()).value(), + debezium_schema); } diff --git a/src/v/pandaproxy/schema_registry/test/sharded_store.cc b/src/v/pandaproxy/schema_registry/test/sharded_store.cc index 7621cc1e00779..5ec2d277da914 100644 --- a/src/v/pandaproxy/schema_registry/test/sharded_store.cc +++ b/src/v/pandaproxy/schema_registry/test/sharded_store.cc @@ -32,12 +32,12 @@ SEASTAR_THREAD_TEST_CASE(test_sharded_store_referenced_by) { // Insert simple auto referenced_schema = pps::canonical_schema{ - pps::subject{"simple.proto"}, simple}; + pps::subject{"simple.proto"}, simple.share()}; store .upsert( pps::seq_marker{ std::nullopt, std::nullopt, ver1, pps::seq_marker_key_type::schema}, - referenced_schema, + referenced_schema.share(), pps::schema_id{1}, ver1, pps::is_deleted::no) @@ -45,13 +45,13 @@ SEASTAR_THREAD_TEST_CASE(test_sharded_store_referenced_by) { // Insert referenced auto importing_schema = pps::canonical_schema{ - pps::subject{"imported.proto"}, imported}; + pps::subject{"imported.proto"}, imported.share()}; store .upsert( pps::seq_marker{ std::nullopt, std::nullopt, ver1, pps::seq_marker_key_type::schema}, - importing_schema, + importing_schema.share(), pps::schema_id{2}, ver1, pps::is_deleted::no) @@ -79,7 +79,7 @@ SEASTAR_THREAD_TEST_CASE(test_sharded_store_referenced_by) { .upsert( pps::seq_marker{ std::nullopt, std::nullopt, ver1, pps::seq_marker_key_type::schema}, - importing_schema, + importing_schema.share(), pps::schema_id{2}, ver1, pps::is_deleted::yes) @@ -116,18 +116,18 @@ SEASTAR_THREAD_TEST_CASE(test_sharded_store_find_unordered) { // Insert an unsorted schema "onto the topic". auto referenced_schema = pps::canonical_schema{ - pps::subject{"simple.proto"}, simple}; + pps::subject{"simple.proto"}, simple.share()}; store .upsert( pps::seq_marker{ std::nullopt, std::nullopt, ver1, pps::seq_marker_key_type::schema}, - array_unsanitized, + array_unsanitized.share(), pps::schema_id{1}, ver1, pps::is_deleted::no) .get(); - auto res = store.has_schema(array_sanitized).get(); + auto res = store.has_schema(array_sanitized.share()).get(); BOOST_REQUIRE_EQUAL(res.id, pps::schema_id{1}); BOOST_REQUIRE_EQUAL(res.version, ver1); } diff --git a/src/v/pandaproxy/schema_registry/test/store.cc b/src/v/pandaproxy/schema_registry/test/store.cc index 74f424cfc2670..b551d773940a8 100644 --- a/src/v/pandaproxy/schema_registry/test/store.cc +++ b/src/v/pandaproxy/schema_registry/test/store.cc @@ -38,31 +38,31 @@ BOOST_AUTO_TEST_CASE(test_store_insert) { pps::store s; // First insert, expect id{1}, version{1} - auto ins_res = s.insert({subject0, string_def0}); + auto ins_res = s.insert({subject0, string_def0.share()}); BOOST_REQUIRE(ins_res.inserted); BOOST_REQUIRE_EQUAL(ins_res.id, pps::schema_id{1}); BOOST_REQUIRE_EQUAL(ins_res.version, pps::schema_version{1}); // Insert duplicate, expect id{1}, versions{1} - ins_res = s.insert({subject0, string_def0}); + ins_res = s.insert({subject0, string_def0.share()}); BOOST_REQUIRE(!ins_res.inserted); BOOST_REQUIRE_EQUAL(ins_res.id, pps::schema_id{1}); BOOST_REQUIRE_EQUAL(ins_res.version, pps::schema_version{1}); // Insert duplicate, with spaces, expect id{1}, versions{1} - ins_res = s.insert({subject0, string_def1}); + ins_res = s.insert({subject0, string_def1.share()}); BOOST_REQUIRE(!ins_res.inserted); BOOST_REQUIRE_EQUAL(ins_res.id, pps::schema_id{1}); BOOST_REQUIRE_EQUAL(ins_res.version, pps::schema_version{1}); // Insert on different subject, expect id{1}, version{1} - ins_res = s.insert({subject1, string_def0}); + ins_res = s.insert({subject1, string_def0.share()}); BOOST_REQUIRE(ins_res.inserted); BOOST_REQUIRE_EQUAL(ins_res.id, pps::schema_id{1}); BOOST_REQUIRE_EQUAL(ins_res.version, pps::schema_version{1}); // Insert different schema, expect id{2}, version{2} - ins_res = s.insert({subject0, int_def0}); + ins_res = s.insert({subject0, int_def0.share()}); BOOST_REQUIRE(ins_res.inserted); BOOST_REQUIRE_EQUAL(ins_res.id, pps::schema_id{2}); BOOST_REQUIRE_EQUAL(ins_res.version, pps::schema_version{2}); @@ -90,7 +90,7 @@ BOOST_AUTO_TEST_CASE(test_store_upsert_in_order) { BOOST_REQUIRE(upsert( s, subject0, - string_def0, + string_def0.share(), pps::schema_type::avro, pps::schema_id{0}, pps::schema_version{0}, @@ -98,7 +98,7 @@ BOOST_AUTO_TEST_CASE(test_store_upsert_in_order) { BOOST_REQUIRE(upsert( s, subject0, - string_def0, + string_def0.share(), pps::schema_type::avro, pps::schema_id{1}, pps::schema_version{1}, @@ -121,7 +121,7 @@ BOOST_AUTO_TEST_CASE(test_store_upsert_reverse_order) { BOOST_REQUIRE(upsert( s, subject0, - string_def0, + string_def0.share(), pps::schema_type::avro, pps::schema_id{1}, pps::schema_version{1}, @@ -129,7 +129,7 @@ BOOST_AUTO_TEST_CASE(test_store_upsert_reverse_order) { BOOST_REQUIRE(upsert( s, subject0, - string_def0, + string_def0.share(), pps::schema_type::avro, pps::schema_id{0}, pps::schema_version{0}, @@ -152,7 +152,7 @@ BOOST_AUTO_TEST_CASE(test_store_upsert_override) { BOOST_REQUIRE(upsert( s, subject0, - string_def0, + string_def0.share(), pps::schema_type::avro, pps::schema_id{0}, pps::schema_version{0}, @@ -161,7 +161,7 @@ BOOST_AUTO_TEST_CASE(test_store_upsert_override) { BOOST_REQUIRE(!upsert( s, subject0, - int_def0, + int_def0.share(), pps::schema_type::avro, pps::schema_id{0}, pps::schema_version{0}, @@ -177,7 +177,7 @@ BOOST_AUTO_TEST_CASE(test_store_upsert_override) { auto s_res = s.get_subject_schema( subject0, pps::schema_version{0}, pps::include_deleted::no); BOOST_REQUIRE(s_res.has_value()); - BOOST_REQUIRE(s_res.value().schema.def() == int_def0); + BOOST_REQUIRE(s_res.value().schema.def() == int_def0.share()); } BOOST_AUTO_TEST_CASE(test_store_get_schema) { @@ -189,7 +189,7 @@ BOOST_AUTO_TEST_CASE(test_store_get_schema) { BOOST_REQUIRE(err.code() == pps::error_code::schema_id_not_found); // First insert, expect id{1} - auto ins_res = s.insert({subject0, string_def0}); + auto ins_res = s.insert({subject0, string_def0.share()}); BOOST_REQUIRE(ins_res.inserted); BOOST_REQUIRE_EQUAL(ins_res.id, pps::schema_id{1}); BOOST_REQUIRE_EQUAL(ins_res.version, pps::schema_version{1}); @@ -198,7 +198,7 @@ BOOST_AUTO_TEST_CASE(test_store_get_schema) { BOOST_REQUIRE(res.has_value()); auto def = std::move(res).assume_value(); - BOOST_REQUIRE_EQUAL(def, string_def0); + BOOST_REQUIRE_EQUAL(def, string_def0.share()); } BOOST_AUTO_TEST_CASE(test_store_get_schema_subject_versions) { @@ -208,7 +208,7 @@ BOOST_AUTO_TEST_CASE(test_store_get_schema_subject_versions) { // First insert, expect id{1} auto ins_res = s.insert( - {subject0, pps::canonical_schema_definition(schema1)}); + {subject0, pps::canonical_schema_definition(schema1.share())}); BOOST_REQUIRE(ins_res.inserted); BOOST_REQUIRE_EQUAL(ins_res.id, pps::schema_id{1}); BOOST_REQUIRE_EQUAL(ins_res.version, pps::schema_version{1}); @@ -222,7 +222,8 @@ BOOST_AUTO_TEST_CASE(test_store_get_schema_subject_versions) { BOOST_REQUIRE(versions.empty()); // Second insert, expect id{2} - ins_res = s.insert({subject0, pps::canonical_schema_definition(schema2)}); + ins_res = s.insert( + {subject0, pps::canonical_schema_definition(schema2.share())}); BOOST_REQUIRE(ins_res.inserted); BOOST_REQUIRE_EQUAL(ins_res.id, pps::schema_id{2}); BOOST_REQUIRE_EQUAL(ins_res.version, pps::schema_version{2}); @@ -259,7 +260,7 @@ BOOST_AUTO_TEST_CASE(test_store_get_schema_subjects) { // First insert, expect id{1} auto ins_res = s.insert( - {subject0, pps::canonical_schema_definition(schema1)}); + {subject0, pps::canonical_schema_definition(schema1.share())}); BOOST_REQUIRE(ins_res.inserted); BOOST_REQUIRE_EQUAL(ins_res.id, pps::schema_id{1}); BOOST_REQUIRE_EQUAL(ins_res.version, pps::schema_version{1}); @@ -270,13 +271,15 @@ BOOST_AUTO_TEST_CASE(test_store_get_schema_subjects) { BOOST_REQUIRE_EQUAL(absl::c_count_if(subjects, is_equal(subject0)), 1); // Second insert, same schema, expect id{1} - ins_res = s.insert({subject1, pps::canonical_schema_definition(schema1)}); + ins_res = s.insert( + {subject1, pps::canonical_schema_definition(schema1.share())}); BOOST_REQUIRE(ins_res.inserted); BOOST_REQUIRE_EQUAL(ins_res.id, pps::schema_id{1}); BOOST_REQUIRE_EQUAL(ins_res.version, pps::schema_version{1}); // Insert yet another schema associated with a different subject - ins_res = s.insert({subject2, pps::canonical_schema_definition(schema2)}); + ins_res = s.insert( + {subject2, pps::canonical_schema_definition(schema2.share())}); BOOST_REQUIRE(ins_res.inserted); BOOST_REQUIRE_EQUAL(ins_res.id, pps::schema_id{2}); BOOST_REQUIRE_EQUAL(ins_res.version, pps::schema_version{1}); @@ -326,7 +329,7 @@ BOOST_AUTO_TEST_CASE(test_store_get_subject_schema) { BOOST_REQUIRE(err.code() == pps::error_code::subject_not_found); // First insert, expect id{1}, version{1} - auto ins_res = s.insert({subject0, string_def0}); + auto ins_res = s.insert({subject0, string_def0.share()}); BOOST_REQUIRE(ins_res.inserted); BOOST_REQUIRE_EQUAL(ins_res.id, pps::schema_id{1}); BOOST_REQUIRE_EQUAL(ins_res.version, pps::schema_version{1}); @@ -339,10 +342,10 @@ BOOST_AUTO_TEST_CASE(test_store_get_subject_schema) { BOOST_REQUIRE_EQUAL(val.id, pps::schema_id{1}); BOOST_REQUIRE_EQUAL(val.version, pps::schema_version{1}); BOOST_REQUIRE_EQUAL(val.deleted, pps::is_deleted::no); - BOOST_REQUIRE_EQUAL(val.schema.def(), string_def0); + BOOST_REQUIRE_EQUAL(val.schema.def(), string_def0.share()); // Second insert, expect id{1}, version{1} - ins_res = s.insert({subject0, string_def0}); + ins_res = s.insert({subject0, string_def0.share()}); BOOST_REQUIRE(!ins_res.inserted); BOOST_REQUIRE_EQUAL(ins_res.id, pps::schema_id{1}); @@ -354,7 +357,7 @@ BOOST_AUTO_TEST_CASE(test_store_get_subject_schema) { BOOST_REQUIRE_EQUAL(val.id, pps::schema_id{1}); BOOST_REQUIRE_EQUAL(val.version, pps::schema_version{1}); BOOST_REQUIRE_EQUAL(val.deleted, pps::is_deleted::no); - BOOST_REQUIRE_EQUAL(val.schema.def(), string_def0); + BOOST_REQUIRE_EQUAL(val.schema.def(), string_def0.share()); // Request bad version res = s.get_subject_schema( @@ -368,7 +371,7 @@ BOOST_AUTO_TEST_CASE(test_store_get_versions) { pps::store s; // First insert, expect id{1}, version{1} - s.insert({subject0, string_def0}); + s.insert({subject0, string_def0.share()}); auto versions = s.get_versions(subject0, pps::include_deleted::no); BOOST_REQUIRE(versions.has_value()); @@ -376,7 +379,7 @@ BOOST_AUTO_TEST_CASE(test_store_get_versions) { BOOST_REQUIRE_EQUAL(versions.value().front(), pps::schema_version{1}); // Insert duplicate, expect id{1}, versions{1} - s.insert({subject0, string_def0}); + s.insert({subject0, string_def0.share()}); versions = s.get_versions(subject0, pps::include_deleted::no); BOOST_REQUIRE(versions.has_value()); @@ -384,7 +387,7 @@ BOOST_AUTO_TEST_CASE(test_store_get_versions) { BOOST_REQUIRE_EQUAL(versions.value().front(), pps::schema_version{1}); // Insert different schema, expect id{2}, version{2} - s.insert({subject0, int_def0}); + s.insert({subject0, int_def0.share()}); versions = s.get_versions(subject0, pps::include_deleted::no); BOOST_REQUIRE(versions.has_value()); @@ -404,13 +407,13 @@ BOOST_AUTO_TEST_CASE(test_store_get_subjects) { BOOST_REQUIRE(subjects.empty()); // First insert - s.insert({subject0, string_def0}); + s.insert({subject0, string_def0.share()}); subjects = s.get_subjects(pps::include_deleted::no); BOOST_REQUIRE_EQUAL(subjects.size(), 1); BOOST_REQUIRE_EQUAL(absl::c_count_if(subjects, is_equal(subject0)), 1); // second insert - s.insert({subject1, string_def0}); + s.insert({subject1, string_def0.share()}); subjects = s.get_subjects(pps::include_deleted::no); BOOST_REQUIRE(subjects.size() == 2); BOOST_REQUIRE_EQUAL(absl::c_count_if(subjects, is_equal(subject0)), 1); @@ -476,7 +479,7 @@ BOOST_AUTO_TEST_CASE(test_store_subject_compat) { pps::compatibility_level::backward}; pps::store s; BOOST_REQUIRE(s.get_compatibility().value() == global_expected); - s.insert({subject0, string_def0}); + s.insert({subject0, string_def0.share()}); auto sub_expected = pps::compatibility_level::backward; BOOST_REQUIRE( @@ -514,7 +517,7 @@ BOOST_AUTO_TEST_CASE(test_store_subject_compat_fallback) { pps::compatibility_level expected{pps::compatibility_level::backward}; pps::store s; - s.insert({subject0, string_def0}); + s.insert({subject0, string_def0.share()}); BOOST_REQUIRE(s.get_compatibility(subject0, fallback).value() == expected); expected = pps::compatibility_level::forward; @@ -562,8 +565,8 @@ BOOST_AUTO_TEST_CASE(test_store_delete_subject) { pps::error_code::subject_not_found); // First insert, expect id{1}, version{1} - s.insert({subject0, string_def0}); - s.insert({subject0, int_def0}); + s.insert({subject0, string_def0.share()}); + s.insert({subject0, int_def0.share()}); auto v_res = s.get_versions(subject0, pps::include_deleted::no); BOOST_REQUIRE(v_res.has_value()); @@ -672,8 +675,8 @@ BOOST_AUTO_TEST_CASE(test_store_delete_subject_version) { pps::error_code::subject_not_found); // First insert, expect id{1}, version{1} - s.insert({subject0, string_def0}); - s.insert({subject0, int_def0}); + s.insert({subject0, string_def0.share()}); + s.insert({subject0, int_def0.share()}); auto v_res = s.get_versions(subject0, pps::include_deleted::no); BOOST_REQUIRE(v_res.has_value()); @@ -743,9 +746,9 @@ BOOST_AUTO_TEST_CASE(test_store_subject_version_latest) { s.set_compatibility(pps::compatibility_level::none).value(); // First insert, expect id{1}, version{1} - s.insert({subject0, string_def0}); + s.insert({subject0, string_def0.share()}); // First insert, expect id{2}, version{2} - s.insert({subject0, int_def0}); + s.insert({subject0, int_def0.share()}); // Test latest auto latest = s.get_subject_version_id( @@ -791,8 +794,8 @@ BOOST_AUTO_TEST_CASE(test_store_delete_subject_after_delete_version) { pps::seq_marker dummy_marker; // First insert, expect id{1}, version{1} - s.insert({subject0, string_def0}); - s.insert({subject0, int_def0}); + s.insert({subject0, string_def0.share()}); + s.insert({subject0, int_def0.share()}); // delete version 1 s.upsert_subject( diff --git a/src/v/pandaproxy/schema_registry/test/test_json_schema.cc b/src/v/pandaproxy/schema_registry/test/test_json_schema.cc index 8ace1c941f929..4ca02886770d9 100644 --- a/src/v/pandaproxy/schema_registry/test/test_json_schema.cc +++ b/src/v/pandaproxy/schema_registry/test/test_json_schema.cc @@ -192,7 +192,7 @@ struct test_references_data { pps::unparsed_schema schema; pps::error_info result; }; - std::vector _schemas; + std::array _schemas; }; const auto referenced = pps::unparsed_schema{ @@ -231,7 +231,7 @@ const auto referencer = pps::unparsed_schema{ const auto referencer_wrong_sub = pps::unparsed_schema{ referencer.sub(), pps::unparsed_schema_definition{ - referencer.def().raw(), + referencer.def().shared_raw(), referencer.def().type(), {pps::schema_reference{ .name = "example.com/referenced.json", @@ -240,13 +240,13 @@ const auto referencer_wrong_sub = pps::unparsed_schema{ const std::array test_reference_cases = { // Referece correct subject - test_references_data{{{referenced, {}}, {referencer, {}}}}, + test_references_data{{{{referenced.share(), {}}, {referencer.share(), {}}}}}, // Reference wrong subject test_references_data{ - {{referenced, {}}, - {referencer_wrong_sub, - {pps::error_code::schema_empty, - R"(Invalid schema {subject=referencer,version=0,id=-1,schemaType=JSON,references=[{name='example.com/referenced.json', subject='wrong_sub', version=1}],metadata=null,ruleSet=null,schema={ + {{{referenced.share(), {}}, + {referencer_wrong_sub.share(), + {pps::error_code::schema_empty, + R"(Invalid schema {subject=referencer,version=0,id=-1,schemaType=JSON,references=[{name='example.com/referenced.json', subject='wrong_sub', version=1}],metadata=null,ruleSet=null,schema={ "description": "A schema that references the base schema", "type": "object", "properties": { @@ -254,7 +254,7 @@ const std::array test_reference_cases = { "$ref": "example.com/referenced.json" } } -}} with refs [{name='example.com/referenced.json', subject='wrong_sub', version=1}] of type JSON, details: No schema reference found for subject "wrong_sub" and version 1)"}}}}}; +}} with refs [{name='example.com/referenced.json', subject='wrong_sub', version=1}] of type JSON, details: No schema reference found for subject "wrong_sub" and version 1)"}}}}}}; SEASTAR_THREAD_TEST_CASE(test_json_schema_references) { for (const auto& test : test_reference_cases) { @@ -265,7 +265,7 @@ SEASTAR_THREAD_TEST_CASE(test_json_schema_references) { pps::schema_version ver{0}; pps::canonical_schema canonical{}; auto make_canonical = [&]() { - canonical = f.store.make_canonical_schema(schema).get(); + canonical = f.store.make_canonical_schema(schema.share()).get(); }; if (result.code() == pps::error_code{}) { @@ -280,7 +280,11 @@ SEASTAR_THREAD_TEST_CASE(test_json_schema_references) { } f.store .upsert( - pps::seq_marker{}, canonical, ++id, ++ver, pps::is_deleted::no) + pps::seq_marker{}, + canonical.share(), + ++id, + ++ver, + pps::is_deleted::no) .get(); } } diff --git a/src/v/pandaproxy/schema_registry/types.h b/src/v/pandaproxy/schema_registry/types.h index 6248dfed8ec7a..7b0ad638d0b33 100644 --- a/src/v/pandaproxy/schema_registry/types.h +++ b/src/v/pandaproxy/schema_registry/types.h @@ -120,6 +120,15 @@ class typed_schema_definition { using raw_string = named_type; using references = std::vector; + typed_schema_definition() = default; + typed_schema_definition(typed_schema_definition&&) noexcept = default; + typed_schema_definition(const typed_schema_definition&) = delete; + typed_schema_definition& operator=(typed_schema_definition&&) noexcept + = default; + typed_schema_definition& operator=(const typed_schema_definition& other) + = delete; + ~typed_schema_definition() noexcept = default; + template typed_schema_definition(T&& def, schema_type type) : _def{ss::sstring{std::forward(def)}} @@ -143,10 +152,24 @@ class typed_schema_definition { const raw_string& raw() const& { return _def; } raw_string raw() && { return std::move(_def); } + raw_string shared_raw() const& { + // temporarily implemented with copy before the type is changed + return _def; + } const references& refs() const& { return _refs; } references refs() && { return std::move(_refs); } + typed_schema_definition share() const { + return {shared_raw(), type(), refs()}; + } + + typed_schema_definition copy() const { return {_def, type(), refs()}; } + + auto destructure() && { + return make_tuple(std::move(_def), _type, std::move(_refs)); + } + private: raw_string _def; schema_type _type{schema_type::avro}; @@ -417,6 +440,13 @@ class typed_schema { const schema_definition& def() const& { return _def; } schema_definition def() && { return std::move(_def); } + typed_schema share() const { return {sub(), def().share()}; } + typed_schema copy() const { return {sub(), def().copy()}; } + + auto destructure() && { + return make_tuple(std::move(_sub), std::move(_def)); + } + private: subject _sub{invalid_subject}; schema_definition _def{"", schema_type::avro}; @@ -431,6 +461,9 @@ struct subject_schema { schema_version version{invalid_schema_version}; schema_id id{invalid_schema_id}; is_deleted deleted{false}; + subject_schema share() const { + return {schema.share(), version, id, deleted}; + } }; enum class compatibility_level { diff --git a/src/v/pandaproxy/schema_registry/util.h b/src/v/pandaproxy/schema_registry/util.h index bac6fae1bdff7..1882fed6faf68 100644 --- a/src/v/pandaproxy/schema_registry/util.h +++ b/src/v/pandaproxy/schema_registry/util.h @@ -54,4 +54,9 @@ make_schema_definition(std::string_view sv) { ss::sstring{str_buf.GetString(), str_buf.GetSize()}}; } +template +ss::sstring to_string(typename typed_schema_definition::raw_string def) { + return def; +} + } // namespace pandaproxy::schema_registry diff --git a/src/v/pandaproxy/schema_registry/validation.cc b/src/v/pandaproxy/schema_registry/validation.cc index f87c3dfe770f9..438faea03d4e5 100644 --- a/src/v/pandaproxy/schema_registry/validation.cc +++ b/src/v/pandaproxy/schema_registry/validation.cc @@ -103,17 +103,19 @@ std::vector get_proto_offsets(iobuf_parser& p) { ss::future> get_record_name( pandaproxy::schema_registry::sharded_store& store, subject_name_strategy sns, - const canonical_schema_definition& schema, + canonical_schema_definition schema, std::optional>& offsets) { + vlog(plog.warn, "get_record_name: sns: {}, schema: {}", sns, schema); if (sns == subject_name_strategy::topic_name) { // Result is succesfully nothing co_return ""; } - switch (schema.type()) { + auto schema_type = schema.type(); + switch (schema_type) { case schema_type::avro: { auto s = co_await make_avro_schema_definition( - store, {subject("r"), {schema.raw(), schema.type()}}); + store, {subject("r"), {std::move(schema).raw(), schema_type}}); co_return s().root()->name().fullname(); } break; case schema_type::protobuf: { @@ -121,7 +123,7 @@ ss::future> get_record_name( co_return std::nullopt; } auto s = co_await make_protobuf_schema_definition( - store, {subject("r"), {schema.raw(), schema.type()}}); + store, {subject("r"), {std::move(schema).raw(), schema_type}}); auto r = s.name(*offsets); if (!r) { co_return std::nullopt; @@ -268,7 +270,7 @@ class schema_id_validator::impl { } auto record_name = co_await get_record_name( - *_api->_store, sns, *schema, proto_offsets); + *_api->_store, sns, *std::move(schema), proto_offsets); if (!record_name) { vlog( plog.debug, diff --git a/src/v/wasm/schema_registry.cc b/src/v/wasm/schema_registry.cc index 27af71ea0a6c6..191ac299149f7 100644 --- a/src/v/wasm/schema_registry.cc +++ b/src/v/wasm/schema_registry.cc @@ -49,7 +49,7 @@ class schema_registry_impl : public schema_registry { create_schema(ppsr::unparsed_schema schema) override { auto [reader, writer] = co_await service(); co_await writer->read_sync(); - auto parsed = co_await reader->make_canonical_schema(schema); + auto parsed = co_await reader->make_canonical_schema(std::move(schema)); co_return co_await writer->write_subject_version( {.schema = std::move(parsed)}); } diff --git a/src/v/wasm/schema_registry_module.cc b/src/v/wasm/schema_registry_module.cc index 382def3fc38e9..bbd41e65a7f5c 100644 --- a/src/v/wasm/schema_registry_module.cc +++ b/src/v/wasm/schema_registry_module.cc @@ -198,9 +198,8 @@ ss::future schema_registry_module::create_subject_schema( ffi::reader r(buf); using namespace pandaproxy::schema_registry; try { - auto unparsed = read_encoded_schema_def(&r); *out_schema_id = co_await _sr->create_schema( - unparsed_schema(sub, unparsed)); + unparsed_schema(sub, read_encoded_schema_def(&r))); } catch (const std::exception& ex) { vlog(wasm_log.warn, "error registering subject schema: {}", ex); co_return SCHEMA_REGISTRY_ERROR; diff --git a/src/v/wasm/tests/wasm_fixture.cc b/src/v/wasm/tests/wasm_fixture.cc index 28dd8251ca9bf..ff98fa9f2d823 100644 --- a/src/v/wasm/tests/wasm_fixture.cc +++ b/src/v/wasm/tests/wasm_fixture.cc @@ -56,7 +56,7 @@ class fake_schema_registry : public wasm::schema_registry { get_schema_definition(ppsr::schema_id id) const override { for (const auto& s : _schemas) { if (s.id == id) { - co_return s.schema.def(); + co_return s.schema.def().share(); } } throw std::runtime_error("unknown schema id"); @@ -75,9 +75,9 @@ class fake_schema_registry : public wasm::schema_registry { if (found && found->version > s.version) { continue; } - found = s; + found.emplace(s.share()); } - co_return found.value(); + co_return std::move(found).value(); } ss::future @@ -95,13 +95,15 @@ class fake_schema_registry : public wasm::schema_registry { } } // TODO: validate references too + auto [sub, unparsed_def] = std::move(unparsed).destructure(); + auto [def, type, refs] = std::move(unparsed_def).destructure(); _schemas.push_back({ .schema = ppsr::canonical_schema( - unparsed.sub(), + std::move(sub), ppsr::canonical_schema_definition( - unparsed.def().raw(), - unparsed.def().type(), - unparsed.def().refs())), + ppsr::canonical_schema_definition::raw_string{std::move(def)()}, + type, + std::move(refs))), .version = version + 1, .id = ppsr::schema_id(int32_t(_schemas.size() + 1)), .deleted = ppsr::is_deleted::no, @@ -109,7 +111,7 @@ class fake_schema_registry : public wasm::schema_registry { co_return _schemas.back().id; } - std::vector get_all() { return _schemas; } + const std::vector& get_all() { return _schemas; } private: std::vector _schemas; @@ -230,7 +232,7 @@ model::record_batch WasmTestFixture::make_tiny_batch(iobuf record_value) { b.add_raw_kv(model::test::make_iobuf(), std::move(record_value)); return std::move(b).build(); } -std::vector +const std::vector& WasmTestFixture::registered_schemas() const { return _sr->get_all(); } diff --git a/src/v/wasm/tests/wasm_fixture.h b/src/v/wasm/tests/wasm_fixture.h index 49dd7089d9475..40add9072f1bb 100644 --- a/src/v/wasm/tests/wasm_fixture.h +++ b/src/v/wasm/tests/wasm_fixture.h @@ -60,7 +60,7 @@ class WasmTestFixture : public ::testing::Test { wasm::engine* engine() { return _engine.get(); } - std::vector + const std::vector& registered_schemas() const; std::vector log_lines() const { return _log_lines; } diff --git a/src/v/wasm/tests/wasm_transform_test.cc b/src/v/wasm/tests/wasm_transform_test.cc index 421cf32e4df78..a41f61729203f 100644 --- a/src/v/wasm/tests/wasm_transform_test.cc +++ b/src/v/wasm/tests/wasm_transform_test.cc @@ -87,7 +87,7 @@ std::string generate_example_avro_record( TEST_F(WasmTestFixture, SchemaRegistry) { // Test an example schema registry encoded avro value -> JSON transform load_wasm("schema-registry.wasm"); - auto schemas = registered_schemas(); + const auto& schemas = registered_schemas(); ASSERT_EQ(schemas.size(), 1); ASSERT_EQ(schemas[0].id, 1); iobuf record_value;