Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(interactive): Replacing nlohmann-json with rapidjson #4194

Merged
merged 27 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,3 @@
path = learning_engine/graphlearn-for-pytorch
url = https://github.com/alibaba/graphlearn-for-pytorch.git

[submodule "flex/third_party/nlohmann-json"]
path = flex/third_party/nlohmann-json
url = https://github.com/nlohmann/json.git
18 changes: 10 additions & 8 deletions flex/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,17 @@ if (BUILD_DOC)
endif(DOXYGEN_FOUND)
endif()

find_package(RapidJSON REQUIRED)
if (DEFINED RapidJSON_INCLUDE_DIRS) # rapidjson > 1.1.0
include_directories(${RapidJSON_INCLUDE_DIRS})
else () # rapidjson <= 1.1.0
include_directories(${RAPIDJSON_INCLUDE_DIRS})
endif ()

# Check whether ${CMAKE_SOURCE_DIR}/third_party/single_include/nlohmann/json.hpp exists
if (NOT EXISTS ${CMAKE_SOURCE_DIR}/third_party/nlohmann-json/single_include/nlohmann/json.hpp)
message(FATAL_ERROR "${CMAKE_SOURCE_DIR}/third_party/nlohmann-json/single_include/nlohmann/json.hpp not found, "
"please run `git submodule update --init` to download third_party")
endif()
include_directories(SYSTEM ${CMAKE_SOURCE_DIR}/third_party/nlohmann-json/single_include)
add_definitions(-DRAPIDJSON_HAS_CXX11=1)
add_definitions(-DRAPIDJSON_HAS_STDSTRING=1)
add_definitions(-DRAPIDJSON_HAS_CXX11_RVALUE_REFS=1)
add_definitions(-DRAPIDJSON_HAS_CXX11_RANGE_FOR=1)

if (BUILD_ODPS_FRAGMENT_LOADER)
include_directories(SYSTEM ${CMAKE_SOURCE_DIR}/third_party/odps/include)
Expand Down Expand Up @@ -277,8 +281,6 @@ set(CPACK_DEB_COMPONENT_INSTALL YES)

#install CMakeLists.txt.template to resources/
install(FILES resources/hqps/CMakeLists.txt.template DESTINATION lib/flex/)
#install header-only nlohmann-json.hpp to include/
install(FILES ${CMAKE_SOURCE_DIR}/third_party/nlohmann-json/single_include/nlohmann/json.hpp DESTINATION include/nlohmann)

if(USE_PTHASH)
install(DIRECTORY ${PROJECT_SOURCE_DIR}/third_party/murmurhash
Expand Down
2 changes: 1 addition & 1 deletion flex/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ RUN apt-get update && apt-get -y install sudo locales g++ cmake openjdk-11-jre-h


RUN apt-get update && apt-get install -y protobuf-compiler libprotobuf-dev maven git vim curl \
wget python3 make libc-ares-dev doxygen python3-pip net-tools curl default-jdk nlohmann-json3-dev \
wget python3 make libc-ares-dev doxygen python3-pip net-tools curl default-jdk rapidjson-dev \
libgoogle-glog-dev libopenmpi-dev libboost-all-dev libyaml-cpp-dev libcrypto++-dev openssl libcurl4-openssl-dev && \
apt-get clean -y && rm -rf /var/lib/apt/lists/*

Expand Down
148 changes: 79 additions & 69 deletions flex/engines/graph_db/database/graph_db_operations.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,25 @@
#include <unordered_map>
#include <vector>

#include <nlohmann/json.hpp>

#include "flex/engines/graph_db/database/graph_db.h"
#include "flex/engines/graph_db/database/graph_db_operations.h"
#include "flex/engines/graph_db/database/graph_db_session.h"
#include "flex/utils/service_utils.h"
#include "utils/result.h"
#include "utils/service_utils.h"

namespace gs {

Result<std::string> GraphDBOperations::CreateVertex(
GraphDBSession& session, nlohmann::json&& input_json) {
GraphDBSession& session, rapidjson::Document&& input_json) {
std::vector<VertexData> vertex_data;
std::vector<EdgeData> edge_data;
// Check if the input json contains vertex_request and edge_request
if (input_json.contains("vertex_request") == false ||
input_json["vertex_request"].is_array() == false ||
input_json["vertex_request"].size() == 0 ||
(input_json.contains("edge_request") == true &&
input_json["edge_request"].is_array() == false)) {
if (input_json.HasMember("vertex_request") == false ||
input_json["vertex_request"].IsArray() == false ||
input_json["vertex_request"].Size() == 0 ||
(input_json.HasMember("edge_request") &&
input_json["edge_request"].IsArray() == false)) {
return Result<std::string>(gs::Status(
StatusCode::INVALID_SCHEMA,
"Invalid input json, vertex_request and edge_request should be array "
Expand All @@ -46,11 +45,11 @@ Result<std::string> GraphDBOperations::CreateVertex(
// input vertex data and edge data
try {
// vertex data
for (auto& vertex_insert : input_json["vertex_request"]) {
for (auto& vertex_insert : input_json["vertex_request"].GetArray()) {
vertex_data.push_back(inputVertex(vertex_insert, schema, session));
}
// edge data
for (auto& edge_insert : input_json["edge_request"]) {
for (auto& edge_insert : input_json["edge_request"].GetArray()) {
edge_data.push_back(inputEdge(edge_insert, schema, session));
}
LOG(INFO) << "CreateVertex edge_data: " << edge_data.size();
Expand All @@ -62,26 +61,27 @@ Result<std::string> GraphDBOperations::CreateVertex(
auto insert_result =
insertVertex(std::move(vertex_data), std::move(edge_data), session);
if (insert_result.ok()) {
nlohmann::json result;
result["message"] = "Vertex data is successfully inserted";
return Result<std::string>(result.dump());
rapidjson::Document result(rapidjson::kObjectType);
result.AddMember("message", "Vertex data is successfully inserted",
result.GetAllocator());
return Result<std::string>(rapidjson_stringify(result));
}
return Result<std::string>(insert_result);
}
Result<std::string> GraphDBOperations::CreateEdge(GraphDBSession& session,
nlohmann::json&& input_json) {
Result<std::string> GraphDBOperations::CreateEdge(
GraphDBSession& session, rapidjson::Document&& input_json) {
std::vector<VertexData> vertex_data;
std::vector<EdgeData> edge_data;
// Check if the input json contains edge_request
if (input_json.is_array() == false || input_json.size() == 0) {
if (input_json.IsArray() == false || input_json.Size() == 0) {
return Result<std::string>(gs::Status(
StatusCode::INVALID_SCHEMA,
"Invalid input json, edge_request should be array and not empty"));
}
const Schema& schema = session.schema();
// input edge data
try {
for (auto& edge_insert : input_json) {
for (auto& edge_insert : input_json.GetArray()) {
edge_data.push_back(inputEdge(edge_insert, schema, session));
}
} catch (std::exception& e) {
Expand All @@ -91,14 +91,15 @@ Result<std::string> GraphDBOperations::CreateEdge(GraphDBSession& session,
}
auto insert_result = insertEdge(std::move(edge_data), session);
if (insert_result.ok()) {
nlohmann::json result;
result["message"] = "Edge data is successfully inserted";
return Result<std::string>(result.dump());
rapidjson::Document result(rapidjson::kObjectType);
result.AddMember("message", "Edge data is successfully inserted",
result.GetAllocator());
return Result<std::string>(rapidjson_stringify(result));
}
return Result<std::string>(insert_result);
}
Result<std::string> GraphDBOperations::UpdateVertex(
GraphDBSession& session, nlohmann::json&& input_json) {
GraphDBSession& session, rapidjson::Document&& input_json) {
std::vector<VertexData> vertex_data;
std::vector<EdgeData> edge_data;
const Schema& schema = session.schema();
Expand All @@ -112,14 +113,15 @@ Result<std::string> GraphDBOperations::UpdateVertex(
}
auto update_result = updateVertex(std::move(vertex_data), session);
if (update_result.ok()) {
nlohmann::json result;
result["message"] = "Successfully update Vertex";
return Result<std::string>(result.dump());
rapidjson::Document result(rapidjson::kObjectType);
result.AddMember("message", "Successfully update Vertex",
result.GetAllocator());
return Result<std::string>(rapidjson_stringify(result));
}
return Result<std::string>(update_result);
}
Result<std::string> GraphDBOperations::UpdateEdge(GraphDBSession& session,
nlohmann::json&& input_json) {
Result<std::string> GraphDBOperations::UpdateEdge(
GraphDBSession& session, rapidjson::Document&& input_json) {
std::vector<VertexData> vertex_data;
std::vector<EdgeData> edge_data;
const Schema& schema = session.schema();
Expand All @@ -133,24 +135,25 @@ Result<std::string> GraphDBOperations::UpdateEdge(GraphDBSession& session,
}
auto update_result = updateEdge(std::move(edge_data), session);
if (update_result.ok()) {
nlohmann::json result;
result["message"] = "Successfully update Edge";
return Result<std::string>(result.dump());
rapidjson::Document result(rapidjson::kObjectType);
result.AddMember("message", "Successfully update Edge",
result.GetAllocator());
return Result<std::string>(rapidjson_stringify(result));
}
return Result<std::string>(update_result);
}
Result<std::string> GraphDBOperations::GetVertex(
GraphDBSession& session,
std::unordered_map<std::string, std::string>&& params) {
nlohmann::json result;
rapidjson::Document result(rapidjson::kObjectType);
std::vector<VertexData> vertex_data;
std::vector<EdgeData> edge_data;
std::vector<std::string> property_names;
const Schema& schema = session.schema();
// input vertex data
VertexData vertex;
std::string label = params["label"];
result["label"] = label;
result.AddMember("label", label, result.GetAllocator());
vertex.pk_value = Any(std::string(params["primary_key_value"]));
auto check_result =
checkVertexSchema(schema, vertex, label, property_names, true);
Expand All @@ -160,15 +163,15 @@ Result<std::string> GraphDBOperations::GetVertex(
vertex_data.push_back(vertex);
auto get_result = getVertex(std::move(vertex_data), property_names, session);
if (get_result.ok()) {
result["values"] = get_result.value();
return Result<std::string>(result.dump());
result.AddMember("values", get_result.value(), result.GetAllocator());
return Result<std::string>(rapidjson_stringify(result));
}
return Result<std::string>(get_result.status());
}
Result<std::string> GraphDBOperations::GetEdge(
GraphDBSession& session,
std::unordered_map<std::string, std::string>&& params) {
nlohmann::json result;
rapidjson::Document result(rapidjson::kObjectType);
std::vector<VertexData> vertex_data;
std::vector<EdgeData> edge_data;
const Schema& schema = session.schema();
Expand All @@ -188,44 +191,47 @@ Result<std::string> GraphDBOperations::GetEdge(
return Result<std::string>(check_result);
}
edge_data.push_back(edge);
result["src_label"] = src_label;
result["dst_label"] = dst_label;
result["edge_label"] = edge_label;
result["src_primary_key_value"] = src_pk_value;
result["dst_primary_key_value"] = dst_pk_value;
result.AddMember("src_label", src_label, result.GetAllocator());
result.AddMember("dst_label", dst_label, result.GetAllocator());
result.AddMember("edge_label", edge_label, result.GetAllocator());
result.AddMember("src_primary_key_value", src_pk_value,
result.GetAllocator());
result.AddMember("dst_primary_key_value", dst_pk_value,
result.GetAllocator());
if (property_name.empty()) {
result["properties"] = nlohmann::json::array();
return Result<std::string>(result.dump());
result.AddMember("properties", rapidjson::Value(rapidjson::kArrayType),
result.GetAllocator());
return Result<std::string>(rapidjson_stringify(result));
}
auto get_result = getEdge(std::move(edge_data), property_name, session);
if (get_result.ok()) {
result["properties"] = get_result.value();
return Result<std::string>(result.dump());
result.AddMember("properties", get_result.value(), result.GetAllocator());
return Result<std::string>(rapidjson_stringify(result));
}
return Result<std::string>(get_result.status());
}
Result<std::string> GraphDBOperations::DeleteVertex(
GraphDBSession& session, nlohmann::json&& input_json) {
GraphDBSession& session, rapidjson::Document&& input_json) {
// not implemented
return Result<std::string>(StatusCode::UNIMPLEMENTED,
"delete_vertex is not implemented");
}
Result<std::string> GraphDBOperations::DeleteEdge(GraphDBSession& session,
nlohmann::json&& input_json) {
Result<std::string> GraphDBOperations::DeleteEdge(
GraphDBSession& session, rapidjson::Document&& input_json) {
// not implemented
return Result<std::string>(StatusCode::UNIMPLEMENTED,
"delete_edge is not implemented");
}

VertexData GraphDBOperations::inputVertex(const nlohmann::json& vertex_json,
VertexData GraphDBOperations::inputVertex(const rapidjson::Value& vertex_json,
const Schema& schema,
GraphDBSession& session) {
VertexData vertex;
std::string label = jsonToString(vertex_json["label"]);
vertex.pk_value = Any(jsonToString(vertex_json["primary_key_value"]));
std::unordered_set<std::string> property_names;
std::vector<std::string> property_names_arr;
for (auto& property : vertex_json["properties"]) {
for (auto& property : vertex_json["properties"].GetArray()) {
auto name_string = jsonToString(property["name"]);
auto value_string = jsonToString(property["value"]);
if (property_names.find(name_string) != property_names.end()) {
Expand All @@ -244,7 +250,7 @@ VertexData GraphDBOperations::inputVertex(const nlohmann::json& vertex_json,
}
return vertex;
}
EdgeData GraphDBOperations::inputEdge(const nlohmann::json& edge_json,
EdgeData GraphDBOperations::inputEdge(const rapidjson::Value& edge_json,
const Schema& schema,
GraphDBSession& session) {
EdgeData edge;
Expand All @@ -254,15 +260,15 @@ EdgeData GraphDBOperations::inputEdge(const nlohmann::json& edge_json,
edge.src_pk_value = Any(jsonToString(edge_json["src_primary_key_value"]));
edge.dst_pk_value = Any(jsonToString(edge_json["dst_primary_key_value"]));
// Check that all parameters in the parameter
if (edge_json["properties"].size() > 1) {
if (edge_json["properties"].Size() > 1) {
throw std::runtime_error(
"size should be 1(only support single property edge)");
}
std::string property_name = "";
if (edge_json["properties"].size() == 1) {
if (edge_json["properties"].Size() == 1) {
edge.property_value =
Any(jsonToString(edge_json["properties"][0]["value"]));
property_name = edge_json["properties"][0]["name"];
property_name = edge_json["properties"][0]["name"].GetString();
}
auto check_result = checkEdgeSchema(schema, edge, src_label, dst_label,
edge_label, property_name);
Expand Down Expand Up @@ -597,37 +603,39 @@ Status GraphDBOperations::updateEdge(std::vector<EdgeData>&& edge_data,
return Status::OK();
}

Result<nlohmann::json> GraphDBOperations::getVertex(
Result<rapidjson::Value> GraphDBOperations::getVertex(
std::vector<VertexData>&& vertex_data,
const std::vector<std::string>& property_names, GraphDBSession& session) {
try {
auto& vertex = vertex_data[0];
nlohmann::json result = nlohmann::json::array();
rapidjson::Document result(rapidjson::kArrayType);
auto txn = session.GetReadTransaction();
auto vertex_db = txn.FindVertex(vertex.label_id, vertex.pk_value);
if (vertex_db.IsValid() == false) {
txn.Abort();
throw std::runtime_error("Vertex not found");
}
for (int i = 0; i < vertex_db.FieldNum(); i++) {
nlohmann::json values;
values["name"] = property_names[i];
values["value"] = vertex_db.GetField(i).to_string();
result.push_back(values);
rapidjson::Value values(rapidjson::kObjectType);
values.AddMember("name", property_names[i], result.GetAllocator());
values.AddMember("value", vertex_db.GetField(i).to_string(),
result.GetAllocator());
result.PushBack(values, result.GetAllocator());
}
txn.Commit();
return Result<nlohmann::json>(result);
return Result<rapidjson::Value>(std::move(result));
} catch (std::exception& e) {
return Result<nlohmann::json>(Status(StatusCode::INVALID_SCHEMA, e.what()));
return Result<rapidjson::Value>(
Status(StatusCode::INVALID_SCHEMA, e.what()));
}
}

Result<nlohmann::json> GraphDBOperations::getEdge(
Result<rapidjson::Value> GraphDBOperations::getEdge(
std::vector<EdgeData>&& edge_data, const std::string& property_name,
GraphDBSession& session) {
try {
const auto& edge = edge_data[0];
nlohmann::json result = nlohmann::json::array();
rapidjson::Document result(rapidjson::kArrayType);
auto txn = session.GetReadTransaction();
vid_t src_vid, dst_vid;
if (txn.GetVertexIndex(edge.src_label_id, edge.src_pk_value, src_vid) ==
Expand All @@ -642,20 +650,22 @@ Result<nlohmann::json> GraphDBOperations::getEdge(
edgeIt.IsValid(); edgeIt.Next()) {
if (edgeIt.GetNeighbor() != dst_vid)
continue;
nlohmann::json push_json;
push_json["name"] = property_name;
push_json["value"] = edgeIt.GetData().to_string();
result.push_back(push_json);
rapidjson::Value push_json(rapidjson::kObjectType);
push_json.AddMember("name", property_name, result.GetAllocator());
push_json.AddMember("value", edgeIt.GetData().to_string(),
result.GetAllocator());
result.PushBack(push_json, result.GetAllocator());
break;
}
if (result.empty()) {
if (result.Empty()) {
txn.Abort();
throw std::runtime_error("Edge not found");
}
txn.Commit();
return Result<nlohmann::json>(result);
return Result<rapidjson::Value>(std::move(result));
} catch (std::exception& e) {
return Result<nlohmann::json>(Status(StatusCode::INVALID_SCHEMA, e.what()));
return Result<rapidjson::Value>(
Status(StatusCode::INVALID_SCHEMA, e.what()));
}
}

Expand Down
Loading
Loading