Skip to content

Commit

Permalink
add count_vertices as builtin app
Browse files Browse the repository at this point in the history
Committed-by: xiaolei.zl from Dev container
  • Loading branch information
zhanglei1949 committed Sep 19, 2024
1 parent b6d413c commit 2559872
Show file tree
Hide file tree
Showing 11 changed files with 197 additions and 11 deletions.
3 changes: 2 additions & 1 deletion flex/engines/graph_db/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
add_subdirectory(runtime)
file(GLOB_RECURSE GRAPH_DB_SRC_FILES "${CMAKE_CURRENT_SOURCE_DIR}/app/*.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/database/*.cc")
"${CMAKE_CURRENT_SOURCE_DIR}/database/*.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/app/builtin/*.cc")

add_library(flex_graph_db SHARED ${GRAPH_DB_SRC_FILES})

Expand Down
40 changes: 40 additions & 0 deletions flex/engines/graph_db/app/builtin/count_vertices.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#include "flex/engines/graph_db/app/builtin/count_vertices.h"

namespace gs {

bool CountVertices::DoQuery(GraphDBSession& sess, Decoder& input,
Encoder& output) {
// First get the read transaction.
auto txn = sess.GetReadTransaction();
// We expect one param of type string from decoder.
if (input.empty()) {
return false;
}
std::string label_name{input.get_string()};
const auto& schema = txn.schema();
if (!schema.has_vertex_label(label_name)) {
output.put_string_view("The requested label doesn't exits.");
return false; // The requested label doesn't exits.
}
auto label_id = schema.get_vertex_label_id(label_name);
// The vertices are labeled internally from 0 ~ vertex_label_num, accumulate
auto vertex_num = txn.GetVertexNum(label_id);
// the count.
results::CollectiveResults results;
auto result = results.add_results();
result->mutable_record()
->add_columns()
->mutable_entry()
->mutable_element()
->mutable_object()
->set_i32(vertex_num);

output.put_string_view(results.SerializeAsString());
txn.Commit();
return true;
}

AppWrapper CountVerticesFactory::CreateApp(const GraphDB& db) {
return AppWrapper(new CountVertices(), NULL);
}
} // namespace gs
39 changes: 39 additions & 0 deletions flex/engines/graph_db/app/builtin/count_vertices.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/** Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef ENGINES_GRAPH_DB_APP_BUILDIN_COUNT_VERTICES_H_
#define ENGINES_GRAPH_DB_APP_BUILDIN_COUNT_VERTICES_H_
#include "flex/engines/graph_db/database/graph_db_session.h"
#include "flex/engines/hqps_db/app/interactive_app_base.h"

namespace gs {
// A simple app to count the number of vertices of a given label.
class CountVertices : public CypherInternalPbWriteAppBase {
public:
CountVertices() {}
bool DoQuery(GraphDBSession& sess, Decoder& input, Encoder& output) override;
};

class CountVerticesFactory : public AppFactoryBase {
public:
CountVerticesFactory() = default;
~CountVerticesFactory() = default;

AppWrapper CreateApp(const GraphDB& db) override;
};

} // namespace gs

#endif // ENGINES_GRAPH_DB_APP_BUILDIN_COUNT_VERTICES_H_
3 changes: 3 additions & 0 deletions flex/engines/graph_db/database/graph_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

#include "flex/engines/graph_db/database/graph_db.h"
#include "flex/engines/graph_db/app/adhoc_app.h"
#include "flex/engines/graph_db/app/builtin/count_vertices.h"
#include "flex/engines/graph_db/app/hqps_app.h"
#include "flex/engines/graph_db/app/server_app.h"
#include "flex/engines/graph_db/database/graph_db_session.h"
Expand Down Expand Up @@ -405,6 +406,8 @@ void GraphDB::initApps(
}
// Builtin apps
app_factories_[0] = std::make_shared<ServerAppFactory>();
app_factories_[Schema::BUILTIN_COUNT_VERTICES_PLUGIN_ID] =
std::make_shared<CountVerticesFactory>();
app_factories_[Schema::HQPS_ADHOC_READ_PLUGIN_ID] =
std::make_shared<HQPSAdhocReadAppFactory>();
app_factories_[Schema::HQPS_ADHOC_WRITE_PLUGIN_ID] =
Expand Down
7 changes: 7 additions & 0 deletions flex/engines/graph_db/database/graph_db_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,13 @@ GraphDBSession::parse_query_type_from_cypher_internal(
gs::Status(StatusCode::NOT_FOUND, "Query name is empty"));
}
const auto& app_name_to_path_index = schema().GetPlugins();
// First check whether the query name is builtin query
for (int i = 0; i < Schema::BUILTIN_PLUGIN_NUM; ++i) {
std::string builtin_query_name = Schema::BUILTIN_PLUGIN_NAMES[i];
if (query_name == builtin_query_name) {
return std::make_pair(Schema::BUILTIN_PLUGIN_IDS[i], str_view);
}
}
if (app_name_to_path_index.count(query_name) <= 0) {
LOG(ERROR) << "Query name is not registered: " << query_name;
return Result<std::pair<uint8_t, std::string_view>>(gs::Status(
Expand Down
7 changes: 4 additions & 3 deletions flex/engines/hqps_db/app/interactive_app_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@

namespace gs {

void put_argument(gs::Encoder& encoder, const procedure::Argument& argument) {
inline void put_argument(gs::Encoder& encoder,
const procedure::Argument& argument) {
auto& value = argument.value();
auto item_case = value.item_case();
switch (item_case) {
Expand All @@ -46,8 +47,8 @@ void put_argument(gs::Encoder& encoder, const procedure::Argument& argument) {
}
}

bool parse_input_argument(gs::Decoder& raw_input,
gs::Encoder& argument_encoder) {
inline bool parse_input_argument(gs::Decoder& raw_input,
gs::Encoder& argument_encoder) {
if (raw_input.size() == 0) {
VLOG(10) << "No arguments found in input";
return true;
Expand Down
38 changes: 38 additions & 0 deletions flex/engines/http_server/actor/admin_actor.act.cc
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,12 @@ seastar::future<seastar::sstring> invoke_creating_procedure(
if (json.HasMember("name")) {
// Currently we need id== name
rapidjson::Value& name = json["name"];
if (gs::Schema::IsBuiltinPlugin(name.GetString())) {
return seastar::make_exception_future<seastar::sstring>(
std::string(
"The plugin name is a builtin plugin, cannot be created: ") +
name.GetString());
}
rapidjson::Value name_copy(name, json.GetAllocator());
json.AddMember("id", name_copy, json.GetAllocator());
}
Expand Down Expand Up @@ -516,6 +522,9 @@ seastar::future<admin_query_result> admin_actor::run_get_graph_meta(
// There can also be procedures that builtin in the graph meta.
for (auto& plugin_meta : graph_meta.plugin_metas) {
add_runnable_info(plugin_meta);
if (plugin_meta.bound_graph.empty()) {
plugin_meta.bound_graph = query_param.content;
}
}
graph_meta.plugin_metas.insert(graph_meta.plugin_metas.end(),
all_plugin_metas.begin(),
Expand Down Expand Up @@ -697,6 +706,15 @@ admin_actor::get_procedure_by_procedure_name(
auto get_procedure_res =
metadata_store_->GetPluginMeta(graph_id, procedure_id);

auto builtin_plugins = gs::get_builtin_plugin_metas();
for (auto& builtin_plugin : builtin_plugins) {
if (builtin_plugin.id == procedure_id.c_str()) {
add_runnable_info(builtin_plugin);
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(builtin_plugin.ToJson()));
}
}

if (get_procedure_res.ok()) {
VLOG(10) << "Successfully get procedure procedures";
auto& proc_meta = get_procedure_res.value();
Expand Down Expand Up @@ -803,6 +821,16 @@ seastar::future<admin_query_result> admin_actor::delete_procedure(
gs::Result<seastar::sstring>(graph_meta_res.status()));
}

if (gs::Schema::IsBuiltinPlugin(procedure_id)) {
LOG(ERROR) << "The plugin name is a builtin plugin, cannot be deleted: "
<< procedure_id;
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(gs::Status(
gs::StatusCode::ILLEGAL_OPERATION,
"The plugin name is a builtin plugin, cannot be deleted: " +
procedure_id)));
}

auto get_procedure_res =
metadata_store_->GetPluginMeta(graph_id, procedure_id);

Expand Down Expand Up @@ -859,6 +887,16 @@ seastar::future<admin_query_result> admin_actor::update_procedure(
gs::Result<seastar::sstring>(graph_meta_res.status()));
}

if (gs::Schema::IsBuiltinPlugin(procedure_id)) {
LOG(ERROR) << "The plugin name is a builtin plugin, cannot be updated: "
<< procedure_id;
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(gs::Status(
gs::StatusCode::ILLEGAL_OPERATION,
"The plugin name is a builtin plugin, cannot be updated: " +
procedure_id)));
}

auto get_procedure_res =
metadata_store_->GetPluginMeta(graph_id, procedure_id);

Expand Down
33 changes: 28 additions & 5 deletions flex/storages/metadata/graph_meta_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,32 @@ std::string read_file_to_string(const std::string& file_path) {
return "";
}
}
const std::vector<PluginMeta>& get_builtin_plugin_metas() {
static std::vector<PluginMeta> builtin_plugins;
static bool initialized = false;
if (!initialized) {
PluginMeta count_vertices;
count_vertices.id = "count_vertices";
count_vertices.name = "count_vertices";
count_vertices.description = "A builtin plugin to count vertices";
count_vertices.enable = true;
count_vertices.runnable = true;
count_vertices.type = "cypher";
count_vertices.creation_time = GetCurrentTimeStamp();
count_vertices.update_time = GetCurrentTimeStamp();
count_vertices.params.push_back({"labelName", PropertyType::kString});
count_vertices.returns.push_back({"count", PropertyType::kInt32});
initialized = true;
builtin_plugins.push_back(count_vertices);
}
return builtin_plugins;
}

void append_builtin_plugins(std::vector<PluginMeta>& plugin_metas) {
auto builtin_plugin_metas = get_builtin_plugin_metas();
plugin_metas.insert(plugin_metas.end(), builtin_plugin_metas.begin(),
builtin_plugin_metas.end());
}

UpdateGraphMetaRequest::UpdateGraphMetaRequest(
int64_t data_update_time, const std::string& data_import_config)
Expand Down Expand Up @@ -420,6 +446,8 @@ CreateGraphMetaRequest CreateGraphMetaRequest::FromJson(
request.plugin_metas.push_back(PluginMeta::FromJson(plugin));
}
}
// Add builtin plugins
append_builtin_plugins(request.plugin_metas);
return request;
}

Expand Down Expand Up @@ -671,11 +699,6 @@ UpdatePluginMetaRequest UpdatePluginMetaRequest::FromJson(
if (j.HasMember("enable")) {
request.enable = j["enable"].GetBool();
}
// } catch (const std::exception& e) {
// LOG(ERROR) << "UpdatePluginMetaRequest::FromJson error: " << e.what() <<
// " "
// << json;
// }
return request;
}

Expand Down
2 changes: 2 additions & 0 deletions flex/storages/metadata/graph_meta_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ JobStatus parseFromString(const std::string& status_string);

////////////////// MetaData ///////////////////////
struct PluginMeta;
const std::vector<PluginMeta>& get_builtin_plugin_metas();

struct GraphMeta {
GraphId id;
std::string name;
Expand Down
20 changes: 20 additions & 0 deletions flex/storages/rt_mutable_graph/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,15 @@

namespace gs {

bool Schema::IsBuiltinPlugin(const std::string& plugin_name) {
for (uint8_t i = 0; i < BUILTIN_PLUGIN_NUM; i++) {
if (plugin_name == BUILTIN_PLUGIN_NAMES[i]) {
return true;
}
}
return false;
}

Schema::Schema() : has_multi_props_edge_(false){};
Schema::~Schema() = default;

Expand Down Expand Up @@ -1302,6 +1311,17 @@ bool Schema::EmplacePlugins(
<< ", name or library not found.";
}
}
// Check whether all plugins contains reserved builtin plugin names.
for (const auto& name_path : plugin_name_and_paths) {
for (int i = 0; i < BUILTIN_PLUGIN_NUM; ++i) {
if (name_path.first == Schema::BUILTIN_PLUGIN_NAMES[i]) {
LOG(ERROR) << "Invalid plugin name: " << name_path.first
<< ", it is a builtin plugin name, please use another name";
return false;
}
}
}

LOG(INFO) << "Load " << plugin_name_to_path_and_id_.size() << " plugins";
return true;
}
Expand Down
16 changes: 14 additions & 2 deletions flex/storages/rt_mutable_graph/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ class Schema {
// How many built-in plugins are there.
// Currently only one builtin plugin, SERVER_APP is supported.
static constexpr uint8_t RESERVED_PLUGIN_NUM = 1;
static constexpr uint8_t MAX_PLUGIN_ID = 252;
static constexpr uint8_t MAX_PLUGIN_ID = 251;
static constexpr uint8_t ADHOC_READ_PLUGIN_ID = 253;
static constexpr uint8_t HQPS_ADHOC_READ_PLUGIN_ID = 254;
static constexpr uint8_t HQPS_ADHOC_WRITE_PLUGIN_ID = 255;
static constexpr uint8_t ADHOC_READ_PLUGIN_ID = 253;
static constexpr const char* HQPS_ADHOC_READ_PLUGIN_ID_STR = "\xFE";
static constexpr const char* HQPS_ADHOC_WRITE_PLUGIN_ID_STR = "\xFF";
static constexpr const char* ADHOC_READ_PLUGIN_ID_STR = "\xFD";
Expand All @@ -43,10 +43,22 @@ class Schema {
static constexpr const char* MAX_LENGTH_KEY = "max_length";
static constexpr const uint16_t STRING_DEFAULT_MAX_LENGTH = 256;

// The builtin plugins are reserved for the system.
static constexpr uint8_t BUILTIN_PLUGIN_NUM = 1;
static constexpr uint8_t BUILTIN_COUNT_VERTICES_PLUGIN_ID = 252;
static constexpr const char* BUILTIN_COUNT_VERTICES_PLUGIN_NAME =
"count_vertices";
static constexpr const char* BUILTIN_PLUGIN_NAMES[BUILTIN_PLUGIN_NUM] = {
BUILTIN_COUNT_VERTICES_PLUGIN_NAME};
static constexpr uint8_t BUILTIN_PLUGIN_IDS[BUILTIN_PLUGIN_NUM] = {
BUILTIN_COUNT_VERTICES_PLUGIN_ID};

// An array containing all compatible versions of schema.
static const std::vector<std::string> COMPATIBLE_VERSIONS;
static constexpr const char* DEFAULT_SCHEMA_VERSION = "v0.0";

static bool IsBuiltinPlugin(const std::string& plugin_name);

using label_type = label_t;
Schema();
~Schema();
Expand Down

0 comments on commit 2559872

Please sign in to comment.