From 77d6336ca42b6cc3e1571ee7be4f55b233efe4d1 Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Mon, 15 Jul 2024 02:04:08 -0700 Subject: [PATCH] iceberg: json serde for data types --- src/v/iceberg/CMakeLists.txt | 2 + src/v/iceberg/datatypes_json.cc | 290 +++++++++++++++++++++ src/v/iceberg/datatypes_json.h | 39 +++ src/v/iceberg/tests/CMakeLists.txt | 1 + src/v/iceberg/tests/datatypes_json_test.cc | 199 ++++++++++++++ 5 files changed, 531 insertions(+) create mode 100644 src/v/iceberg/datatypes_json.cc create mode 100644 src/v/iceberg/datatypes_json.h create mode 100644 src/v/iceberg/tests/datatypes_json_test.cc diff --git a/src/v/iceberg/CMakeLists.txt b/src/v/iceberg/CMakeLists.txt index cb95e3031ae35..fa87a1a56462e 100644 --- a/src/v/iceberg/CMakeLists.txt +++ b/src/v/iceberg/CMakeLists.txt @@ -32,8 +32,10 @@ v_cc_library( NAME iceberg SRCS datatypes.cc + datatypes_json.cc DEPS v::container + v::strings ) add_dependencies(v_iceberg iceberg_avro_codegen) diff --git a/src/v/iceberg/datatypes_json.cc b/src/v/iceberg/datatypes_json.cc new file mode 100644 index 0000000000000..5842875503781 --- /dev/null +++ b/src/v/iceberg/datatypes_json.cc @@ -0,0 +1,290 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#include "iceberg/datatypes_json.h" + +#include "iceberg/datatypes.h" +#include "json/document.h" +#include "strings/string_switch.h" + +#include +#include + +namespace iceberg { + +namespace { +std::string_view +extract_between(char start_ch, char end_ch, const std::string_view& s) { + auto start_pos = s.find(start_ch); + auto end_pos = s.find(end_ch, start_pos); + + if (start_pos != std::string::npos && end_pos != std::string::npos) { + return s.substr(start_pos + 1, end_pos - start_pos - 1); + } + throw std::invalid_argument( + fmt::format("Missing wrappers '{}' or '{}' in {}", start_ch, end_ch, s)); +} + +decimal_type parse_decimal(const std::string_view& type_str) { + auto ps_str = extract_between('(', ')', type_str); + size_t pos = ps_str.find(','); + if (pos != std::string::npos) { + // Extract substrings before and after the comma + auto p_str = ps_str.substr(0, pos); + auto s_str = ps_str.substr(pos + 1); + return decimal_type{ + .precision = static_cast(std::stoul(ss::sstring(p_str))), + .scale = static_cast(std::stoul(ss::sstring(s_str)))}; + } + throw std::invalid_argument(fmt::format( + "Decimal requires format decimal(uint32, uint32): {}", type_str)); +} + +fixed_type parse_fixed(const std::string_view& type_str) { + auto l_str = extract_between('[', ']', type_str); + auto l = std::stoull(ss::sstring(l_str)); + return fixed_type{l}; +} + +const json::Value& +parse_required(const json::Value& v, const char* member_name) { + if (!v.IsObject()) { + throw std::invalid_argument( + fmt::format("Expected JSON object to parse field '{}'", member_name)); + } + auto iter = v.FindMember(member_name); + if (iter == v.MemberEnd()) { + throw std::invalid_argument( + fmt::format("No member named '{}'", member_name)); + } + return iter->value; +} + +ss::sstring parse_required_str(const json::Value& v, const char* member_name) { + const auto& str_json = parse_required(v, member_name); + if (!str_json.IsString()) { + throw std::invalid_argument( + fmt::format("Expected string for field '{}'", member_name)); + } + return str_json.GetString(); +} + +int32_t parse_required_i32(const json::Value& v, const char* member_name) { + const auto& int_json = parse_required(v, member_name); + if (!int_json.IsInt()) { + throw std::invalid_argument( + fmt::format("Expected integer for field '{}'", member_name)); + } + return int_json.GetInt(); +} + +bool parse_required_bool(const json::Value& v, const char* member_name) { + const auto& bool_json = parse_required(v, member_name); + if (!bool_json.IsBool()) { + throw std::invalid_argument( + fmt::format("Expected bool for field '{}'", member_name)); + } + return bool_json.GetBool(); +} + +struct_type parse_struct(const json::Value&); +list_type parse_list(const json::Value&); +map_type parse_map(const json::Value&); + +struct_type parse_struct(const json::Value& v) { + struct_type ret; + const auto& fields_json = parse_required(v, "fields"); + const auto& fields_array = fields_json.GetArray(); + ret.fields.reserve(fields_array.Size()); + for (const auto& field_json : fields_array) { + ret.fields.emplace_back(parse_field(field_json)); + } + return ret; +} + +list_type parse_list(const json::Value& v) { + const auto& element_json = parse_required(v, "element"); + return list_type::create( + parse_required_i32(v, "element-id"), + parse_required_bool(v, "element-required") ? field_required::yes + : field_required::no, + parse_type(element_json)); +} + +map_type parse_map(const json::Value& v) { + const auto& key_json = parse_required(v, "key"); + const auto& val_json = parse_required(v, "value"); + auto value_required = parse_required_bool(v, "value-required"); + auto key_type = parse_type(key_json); + return map_type::create( + parse_required_i32(v, "key-id"), + parse_type(key_json), + parse_required_i32(v, "value-id"), + value_required ? field_required::yes : field_required::no, + parse_type(val_json)); +} + +} // namespace + +nested_field_ptr parse_field(const json::Value& v) { + auto id = parse_required_i32(v, "id"); + auto name = parse_required_str(v, "name"); + auto required = parse_required_bool(v, "required"); + const auto& type_json = parse_required(v, "type"); + auto type = parse_type(type_json); + return nested_field::create( + id, + std::move(name), + required ? field_required::yes : field_required::no, + std::move(type)); +} + +field_type parse_type(const json::Value& v) { + if (v.IsString()) { + auto v_str = std::string_view{v.GetString(), v.GetStringLength()}; + if (v_str.starts_with("decimal")) { + return parse_decimal(v_str); + } + if (v_str.starts_with("fixed")) { + return parse_fixed(v_str); + } + return string_switch(v_str) + .match("boolean", boolean_type{}) + .match("int", int_type{}) + .match("long", long_type{}) + .match("float", float_type{}) + .match("double", double_type{}) + .match("date", date_type{}) + .match("time", time_type{}) + .match("timestamp", timestamp_type{}) + .match("timestamptz", timestamptz_type{}) + .match("string", string_type{}) + .match("uuid", uuid_type{}) + .match("binary", binary_type{}); + } + if (!v.IsObject()) { + throw std::invalid_argument("Expected string or object for type field"); + } + auto type = parse_required_str(v, "type"); + if (type == "struct") { + return parse_struct(v); + } else if (type == "list") { + return parse_list(v); + } else if (type == "map") { + return parse_map(v); + } + throw std::invalid_argument(fmt::format( + "Expected type field of 'struct', 'list', or 'map': {}", type)); +} + +} // namespace iceberg + +namespace json { + +namespace { +class rjson_visitor { +public: + explicit rjson_visitor(json::Writer& w) + : w(w) {} + void operator()(const iceberg::boolean_type&) { w.String("boolean"); } + void operator()(const iceberg::int_type&) { w.String("int"); } + void operator()(const iceberg::long_type&) { w.String("long"); } + void operator()(const iceberg::float_type&) { w.String("float"); } + void operator()(const iceberg::double_type&) { w.String("double"); } + void operator()(const iceberg::decimal_type& t) { + w.String("decimal({}, {})", t.precision, t.scale); + } + void operator()(const iceberg::date_type&) { w.String("date"); } + void operator()(const iceberg::time_type&) { w.String("time"); } + void operator()(const iceberg::timestamp_type&) { w.String("timestamp"); } + void operator()(const iceberg::timestamptz_type&) { + w.String("timestamptz"); + } + void operator()(const iceberg::string_type&) { w.String("string"); } + void operator()(const iceberg::uuid_type&) { w.String("uuid"); } + void operator()(const iceberg::fixed_type& t) { + w.String(fmt::format("fixed[{}]", t.length)); + } + void operator()(const iceberg::binary_type&) { w.String("binary"); } + + void operator()(const iceberg::primitive_type& t) { rjson_serialize(w, t); } + void operator()(const iceberg::struct_type& t) { rjson_serialize(w, t); } + void operator()(const iceberg::list_type& t) { rjson_serialize(w, t); } + void operator()(const iceberg::map_type& t) { rjson_serialize(w, t); } + +private: + json::Writer& w; +}; +} // anonymous namespace + +void rjson_serialize( + json::Writer& w, const iceberg::nested_field& f) { + w.StartObject(); + w.Key("id"); + w.Int(f.id()); + w.Key("name"); + w.String(f.name); + w.Key("required"); + w.Bool(bool(f.required)); + w.Key("type"); + rjson_serialize(w, f.type); + w.EndObject(); +} + +void rjson_serialize( + json::Writer& w, const iceberg::primitive_type& t) { + std::visit(rjson_visitor{w}, t); +} + +void rjson_serialize( + json::Writer& w, const iceberg::struct_type& t) { + w.StartObject(); + w.Key("fields"); + w.StartArray(); + for (const auto& f : t.fields) { + rjson_serialize(w, *f); + } + w.EndArray(); + w.EndObject(); +} + +void rjson_serialize( + json::Writer& w, const iceberg::list_type& t) { + w.StartObject(); + w.Key("element-id"); + w.Int(t.element_field->id); + w.Key("element-required"); + w.Bool(bool(t.element_field->required)); + w.Key("element"); + rjson_serialize(w, t.element_field->type); + w.EndObject(); +} + +void rjson_serialize( + json::Writer& w, const iceberg::map_type& t) { + w.StartObject(); + w.Key("key-id"); + w.Int(t.key_field->id); + w.Key("value-id"); + w.Int(t.value_field->id); + w.Key("value-required"); + w.Bool(bool(t.value_field->required)); + w.Key("key-type"); + rjson_serialize(w, t.key_field->type); + w.Key("value-type"); + rjson_serialize(w, t.value_field->type); + w.EndObject(); +} + +void rjson_serialize( + json::Writer& w, const iceberg::field_type& t) { + std::visit(rjson_visitor{w}, t); +} + +} // namespace json diff --git a/src/v/iceberg/datatypes_json.h b/src/v/iceberg/datatypes_json.h new file mode 100644 index 0000000000000..3491ff05b6484 --- /dev/null +++ b/src/v/iceberg/datatypes_json.h @@ -0,0 +1,39 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 +#pragma once + +#include "iceberg/datatypes.h" +#include "json/_include_first.h" +#include "json/document.h" +#include "json/stringbuffer.h" +#include "json/writer.h" + +namespace iceberg { + +field_type parse_type(const json::Value&); +nested_field_ptr parse_field(const json::Value&); + +} // namespace iceberg + +namespace json { + +void rjson_serialize( + json::Writer& w, const iceberg::nested_field& f); +void rjson_serialize( + json::Writer& w, const iceberg::primitive_type& t); +void rjson_serialize( + json::Writer& w, const iceberg::struct_type& t); +void rjson_serialize( + json::Writer& w, const iceberg::list_type& t); +void rjson_serialize( + json::Writer& w, const iceberg::map_type& t); +void rjson_serialize( + json::Writer& w, const iceberg::field_type& t); + +} // namespace json diff --git a/src/v/iceberg/tests/CMakeLists.txt b/src/v/iceberg/tests/CMakeLists.txt index 309e47ca80efe..eda83fe1630b3 100644 --- a/src/v/iceberg/tests/CMakeLists.txt +++ b/src/v/iceberg/tests/CMakeLists.txt @@ -7,6 +7,7 @@ rp_test( BINARY_NAME iceberg SOURCES manifest_serialization_test.cc + datatypes_json_test.cc LIBRARIES Avro::avro v::bytes diff --git a/src/v/iceberg/tests/datatypes_json_test.cc b/src/v/iceberg/tests/datatypes_json_test.cc new file mode 100644 index 0000000000000..e879d170e24d0 --- /dev/null +++ b/src/v/iceberg/tests/datatypes_json_test.cc @@ -0,0 +1,199 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#include "iceberg/datatypes.h" +#include "iceberg/datatypes_json.h" +#include "json/document.h" +#include "json/stringbuffer.h" + +#include + +using namespace iceberg; + +namespace { +ss::sstring type_to_json_str(const field_type& t) { + json::StringBuffer buf; + json::Writer w(buf); + rjson_serialize(w, t); + return buf.GetString(); +} +} // namespace + +// Round trip test for the struct type of a schema taken from +// https://github.com/apache/iceberg-go/blob/704a6e78c13ea63f1ff4bb387f7d4b365b5f0f82/schema_test.go#L644 +TEST(DataTypeJsonSerde, TestFieldType) { + struct_type expected_struct; + expected_struct.fields.emplace_back( + nested_field::create(1, "foo", field_required::no, string_type{})); + expected_struct.fields.emplace_back( + nested_field::create(2, "bar", field_required::yes, int_type{})); + expected_struct.fields.emplace_back( + nested_field::create(3, "baz", field_required::no, boolean_type{})); + + expected_struct.fields.emplace_back(nested_field::create( + 4, + "qux", + field_required::yes, + list_type::create(5, field_required::yes, string_type{}))); + + expected_struct.fields.emplace_back(nested_field::create( + 6, + "quux", + field_required::yes, + map_type::create( + 7, + string_type{}, + 8, + field_required::yes, + map_type::create( + 9, string_type{}, 10, field_required::yes, int_type{})))); + + struct_type location_struct; + location_struct.fields.emplace_back( + nested_field::create(13, "latitude", field_required::no, float_type{})); + location_struct.fields.emplace_back( + nested_field::create(14, "longitude", field_required::no, float_type{})); + expected_struct.fields.emplace_back(nested_field::create( + 11, + "location", + field_required::yes, + list_type::create(12, field_required::yes, std::move(location_struct)))); + + struct_type person_struct; + person_struct.fields.emplace_back( + nested_field::create(16, "name", field_required::no, string_type{})); + person_struct.fields.emplace_back( + nested_field::create(17, "age", field_required::yes, int_type{})); + expected_struct.fields.emplace_back(nested_field::create( + 15, "person", field_required::no, std::move(person_struct))); + field_type expected_type = std::move(expected_struct); + + const char* json_data = R"JSON({ + "type": "struct", + "schema-id": 1, + "identifier-field-ids": [1], + "fields": [ + { + "type": "string", + "id": 1, + "name": "foo", + "required": false + }, + { + "type": "int", + "id": 2, + "name": "bar", + "required": true + }, + { + "type": "boolean", + "id": 3, + "name": "baz", + "required": false + }, + { + "id": 4, + "name": "qux", + "required": true, + "type": { + "type": "list", + "element-id": 5, + "element-required": true, + "element": "string" + } + }, + { + "id": 6, + "name": "quux", + "required": true, + "type": { + "type": "map", + "key-id": 7, + "key": "string", + "value-id": 8, + "value": { + "type": "map", + "key-id": 9, + "key": "string", + "value-id": 10, + "value": "int", + "value-required": true + }, + "value-required": true + } + }, + { + "id": 11, + "name": "location", + "required": true, + "type": { + "type": "list", + "element-id": 12, + "element-required": true, + "element": { + "type": "struct", + "fields": [ + { + "id": 13, + "name": "latitude", + "type": "float", + "required": false + }, + { + "id": 14, + "name": "longitude", + "type": "float", + "required": false + } + ] + } + } + }, + { + "id": 15, + "name": "person", + "required": false, + "type": { + "type": "struct", + "fields": [ + { + "id": 16, + "name": "name", + "type": "string", + "required": false + }, + { + "id": 17, + "name": "age", + "type": "int", + "required": true + } + ] + } + } + ] + })JSON"; + const ss::sstring expected_type_str = type_to_json_str(expected_type); + + json::Document parsed_orig_json; + parsed_orig_json.Parse(json_data); + auto parsed_orig_type = parse_type(parsed_orig_json); + const ss::sstring parsed_orig_as_str = type_to_json_str(parsed_orig_type); + ASSERT_EQ(expected_type, parsed_orig_type) + << fmt::format("{}\nvs\n{}", expected_type_str, parsed_orig_as_str); + + json::Document parsed_roundtrip_json; + parsed_roundtrip_json.Parse(parsed_orig_as_str); + auto parsed_roundtrip_type_moved = parse_type(parsed_roundtrip_json); + auto parsed_roundtrip_type = std::move(parsed_roundtrip_type_moved); + const ss::sstring parsed_roundtrip_as_str = type_to_json_str( + parsed_roundtrip_type); + ASSERT_EQ(parsed_roundtrip_type, expected_type) + << fmt::format("{}\nvs\n{}", parsed_roundtrip_as_str, expected_type_str); +}