Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v24.1.x] sr/avro: fix missing default compat check for null (v2) #24429

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 23 additions & 12 deletions src/v/pandaproxy/schema_registry/avro.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,18 +115,29 @@ avro_compatibility_result check_compatible(
*reader.leafAt(int(r_idx)),
*writer.leafAt(int(w_idx)),
fields_p / std::to_string(r_idx) / "type"));
} else if (
reader.defaultValueAt(int(r_idx)).type() == avro::AVRO_NULL) {
// if the reader's record schema has a field with no default
// value, and writer's schema does not have a field with the
// same name, an error is signalled.

// For union, the default must correspond to the first type.
// The default may be null.
const auto& r_leaf = reader.leafAt(int(r_idx));
if (
r_leaf->type() != avro::Type::AVRO_UNION
|| r_leaf->leafAt(0)->type() != avro::Type::AVRO_NULL) {
} else {
// if the reader's record schema has a field with no
// default value, and writer's schema does not have a
// field with the same name, an error is signalled.
// For union, the default must correspond to the first
// type.
const auto& def = reader.defaultValueAt(int(r_idx));

// Note: this code is overly restrictive for null-type
// fields with null defaults. This is because the Avro API
// is not expressive enough to differentiate the two.
// Union type field's default set to null:
// def=GenericDatum(Union(Null))
// Union type field's default missing:
// def=GenericDatum(Null)
// Null type field's default set to null:
// def=GenericDatum(Null)
// Null type field's default missing:
// def=GenericDatum(Null)
auto default_unset = !def.isUnion()
&& def.type() == avro::AVRO_NULL;

if (default_unset) {
compat_result.emplace<avro_incompatibility>(
fields_p / std::to_string(r_idx),
avro_incompatibility::Type::
Expand Down
183 changes: 161 additions & 22 deletions src/v/pandaproxy/schema_registry/test/compatibility_avro.cc
Original file line number Diff line number Diff line change
Expand Up @@ -325,9 +325,7 @@ SEASTAR_THREAD_TEST_CASE(test_avro_alias_resolution_stopgap) {

namespace {

const auto schema_old = pps::sanitize_avro_schema_definition(
{
R"({
const auto schema_old = R"({
"type": "record",
"name": "myrecord",
"fields": [
Expand Down Expand Up @@ -392,13 +390,9 @@ const auto schema_old = pps::sanitize_avro_schema_definition(
}
}
]
})",
pps::schema_type::avro})
.value();
})";

const auto schema_new = pps::sanitize_avro_schema_definition(
{
R"({
const auto schema_new = R"({
"type": "record",
"name": "myrecord",
"fields": [
Expand Down Expand Up @@ -464,9 +458,7 @@ const auto schema_new = pps::sanitize_avro_schema_definition(
}
}
]
})",
pps::schema_type::avro})
.value();
})";

using incompatibility = pps::avro_incompatibility;

Expand Down Expand Up @@ -534,16 +526,150 @@ const absl::flat_hash_set<incompatibility> backward_expected{
"expected: someEnum1 (alias resolution is not yet fully supported)"},
};

const auto compat_data = std::to_array<compat_test_data<incompatibility>>({
struct compat_test_case {
std::string reader;
std::string writer;
absl::flat_hash_set<incompatibility> expected;
};

const auto compat_data = std::to_array<compat_test_case>({
{
.reader=schema_old,
.writer=schema_new,
.expected=forward_expected,
},
{
schema_old.share(),
schema_new.share(),
forward_expected,
.reader=schema_new,
.writer=schema_old,
.expected=backward_expected,
},
{
schema_new.share(),
schema_old.share(),
backward_expected,
.reader=R"({
"type": "record",
"name": "car",
"fields": [
{
"name": "color",
"type": ["null", "string"]
}
]
})",
.writer=R"({
"type": "record",
"name": "car",
"fields": []
})",
.expected={
{"/fields/0",
incompatibility::Type::reader_field_missing_default_value,
"color"},
},
},
{
.reader=R"({
"type": "record",
"name": "car",
"fields": [
{
"name": "color",
"type": ["string", "null"]
}
]
})",
.writer=R"({
"type": "record",
"name": "car",
"fields": []
})",
.expected={
{"/fields/0",
incompatibility::Type::reader_field_missing_default_value,
"color"},
},
},
{
.reader = R"({
"type": "record",
"name": "car",
"fields": [
{
"name": "color",
"type": ["null", "string"],
"default": null
}
]
})",
.writer = R"({
"type": "record",
"name": "car",
"fields": []
})",
.expected = {},
},
{
.reader=R"({
"type": "record",
"name": "car",
"fields": [
{
"name": "color",
"type": "null"
}
]
})",
.writer=R"({
"type": "record",
"name": "car",
"fields": []
})",
.expected={
{"/fields/0",
incompatibility::Type::reader_field_missing_default_value,
"color"},
},
},
{
.reader=R"({
"type": "record",
"name": "car",
"fields": [
{
"name": "color",
"type": "string",
"default": "somevalue"
}
]
})",
.writer=R"({
"type": "record",
"name": "car",
"fields": []
})",
.expected={},
},{
.reader=R"({
"type": "record",
"name": "car",
"fields": [
{
"name": "color",
"type": "null",
"default": null
}
]
})",
.writer=R"({
"type": "record",
"name": "car",
"fields": []
})",
.expected={
// Note: this is overly restrictive for null-type fields with null defaults.
// This is because the Avro API is not expressive enough to differentiate the two.
{"/fields/0",
incompatibility::Type::reader_field_missing_default_value,
"color"},
},
},
});

Expand All @@ -555,13 +681,26 @@ std::string format_set(const absl::flat_hash_set<ss::sstring>& d) {

SEASTAR_THREAD_TEST_CASE(test_avro_compat_messages) {
for (const auto& cd : compat_data) {
auto compat = check_compatible_verbose(cd.reader, cd.writer);
auto compat = check_compatible_verbose(
pps::sanitize_avro_schema_definition(
{cd.reader, pps::schema_type::avro})
.value(),
pps::sanitize_avro_schema_definition(
{cd.writer, pps::schema_type::avro})
.value());

pps::raw_compatibility_result raw;
absl::c_for_each(cd.expected, [&raw](auto e) {
raw.emplace<incompatibility>(std::move(e));
});
auto exp_compat = std::move(raw)(pps::verbose::yes);

absl::flat_hash_set<ss::sstring> errs{
compat.messages.begin(), compat.messages.end()};
absl::flat_hash_set<ss::sstring> expected{
cd.expected.messages.begin(), cd.expected.messages.end()};
exp_compat.messages.begin(), exp_compat.messages.end()};

BOOST_CHECK(!compat.is_compat);
BOOST_CHECK_EQUAL(compat.is_compat, exp_compat.is_compat);
BOOST_CHECK_EQUAL(errs.size(), expected.size());
BOOST_REQUIRE_MESSAGE(
errs == expected,
Expand Down
Loading