diff --git a/src/v/iceberg/CMakeLists.txt b/src/v/iceberg/CMakeLists.txt index 6eb193ab3fc6b..003cbfe008171 100644 --- a/src/v/iceberg/CMakeLists.txt +++ b/src/v/iceberg/CMakeLists.txt @@ -26,7 +26,15 @@ endforeach() v_cc_library( NAME iceberg - SRCS ${avro_hdrs} + SRCS + ${avro_hdrs} + datatypes.cc + datatypes_json.cc + json_utils.cc + DEPS + v::container + v::json + v::strings ) add_subdirectory(tests) diff --git a/src/v/iceberg/datatypes.cc b/src/v/iceberg/datatypes.cc new file mode 100644 index 0000000000000..ec3f20449b8bc --- /dev/null +++ b/src/v/iceberg/datatypes.cc @@ -0,0 +1,263 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#include "iceberg/datatypes.h" + +#include + +namespace iceberg { + +struct primitive_type_comparison_visitor { + template + bool operator()(const T&, const U&) const { + return false; + } + bool operator()(const decimal_type& lhs, const decimal_type& rhs) const { + return lhs.precision == rhs.precision && lhs.scale == rhs.scale; + } + bool operator()(const fixed_type& lhs, const fixed_type& rhs) const { + return lhs.length == rhs.length; + } + template + bool operator()(const T&, const T&) const { + return true; + } +}; + +bool operator==(const primitive_type& lhs, const primitive_type& rhs) { + return std::visit(primitive_type_comparison_visitor{}, lhs, rhs); +} + +struct field_type_comparison_visitor { + template + bool operator()(const T&, const U&) const { + return false; + } + bool + operator()(const primitive_type& lhs, const primitive_type& rhs) const { + return lhs == rhs; + } + bool operator()(const struct_type& lhs, const struct_type& rhs) const { + return lhs == rhs; + } + bool operator()(const list_type& lhs, const list_type& rhs) const { + return lhs == rhs; + } + bool operator()(const map_type& lhs, const map_type& rhs) const { + return lhs == rhs; + } +}; + +bool operator==(const field_type& lhs, const field_type& rhs) { + return std::visit(field_type_comparison_visitor{}, lhs, rhs); +} +bool operator==(const nested_field& lhs, const nested_field& rhs) { + return lhs.id == rhs.id && lhs.required == rhs.required + && lhs.name == rhs.name && lhs.type == rhs.type; +} + +std::ostream& operator<<(std::ostream& o, const boolean_type&) { + o << "boolean"; + return o; +} + +std::ostream& operator<<(std::ostream& o, const int_type&) { + o << "int"; + return o; +} + +std::ostream& operator<<(std::ostream& o, const long_type&) { + o << "long"; + return o; +} + +std::ostream& operator<<(std::ostream& o, const float_type&) { + o << "float"; + return o; +} + +std::ostream& operator<<(std::ostream& o, const double_type&) { + o << "double"; + return o; +} + +std::ostream& operator<<(std::ostream& o, const decimal_type& t) { + o << fmt::format("decimal({}, {})", t.precision, t.scale); + return o; +} + +std::ostream& operator<<(std::ostream& o, const date_type&) { + o << "date"; + return o; +} + +std::ostream& operator<<(std::ostream& o, const time_type&) { + o << "time"; + return o; +} + +std::ostream& operator<<(std::ostream& o, const timestamp_type&) { + o << "timestamp"; + return o; +} + +std::ostream& operator<<(std::ostream& o, const timestamptz_type&) { + o << "timestamptz"; + return o; +} + +std::ostream& operator<<(std::ostream& o, const string_type&) { + o << "string"; + return o; +} + +std::ostream& operator<<(std::ostream& o, const uuid_type&) { + o << "uuid"; + return o; +} + +std::ostream& operator<<(std::ostream& o, const fixed_type& t) { + // NOTE: square brackets to match how fixed type is serialized as JSON, + // though this matching isn't necessarily important for operator<<. + o << fmt::format("fixed[{}]", t.length); + return o; +} + +std::ostream& operator<<(std::ostream& o, const binary_type&) { + o << "binary"; + return o; +} + +std::ostream& operator<<(std::ostream& o, const struct_type&) { + o << "struct"; + return o; +} + +std::ostream& operator<<(std::ostream& o, const list_type&) { + o << "list"; + return o; +} + +std::ostream& operator<<(std::ostream& o, const map_type&) { + o << "map"; + return o; +} + +bool operator==(const struct_type& lhs, const struct_type& rhs) { + if (lhs.fields.size() != rhs.fields.size()) { + return false; + } + for (size_t i = 0; i < lhs.fields.size(); i++) { + bool has_lhs = lhs.fields[i] != nullptr; + bool has_rhs = rhs.fields[i] != nullptr; + if (has_lhs != has_rhs) { + return false; + } + if (has_lhs == false) { + continue; + } + if (*lhs.fields[i] != *rhs.fields[i]) { + return false; + } + } + return true; +} +bool operator==(const list_type& lhs, const list_type& rhs) { + bool has_lhs = lhs.element_field != nullptr; + bool has_rhs = rhs.element_field != nullptr; + if (has_lhs != has_rhs) { + return false; + } + if (!has_lhs) { + // Both nullptr. + return true; + } + return *lhs.element_field == *rhs.element_field; +} +bool operator==(const map_type& lhs, const map_type& rhs) { + bool has_key_lhs = lhs.key_field != nullptr; + bool has_key_rhs = rhs.key_field != nullptr; + if (has_key_lhs != has_key_rhs) { + return false; + } + bool has_val_lhs = lhs.value_field != nullptr; + bool has_val_rhs = rhs.value_field != nullptr; + if (has_val_lhs != has_val_rhs) { + return false; + } + if (has_key_lhs && *lhs.key_field != *rhs.key_field) { + return false; + } + if (has_val_lhs && *lhs.value_field != *rhs.value_field) { + return false; + } + return true; +} + +namespace { +struct ostream_visitor { + explicit ostream_visitor(std::ostream& o) + : os(o) {} + std::ostream& os; + + template + void operator()(const T& v) const { + os << v; + } +}; +} // namespace + +std::ostream& operator<<(std::ostream& o, const primitive_type& t) { + std::visit(ostream_visitor{o}, t); + return o; +} + +std::ostream& operator<<(std::ostream& o, const field_type& t) { + std::visit(ostream_visitor{o}, t); + return o; +} + +list_type list_type::create( + int32_t element_id, field_required element_required, field_type element) { + // NOTE: the element field doesn't have a name. Functionally, the list type + // is represented as: + // - element-id + // - element-type + // - element-required + // Despite the missing name though, many Iceberg implementations represent + // the list with a nested_field. + return list_type{ + .element_field = nested_field::create( + element_id, "element", element_required, std::move(element))}; +} + +map_type map_type::create( + int32_t key_id, + field_type key_type, + int32_t val_id, + field_required val_req, + field_type val_type) { + // NOTE: the keys and values don't have names, and the key is always + // required. Functionally, a map type is represented as: + // - key-id + // - key-type + // - value-id + // - value-required + // - value-type + // Despite the missing names though, many Iceberg implementations represent + // the map with two nested_fields. + return map_type{ + .key_field = nested_field::create( + key_id, "key", field_required::yes, std::move(key_type)), + .value_field = nested_field::create( + val_id, "value", val_req, std::move(val_type)), + }; +} + +} // namespace iceberg diff --git a/src/v/iceberg/datatypes.h b/src/v/iceberg/datatypes.h new file mode 100644 index 0000000000000..328e94e187361 --- /dev/null +++ b/src/v/iceberg/datatypes.h @@ -0,0 +1,132 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 +#pragma once + +#include "base/seastarx.h" +#include "container/fragmented_vector.h" +#include "utils/named_type.h" + +#include +#include + +#include + +namespace iceberg { + +struct boolean_type {}; +struct int_type {}; +struct long_type {}; +struct float_type {}; +struct double_type {}; +struct decimal_type { + uint32_t precision; + uint32_t scale; +}; +struct date_type {}; +struct time_type {}; +struct timestamp_type {}; +struct timestamptz_type {}; +struct string_type {}; +struct uuid_type {}; +struct fixed_type { + uint64_t length; +}; +struct binary_type {}; +using primitive_type = std::variant< + boolean_type, + int_type, + long_type, + float_type, + double_type, + decimal_type, + date_type, + time_type, + timestamp_type, + timestamptz_type, + string_type, + uuid_type, + fixed_type, + binary_type>; +bool operator==(const primitive_type& lhs, const primitive_type& rhs); + +struct struct_type; +struct list_type; +struct map_type; +using field_type + = std::variant; +bool operator==(const field_type& lhs, const field_type& rhs); + +std::ostream& operator<<(std::ostream&, const boolean_type&); +std::ostream& operator<<(std::ostream&, const int_type&); +std::ostream& operator<<(std::ostream&, const long_type&); +std::ostream& operator<<(std::ostream&, const float_type&); +std::ostream& operator<<(std::ostream&, const double_type&); +std::ostream& operator<<(std::ostream&, const decimal_type&); +std::ostream& operator<<(std::ostream&, const date_type&); +std::ostream& operator<<(std::ostream&, const time_type&); +std::ostream& operator<<(std::ostream&, const timestamp_type&); +std::ostream& operator<<(std::ostream&, const timestamptz_type&); +std::ostream& operator<<(std::ostream&, const string_type&); +std::ostream& operator<<(std::ostream&, const uuid_type&); +std::ostream& operator<<(std::ostream&, const fixed_type&); +std::ostream& operator<<(std::ostream&, const binary_type&); +std::ostream& operator<<(std::ostream&, const struct_type&); +std::ostream& operator<<(std::ostream&, const list_type&); +std::ostream& operator<<(std::ostream&, const map_type&); +std::ostream& operator<<(std::ostream&, const primitive_type&); +std::ostream& operator<<(std::ostream&, const field_type&); + +struct nested_field; +using nested_field_ptr = std::unique_ptr; +using field_required = ss::bool_class; +struct struct_type { + chunked_vector fields; + friend bool operator==(const struct_type& lhs, const struct_type& rhs); +}; + +struct list_type { + nested_field_ptr element_field; + friend bool operator==(const list_type& lhs, const list_type& rhs); + + static list_type create( + int32_t element_id, field_required element_required, field_type element); +}; + +struct map_type { + nested_field_ptr key_field; + nested_field_ptr value_field; + friend bool operator==(const map_type& lhs, const map_type& rhs); + + static map_type create( + int32_t key_id, + field_type key_type, + int32_t val_id, + field_required val_req, + field_type val_type); +}; + +struct nested_field { + using id_t = named_type; + id_t id; + ss::sstring name; + field_required required; + field_type type; + std::optional doc; + // TODO: support initial-default and write-default optional literals. + + static nested_field_ptr + create(int32_t id, ss::sstring name, field_required req, field_type t) { + return std::make_unique( + id_t{id}, std::move(name), req, std::move(t), std::nullopt); + } + + friend bool operator==(const nested_field& lhs, const nested_field& rhs); +}; + +} // namespace iceberg diff --git a/src/v/iceberg/datatypes_json.cc b/src/v/iceberg/datatypes_json.cc new file mode 100644 index 0000000000000..f9dceea7c1098 --- /dev/null +++ b/src/v/iceberg/datatypes_json.cc @@ -0,0 +1,251 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#include "iceberg/datatypes_json.h" + +#include "iceberg/datatypes.h" +#include "iceberg/json_utils.h" +#include "json/document.h" +#include "strings/string_switch.h" + +#include +#include + +namespace iceberg { + +namespace { +std::string_view +extract_between(char start_ch, char end_ch, const std::string_view& s) { + auto start_pos = s.find(start_ch); + auto end_pos = s.find(end_ch, start_pos); + + if (start_pos != std::string::npos && end_pos != std::string::npos) { + return s.substr(start_pos + 1, end_pos - start_pos - 1); + } + throw std::invalid_argument( + fmt::format("Missing wrappers '{}' or '{}' in {}", start_ch, end_ch, s)); +} + +decimal_type parse_decimal(std::string_view type_str) { + auto ps_str = extract_between('(', ')', type_str); + size_t pos = ps_str.find(','); + if (pos != std::string::npos) { + // Extract substrings before and after the comma + auto p_str = ps_str.substr(0, pos); + auto s_str = ps_str.substr(pos + 1); + return decimal_type{ + .precision = static_cast(std::stoul(ss::sstring(p_str))), + .scale = static_cast(std::stoul(ss::sstring(s_str)))}; + } + throw std::invalid_argument(fmt::format( + "Decimal requires format decimal(uint32, uint32): {}", type_str)); +} + +fixed_type parse_fixed(std::string_view type_str) { + auto l_str = extract_between('[', ']', type_str); + auto l = std::stoull(ss::sstring(l_str)); + return fixed_type{l}; +} + +} // namespace + +struct_type parse_struct(const json::Value& v) { + struct_type ret; + const auto& fields_json = parse_required(v, "fields"); + const auto& fields_array = fields_json.GetArray(); + ret.fields.reserve(fields_array.Size()); + for (const auto& field_json : fields_array) { + ret.fields.emplace_back(parse_field(field_json)); + } + return ret; +} + +list_type parse_list(const json::Value& v) { + const auto& element_json = parse_required(v, "element"); + return list_type::create( + parse_required_i32(v, "element-id"), + parse_required_bool(v, "element-required") ? field_required::yes + : field_required::no, + parse_type(element_json)); +} + +map_type parse_map(const json::Value& v) { + const auto& key_json = parse_required(v, "key"); + const auto& val_json = parse_required(v, "value"); + auto value_required = parse_required_bool(v, "value-required"); + return map_type::create( + parse_required_i32(v, "key-id"), + parse_type(key_json), + parse_required_i32(v, "value-id"), + value_required ? field_required::yes : field_required::no, + parse_type(val_json)); +} + +nested_field_ptr parse_field(const json::Value& v) { + auto id = parse_required_i32(v, "id"); + auto name = parse_required_str(v, "name"); + auto required = parse_required_bool(v, "required"); + const auto& type_json = parse_required(v, "type"); + auto type = parse_type(type_json); + return nested_field::create( + id, + std::move(name), + required ? field_required::yes : field_required::no, + std::move(type)); +} + +field_type parse_type(const json::Value& v) { + if (v.IsString()) { + auto v_str = std::string_view{v.GetString(), v.GetStringLength()}; + if (v_str.starts_with("decimal")) { + return parse_decimal(v_str); + } + if (v_str.starts_with("fixed")) { + return parse_fixed(v_str); + } + return string_switch(v_str) + .match("boolean", boolean_type{}) + .match("int", int_type{}) + .match("long", long_type{}) + .match("float", float_type{}) + .match("double", double_type{}) + .match("date", date_type{}) + .match("time", time_type{}) + .match("timestamp", timestamp_type{}) + .match("timestamptz", timestamptz_type{}) + .match("string", string_type{}) + .match("uuid", uuid_type{}) + .match("binary", binary_type{}); + } + if (!v.IsObject()) { + throw std::invalid_argument("Expected string or object for type field"); + } + auto type = parse_required_str(v, "type"); + if (type == "struct") { + return parse_struct(v); + } else if (type == "list") { + return parse_list(v); + } else if (type == "map") { + return parse_map(v); + } + throw std::invalid_argument(fmt::format( + "Expected type field of 'struct', 'list', or 'map': {}", type)); +} + +} // namespace iceberg + +namespace json { + +namespace { +class rjson_visitor { +public: + explicit rjson_visitor(json::Writer& w) + : w(w) {} + void operator()(const iceberg::boolean_type&) { w.String("boolean"); } + void operator()(const iceberg::int_type&) { w.String("int"); } + void operator()(const iceberg::long_type&) { w.String("long"); } + void operator()(const iceberg::float_type&) { w.String("float"); } + void operator()(const iceberg::double_type&) { w.String("double"); } + void operator()(const iceberg::decimal_type& t) { + w.String("decimal({}, {})", t.precision, t.scale); + } + void operator()(const iceberg::date_type&) { w.String("date"); } + void operator()(const iceberg::time_type&) { w.String("time"); } + void operator()(const iceberg::timestamp_type&) { w.String("timestamp"); } + void operator()(const iceberg::timestamptz_type&) { + w.String("timestamptz"); + } + void operator()(const iceberg::string_type&) { w.String("string"); } + void operator()(const iceberg::uuid_type&) { w.String("uuid"); } + void operator()(const iceberg::fixed_type& t) { + w.String(fmt::format("fixed[{}]", t.length)); + } + void operator()(const iceberg::binary_type&) { w.String("binary"); } + + void operator()(const iceberg::primitive_type& t) { rjson_serialize(w, t); } + void operator()(const iceberg::struct_type& t) { rjson_serialize(w, t); } + void operator()(const iceberg::list_type& t) { rjson_serialize(w, t); } + void operator()(const iceberg::map_type& t) { rjson_serialize(w, t); } + +private: + json::Writer& w; +}; +} // anonymous namespace + +void rjson_serialize( + json::Writer& w, const iceberg::nested_field& f) { + w.StartObject(); + w.Key("id"); + w.Int(f.id()); + w.Key("name"); + w.String(f.name); + w.Key("required"); + w.Bool(bool(f.required)); + w.Key("type"); + rjson_serialize(w, f.type); + w.EndObject(); +} + +void rjson_serialize( + json::Writer& w, const iceberg::primitive_type& t) { + std::visit(rjson_visitor{w}, t); +} + +void rjson_serialize( + json::Writer& w, const iceberg::struct_type& t) { + w.StartObject(); + w.Key("type"); + w.String("struct"); + w.Key("fields"); + w.StartArray(); + for (const auto& f : t.fields) { + rjson_serialize(w, *f); + } + w.EndArray(); + w.EndObject(); +} + +void rjson_serialize( + json::Writer& w, const iceberg::list_type& t) { + w.StartObject(); + w.Key("type"); + w.String("list"); + w.Key("element-id"); + w.Int(t.element_field->id); + w.Key("element-required"); + w.Bool(bool(t.element_field->required)); + w.Key("element"); + rjson_serialize(w, t.element_field->type); + w.EndObject(); +} + +void rjson_serialize( + json::Writer& w, const iceberg::map_type& t) { + w.StartObject(); + w.Key("type"); + w.String("map"); + w.Key("key-id"); + w.Int(t.key_field->id); + w.Key("value-id"); + w.Int(t.value_field->id); + w.Key("value-required"); + w.Bool(bool(t.value_field->required)); + w.Key("key"); + rjson_serialize(w, t.key_field->type); + w.Key("value"); + rjson_serialize(w, t.value_field->type); + w.EndObject(); +} + +void rjson_serialize( + json::Writer& w, const iceberg::field_type& t) { + std::visit(rjson_visitor{w}, t); +} + +} // namespace json diff --git a/src/v/iceberg/datatypes_json.h b/src/v/iceberg/datatypes_json.h new file mode 100644 index 0000000000000..620d451d50c6b --- /dev/null +++ b/src/v/iceberg/datatypes_json.h @@ -0,0 +1,42 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 +#pragma once + +#include "iceberg/datatypes.h" +#include "json/_include_first.h" +#include "json/document.h" +#include "json/stringbuffer.h" +#include "json/writer.h" + +namespace iceberg { + +struct_type parse_struct(const json::Value&); +list_type parse_list(const json::Value&); +map_type parse_map(const json::Value&); +field_type parse_type(const json::Value&); +nested_field_ptr parse_field(const json::Value&); + +} // namespace iceberg + +namespace json { + +void rjson_serialize( + json::Writer& w, const iceberg::nested_field& f); +void rjson_serialize( + json::Writer& w, const iceberg::primitive_type& t); +void rjson_serialize( + json::Writer& w, const iceberg::struct_type& t); +void rjson_serialize( + json::Writer& w, const iceberg::list_type& t); +void rjson_serialize( + json::Writer& w, const iceberg::map_type& t); +void rjson_serialize( + json::Writer& w, const iceberg::field_type& t); + +} // namespace json diff --git a/src/v/iceberg/json_utils.cc b/src/v/iceberg/json_utils.cc new file mode 100644 index 0000000000000..565482a4d60c1 --- /dev/null +++ b/src/v/iceberg/json_utils.cc @@ -0,0 +1,72 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#include "json/document.h" + +#include + +#include + +namespace iceberg { + +std::optional> +parse_optional(const json::Value& v, const char* member_name) { + if (!v.IsObject()) { + throw std::invalid_argument( + fmt::format("Expected JSON object to parse field '{}'", member_name)); + } + auto iter = v.FindMember(member_name); + if (iter == v.MemberEnd()) { + return std::nullopt; + } + return iter->value; +} + +const json::Value& +parse_required(const json::Value& v, const char* member_name) { + if (!v.IsObject()) { + throw std::invalid_argument( + fmt::format("Expected JSON object to parse field '{}'", member_name)); + } + auto iter = v.FindMember(member_name); + if (iter == v.MemberEnd()) { + throw std::invalid_argument( + fmt::format("No member named '{}'", member_name)); + } + return iter->value; +} + +ss::sstring parse_required_str(const json::Value& v, const char* member_name) { + const auto& str_json = parse_required(v, member_name); + if (!str_json.IsString()) { + throw std::invalid_argument( + fmt::format("Expected string for field '{}'", member_name)); + } + return str_json.GetString(); +} + +int32_t parse_required_i32(const json::Value& v, const char* member_name) { + const auto& int_json = parse_required(v, member_name); + if (!int_json.IsInt()) { + throw std::invalid_argument( + fmt::format("Expected integer for field '{}'", member_name)); + } + return int_json.GetInt(); +} + +bool parse_required_bool(const json::Value& v, const char* member_name) { + const auto& bool_json = parse_required(v, member_name); + if (!bool_json.IsBool()) { + throw std::invalid_argument( + fmt::format("Expected bool for field '{}'", member_name)); + } + return bool_json.GetBool(); +} + +} // namespace iceberg diff --git a/src/v/iceberg/json_utils.h b/src/v/iceberg/json_utils.h new file mode 100644 index 0000000000000..d84d8c8f550e3 --- /dev/null +++ b/src/v/iceberg/json_utils.h @@ -0,0 +1,27 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 +#pragma once + +#include "json/document.h" + +namespace iceberg { + +std::optional> +parse_optional(const json::Value& v, const char* member_name); + +const json::Value& +parse_required(const json::Value& v, const char* member_name); + +ss::sstring parse_required_str(const json::Value& v, const char* member_name); + +int32_t parse_required_i32(const json::Value& v, const char* member_name); + +bool parse_required_bool(const json::Value& v, const char* member_name); + +} // namespace iceberg diff --git a/src/v/iceberg/tests/CMakeLists.txt b/src/v/iceberg/tests/CMakeLists.txt index 309e47ca80efe..ff8c92f2f5a56 100644 --- a/src/v/iceberg/tests/CMakeLists.txt +++ b/src/v/iceberg/tests/CMakeLists.txt @@ -6,7 +6,9 @@ rp_test( USE_CWD BINARY_NAME iceberg SOURCES + datatypes_test.cc manifest_serialization_test.cc + datatypes_json_test.cc LIBRARIES Avro::avro v::bytes diff --git a/src/v/iceberg/tests/datatypes_json_test.cc b/src/v/iceberg/tests/datatypes_json_test.cc new file mode 100644 index 0000000000000..64d0606428422 --- /dev/null +++ b/src/v/iceberg/tests/datatypes_json_test.cc @@ -0,0 +1,201 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#include "iceberg/datatypes.h" +#include "iceberg/datatypes_json.h" +#include "json/document.h" +#include "json/stringbuffer.h" + +#include + +using namespace iceberg; + +namespace { +ss::sstring type_to_json_str(const field_type& t) { + json::StringBuffer buf; + json::Writer w(buf); + rjson_serialize(w, t); + return buf.GetString(); +} +} // namespace + +// Round trip test for the struct type of a schema taken from +// https://github.com/apache/iceberg-go/blob/704a6e78c13ea63f1ff4bb387f7d4b365b5f0f82/schema_test.go#L644 +TEST(DataTypeJsonSerde, TestFieldType) { + struct_type expected_struct; + expected_struct.fields.emplace_back( + nested_field::create(1, "foo", field_required::no, string_type{})); + expected_struct.fields.emplace_back( + nested_field::create(2, "bar", field_required::yes, int_type{})); + expected_struct.fields.emplace_back( + nested_field::create(3, "baz", field_required::no, boolean_type{})); + + expected_struct.fields.emplace_back(nested_field::create( + 4, + "qux", + field_required::yes, + list_type::create(5, field_required::yes, string_type{}))); + + expected_struct.fields.emplace_back(nested_field::create( + 6, + "quux", + field_required::yes, + map_type::create( + 7, + string_type{}, + 8, + field_required::yes, + map_type::create( + 9, string_type{}, 10, field_required::yes, int_type{})))); + + struct_type location_struct; + location_struct.fields.emplace_back( + nested_field::create(13, "latitude", field_required::no, float_type{})); + location_struct.fields.emplace_back( + nested_field::create(14, "longitude", field_required::no, float_type{})); + expected_struct.fields.emplace_back(nested_field::create( + 11, + "location", + field_required::yes, + list_type::create(12, field_required::yes, std::move(location_struct)))); + + struct_type person_struct; + person_struct.fields.emplace_back( + nested_field::create(16, "name", field_required::no, string_type{})); + person_struct.fields.emplace_back( + nested_field::create(17, "age", field_required::yes, int_type{})); + expected_struct.fields.emplace_back(nested_field::create( + 15, "person", field_required::no, std::move(person_struct))); + field_type expected_type = std::move(expected_struct); + + const char* json_data = R"JSON({ + "type": "struct", + "schema-id": 1, + "identifier-field-ids": [1], + "fields": [ + { + "type": "string", + "id": 1, + "name": "foo", + "required": false + }, + { + "type": "int", + "id": 2, + "name": "bar", + "required": true + }, + { + "type": "boolean", + "id": 3, + "name": "baz", + "required": false + }, + { + "id": 4, + "name": "qux", + "required": true, + "type": { + "type": "list", + "element-id": 5, + "element-required": true, + "element": "string" + } + }, + { + "id": 6, + "name": "quux", + "required": true, + "type": { + "type": "map", + "key-id": 7, + "key": "string", + "value-id": 8, + "value": { + "type": "map", + "key-id": 9, + "key": "string", + "value-id": 10, + "value": "int", + "value-required": true + }, + "value-required": true + } + }, + { + "id": 11, + "name": "location", + "required": true, + "type": { + "type": "list", + "element-id": 12, + "element-required": true, + "element": { + "type": "struct", + "fields": [ + { + "id": 13, + "name": "latitude", + "type": "float", + "required": false + }, + { + "id": 14, + "name": "longitude", + "type": "float", + "required": false + } + ] + } + } + }, + { + "id": 15, + "name": "person", + "required": false, + "type": { + "type": "struct", + "fields": [ + { + "id": 16, + "name": "name", + "type": "string", + "required": false + }, + { + "id": 17, + "name": "age", + "type": "int", + "required": true + } + ] + } + } + ] + })JSON"; + const ss::sstring expected_type_str = type_to_json_str(expected_type); + + json::Document parsed_orig_json; + parsed_orig_json.Parse(json_data); + auto parsed_orig_type = parse_type(parsed_orig_json); + const ss::sstring parsed_orig_as_str = type_to_json_str(parsed_orig_type); + ASSERT_EQ(expected_type, parsed_orig_type) + << fmt::format("{}\nvs\n{}", expected_type_str, parsed_orig_as_str); + + json::Document parsed_roundtrip_json; + parsed_roundtrip_json.Parse(parsed_orig_as_str); + auto parsed_roundtrip_type_moved = parse_type(parsed_roundtrip_json); + auto parsed_roundtrip_type = std::move(parsed_roundtrip_type_moved); + const ss::sstring parsed_roundtrip_as_str = type_to_json_str( + parsed_roundtrip_type); + // NOLINTNEXTLINE(bugprone-use-after-move) + ASSERT_NE(parsed_roundtrip_type_moved, parsed_roundtrip_type); + ASSERT_EQ(parsed_roundtrip_type, expected_type) + << fmt::format("{}\nvs\n{}", parsed_roundtrip_as_str, expected_type_str); +} diff --git a/src/v/iceberg/tests/datatypes_test.cc b/src/v/iceberg/tests/datatypes_test.cc new file mode 100644 index 0000000000000..f483226424934 --- /dev/null +++ b/src/v/iceberg/tests/datatypes_test.cc @@ -0,0 +1,295 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#include "iceberg/datatypes.h" + +#include + +using namespace iceberg; + +chunked_vector all_types() { + chunked_vector all_types; + + // Primitive types. + all_types.emplace_back(boolean_type{}); + all_types.emplace_back(int_type{}); + all_types.emplace_back(long_type{}); + all_types.emplace_back(float_type{}); + all_types.emplace_back(double_type{}); + all_types.emplace_back(decimal_type{.precision = 123, .scale = 456}); + all_types.emplace_back(date_type{}); + all_types.emplace_back(time_type{}); + all_types.emplace_back(timestamp_type{}); + all_types.emplace_back(timestamptz_type{}); + all_types.emplace_back(string_type{}); + all_types.emplace_back(uuid_type{}); + all_types.emplace_back(fixed_type{ + .length = 789, + }); + all_types.emplace_back(binary_type{}); + + // Complex types. + all_types.emplace_back( + list_type::create(1, field_required::yes, string_type{})); + chunked_vector struct_fields; + struct_fields.emplace_back( + nested_field::create(1, "foo", field_required::yes, string_type{})); + all_types.emplace_back(struct_type{ + .fields = std::move(struct_fields), + }); + all_types.emplace_back(map_type::create( + 0, string_type{}, 1, field_required::yes, string_type{})); + return all_types; +}; + +// Use the equality operator to check that the given type only exists once in +// the given list. +void check_single_type_exists( + const field_type& expected_type, + const chunked_vector& all_types) { + size_t num_eq = 0; + size_t num_ne = 0; + for (const auto& t : all_types) { + if (t == expected_type) { + ++num_eq; + } + if (t != expected_type) { + ++num_ne; + } + } + ASSERT_EQ(num_eq, 1); + ASSERT_EQ(num_ne, all_types.size() - 1); +} + +TEST(DatatypeTest, TestTypesEquality) { + auto types = all_types(); + for (const auto& t : types) { + ASSERT_NO_FATAL_FAILURE(check_single_type_exists(t, types)); + } +} +TEST(DatatypeTest, TestBoolean) { + ASSERT_EQ(field_type{boolean_type{}}, field_type{boolean_type{}}); + ASSERT_EQ("boolean", fmt::format("{}", boolean_type{})); + ASSERT_EQ("boolean", fmt::format("{}", field_type{boolean_type{}})); +} +TEST(DatatypeTest, TestInt) { + ASSERT_EQ(field_type{int_type{}}, field_type{int_type{}}); + ASSERT_EQ("int", fmt::format("{}", int_type{})); + ASSERT_EQ("int", fmt::format("{}", field_type{int_type{}})); +} +TEST(DatatypeTest, TestLong) { + ASSERT_EQ(field_type{long_type{}}, field_type{long_type{}}); + ASSERT_EQ("long", fmt::format("{}", long_type{})); + ASSERT_EQ("long", fmt::format("{}", field_type{long_type{}})); +} +TEST(DatatypeTest, TestFloat) { + ASSERT_EQ(field_type{float_type{}}, field_type{float_type{}}); + ASSERT_EQ("float", fmt::format("{}", float_type{})); + ASSERT_EQ("float", fmt::format("{}", field_type{float_type{}})); +} +TEST(DatatypeTest, TestDouble) { + ASSERT_EQ(field_type{double_type{}}, field_type{double_type{}}); + ASSERT_EQ("double", fmt::format("{}", double_type{})); + ASSERT_EQ("double", fmt::format("{}", field_type{double_type{}})); +} +TEST(DatatypeTest, TestDecimal) { + decimal_type t1{ + .precision = 123, + .scale = 456, + }; + auto t1_copy = t1; + auto t2 = t1; + t2.scale = 0; + auto t3 = t1; + t3.precision = 0; + ASSERT_EQ(field_type{t1}, field_type{t1}); + ASSERT_EQ(field_type{t1}, field_type{t1_copy}); + ASSERT_NE(field_type{t1}, field_type{t2}); + ASSERT_NE(field_type{t1}, field_type{t3}); + ASSERT_EQ("decimal(123, 456)", fmt::format("{}", t1)); + ASSERT_EQ("decimal(123, 456)", fmt::format("{}", t1_copy)); + ASSERT_EQ("decimal(123, 0)", fmt::format("{}", t2)); + ASSERT_EQ("decimal(0, 456)", fmt::format("{}", t3)); + ASSERT_EQ("decimal(123, 456)", fmt::format("{}", field_type{t1})); + ASSERT_EQ("decimal(123, 456)", fmt::format("{}", field_type{t1_copy})); + ASSERT_EQ("decimal(123, 0)", fmt::format("{}", field_type{t2})); + ASSERT_EQ("decimal(0, 456)", fmt::format("{}", field_type{t3})); +} +TEST(DatatypeTest, TestDate) { + ASSERT_EQ(field_type{date_type{}}, field_type{date_type{}}); + ASSERT_EQ("date", fmt::format("{}", date_type{})); + ASSERT_EQ("date", fmt::format("{}", field_type{date_type{}})); +} +TEST(DatatypeTest, TestTime) { + ASSERT_EQ(field_type{time_type{}}, field_type{time_type{}}); + ASSERT_EQ("time", fmt::format("{}", time_type{})); + ASSERT_EQ("time", fmt::format("{}", field_type{time_type{}})); +} +TEST(DatatypeTest, TestTimestamp) { + ASSERT_EQ(field_type{timestamp_type{}}, field_type{timestamp_type{}}); + ASSERT_EQ("timestamp", fmt::format("{}", timestamp_type{})); + ASSERT_EQ("timestamp", fmt::format("{}", field_type{timestamp_type{}})); +} +TEST(DatatypeTest, TestTimestamptz) { + ASSERT_EQ(field_type{timestamptz_type{}}, field_type{timestamptz_type{}}); + ASSERT_EQ("timestamptz", fmt::format("{}", timestamptz_type{})); + ASSERT_EQ("timestamptz", fmt::format("{}", field_type{timestamptz_type{}})); +} +TEST(DatatypeTest, TestString) { + ASSERT_EQ(field_type{string_type{}}, field_type{string_type{}}); + ASSERT_EQ("string", fmt::format("{}", string_type{})); + ASSERT_EQ("string", fmt::format("{}", field_type{string_type{}})); +} +TEST(DatatypeTest, TestUuid) { + ASSERT_EQ(field_type{uuid_type{}}, field_type{uuid_type{}}); + ASSERT_EQ("uuid", fmt::format("{}", uuid_type{})); + ASSERT_EQ("uuid", fmt::format("{}", field_type{uuid_type{}})); +} +TEST(DatatypeTest, TestFixed) { + fixed_type t1{ + .length = 12345, + }; + auto t1_copy = t1; + fixed_type t2{ + .length = 54321, + }; + ASSERT_EQ(field_type{t1}, field_type{t1}); + ASSERT_EQ(field_type{t1}, field_type{t1_copy}); + ASSERT_NE(field_type{t1}, field_type{t2}); + ASSERT_EQ("fixed[12345]", fmt::format("{}", t1)); + ASSERT_EQ("fixed[12345]", fmt::format("{}", t1_copy)); + ASSERT_EQ("fixed[54321]", fmt::format("{}", t2)); + ASSERT_EQ("fixed[12345]", fmt::format("{}", field_type{t1})); + ASSERT_EQ("fixed[12345]", fmt::format("{}", field_type{t1_copy})); + ASSERT_EQ("fixed[54321]", fmt::format("{}", field_type{t2})); +} +TEST(DatatypeTest, TestBinary) { + ASSERT_EQ(field_type{binary_type{}}, field_type{binary_type{}}); + ASSERT_EQ("binary", fmt::format("{}", binary_type{})); + ASSERT_EQ("binary", fmt::format("{}", field_type{binary_type{}})); +} +TEST(DatatypeTest, TestList) { + auto t1 = field_type{ + list_type::create(1, field_required::yes, boolean_type{})}; + auto t1_dup = field_type{ + list_type::create(1, field_required::yes, boolean_type{})}; + ASSERT_EQ(t1, t1); + ASSERT_EQ(t1, t1_dup); + + auto t2 = field_type{ + list_type::create(2, field_required::yes, boolean_type{})}; + auto t3 = field_type{ + list_type::create(1, field_required::no, boolean_type{})}; + auto t4 = field_type{ + list_type::create(1, field_required::yes, string_type{})}; + ASSERT_NE(t1, t2); + ASSERT_NE(t1, t3); + ASSERT_NE(t1, t4); + + // Moving the type will empty it. + auto t1_move = std::move(t1); + // NOLINTBEGIN(bugprone-use-after-move) + ASSERT_TRUE(std::holds_alternative(t1)); + ASSERT_TRUE(std::get(t1).element_field == nullptr); + ASSERT_NE(t1_move, t1); + ASSERT_EQ("list", fmt::format("{}", t1)); + // NOLINTEND(bugprone-use-after-move) + + ASSERT_EQ(t1_move, t1_dup); + ASSERT_EQ("list", fmt::format("{}", t2)); + ASSERT_EQ("list", fmt::format("{}", t3)); + ASSERT_EQ("list", fmt::format("{}", t4)); + ASSERT_EQ("list", fmt::format("{}", t1_dup)); + ASSERT_EQ("list", fmt::format("{}", t1_move)); +} +TEST(DatatypeTest, TestMap) { + auto t1 = field_type{map_type::create( + 0, string_type{}, 1, field_required::yes, string_type{})}; + auto t1_dup = field_type{map_type::create( + 0, string_type{}, 1, field_required::yes, string_type{})}; + ASSERT_EQ(t1, t1); + ASSERT_EQ(t1, t1_dup); + + auto t2 = field_type{map_type::create( + 1, string_type{}, 1, field_required::yes, string_type{})}; + auto t3 = field_type{map_type::create( + 0, boolean_type{}, 1, field_required::yes, string_type{})}; + auto t4 = field_type{ + map_type::create(0, string_type{}, 2, field_required::no, string_type{})}; + auto t5 = field_type{map_type::create( + 0, string_type{}, 1, field_required::yes, boolean_type{})}; + ASSERT_NE(t1, t2); + ASSERT_NE(t1, t3); + ASSERT_NE(t1, t4); + ASSERT_NE(t1, t5); + + // Moving the type will empty it. + auto t1_move = std::move(t1); + // NOLINTBEGIN(bugprone-use-after-move) + ASSERT_TRUE(std::holds_alternative(t1)); + ASSERT_TRUE(std::get(t1).key_field == nullptr); + ASSERT_TRUE(std::get(t1).value_field == nullptr); + ASSERT_NE(t1_move, t1); + ASSERT_EQ("map", fmt::format("{}", t1)); + // NOLINTEND(bugprone-use-after-move) + + ASSERT_EQ(t1_move, t1_dup); + ASSERT_EQ("map", fmt::format("{}", t2)); + ASSERT_EQ("map", fmt::format("{}", t3)); + ASSERT_EQ("map", fmt::format("{}", t4)); + ASSERT_EQ("map", fmt::format("{}", t1_dup)); + ASSERT_EQ("map", fmt::format("{}", t1_move)); + + // Regression test that would cause previous impl to crash if both types + // are the same but keys are null. + std::get(t1_move).key_field.reset(); + std::get(t1_dup).key_field.reset(); + ASSERT_EQ(t1_move, t1_dup); +} +TEST(DatatypeTest, TestStruct) { + // Constructs a struct_type with a single field. + auto struct_single = + [](int32_t i, const ss::sstring& name, field_required req, field_type t) { + chunked_vector struct_fields; + struct_fields.emplace_back( + nested_field::create(i, name, req, std::move(t))); + return field_type{struct_type{ + .fields = std::move(struct_fields), + }}; + }; + auto t1 = struct_single(0, "foo", field_required::yes, string_type{}); + auto t1_dup = struct_single(0, "foo", field_required::yes, string_type{}); + auto t2 = struct_single(1, "foo", field_required::yes, string_type{}); + auto t3 = struct_single(0, "food", field_required::yes, string_type{}); + auto t4 = struct_single(0, "foo", field_required::no, string_type{}); + auto t5 = struct_single(0, "foo", field_required::yes, boolean_type{}); + ASSERT_EQ(t1, t1); + ASSERT_EQ(t1, t1_dup); + ASSERT_NE(t1, t2); + ASSERT_NE(t1, t3); + ASSERT_NE(t1, t4); + ASSERT_NE(t1, t5); + + // Moving the type will empty it. + auto t1_move = std::move(t1); + // NOLINTBEGIN(bugprone-use-after-move) + ASSERT_TRUE(std::holds_alternative(t1)); + ASSERT_TRUE(std::get(t1).fields.empty()); + ASSERT_NE(t1_move, t1); + ASSERT_EQ("struct", fmt::format("{}", t1)); + // NOLINTEND(bugprone-use-after-move) + + ASSERT_EQ(t1_move, t1_dup); + ASSERT_EQ("struct", fmt::format("{}", t2)); + ASSERT_EQ("struct", fmt::format("{}", t3)); + ASSERT_EQ("struct", fmt::format("{}", t4)); + ASSERT_EQ("struct", fmt::format("{}", t5)); + ASSERT_EQ("struct", fmt::format("{}", t1_dup)); + ASSERT_EQ("struct", fmt::format("{}", t1_move)); +}