Skip to content

Commit

Permalink
iceberg: json serde for data types
Browse files Browse the repository at this point in the history
  • Loading branch information
andrwng committed Jul 15, 2024
1 parent 330c910 commit 77d6336
Show file tree
Hide file tree
Showing 5 changed files with 531 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/v/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ v_cc_library(
NAME iceberg
SRCS
datatypes.cc
datatypes_json.cc
DEPS
v::container
v::strings
)
add_dependencies(v_iceberg iceberg_avro_codegen)

Expand Down
290 changes: 290 additions & 0 deletions src/v/iceberg/datatypes_json.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,290 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

#include "iceberg/datatypes_json.h"

#include "iceberg/datatypes.h"
#include "json/document.h"
#include "strings/string_switch.h"

#include <stdexcept>
#include <string>

namespace iceberg {

namespace {
std::string_view
extract_between(char start_ch, char end_ch, const std::string_view& s) {
auto start_pos = s.find(start_ch);
auto end_pos = s.find(end_ch, start_pos);

if (start_pos != std::string::npos && end_pos != std::string::npos) {
return s.substr(start_pos + 1, end_pos - start_pos - 1);
}
throw std::invalid_argument(
fmt::format("Missing wrappers '{}' or '{}' in {}", start_ch, end_ch, s));
}

decimal_type parse_decimal(const std::string_view& type_str) {
auto ps_str = extract_between('(', ')', type_str);
size_t pos = ps_str.find(',');
if (pos != std::string::npos) {
// Extract substrings before and after the comma
auto p_str = ps_str.substr(0, pos);
auto s_str = ps_str.substr(pos + 1);
return decimal_type{
.precision = static_cast<uint32_t>(std::stoul(ss::sstring(p_str))),
.scale = static_cast<uint32_t>(std::stoul(ss::sstring(s_str)))};
}
throw std::invalid_argument(fmt::format(
"Decimal requires format decimal(uint32, uint32): {}", type_str));
}

fixed_type parse_fixed(const std::string_view& type_str) {
auto l_str = extract_between('[', ']', type_str);
auto l = std::stoull(ss::sstring(l_str));
return fixed_type{l};
}

const json::Value&
parse_required(const json::Value& v, const char* member_name) {
if (!v.IsObject()) {
throw std::invalid_argument(
fmt::format("Expected JSON object to parse field '{}'", member_name));
}
auto iter = v.FindMember(member_name);
if (iter == v.MemberEnd()) {
throw std::invalid_argument(
fmt::format("No member named '{}'", member_name));
}
return iter->value;
}

ss::sstring parse_required_str(const json::Value& v, const char* member_name) {
const auto& str_json = parse_required(v, member_name);
if (!str_json.IsString()) {
throw std::invalid_argument(
fmt::format("Expected string for field '{}'", member_name));
}
return str_json.GetString();
}

int32_t parse_required_i32(const json::Value& v, const char* member_name) {
const auto& int_json = parse_required(v, member_name);
if (!int_json.IsInt()) {
throw std::invalid_argument(
fmt::format("Expected integer for field '{}'", member_name));
}
return int_json.GetInt();
}

bool parse_required_bool(const json::Value& v, const char* member_name) {
const auto& bool_json = parse_required(v, member_name);
if (!bool_json.IsBool()) {
throw std::invalid_argument(
fmt::format("Expected bool for field '{}'", member_name));
}
return bool_json.GetBool();
}

struct_type parse_struct(const json::Value&);
list_type parse_list(const json::Value&);
map_type parse_map(const json::Value&);

struct_type parse_struct(const json::Value& v) {
struct_type ret;
const auto& fields_json = parse_required(v, "fields");
const auto& fields_array = fields_json.GetArray();
ret.fields.reserve(fields_array.Size());
for (const auto& field_json : fields_array) {
ret.fields.emplace_back(parse_field(field_json));
}
return ret;
}

list_type parse_list(const json::Value& v) {
const auto& element_json = parse_required(v, "element");
return list_type::create(
parse_required_i32(v, "element-id"),
parse_required_bool(v, "element-required") ? field_required::yes
: field_required::no,
parse_type(element_json));
}

map_type parse_map(const json::Value& v) {
const auto& key_json = parse_required(v, "key");
const auto& val_json = parse_required(v, "value");
auto value_required = parse_required_bool(v, "value-required");
auto key_type = parse_type(key_json);
return map_type::create(
parse_required_i32(v, "key-id"),
parse_type(key_json),
parse_required_i32(v, "value-id"),
value_required ? field_required::yes : field_required::no,
parse_type(val_json));
}

} // namespace

nested_field_ptr parse_field(const json::Value& v) {
auto id = parse_required_i32(v, "id");
auto name = parse_required_str(v, "name");
auto required = parse_required_bool(v, "required");
const auto& type_json = parse_required(v, "type");
auto type = parse_type(type_json);
return nested_field::create(
id,
std::move(name),
required ? field_required::yes : field_required::no,
std::move(type));
}

field_type parse_type(const json::Value& v) {
if (v.IsString()) {
auto v_str = std::string_view{v.GetString(), v.GetStringLength()};
if (v_str.starts_with("decimal")) {
return parse_decimal(v_str);
}
if (v_str.starts_with("fixed")) {
return parse_fixed(v_str);
}
return string_switch<field_type>(v_str)
.match("boolean", boolean_type{})
.match("int", int_type{})
.match("long", long_type{})
.match("float", float_type{})
.match("double", double_type{})
.match("date", date_type{})
.match("time", time_type{})
.match("timestamp", timestamp_type{})
.match("timestamptz", timestamptz_type{})
.match("string", string_type{})
.match("uuid", uuid_type{})
.match("binary", binary_type{});
}
if (!v.IsObject()) {
throw std::invalid_argument("Expected string or object for type field");
}
auto type = parse_required_str(v, "type");
if (type == "struct") {
return parse_struct(v);
} else if (type == "list") {
return parse_list(v);
} else if (type == "map") {
return parse_map(v);
}
throw std::invalid_argument(fmt::format(
"Expected type field of 'struct', 'list', or 'map': {}", type));
}

} // namespace iceberg

namespace json {

namespace {
class rjson_visitor {
public:
explicit rjson_visitor(json::Writer<json::StringBuffer>& w)
: w(w) {}
void operator()(const iceberg::boolean_type&) { w.String("boolean"); }
void operator()(const iceberg::int_type&) { w.String("int"); }
void operator()(const iceberg::long_type&) { w.String("long"); }
void operator()(const iceberg::float_type&) { w.String("float"); }
void operator()(const iceberg::double_type&) { w.String("double"); }
void operator()(const iceberg::decimal_type& t) {
w.String("decimal({}, {})", t.precision, t.scale);
}
void operator()(const iceberg::date_type&) { w.String("date"); }
void operator()(const iceberg::time_type&) { w.String("time"); }
void operator()(const iceberg::timestamp_type&) { w.String("timestamp"); }
void operator()(const iceberg::timestamptz_type&) {
w.String("timestamptz");
}
void operator()(const iceberg::string_type&) { w.String("string"); }
void operator()(const iceberg::uuid_type&) { w.String("uuid"); }
void operator()(const iceberg::fixed_type& t) {
w.String(fmt::format("fixed[{}]", t.length));
}
void operator()(const iceberg::binary_type&) { w.String("binary"); }

void operator()(const iceberg::primitive_type& t) { rjson_serialize(w, t); }
void operator()(const iceberg::struct_type& t) { rjson_serialize(w, t); }
void operator()(const iceberg::list_type& t) { rjson_serialize(w, t); }
void operator()(const iceberg::map_type& t) { rjson_serialize(w, t); }

private:
json::Writer<json::StringBuffer>& w;
};
} // anonymous namespace

void rjson_serialize(
json::Writer<json::StringBuffer>& w, const iceberg::nested_field& f) {
w.StartObject();
w.Key("id");
w.Int(f.id());
w.Key("name");
w.String(f.name);
w.Key("required");
w.Bool(bool(f.required));
w.Key("type");
rjson_serialize(w, f.type);
w.EndObject();
}

void rjson_serialize(
json::Writer<json::StringBuffer>& w, const iceberg::primitive_type& t) {
std::visit(rjson_visitor{w}, t);
}

void rjson_serialize(
json::Writer<json::StringBuffer>& w, const iceberg::struct_type& t) {
w.StartObject();
w.Key("fields");
w.StartArray();
for (const auto& f : t.fields) {
rjson_serialize(w, *f);
}
w.EndArray();
w.EndObject();
}

void rjson_serialize(
json::Writer<json::StringBuffer>& w, const iceberg::list_type& t) {
w.StartObject();
w.Key("element-id");
w.Int(t.element_field->id);
w.Key("element-required");
w.Bool(bool(t.element_field->required));
w.Key("element");
rjson_serialize(w, t.element_field->type);
w.EndObject();
}

void rjson_serialize(
json::Writer<json::StringBuffer>& w, const iceberg::map_type& t) {
w.StartObject();
w.Key("key-id");
w.Int(t.key_field->id);
w.Key("value-id");
w.Int(t.value_field->id);
w.Key("value-required");
w.Bool(bool(t.value_field->required));
w.Key("key-type");
rjson_serialize(w, t.key_field->type);
w.Key("value-type");
rjson_serialize(w, t.value_field->type);
w.EndObject();
}

void rjson_serialize(
json::Writer<json::StringBuffer>& w, const iceberg::field_type& t) {
std::visit(rjson_visitor{w}, t);
}

} // namespace json
39 changes: 39 additions & 0 deletions src/v/iceberg/datatypes_json.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0
#pragma once

#include "iceberg/datatypes.h"
#include "json/_include_first.h"
#include "json/document.h"
#include "json/stringbuffer.h"
#include "json/writer.h"

namespace iceberg {

field_type parse_type(const json::Value&);
nested_field_ptr parse_field(const json::Value&);

} // namespace iceberg

namespace json {

void rjson_serialize(
json::Writer<json::StringBuffer>& w, const iceberg::nested_field& f);
void rjson_serialize(
json::Writer<json::StringBuffer>& w, const iceberg::primitive_type& t);
void rjson_serialize(
json::Writer<json::StringBuffer>& w, const iceberg::struct_type& t);
void rjson_serialize(
json::Writer<json::StringBuffer>& w, const iceberg::list_type& t);
void rjson_serialize(
json::Writer<json::StringBuffer>& w, const iceberg::map_type& t);
void rjson_serialize(
json::Writer<json::StringBuffer>& w, const iceberg::field_type& t);

} // namespace json
1 change: 1 addition & 0 deletions src/v/iceberg/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ rp_test(
BINARY_NAME iceberg
SOURCES
manifest_serialization_test.cc
datatypes_json_test.cc
LIBRARIES
Avro::avro
v::bytes
Expand Down
Loading

0 comments on commit 77d6336

Please sign in to comment.