Skip to content

Commit

Permalink
Merge pull request #24429 from vbotbuildovich/backport-pr-24032-v24.1…
Browse files Browse the repository at this point in the history
….x-806

[v24.1.x] sr/avro: fix missing default compat check for null (v2)
  • Loading branch information
pgellert authored Dec 4, 2024
2 parents 393225f + 41c3a3a commit f52ed38
Show file tree
Hide file tree
Showing 2 changed files with 184 additions and 34 deletions.
35 changes: 23 additions & 12 deletions src/v/pandaproxy/schema_registry/avro.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,18 +115,29 @@ avro_compatibility_result check_compatible(
*reader.leafAt(int(r_idx)),
*writer.leafAt(int(w_idx)),
fields_p / std::to_string(r_idx) / "type"));
} else if (
reader.defaultValueAt(int(r_idx)).type() == avro::AVRO_NULL) {
// if the reader's record schema has a field with no default
// value, and writer's schema does not have a field with the
// same name, an error is signalled.

// For union, the default must correspond to the first type.
// The default may be null.
const auto& r_leaf = reader.leafAt(int(r_idx));
if (
r_leaf->type() != avro::Type::AVRO_UNION
|| r_leaf->leafAt(0)->type() != avro::Type::AVRO_NULL) {
} else {
// if the reader's record schema has a field with no
// default value, and writer's schema does not have a
// field with the same name, an error is signalled.
// For union, the default must correspond to the first
// type.
const auto& def = reader.defaultValueAt(int(r_idx));

// Note: this code is overly restrictive for null-type
// fields with null defaults. This is because the Avro API
// is not expressive enough to differentiate the two.
// Union type field's default set to null:
// def=GenericDatum(Union(Null))
// Union type field's default missing:
// def=GenericDatum(Null)
// Null type field's default set to null:
// def=GenericDatum(Null)
// Null type field's default missing:
// def=GenericDatum(Null)
auto default_unset = !def.isUnion()
&& def.type() == avro::AVRO_NULL;

if (default_unset) {
compat_result.emplace<avro_incompatibility>(
fields_p / std::to_string(r_idx),
avro_incompatibility::Type::
Expand Down
183 changes: 161 additions & 22 deletions src/v/pandaproxy/schema_registry/test/compatibility_avro.cc
Original file line number Diff line number Diff line change
Expand Up @@ -325,9 +325,7 @@ SEASTAR_THREAD_TEST_CASE(test_avro_alias_resolution_stopgap) {

namespace {

const auto schema_old = pps::sanitize_avro_schema_definition(
{
R"({
const auto schema_old = R"({
"type": "record",
"name": "myrecord",
"fields": [
Expand Down Expand Up @@ -392,13 +390,9 @@ const auto schema_old = pps::sanitize_avro_schema_definition(
}
}
]
})",
pps::schema_type::avro})
.value();
})";

const auto schema_new = pps::sanitize_avro_schema_definition(
{
R"({
const auto schema_new = R"({
"type": "record",
"name": "myrecord",
"fields": [
Expand Down Expand Up @@ -464,9 +458,7 @@ const auto schema_new = pps::sanitize_avro_schema_definition(
}
}
]
})",
pps::schema_type::avro})
.value();
})";

using incompatibility = pps::avro_incompatibility;

Expand Down Expand Up @@ -534,16 +526,150 @@ const absl::flat_hash_set<incompatibility> backward_expected{
"expected: someEnum1 (alias resolution is not yet fully supported)"},
};

const auto compat_data = std::to_array<compat_test_data<incompatibility>>({
struct compat_test_case {
std::string reader;
std::string writer;
absl::flat_hash_set<incompatibility> expected;
};

const auto compat_data = std::to_array<compat_test_case>({
{
.reader=schema_old,
.writer=schema_new,
.expected=forward_expected,
},
{
schema_old.share(),
schema_new.share(),
forward_expected,
.reader=schema_new,
.writer=schema_old,
.expected=backward_expected,
},
{
schema_new.share(),
schema_old.share(),
backward_expected,
.reader=R"({
"type": "record",
"name": "car",
"fields": [
{
"name": "color",
"type": ["null", "string"]
}
]
})",
.writer=R"({
"type": "record",
"name": "car",
"fields": []
})",
.expected={
{"/fields/0",
incompatibility::Type::reader_field_missing_default_value,
"color"},
},
},
{
.reader=R"({
"type": "record",
"name": "car",
"fields": [
{
"name": "color",
"type": ["string", "null"]
}
]
})",
.writer=R"({
"type": "record",
"name": "car",
"fields": []
})",
.expected={
{"/fields/0",
incompatibility::Type::reader_field_missing_default_value,
"color"},
},
},
{
.reader = R"({
"type": "record",
"name": "car",
"fields": [
{
"name": "color",
"type": ["null", "string"],
"default": null
}
]
})",
.writer = R"({
"type": "record",
"name": "car",
"fields": []
})",
.expected = {},
},
{
.reader=R"({
"type": "record",
"name": "car",
"fields": [
{
"name": "color",
"type": "null"
}
]
})",
.writer=R"({
"type": "record",
"name": "car",
"fields": []
})",
.expected={
{"/fields/0",
incompatibility::Type::reader_field_missing_default_value,
"color"},
},
},
{
.reader=R"({
"type": "record",
"name": "car",
"fields": [
{
"name": "color",
"type": "string",
"default": "somevalue"
}
]
})",
.writer=R"({
"type": "record",
"name": "car",
"fields": []
})",
.expected={},
},{
.reader=R"({
"type": "record",
"name": "car",
"fields": [
{
"name": "color",
"type": "null",
"default": null
}
]
})",
.writer=R"({
"type": "record",
"name": "car",
"fields": []
})",
.expected={
// Note: this is overly restrictive for null-type fields with null defaults.
// This is because the Avro API is not expressive enough to differentiate the two.
{"/fields/0",
incompatibility::Type::reader_field_missing_default_value,
"color"},
},
},
});

Expand All @@ -555,13 +681,26 @@ std::string format_set(const absl::flat_hash_set<ss::sstring>& d) {

SEASTAR_THREAD_TEST_CASE(test_avro_compat_messages) {
for (const auto& cd : compat_data) {
auto compat = check_compatible_verbose(cd.reader, cd.writer);
auto compat = check_compatible_verbose(
pps::sanitize_avro_schema_definition(
{cd.reader, pps::schema_type::avro})
.value(),
pps::sanitize_avro_schema_definition(
{cd.writer, pps::schema_type::avro})
.value());

pps::raw_compatibility_result raw;
absl::c_for_each(cd.expected, [&raw](auto e) {
raw.emplace<incompatibility>(std::move(e));
});
auto exp_compat = std::move(raw)(pps::verbose::yes);

absl::flat_hash_set<ss::sstring> errs{
compat.messages.begin(), compat.messages.end()};
absl::flat_hash_set<ss::sstring> expected{
cd.expected.messages.begin(), cd.expected.messages.end()};
exp_compat.messages.begin(), exp_compat.messages.end()};

BOOST_CHECK(!compat.is_compat);
BOOST_CHECK_EQUAL(compat.is_compat, exp_compat.is_compat);
BOOST_CHECK_EQUAL(errs.size(), expected.size());
BOOST_REQUIRE_MESSAGE(
errs == expected,
Expand Down

0 comments on commit f52ed38

Please sign in to comment.