Skip to content

Commit

Permalink
fix(interactive): avoid introduce pb to stored procedure (#3861)
Browse files Browse the repository at this point in the history
  • Loading branch information
liulx20 committed May 31, 2024
1 parent 6be2147 commit a269a85
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 60 deletions.
60 changes: 60 additions & 0 deletions flex/engines/graph_db/database/graph_db_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
#include "flex/engines/graph_db/database/graph_db_session.h"
#include "flex/utils/app_utils.h"

#ifdef BUILD_HQPS
#include "flex/proto_generated_gie/stored_procedure.pb.h"
#include "nlohmann/json.hpp"
#endif // BUILD_HQPS

namespace gs {

ReadTransaction GraphDBSession::GetReadTransaction() const {
Expand Down Expand Up @@ -224,6 +229,61 @@ AppBase* GraphDBSession::GetApp(int type) {

#undef likely // likely

#ifdef BUILD_HQPS
Result<std::pair<uint8_t, std::string_view>>
GraphDBSession::parse_query_type_from_cypher_json(
const std::string_view& str_view) {
VLOG(10) << "string view: " << str_view;
nlohmann::json j;
try {
j = nlohmann::json::parse(str_view);
} catch (const nlohmann::json::parse_error& e) {
LOG(ERROR) << "Fail to parse json from input content: " << e.what();
return Result<std::pair<uint8_t, std::string_view>>(gs::Status(
StatusCode::InternalError,
"Fail to parse json from input content:" + std::string(e.what())));
}
auto query_name = j["query_name"].get<std::string>();
const auto& app_name_to_path_index = schema().GetPlugins();
if (app_name_to_path_index.count(query_name) <= 0) {
LOG(ERROR) << "Query name is not registered: " << query_name;
return Result<std::pair<uint8_t, std::string_view>>(gs::Status(
StatusCode::NotFound, "Query name is not registered: " + query_name));
}
if (j.contains("arguments")) {
for (auto& arg : j["arguments"]) {
VLOG(10) << "arg: " << arg;
}
}
VLOG(10) << "Query name: " << query_name;
return std::make_pair(app_name_to_path_index.at(query_name).second, str_view);
}

Result<std::pair<uint8_t, std::string_view>>
GraphDBSession::parse_query_type_from_cypher_internal(
const std::string_view& str_view) {
procedure::Query cur_query;
if (!cur_query.ParseFromArray(str_view.data(), str_view.size())) {
LOG(ERROR) << "Fail to parse query from input content";
return Result<std::pair<uint8_t, std::string_view>>(gs::Status(
StatusCode::InternalError, "Fail to parse query from input content"));
}
auto query_name = cur_query.query_name().name();
if (query_name.empty()) {
LOG(ERROR) << "Query name is empty";
return Result<std::pair<uint8_t, std::string_view>>(
gs::Status(StatusCode::NotFound, "Query name is empty"));
}
const auto& app_name_to_path_index = schema().GetPlugins();
if (app_name_to_path_index.count(query_name) <= 0) {
LOG(ERROR) << "Query name is not registered: " << query_name;
return Result<std::pair<uint8_t, std::string_view>>(gs::Status(
StatusCode::NotFound, "Query name is not registered: " + query_name));
}
return std::make_pair(app_name_to_path_index.at(query_name).second, str_view);
}
#endif

const AppMetric& GraphDBSession::GetAppMetric(int idx) const {
return app_metrics_[idx];
}
Expand Down
63 changes: 10 additions & 53 deletions flex/engines/graph_db/database/graph_db_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,6 @@
#include "flex/utils/property/column.h"
#include "flex/utils/result.h"

#ifdef BUILD_HQPS
#include "flex/proto_generated_gie/stored_procedure.pb.h"
#include "nlohmann/json.hpp"
#endif // BUILD_HQPS

namespace gs {

class GraphDB;
Expand Down Expand Up @@ -114,6 +109,12 @@ class GraphDBSession {
AppBase* GetApp(int idx);

private:
#ifdef BUILD_HQPS
Result<std::pair<uint8_t, std::string_view>>
parse_query_type_from_cypher_json(const std::string_view& input);
Result<std::pair<uint8_t, std::string_view>>
parse_query_type_from_cypher_internal(const std::string_view& input);
#endif // BUILD_HQPS
/**
* @brief Parse the input format of the query.
* There are four formats:
Expand Down Expand Up @@ -165,58 +166,14 @@ class GraphDBSession {
// For cypherJson there is no query-id provided. The query name is
// provided in the json string.
std::string_view str_view(input.data(), len - 1);
VLOG(10) << "string view: " << str_view;
nlohmann::json j;
try {
j = nlohmann::json::parse(str_view);
} catch (const nlohmann::json::parse_error& e) {
LOG(ERROR) << "Fail to parse json from input content: " << e.what();
return Result<std::pair<uint8_t, std::string_view>>(gs::Status(
StatusCode::InternalError,
"Fail to parse json from input content:" + std::string(e.what())));
}
auto query_name = j["query_name"].get<std::string>();
const auto& app_name_to_path_index = schema().GetPlugins();
if (app_name_to_path_index.count(query_name) <= 0) {
LOG(ERROR) << "Query name is not registered: " << query_name;
return Result<std::pair<uint8_t, std::string_view>>(
gs::Status(StatusCode::NotFound,
"Query name is not registered: " + query_name));
}
if (j.contains("arguments")) {
for (auto& arg : j["arguments"]) {
VLOG(10) << "arg: " << arg;
}
}
VLOG(10) << "Query name: " << query_name;
return std::make_pair(app_name_to_path_index.at(query_name).second,
std::string_view(str_data, len - 1));
return parse_query_type_from_cypher_json(str_view);
} else if (input_tag ==
static_cast<uint8_t>(InputFormat::kCypherInternalProcedure)) {
// For cypher internal procedure, the query_name is
// provided in the protobuf message.
procedure::Query cur_query;
if (!cur_query.ParseFromArray(input.data(), input.size() - 1)) {
LOG(ERROR) << "Fail to parse query from input content";
return Result<std::pair<uint8_t, std::string_view>>(
gs::Status(StatusCode::InternalError,
"Fail to parse query from input content"));
}
auto query_name = cur_query.query_name().name();
if (query_name.empty()) {
LOG(ERROR) << "Query name is empty";
return Result<std::pair<uint8_t, std::string_view>>(
gs::Status(StatusCode::NotFound, "Query name is empty"));
}
const auto& app_name_to_path_index = schema().GetPlugins();
if (app_name_to_path_index.count(query_name) <= 0) {
LOG(ERROR) << "Query name is not registered: " << query_name;
return Result<std::pair<uint8_t, std::string_view>>(
gs::Status(StatusCode::NotFound,
"Query name is not registered: " + query_name));
}
return std::make_pair(app_name_to_path_index.at(query_name).second,
std::string_view(str_data, len - 1));
std::string_view str_view(input.data(), len - 1);
return parse_query_type_from_cypher_internal(str_view);

}
#endif // BUILD_HQPS
else {
Expand Down
2 changes: 1 addition & 1 deletion flex/engines/hqps_db/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ add_library(hqps_plan_proto SHARED ${PROTO_SRCS_GIE})
target_include_directories(hqps_plan_proto PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>
$<BUILD_INTERFACE:${CMAKE_CURRENT_BINARY_DIR}>)
target_link_libraries(hqps_plan_proto PUBLIC protobuf::libprotobuf)
target_link_libraries(hqps_plan_proto PUBLIC ${Protobuf_LIBRARIES})
install_flex_target(hqps_plan_proto)

install(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
Expand Down
9 changes: 4 additions & 5 deletions flex/engines/http_server/handler/hqps_http_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,10 @@ seastar::future<std::unique_ptr<seastar::httpd::reply>> hqps_ic_handler::handle(

return executor_refs_[dst_executor]
.run_graph_db_query(query_param{std::move(req->content)})
.then([this, input_format
.then([input_format
#ifdef HAVE_OPENTELEMETRY_CPP
,
outer_span = outer_span
this, outer_span = outer_span
#endif // HAVE_OPENTELEMETRY_CPP
](auto&& output) {
if (output.content.size() < 4) {
Expand Down Expand Up @@ -425,10 +425,9 @@ hqps_adhoc_query_handler::handle(const seastar::sstring& path,
std::move(output.content));
});
})
.then([this
.then([
#ifdef HAVE_OPENTELEMETRY_CPP
,
outer_span = outer_span
this, outer_span = outer_span
#endif // HAVE_OPENTELEMETRY_CPP
](auto&& output) {
if (output.content.size() < 4) {
Expand Down
2 changes: 1 addition & 1 deletion flex/tests/hqps/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ if (BUILD_HQPS)
add_executable(${T_NAME} ${CMAKE_CURRENT_SOURCE_DIR}/${T_NAME}.cc)
target_link_libraries(${T_NAME} hqps_plan_proto flex_rt_mutable_graph flex_graph_db flex_utils ${GLOG_LIBRARIES} ${LIBGRAPELITE_LIBRARIES})
endforeach()
endif()
endif()

0 comments on commit a269a85

Please sign in to comment.