From 837cc6f65611c78e5521c1b0724d2581c122f113 Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Mon, 15 Jul 2024 13:46:51 -0700 Subject: [PATCH 1/3] iceberg: add logical data types Adds initial logical types[1] that will be used to represent an Iceberg schema. These include "primitives" (basic types like int, float, string, etc) as well as complex types (types that are composed of other types, like list, map, struct). Complex types are represented in Iceberg as composed of "nested fields" which are types that are themselves composed of other types (primitives or complex types). This PR introduces the recursive definition of these types, with a basic equality operator and ostream operator (note, this is for JSON serialization, that will come in a following commit). The implementation of these types is similar in style to the Iceberg Rust library, though we are using std::variants instead of Rust enums, since C++ enums aren't expressive enough to express complex nested enums with members. --- src/v/iceberg/CMakeLists.txt | 6 +- src/v/iceberg/datatypes.cc | 263 +++++++++++++++++++++++ src/v/iceberg/datatypes.h | 132 ++++++++++++ src/v/iceberg/tests/CMakeLists.txt | 1 + src/v/iceberg/tests/datatypes_test.cc | 295 ++++++++++++++++++++++++++ 5 files changed, 696 insertions(+), 1 deletion(-) create mode 100644 src/v/iceberg/datatypes.cc create mode 100644 src/v/iceberg/datatypes.h create mode 100644 src/v/iceberg/tests/datatypes_test.cc diff --git a/src/v/iceberg/CMakeLists.txt b/src/v/iceberg/CMakeLists.txt index 6eb193ab3fc6..68453ba77483 100644 --- a/src/v/iceberg/CMakeLists.txt +++ b/src/v/iceberg/CMakeLists.txt @@ -26,7 +26,11 @@ endforeach() v_cc_library( NAME iceberg - SRCS ${avro_hdrs} + SRCS + ${avro_hdrs} + datatypes.cc + DEPS + v::container ) add_subdirectory(tests) diff --git a/src/v/iceberg/datatypes.cc b/src/v/iceberg/datatypes.cc new file mode 100644 index 000000000000..ec3f20449b8b --- /dev/null +++ b/src/v/iceberg/datatypes.cc @@ -0,0 +1,263 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#include "iceberg/datatypes.h" + +#include + +namespace iceberg { + +struct primitive_type_comparison_visitor { + template + bool operator()(const T&, const U&) const { + return false; + } + bool operator()(const decimal_type& lhs, const decimal_type& rhs) const { + return lhs.precision == rhs.precision && lhs.scale == rhs.scale; + } + bool operator()(const fixed_type& lhs, const fixed_type& rhs) const { + return lhs.length == rhs.length; + } + template + bool operator()(const T&, const T&) const { + return true; + } +}; + +bool operator==(const primitive_type& lhs, const primitive_type& rhs) { + return std::visit(primitive_type_comparison_visitor{}, lhs, rhs); +} + +struct field_type_comparison_visitor { + template + bool operator()(const T&, const U&) const { + return false; + } + bool + operator()(const primitive_type& lhs, const primitive_type& rhs) const { + return lhs == rhs; + } + bool operator()(const struct_type& lhs, const struct_type& rhs) const { + return lhs == rhs; + } + bool operator()(const list_type& lhs, const list_type& rhs) const { + return lhs == rhs; + } + bool operator()(const map_type& lhs, const map_type& rhs) const { + return lhs == rhs; + } +}; + +bool operator==(const field_type& lhs, const field_type& rhs) { + return std::visit(field_type_comparison_visitor{}, lhs, rhs); +} +bool operator==(const nested_field& lhs, const nested_field& rhs) { + return lhs.id == rhs.id && lhs.required == rhs.required + && lhs.name == rhs.name && lhs.type == rhs.type; +} + +std::ostream& operator<<(std::ostream& o, const boolean_type&) { + o << "boolean"; + return o; +} + +std::ostream& operator<<(std::ostream& o, const int_type&) { + o << "int"; + return o; +} + +std::ostream& operator<<(std::ostream& o, const long_type&) { + o << "long"; + return o; +} + +std::ostream& operator<<(std::ostream& o, const float_type&) { + o << "float"; + return o; +} + +std::ostream& operator<<(std::ostream& o, const double_type&) { + o << "double"; + return o; +} + +std::ostream& operator<<(std::ostream& o, const decimal_type& t) { + o << fmt::format("decimal({}, {})", t.precision, t.scale); + return o; +} + +std::ostream& operator<<(std::ostream& o, const date_type&) { + o << "date"; + return o; +} + +std::ostream& operator<<(std::ostream& o, const time_type&) { + o << "time"; + return o; +} + +std::ostream& operator<<(std::ostream& o, const timestamp_type&) { + o << "timestamp"; + return o; +} + +std::ostream& operator<<(std::ostream& o, const timestamptz_type&) { + o << "timestamptz"; + return o; +} + +std::ostream& operator<<(std::ostream& o, const string_type&) { + o << "string"; + return o; +} + +std::ostream& operator<<(std::ostream& o, const uuid_type&) { + o << "uuid"; + return o; +} + +std::ostream& operator<<(std::ostream& o, const fixed_type& t) { + // NOTE: square brackets to match how fixed type is serialized as JSON, + // though this matching isn't necessarily important for operator<<. + o << fmt::format("fixed[{}]", t.length); + return o; +} + +std::ostream& operator<<(std::ostream& o, const binary_type&) { + o << "binary"; + return o; +} + +std::ostream& operator<<(std::ostream& o, const struct_type&) { + o << "struct"; + return o; +} + +std::ostream& operator<<(std::ostream& o, const list_type&) { + o << "list"; + return o; +} + +std::ostream& operator<<(std::ostream& o, const map_type&) { + o << "map"; + return o; +} + +bool operator==(const struct_type& lhs, const struct_type& rhs) { + if (lhs.fields.size() != rhs.fields.size()) { + return false; + } + for (size_t i = 0; i < lhs.fields.size(); i++) { + bool has_lhs = lhs.fields[i] != nullptr; + bool has_rhs = rhs.fields[i] != nullptr; + if (has_lhs != has_rhs) { + return false; + } + if (has_lhs == false) { + continue; + } + if (*lhs.fields[i] != *rhs.fields[i]) { + return false; + } + } + return true; +} +bool operator==(const list_type& lhs, const list_type& rhs) { + bool has_lhs = lhs.element_field != nullptr; + bool has_rhs = rhs.element_field != nullptr; + if (has_lhs != has_rhs) { + return false; + } + if (!has_lhs) { + // Both nullptr. + return true; + } + return *lhs.element_field == *rhs.element_field; +} +bool operator==(const map_type& lhs, const map_type& rhs) { + bool has_key_lhs = lhs.key_field != nullptr; + bool has_key_rhs = rhs.key_field != nullptr; + if (has_key_lhs != has_key_rhs) { + return false; + } + bool has_val_lhs = lhs.value_field != nullptr; + bool has_val_rhs = rhs.value_field != nullptr; + if (has_val_lhs != has_val_rhs) { + return false; + } + if (has_key_lhs && *lhs.key_field != *rhs.key_field) { + return false; + } + if (has_val_lhs && *lhs.value_field != *rhs.value_field) { + return false; + } + return true; +} + +namespace { +struct ostream_visitor { + explicit ostream_visitor(std::ostream& o) + : os(o) {} + std::ostream& os; + + template + void operator()(const T& v) const { + os << v; + } +}; +} // namespace + +std::ostream& operator<<(std::ostream& o, const primitive_type& t) { + std::visit(ostream_visitor{o}, t); + return o; +} + +std::ostream& operator<<(std::ostream& o, const field_type& t) { + std::visit(ostream_visitor{o}, t); + return o; +} + +list_type list_type::create( + int32_t element_id, field_required element_required, field_type element) { + // NOTE: the element field doesn't have a name. Functionally, the list type + // is represented as: + // - element-id + // - element-type + // - element-required + // Despite the missing name though, many Iceberg implementations represent + // the list with a nested_field. + return list_type{ + .element_field = nested_field::create( + element_id, "element", element_required, std::move(element))}; +} + +map_type map_type::create( + int32_t key_id, + field_type key_type, + int32_t val_id, + field_required val_req, + field_type val_type) { + // NOTE: the keys and values don't have names, and the key is always + // required. Functionally, a map type is represented as: + // - key-id + // - key-type + // - value-id + // - value-required + // - value-type + // Despite the missing names though, many Iceberg implementations represent + // the map with two nested_fields. + return map_type{ + .key_field = nested_field::create( + key_id, "key", field_required::yes, std::move(key_type)), + .value_field = nested_field::create( + val_id, "value", val_req, std::move(val_type)), + }; +} + +} // namespace iceberg diff --git a/src/v/iceberg/datatypes.h b/src/v/iceberg/datatypes.h new file mode 100644 index 000000000000..328e94e18736 --- /dev/null +++ b/src/v/iceberg/datatypes.h @@ -0,0 +1,132 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 +#pragma once + +#include "base/seastarx.h" +#include "container/fragmented_vector.h" +#include "utils/named_type.h" + +#include +#include + +#include + +namespace iceberg { + +struct boolean_type {}; +struct int_type {}; +struct long_type {}; +struct float_type {}; +struct double_type {}; +struct decimal_type { + uint32_t precision; + uint32_t scale; +}; +struct date_type {}; +struct time_type {}; +struct timestamp_type {}; +struct timestamptz_type {}; +struct string_type {}; +struct uuid_type {}; +struct fixed_type { + uint64_t length; +}; +struct binary_type {}; +using primitive_type = std::variant< + boolean_type, + int_type, + long_type, + float_type, + double_type, + decimal_type, + date_type, + time_type, + timestamp_type, + timestamptz_type, + string_type, + uuid_type, + fixed_type, + binary_type>; +bool operator==(const primitive_type& lhs, const primitive_type& rhs); + +struct struct_type; +struct list_type; +struct map_type; +using field_type + = std::variant; +bool operator==(const field_type& lhs, const field_type& rhs); + +std::ostream& operator<<(std::ostream&, const boolean_type&); +std::ostream& operator<<(std::ostream&, const int_type&); +std::ostream& operator<<(std::ostream&, const long_type&); +std::ostream& operator<<(std::ostream&, const float_type&); +std::ostream& operator<<(std::ostream&, const double_type&); +std::ostream& operator<<(std::ostream&, const decimal_type&); +std::ostream& operator<<(std::ostream&, const date_type&); +std::ostream& operator<<(std::ostream&, const time_type&); +std::ostream& operator<<(std::ostream&, const timestamp_type&); +std::ostream& operator<<(std::ostream&, const timestamptz_type&); +std::ostream& operator<<(std::ostream&, const string_type&); +std::ostream& operator<<(std::ostream&, const uuid_type&); +std::ostream& operator<<(std::ostream&, const fixed_type&); +std::ostream& operator<<(std::ostream&, const binary_type&); +std::ostream& operator<<(std::ostream&, const struct_type&); +std::ostream& operator<<(std::ostream&, const list_type&); +std::ostream& operator<<(std::ostream&, const map_type&); +std::ostream& operator<<(std::ostream&, const primitive_type&); +std::ostream& operator<<(std::ostream&, const field_type&); + +struct nested_field; +using nested_field_ptr = std::unique_ptr; +using field_required = ss::bool_class; +struct struct_type { + chunked_vector fields; + friend bool operator==(const struct_type& lhs, const struct_type& rhs); +}; + +struct list_type { + nested_field_ptr element_field; + friend bool operator==(const list_type& lhs, const list_type& rhs); + + static list_type create( + int32_t element_id, field_required element_required, field_type element); +}; + +struct map_type { + nested_field_ptr key_field; + nested_field_ptr value_field; + friend bool operator==(const map_type& lhs, const map_type& rhs); + + static map_type create( + int32_t key_id, + field_type key_type, + int32_t val_id, + field_required val_req, + field_type val_type); +}; + +struct nested_field { + using id_t = named_type; + id_t id; + ss::sstring name; + field_required required; + field_type type; + std::optional doc; + // TODO: support initial-default and write-default optional literals. + + static nested_field_ptr + create(int32_t id, ss::sstring name, field_required req, field_type t) { + return std::make_unique( + id_t{id}, std::move(name), req, std::move(t), std::nullopt); + } + + friend bool operator==(const nested_field& lhs, const nested_field& rhs); +}; + +} // namespace iceberg diff --git a/src/v/iceberg/tests/CMakeLists.txt b/src/v/iceberg/tests/CMakeLists.txt index 309e47ca80ef..a1334d15becf 100644 --- a/src/v/iceberg/tests/CMakeLists.txt +++ b/src/v/iceberg/tests/CMakeLists.txt @@ -6,6 +6,7 @@ rp_test( USE_CWD BINARY_NAME iceberg SOURCES + datatypes_test.cc manifest_serialization_test.cc LIBRARIES Avro::avro diff --git a/src/v/iceberg/tests/datatypes_test.cc b/src/v/iceberg/tests/datatypes_test.cc new file mode 100644 index 000000000000..f48322642493 --- /dev/null +++ b/src/v/iceberg/tests/datatypes_test.cc @@ -0,0 +1,295 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#include "iceberg/datatypes.h" + +#include + +using namespace iceberg; + +chunked_vector all_types() { + chunked_vector all_types; + + // Primitive types. + all_types.emplace_back(boolean_type{}); + all_types.emplace_back(int_type{}); + all_types.emplace_back(long_type{}); + all_types.emplace_back(float_type{}); + all_types.emplace_back(double_type{}); + all_types.emplace_back(decimal_type{.precision = 123, .scale = 456}); + all_types.emplace_back(date_type{}); + all_types.emplace_back(time_type{}); + all_types.emplace_back(timestamp_type{}); + all_types.emplace_back(timestamptz_type{}); + all_types.emplace_back(string_type{}); + all_types.emplace_back(uuid_type{}); + all_types.emplace_back(fixed_type{ + .length = 789, + }); + all_types.emplace_back(binary_type{}); + + // Complex types. + all_types.emplace_back( + list_type::create(1, field_required::yes, string_type{})); + chunked_vector struct_fields; + struct_fields.emplace_back( + nested_field::create(1, "foo", field_required::yes, string_type{})); + all_types.emplace_back(struct_type{ + .fields = std::move(struct_fields), + }); + all_types.emplace_back(map_type::create( + 0, string_type{}, 1, field_required::yes, string_type{})); + return all_types; +}; + +// Use the equality operator to check that the given type only exists once in +// the given list. +void check_single_type_exists( + const field_type& expected_type, + const chunked_vector& all_types) { + size_t num_eq = 0; + size_t num_ne = 0; + for (const auto& t : all_types) { + if (t == expected_type) { + ++num_eq; + } + if (t != expected_type) { + ++num_ne; + } + } + ASSERT_EQ(num_eq, 1); + ASSERT_EQ(num_ne, all_types.size() - 1); +} + +TEST(DatatypeTest, TestTypesEquality) { + auto types = all_types(); + for (const auto& t : types) { + ASSERT_NO_FATAL_FAILURE(check_single_type_exists(t, types)); + } +} +TEST(DatatypeTest, TestBoolean) { + ASSERT_EQ(field_type{boolean_type{}}, field_type{boolean_type{}}); + ASSERT_EQ("boolean", fmt::format("{}", boolean_type{})); + ASSERT_EQ("boolean", fmt::format("{}", field_type{boolean_type{}})); +} +TEST(DatatypeTest, TestInt) { + ASSERT_EQ(field_type{int_type{}}, field_type{int_type{}}); + ASSERT_EQ("int", fmt::format("{}", int_type{})); + ASSERT_EQ("int", fmt::format("{}", field_type{int_type{}})); +} +TEST(DatatypeTest, TestLong) { + ASSERT_EQ(field_type{long_type{}}, field_type{long_type{}}); + ASSERT_EQ("long", fmt::format("{}", long_type{})); + ASSERT_EQ("long", fmt::format("{}", field_type{long_type{}})); +} +TEST(DatatypeTest, TestFloat) { + ASSERT_EQ(field_type{float_type{}}, field_type{float_type{}}); + ASSERT_EQ("float", fmt::format("{}", float_type{})); + ASSERT_EQ("float", fmt::format("{}", field_type{float_type{}})); +} +TEST(DatatypeTest, TestDouble) { + ASSERT_EQ(field_type{double_type{}}, field_type{double_type{}}); + ASSERT_EQ("double", fmt::format("{}", double_type{})); + ASSERT_EQ("double", fmt::format("{}", field_type{double_type{}})); +} +TEST(DatatypeTest, TestDecimal) { + decimal_type t1{ + .precision = 123, + .scale = 456, + }; + auto t1_copy = t1; + auto t2 = t1; + t2.scale = 0; + auto t3 = t1; + t3.precision = 0; + ASSERT_EQ(field_type{t1}, field_type{t1}); + ASSERT_EQ(field_type{t1}, field_type{t1_copy}); + ASSERT_NE(field_type{t1}, field_type{t2}); + ASSERT_NE(field_type{t1}, field_type{t3}); + ASSERT_EQ("decimal(123, 456)", fmt::format("{}", t1)); + ASSERT_EQ("decimal(123, 456)", fmt::format("{}", t1_copy)); + ASSERT_EQ("decimal(123, 0)", fmt::format("{}", t2)); + ASSERT_EQ("decimal(0, 456)", fmt::format("{}", t3)); + ASSERT_EQ("decimal(123, 456)", fmt::format("{}", field_type{t1})); + ASSERT_EQ("decimal(123, 456)", fmt::format("{}", field_type{t1_copy})); + ASSERT_EQ("decimal(123, 0)", fmt::format("{}", field_type{t2})); + ASSERT_EQ("decimal(0, 456)", fmt::format("{}", field_type{t3})); +} +TEST(DatatypeTest, TestDate) { + ASSERT_EQ(field_type{date_type{}}, field_type{date_type{}}); + ASSERT_EQ("date", fmt::format("{}", date_type{})); + ASSERT_EQ("date", fmt::format("{}", field_type{date_type{}})); +} +TEST(DatatypeTest, TestTime) { + ASSERT_EQ(field_type{time_type{}}, field_type{time_type{}}); + ASSERT_EQ("time", fmt::format("{}", time_type{})); + ASSERT_EQ("time", fmt::format("{}", field_type{time_type{}})); +} +TEST(DatatypeTest, TestTimestamp) { + ASSERT_EQ(field_type{timestamp_type{}}, field_type{timestamp_type{}}); + ASSERT_EQ("timestamp", fmt::format("{}", timestamp_type{})); + ASSERT_EQ("timestamp", fmt::format("{}", field_type{timestamp_type{}})); +} +TEST(DatatypeTest, TestTimestamptz) { + ASSERT_EQ(field_type{timestamptz_type{}}, field_type{timestamptz_type{}}); + ASSERT_EQ("timestamptz", fmt::format("{}", timestamptz_type{})); + ASSERT_EQ("timestamptz", fmt::format("{}", field_type{timestamptz_type{}})); +} +TEST(DatatypeTest, TestString) { + ASSERT_EQ(field_type{string_type{}}, field_type{string_type{}}); + ASSERT_EQ("string", fmt::format("{}", string_type{})); + ASSERT_EQ("string", fmt::format("{}", field_type{string_type{}})); +} +TEST(DatatypeTest, TestUuid) { + ASSERT_EQ(field_type{uuid_type{}}, field_type{uuid_type{}}); + ASSERT_EQ("uuid", fmt::format("{}", uuid_type{})); + ASSERT_EQ("uuid", fmt::format("{}", field_type{uuid_type{}})); +} +TEST(DatatypeTest, TestFixed) { + fixed_type t1{ + .length = 12345, + }; + auto t1_copy = t1; + fixed_type t2{ + .length = 54321, + }; + ASSERT_EQ(field_type{t1}, field_type{t1}); + ASSERT_EQ(field_type{t1}, field_type{t1_copy}); + ASSERT_NE(field_type{t1}, field_type{t2}); + ASSERT_EQ("fixed[12345]", fmt::format("{}", t1)); + ASSERT_EQ("fixed[12345]", fmt::format("{}", t1_copy)); + ASSERT_EQ("fixed[54321]", fmt::format("{}", t2)); + ASSERT_EQ("fixed[12345]", fmt::format("{}", field_type{t1})); + ASSERT_EQ("fixed[12345]", fmt::format("{}", field_type{t1_copy})); + ASSERT_EQ("fixed[54321]", fmt::format("{}", field_type{t2})); +} +TEST(DatatypeTest, TestBinary) { + ASSERT_EQ(field_type{binary_type{}}, field_type{binary_type{}}); + ASSERT_EQ("binary", fmt::format("{}", binary_type{})); + ASSERT_EQ("binary", fmt::format("{}", field_type{binary_type{}})); +} +TEST(DatatypeTest, TestList) { + auto t1 = field_type{ + list_type::create(1, field_required::yes, boolean_type{})}; + auto t1_dup = field_type{ + list_type::create(1, field_required::yes, boolean_type{})}; + ASSERT_EQ(t1, t1); + ASSERT_EQ(t1, t1_dup); + + auto t2 = field_type{ + list_type::create(2, field_required::yes, boolean_type{})}; + auto t3 = field_type{ + list_type::create(1, field_required::no, boolean_type{})}; + auto t4 = field_type{ + list_type::create(1, field_required::yes, string_type{})}; + ASSERT_NE(t1, t2); + ASSERT_NE(t1, t3); + ASSERT_NE(t1, t4); + + // Moving the type will empty it. + auto t1_move = std::move(t1); + // NOLINTBEGIN(bugprone-use-after-move) + ASSERT_TRUE(std::holds_alternative(t1)); + ASSERT_TRUE(std::get(t1).element_field == nullptr); + ASSERT_NE(t1_move, t1); + ASSERT_EQ("list", fmt::format("{}", t1)); + // NOLINTEND(bugprone-use-after-move) + + ASSERT_EQ(t1_move, t1_dup); + ASSERT_EQ("list", fmt::format("{}", t2)); + ASSERT_EQ("list", fmt::format("{}", t3)); + ASSERT_EQ("list", fmt::format("{}", t4)); + ASSERT_EQ("list", fmt::format("{}", t1_dup)); + ASSERT_EQ("list", fmt::format("{}", t1_move)); +} +TEST(DatatypeTest, TestMap) { + auto t1 = field_type{map_type::create( + 0, string_type{}, 1, field_required::yes, string_type{})}; + auto t1_dup = field_type{map_type::create( + 0, string_type{}, 1, field_required::yes, string_type{})}; + ASSERT_EQ(t1, t1); + ASSERT_EQ(t1, t1_dup); + + auto t2 = field_type{map_type::create( + 1, string_type{}, 1, field_required::yes, string_type{})}; + auto t3 = field_type{map_type::create( + 0, boolean_type{}, 1, field_required::yes, string_type{})}; + auto t4 = field_type{ + map_type::create(0, string_type{}, 2, field_required::no, string_type{})}; + auto t5 = field_type{map_type::create( + 0, string_type{}, 1, field_required::yes, boolean_type{})}; + ASSERT_NE(t1, t2); + ASSERT_NE(t1, t3); + ASSERT_NE(t1, t4); + ASSERT_NE(t1, t5); + + // Moving the type will empty it. + auto t1_move = std::move(t1); + // NOLINTBEGIN(bugprone-use-after-move) + ASSERT_TRUE(std::holds_alternative(t1)); + ASSERT_TRUE(std::get(t1).key_field == nullptr); + ASSERT_TRUE(std::get(t1).value_field == nullptr); + ASSERT_NE(t1_move, t1); + ASSERT_EQ("map", fmt::format("{}", t1)); + // NOLINTEND(bugprone-use-after-move) + + ASSERT_EQ(t1_move, t1_dup); + ASSERT_EQ("map", fmt::format("{}", t2)); + ASSERT_EQ("map", fmt::format("{}", t3)); + ASSERT_EQ("map", fmt::format("{}", t4)); + ASSERT_EQ("map", fmt::format("{}", t1_dup)); + ASSERT_EQ("map", fmt::format("{}", t1_move)); + + // Regression test that would cause previous impl to crash if both types + // are the same but keys are null. + std::get(t1_move).key_field.reset(); + std::get(t1_dup).key_field.reset(); + ASSERT_EQ(t1_move, t1_dup); +} +TEST(DatatypeTest, TestStruct) { + // Constructs a struct_type with a single field. + auto struct_single = + [](int32_t i, const ss::sstring& name, field_required req, field_type t) { + chunked_vector struct_fields; + struct_fields.emplace_back( + nested_field::create(i, name, req, std::move(t))); + return field_type{struct_type{ + .fields = std::move(struct_fields), + }}; + }; + auto t1 = struct_single(0, "foo", field_required::yes, string_type{}); + auto t1_dup = struct_single(0, "foo", field_required::yes, string_type{}); + auto t2 = struct_single(1, "foo", field_required::yes, string_type{}); + auto t3 = struct_single(0, "food", field_required::yes, string_type{}); + auto t4 = struct_single(0, "foo", field_required::no, string_type{}); + auto t5 = struct_single(0, "foo", field_required::yes, boolean_type{}); + ASSERT_EQ(t1, t1); + ASSERT_EQ(t1, t1_dup); + ASSERT_NE(t1, t2); + ASSERT_NE(t1, t3); + ASSERT_NE(t1, t4); + ASSERT_NE(t1, t5); + + // Moving the type will empty it. + auto t1_move = std::move(t1); + // NOLINTBEGIN(bugprone-use-after-move) + ASSERT_TRUE(std::holds_alternative(t1)); + ASSERT_TRUE(std::get(t1).fields.empty()); + ASSERT_NE(t1_move, t1); + ASSERT_EQ("struct", fmt::format("{}", t1)); + // NOLINTEND(bugprone-use-after-move) + + ASSERT_EQ(t1_move, t1_dup); + ASSERT_EQ("struct", fmt::format("{}", t2)); + ASSERT_EQ("struct", fmt::format("{}", t3)); + ASSERT_EQ("struct", fmt::format("{}", t4)); + ASSERT_EQ("struct", fmt::format("{}", t5)); + ASSERT_EQ("struct", fmt::format("{}", t1_dup)); + ASSERT_EQ("struct", fmt::format("{}", t1_move)); +} From 5f16ed77cd6dcc1cf0201d041aa6eec90f5a8998 Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Mon, 15 Jul 2024 18:53:50 -0700 Subject: [PATCH 2/3] iceberg: introduce json utils Adds some helpers for operating on the json::Value class, that will be useful in building JSON parsing for Iceberg metadata types. Long term it may make sense to move these to some general purpose module, but since this is only used in Iceberg for now, and to avoid distractions, the utilities are just added to the Iceberg module. --- src/v/iceberg/CMakeLists.txt | 1 + src/v/iceberg/json_utils.cc | 72 ++++++++++++++++++++++++++++++++++++ src/v/iceberg/json_utils.h | 27 ++++++++++++++ 3 files changed, 100 insertions(+) create mode 100644 src/v/iceberg/json_utils.cc create mode 100644 src/v/iceberg/json_utils.h diff --git a/src/v/iceberg/CMakeLists.txt b/src/v/iceberg/CMakeLists.txt index 68453ba77483..595624965eda 100644 --- a/src/v/iceberg/CMakeLists.txt +++ b/src/v/iceberg/CMakeLists.txt @@ -29,6 +29,7 @@ v_cc_library( SRCS ${avro_hdrs} datatypes.cc + json_utils.cc DEPS v::container ) diff --git a/src/v/iceberg/json_utils.cc b/src/v/iceberg/json_utils.cc new file mode 100644 index 000000000000..565482a4d60c --- /dev/null +++ b/src/v/iceberg/json_utils.cc @@ -0,0 +1,72 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#include "json/document.h" + +#include + +#include + +namespace iceberg { + +std::optional> +parse_optional(const json::Value& v, const char* member_name) { + if (!v.IsObject()) { + throw std::invalid_argument( + fmt::format("Expected JSON object to parse field '{}'", member_name)); + } + auto iter = v.FindMember(member_name); + if (iter == v.MemberEnd()) { + return std::nullopt; + } + return iter->value; +} + +const json::Value& +parse_required(const json::Value& v, const char* member_name) { + if (!v.IsObject()) { + throw std::invalid_argument( + fmt::format("Expected JSON object to parse field '{}'", member_name)); + } + auto iter = v.FindMember(member_name); + if (iter == v.MemberEnd()) { + throw std::invalid_argument( + fmt::format("No member named '{}'", member_name)); + } + return iter->value; +} + +ss::sstring parse_required_str(const json::Value& v, const char* member_name) { + const auto& str_json = parse_required(v, member_name); + if (!str_json.IsString()) { + throw std::invalid_argument( + fmt::format("Expected string for field '{}'", member_name)); + } + return str_json.GetString(); +} + +int32_t parse_required_i32(const json::Value& v, const char* member_name) { + const auto& int_json = parse_required(v, member_name); + if (!int_json.IsInt()) { + throw std::invalid_argument( + fmt::format("Expected integer for field '{}'", member_name)); + } + return int_json.GetInt(); +} + +bool parse_required_bool(const json::Value& v, const char* member_name) { + const auto& bool_json = parse_required(v, member_name); + if (!bool_json.IsBool()) { + throw std::invalid_argument( + fmt::format("Expected bool for field '{}'", member_name)); + } + return bool_json.GetBool(); +} + +} // namespace iceberg diff --git a/src/v/iceberg/json_utils.h b/src/v/iceberg/json_utils.h new file mode 100644 index 000000000000..d84d8c8f550e --- /dev/null +++ b/src/v/iceberg/json_utils.h @@ -0,0 +1,27 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 +#pragma once + +#include "json/document.h" + +namespace iceberg { + +std::optional> +parse_optional(const json::Value& v, const char* member_name); + +const json::Value& +parse_required(const json::Value& v, const char* member_name); + +ss::sstring parse_required_str(const json::Value& v, const char* member_name); + +int32_t parse_required_i32(const json::Value& v, const char* member_name); + +bool parse_required_bool(const json::Value& v, const char* member_name); + +} // namespace iceberg From f4f231a8430a985a0ff3dacf6ce20809ff2bfcf6 Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Mon, 15 Jul 2024 02:04:08 -0700 Subject: [PATCH 3/3] iceberg: json serde for data types Adds JSON serialization for newly added Iceberg data types. A test is added to demonstrate the roundtrip with a type comprised of multiple nested fields. --- src/v/iceberg/CMakeLists.txt | 3 + src/v/iceberg/datatypes_json.cc | 251 +++++++++++++++++++++ src/v/iceberg/datatypes_json.h | 42 ++++ src/v/iceberg/tests/CMakeLists.txt | 1 + src/v/iceberg/tests/datatypes_json_test.cc | 201 +++++++++++++++++ 5 files changed, 498 insertions(+) create mode 100644 src/v/iceberg/datatypes_json.cc create mode 100644 src/v/iceberg/datatypes_json.h create mode 100644 src/v/iceberg/tests/datatypes_json_test.cc diff --git a/src/v/iceberg/CMakeLists.txt b/src/v/iceberg/CMakeLists.txt index 595624965eda..003cbfe00817 100644 --- a/src/v/iceberg/CMakeLists.txt +++ b/src/v/iceberg/CMakeLists.txt @@ -29,9 +29,12 @@ v_cc_library( SRCS ${avro_hdrs} datatypes.cc + datatypes_json.cc json_utils.cc DEPS v::container + v::json + v::strings ) add_subdirectory(tests) diff --git a/src/v/iceberg/datatypes_json.cc b/src/v/iceberg/datatypes_json.cc new file mode 100644 index 000000000000..f9dceea7c109 --- /dev/null +++ b/src/v/iceberg/datatypes_json.cc @@ -0,0 +1,251 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#include "iceberg/datatypes_json.h" + +#include "iceberg/datatypes.h" +#include "iceberg/json_utils.h" +#include "json/document.h" +#include "strings/string_switch.h" + +#include +#include + +namespace iceberg { + +namespace { +std::string_view +extract_between(char start_ch, char end_ch, const std::string_view& s) { + auto start_pos = s.find(start_ch); + auto end_pos = s.find(end_ch, start_pos); + + if (start_pos != std::string::npos && end_pos != std::string::npos) { + return s.substr(start_pos + 1, end_pos - start_pos - 1); + } + throw std::invalid_argument( + fmt::format("Missing wrappers '{}' or '{}' in {}", start_ch, end_ch, s)); +} + +decimal_type parse_decimal(std::string_view type_str) { + auto ps_str = extract_between('(', ')', type_str); + size_t pos = ps_str.find(','); + if (pos != std::string::npos) { + // Extract substrings before and after the comma + auto p_str = ps_str.substr(0, pos); + auto s_str = ps_str.substr(pos + 1); + return decimal_type{ + .precision = static_cast(std::stoul(ss::sstring(p_str))), + .scale = static_cast(std::stoul(ss::sstring(s_str)))}; + } + throw std::invalid_argument(fmt::format( + "Decimal requires format decimal(uint32, uint32): {}", type_str)); +} + +fixed_type parse_fixed(std::string_view type_str) { + auto l_str = extract_between('[', ']', type_str); + auto l = std::stoull(ss::sstring(l_str)); + return fixed_type{l}; +} + +} // namespace + +struct_type parse_struct(const json::Value& v) { + struct_type ret; + const auto& fields_json = parse_required(v, "fields"); + const auto& fields_array = fields_json.GetArray(); + ret.fields.reserve(fields_array.Size()); + for (const auto& field_json : fields_array) { + ret.fields.emplace_back(parse_field(field_json)); + } + return ret; +} + +list_type parse_list(const json::Value& v) { + const auto& element_json = parse_required(v, "element"); + return list_type::create( + parse_required_i32(v, "element-id"), + parse_required_bool(v, "element-required") ? field_required::yes + : field_required::no, + parse_type(element_json)); +} + +map_type parse_map(const json::Value& v) { + const auto& key_json = parse_required(v, "key"); + const auto& val_json = parse_required(v, "value"); + auto value_required = parse_required_bool(v, "value-required"); + return map_type::create( + parse_required_i32(v, "key-id"), + parse_type(key_json), + parse_required_i32(v, "value-id"), + value_required ? field_required::yes : field_required::no, + parse_type(val_json)); +} + +nested_field_ptr parse_field(const json::Value& v) { + auto id = parse_required_i32(v, "id"); + auto name = parse_required_str(v, "name"); + auto required = parse_required_bool(v, "required"); + const auto& type_json = parse_required(v, "type"); + auto type = parse_type(type_json); + return nested_field::create( + id, + std::move(name), + required ? field_required::yes : field_required::no, + std::move(type)); +} + +field_type parse_type(const json::Value& v) { + if (v.IsString()) { + auto v_str = std::string_view{v.GetString(), v.GetStringLength()}; + if (v_str.starts_with("decimal")) { + return parse_decimal(v_str); + } + if (v_str.starts_with("fixed")) { + return parse_fixed(v_str); + } + return string_switch(v_str) + .match("boolean", boolean_type{}) + .match("int", int_type{}) + .match("long", long_type{}) + .match("float", float_type{}) + .match("double", double_type{}) + .match("date", date_type{}) + .match("time", time_type{}) + .match("timestamp", timestamp_type{}) + .match("timestamptz", timestamptz_type{}) + .match("string", string_type{}) + .match("uuid", uuid_type{}) + .match("binary", binary_type{}); + } + if (!v.IsObject()) { + throw std::invalid_argument("Expected string or object for type field"); + } + auto type = parse_required_str(v, "type"); + if (type == "struct") { + return parse_struct(v); + } else if (type == "list") { + return parse_list(v); + } else if (type == "map") { + return parse_map(v); + } + throw std::invalid_argument(fmt::format( + "Expected type field of 'struct', 'list', or 'map': {}", type)); +} + +} // namespace iceberg + +namespace json { + +namespace { +class rjson_visitor { +public: + explicit rjson_visitor(json::Writer& w) + : w(w) {} + void operator()(const iceberg::boolean_type&) { w.String("boolean"); } + void operator()(const iceberg::int_type&) { w.String("int"); } + void operator()(const iceberg::long_type&) { w.String("long"); } + void operator()(const iceberg::float_type&) { w.String("float"); } + void operator()(const iceberg::double_type&) { w.String("double"); } + void operator()(const iceberg::decimal_type& t) { + w.String("decimal({}, {})", t.precision, t.scale); + } + void operator()(const iceberg::date_type&) { w.String("date"); } + void operator()(const iceberg::time_type&) { w.String("time"); } + void operator()(const iceberg::timestamp_type&) { w.String("timestamp"); } + void operator()(const iceberg::timestamptz_type&) { + w.String("timestamptz"); + } + void operator()(const iceberg::string_type&) { w.String("string"); } + void operator()(const iceberg::uuid_type&) { w.String("uuid"); } + void operator()(const iceberg::fixed_type& t) { + w.String(fmt::format("fixed[{}]", t.length)); + } + void operator()(const iceberg::binary_type&) { w.String("binary"); } + + void operator()(const iceberg::primitive_type& t) { rjson_serialize(w, t); } + void operator()(const iceberg::struct_type& t) { rjson_serialize(w, t); } + void operator()(const iceberg::list_type& t) { rjson_serialize(w, t); } + void operator()(const iceberg::map_type& t) { rjson_serialize(w, t); } + +private: + json::Writer& w; +}; +} // anonymous namespace + +void rjson_serialize( + json::Writer& w, const iceberg::nested_field& f) { + w.StartObject(); + w.Key("id"); + w.Int(f.id()); + w.Key("name"); + w.String(f.name); + w.Key("required"); + w.Bool(bool(f.required)); + w.Key("type"); + rjson_serialize(w, f.type); + w.EndObject(); +} + +void rjson_serialize( + json::Writer& w, const iceberg::primitive_type& t) { + std::visit(rjson_visitor{w}, t); +} + +void rjson_serialize( + json::Writer& w, const iceberg::struct_type& t) { + w.StartObject(); + w.Key("type"); + w.String("struct"); + w.Key("fields"); + w.StartArray(); + for (const auto& f : t.fields) { + rjson_serialize(w, *f); + } + w.EndArray(); + w.EndObject(); +} + +void rjson_serialize( + json::Writer& w, const iceberg::list_type& t) { + w.StartObject(); + w.Key("type"); + w.String("list"); + w.Key("element-id"); + w.Int(t.element_field->id); + w.Key("element-required"); + w.Bool(bool(t.element_field->required)); + w.Key("element"); + rjson_serialize(w, t.element_field->type); + w.EndObject(); +} + +void rjson_serialize( + json::Writer& w, const iceberg::map_type& t) { + w.StartObject(); + w.Key("type"); + w.String("map"); + w.Key("key-id"); + w.Int(t.key_field->id); + w.Key("value-id"); + w.Int(t.value_field->id); + w.Key("value-required"); + w.Bool(bool(t.value_field->required)); + w.Key("key"); + rjson_serialize(w, t.key_field->type); + w.Key("value"); + rjson_serialize(w, t.value_field->type); + w.EndObject(); +} + +void rjson_serialize( + json::Writer& w, const iceberg::field_type& t) { + std::visit(rjson_visitor{w}, t); +} + +} // namespace json diff --git a/src/v/iceberg/datatypes_json.h b/src/v/iceberg/datatypes_json.h new file mode 100644 index 000000000000..620d451d50c6 --- /dev/null +++ b/src/v/iceberg/datatypes_json.h @@ -0,0 +1,42 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 +#pragma once + +#include "iceberg/datatypes.h" +#include "json/_include_first.h" +#include "json/document.h" +#include "json/stringbuffer.h" +#include "json/writer.h" + +namespace iceberg { + +struct_type parse_struct(const json::Value&); +list_type parse_list(const json::Value&); +map_type parse_map(const json::Value&); +field_type parse_type(const json::Value&); +nested_field_ptr parse_field(const json::Value&); + +} // namespace iceberg + +namespace json { + +void rjson_serialize( + json::Writer& w, const iceberg::nested_field& f); +void rjson_serialize( + json::Writer& w, const iceberg::primitive_type& t); +void rjson_serialize( + json::Writer& w, const iceberg::struct_type& t); +void rjson_serialize( + json::Writer& w, const iceberg::list_type& t); +void rjson_serialize( + json::Writer& w, const iceberg::map_type& t); +void rjson_serialize( + json::Writer& w, const iceberg::field_type& t); + +} // namespace json diff --git a/src/v/iceberg/tests/CMakeLists.txt b/src/v/iceberg/tests/CMakeLists.txt index a1334d15becf..ff8c92f2f5a5 100644 --- a/src/v/iceberg/tests/CMakeLists.txt +++ b/src/v/iceberg/tests/CMakeLists.txt @@ -8,6 +8,7 @@ rp_test( SOURCES datatypes_test.cc manifest_serialization_test.cc + datatypes_json_test.cc LIBRARIES Avro::avro v::bytes diff --git a/src/v/iceberg/tests/datatypes_json_test.cc b/src/v/iceberg/tests/datatypes_json_test.cc new file mode 100644 index 000000000000..64d060642842 --- /dev/null +++ b/src/v/iceberg/tests/datatypes_json_test.cc @@ -0,0 +1,201 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#include "iceberg/datatypes.h" +#include "iceberg/datatypes_json.h" +#include "json/document.h" +#include "json/stringbuffer.h" + +#include + +using namespace iceberg; + +namespace { +ss::sstring type_to_json_str(const field_type& t) { + json::StringBuffer buf; + json::Writer w(buf); + rjson_serialize(w, t); + return buf.GetString(); +} +} // namespace + +// Round trip test for the struct type of a schema taken from +// https://github.com/apache/iceberg-go/blob/704a6e78c13ea63f1ff4bb387f7d4b365b5f0f82/schema_test.go#L644 +TEST(DataTypeJsonSerde, TestFieldType) { + struct_type expected_struct; + expected_struct.fields.emplace_back( + nested_field::create(1, "foo", field_required::no, string_type{})); + expected_struct.fields.emplace_back( + nested_field::create(2, "bar", field_required::yes, int_type{})); + expected_struct.fields.emplace_back( + nested_field::create(3, "baz", field_required::no, boolean_type{})); + + expected_struct.fields.emplace_back(nested_field::create( + 4, + "qux", + field_required::yes, + list_type::create(5, field_required::yes, string_type{}))); + + expected_struct.fields.emplace_back(nested_field::create( + 6, + "quux", + field_required::yes, + map_type::create( + 7, + string_type{}, + 8, + field_required::yes, + map_type::create( + 9, string_type{}, 10, field_required::yes, int_type{})))); + + struct_type location_struct; + location_struct.fields.emplace_back( + nested_field::create(13, "latitude", field_required::no, float_type{})); + location_struct.fields.emplace_back( + nested_field::create(14, "longitude", field_required::no, float_type{})); + expected_struct.fields.emplace_back(nested_field::create( + 11, + "location", + field_required::yes, + list_type::create(12, field_required::yes, std::move(location_struct)))); + + struct_type person_struct; + person_struct.fields.emplace_back( + nested_field::create(16, "name", field_required::no, string_type{})); + person_struct.fields.emplace_back( + nested_field::create(17, "age", field_required::yes, int_type{})); + expected_struct.fields.emplace_back(nested_field::create( + 15, "person", field_required::no, std::move(person_struct))); + field_type expected_type = std::move(expected_struct); + + const char* json_data = R"JSON({ + "type": "struct", + "schema-id": 1, + "identifier-field-ids": [1], + "fields": [ + { + "type": "string", + "id": 1, + "name": "foo", + "required": false + }, + { + "type": "int", + "id": 2, + "name": "bar", + "required": true + }, + { + "type": "boolean", + "id": 3, + "name": "baz", + "required": false + }, + { + "id": 4, + "name": "qux", + "required": true, + "type": { + "type": "list", + "element-id": 5, + "element-required": true, + "element": "string" + } + }, + { + "id": 6, + "name": "quux", + "required": true, + "type": { + "type": "map", + "key-id": 7, + "key": "string", + "value-id": 8, + "value": { + "type": "map", + "key-id": 9, + "key": "string", + "value-id": 10, + "value": "int", + "value-required": true + }, + "value-required": true + } + }, + { + "id": 11, + "name": "location", + "required": true, + "type": { + "type": "list", + "element-id": 12, + "element-required": true, + "element": { + "type": "struct", + "fields": [ + { + "id": 13, + "name": "latitude", + "type": "float", + "required": false + }, + { + "id": 14, + "name": "longitude", + "type": "float", + "required": false + } + ] + } + } + }, + { + "id": 15, + "name": "person", + "required": false, + "type": { + "type": "struct", + "fields": [ + { + "id": 16, + "name": "name", + "type": "string", + "required": false + }, + { + "id": 17, + "name": "age", + "type": "int", + "required": true + } + ] + } + } + ] + })JSON"; + const ss::sstring expected_type_str = type_to_json_str(expected_type); + + json::Document parsed_orig_json; + parsed_orig_json.Parse(json_data); + auto parsed_orig_type = parse_type(parsed_orig_json); + const ss::sstring parsed_orig_as_str = type_to_json_str(parsed_orig_type); + ASSERT_EQ(expected_type, parsed_orig_type) + << fmt::format("{}\nvs\n{}", expected_type_str, parsed_orig_as_str); + + json::Document parsed_roundtrip_json; + parsed_roundtrip_json.Parse(parsed_orig_as_str); + auto parsed_roundtrip_type_moved = parse_type(parsed_roundtrip_json); + auto parsed_roundtrip_type = std::move(parsed_roundtrip_type_moved); + const ss::sstring parsed_roundtrip_as_str = type_to_json_str( + parsed_roundtrip_type); + // NOLINTNEXTLINE(bugprone-use-after-move) + ASSERT_NE(parsed_roundtrip_type_moved, parsed_roundtrip_type); + ASSERT_EQ(parsed_roundtrip_type, expected_type) + << fmt::format("{}\nvs\n{}", parsed_roundtrip_as_str, expected_type_str); +}