Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(interactive): Support builtin app in Interactive #4242

Merged
merged 4 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion flex/engines/graph_db/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
add_subdirectory(runtime)
file(GLOB_RECURSE GRAPH_DB_SRC_FILES "${CMAKE_CURRENT_SOURCE_DIR}/app/*.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/database/*.cc")
"${CMAKE_CURRENT_SOURCE_DIR}/database/*.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/app/builtin/*.cc")

add_library(flex_graph_db SHARED ${GRAPH_DB_SRC_FILES})

Expand Down
54 changes: 54 additions & 0 deletions flex/engines/graph_db/app/builtin/count_vertices.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/** Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "flex/engines/graph_db/app/builtin/count_vertices.h"

namespace gs {

bool CountVertices::DoQuery(GraphDBSession& sess, Decoder& input,
Encoder& output) {
// First get the read transaction.
auto txn = sess.GetReadTransaction();
// We expect one param of type string from decoder.
if (input.empty()) {
return false;
}
std::string label_name{input.get_string()};
const auto& schema = txn.schema();
if (!schema.has_vertex_label(label_name)) {
output.put_string_view("The requested label doesn't exits.");
return false; // The requested label doesn't exits.
}
auto label_id = schema.get_vertex_label_id(label_name);
// The vertices are labeled internally from 0 ~ vertex_label_num, accumulate
auto vertex_num = txn.GetVertexNum(label_id);
// the count.
results::CollectiveResults results;
auto result = results.add_results();
result->mutable_record()
->add_columns()
->mutable_entry()
->mutable_element()
->mutable_object()
->set_i32(vertex_num);

output.put_string_view(results.SerializeAsString());
txn.Commit();
return true;
}

AppWrapper CountVerticesFactory::CreateApp(const GraphDB& db) {
return AppWrapper(new CountVertices(), NULL);
}
} // namespace gs
39 changes: 39 additions & 0 deletions flex/engines/graph_db/app/builtin/count_vertices.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/** Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef ENGINES_GRAPH_DB_APP_BUILDIN_COUNT_VERTICES_H_
#define ENGINES_GRAPH_DB_APP_BUILDIN_COUNT_VERTICES_H_
#include "flex/engines/graph_db/database/graph_db_session.h"
#include "flex/engines/hqps_db/app/interactive_app_base.h"

namespace gs {
// A simple app to count the number of vertices of a given label.
class CountVertices : public CypherInternalPbWriteAppBase {
public:
CountVertices() {}
bool DoQuery(GraphDBSession& sess, Decoder& input, Encoder& output) override;
};

class CountVerticesFactory : public AppFactoryBase {
public:
CountVerticesFactory() = default;
~CountVerticesFactory() = default;

AppWrapper CreateApp(const GraphDB& db) override;
};

} // namespace gs

#endif // ENGINES_GRAPH_DB_APP_BUILDIN_COUNT_VERTICES_H_
3 changes: 3 additions & 0 deletions flex/engines/graph_db/database/graph_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

#include "flex/engines/graph_db/database/graph_db.h"
#include "flex/engines/graph_db/app/adhoc_app.h"
#include "flex/engines/graph_db/app/builtin/count_vertices.h"
#include "flex/engines/graph_db/app/hqps_app.h"
#include "flex/engines/graph_db/app/server_app.h"
#include "flex/engines/graph_db/database/graph_db_session.h"
Expand Down Expand Up @@ -405,6 +406,8 @@ void GraphDB::initApps(
}
// Builtin apps
app_factories_[0] = std::make_shared<ServerAppFactory>();
app_factories_[Schema::BUILTIN_COUNT_VERTICES_PLUGIN_ID] =
std::make_shared<CountVerticesFactory>();
app_factories_[Schema::HQPS_ADHOC_READ_PLUGIN_ID] =
std::make_shared<HQPSAdhocReadAppFactory>();
app_factories_[Schema::HQPS_ADHOC_WRITE_PLUGIN_ID] =
Expand Down
7 changes: 7 additions & 0 deletions flex/engines/graph_db/database/graph_db_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,13 @@ GraphDBSession::parse_query_type_from_cypher_internal(
gs::Status(StatusCode::NOT_FOUND, "Query name is empty"));
}
const auto& app_name_to_path_index = schema().GetPlugins();
// First check whether the query name is builtin query
for (int i = 0; i < Schema::BUILTIN_PLUGIN_NUM; ++i) {
std::string builtin_query_name = Schema::BUILTIN_PLUGIN_NAMES[i];
if (query_name == builtin_query_name) {
return std::make_pair(Schema::BUILTIN_PLUGIN_IDS[i], str_view);
}
}
if (app_name_to_path_index.count(query_name) <= 0) {
LOG(ERROR) << "Query name is not registered: " << query_name;
return Result<std::pair<uint8_t, std::string_view>>(gs::Status(
Expand Down
7 changes: 4 additions & 3 deletions flex/engines/hqps_db/app/interactive_app_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@

namespace gs {

void put_argument(gs::Encoder& encoder, const procedure::Argument& argument) {
inline void put_argument(gs::Encoder& encoder,
const procedure::Argument& argument) {
auto& value = argument.value();
auto item_case = value.item_case();
switch (item_case) {
Expand All @@ -46,8 +47,8 @@ void put_argument(gs::Encoder& encoder, const procedure::Argument& argument) {
}
}

bool parse_input_argument(gs::Decoder& raw_input,
gs::Encoder& argument_encoder) {
inline bool parse_input_argument(gs::Decoder& raw_input,
gs::Encoder& argument_encoder) {
if (raw_input.size() == 0) {
VLOG(10) << "No arguments found in input";
return true;
Expand Down
38 changes: 38 additions & 0 deletions flex/engines/http_server/actor/admin_actor.act.cc
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,12 @@ seastar::future<seastar::sstring> invoke_creating_procedure(
if (json.HasMember("name")) {
// Currently we need id== name
rapidjson::Value& name = json["name"];
if (gs::Schema::IsBuiltinPlugin(name.GetString())) {
return seastar::make_exception_future<seastar::sstring>(
std::string(
"The plugin name is a builtin plugin, cannot be created: ") +
name.GetString());
}
rapidjson::Value name_copy(name, json.GetAllocator());
json.AddMember("id", name_copy, json.GetAllocator());
}
Expand Down Expand Up @@ -516,6 +522,9 @@ seastar::future<admin_query_result> admin_actor::run_get_graph_meta(
// There can also be procedures that builtin in the graph meta.
for (auto& plugin_meta : graph_meta.plugin_metas) {
add_runnable_info(plugin_meta);
if (plugin_meta.bound_graph.empty()) {
plugin_meta.bound_graph = query_param.content;
}
}
graph_meta.plugin_metas.insert(graph_meta.plugin_metas.end(),
all_plugin_metas.begin(),
Expand Down Expand Up @@ -697,6 +706,15 @@ admin_actor::get_procedure_by_procedure_name(
auto get_procedure_res =
metadata_store_->GetPluginMeta(graph_id, procedure_id);

auto builtin_plugins = gs::get_builtin_plugin_metas();
for (auto& builtin_plugin : builtin_plugins) {
if (builtin_plugin.id == procedure_id.c_str()) {
add_runnable_info(builtin_plugin);
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(builtin_plugin.ToJson()));
}
}

if (get_procedure_res.ok()) {
VLOG(10) << "Successfully get procedure procedures";
auto& proc_meta = get_procedure_res.value();
Expand Down Expand Up @@ -803,6 +821,16 @@ seastar::future<admin_query_result> admin_actor::delete_procedure(
gs::Result<seastar::sstring>(graph_meta_res.status()));
}

if (gs::Schema::IsBuiltinPlugin(procedure_id)) {
LOG(ERROR) << "The plugin name is a builtin plugin, cannot be deleted: "
<< procedure_id;
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(gs::Status(
gs::StatusCode::ILLEGAL_OPERATION,
"The plugin name is a builtin plugin, cannot be deleted: " +
procedure_id)));
}

auto get_procedure_res =
metadata_store_->GetPluginMeta(graph_id, procedure_id);

Expand Down Expand Up @@ -859,6 +887,16 @@ seastar::future<admin_query_result> admin_actor::update_procedure(
gs::Result<seastar::sstring>(graph_meta_res.status()));
}

if (gs::Schema::IsBuiltinPlugin(procedure_id)) {
LOG(ERROR) << "The plugin name is a builtin plugin, cannot be updated: "
<< procedure_id;
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(gs::Status(
gs::StatusCode::ILLEGAL_OPERATION,
"The plugin name is a builtin plugin, cannot be updated: " +
procedure_id)));
}

auto get_procedure_res =
metadata_store_->GetPluginMeta(graph_id, procedure_id);

Expand Down
17 changes: 15 additions & 2 deletions flex/interactive/sdk/python/gs_interactive/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,8 +395,8 @@ def run_cypher_test_suite(neo4j_sess : Neo4jSession, graph_id: str, queries: lis
for query in queries:
submit_query_via_neo4j_endpoint(neo4j_sess, graph_id, query)

def call_procedure(neo4j_sess : Neo4jSession, graph_id: str, proc_name: str):
query = "CALL " + proc_name + "()"
def call_procedure(neo4j_sess : Neo4jSession, graph_id: str, proc_name: str, *args):
query = "CALL " + proc_name + "(" + ",".join(args) + ")"
result = neo4j_sess.run(query)
for record in result:
print(record)
Expand Down Expand Up @@ -435,6 +435,19 @@ def create_procedure(sess: Session, graph_id: str, name: str, query: str, descri
proc_id = resp.get_value().procedure_id
return proc_id

def delete_procedure(sess: Session, graph_id: str, proc_id: str):
resp = sess.delete_procedure(graph_id, proc_id)
if not resp.is_ok():
print("Failed to delete procedure: ", resp.get_status_message())
raise Exception("Failed to delete procedure, status: ", resp.get_status_message())

def update_procedure(sess: Session, graph_id: str, proc_id: str, desc : str):
request = UpdateProcedureRequest(
description=desc)
resp = sess.update_procedure(graph_id, proc_id, request)
if not resp.is_ok():
print("Failed to update procedure: ", resp.get_status_message())
raise Exception("Failed to update procedure, status: ", resp.get_status_message())

def start_service_on_graph(interactive_session, graph_id : str):
resp = interactive_session.start_service(StartServiceRequest(graph_id=graph_id))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@


from gs_interactive.tests.conftest import create_vertex_only_modern_graph, start_service_on_graph,interactive_driver
from gs_interactive.tests.conftest import create_procedure, delete_running_graph, create_modern_graph, create_partial_modern_graph,run_cypher_test_suite, call_procedure
from gs_interactive.tests.conftest import create_procedure,delete_procedure,update_procedure, delete_running_graph, create_modern_graph, create_partial_modern_graph,run_cypher_test_suite, call_procedure
from gs_interactive.tests.conftest import import_data_to_vertex_only_modern_graph, import_data_to_partial_modern_graph, import_data_to_full_modern_graph


Expand Down Expand Up @@ -121,3 +121,18 @@ def test_procedure_creation(interactive_session, neo4j_session, create_modern_gr
with pytest.raises(Exception):
create_procedure(interactive_session, create_modern_graph, "test_proc2", "MATCH(n: IDONTKOWN) return count(n)")

def test_builtin_procedure(interactive_session,neo4j_session, create_modern_graph):
print("[Test builtin procedure]")
# Delete the builtin procedure should fail
with pytest.raises(Exception):
delete_procedure(interactive_session, create_modern_graph, "count_vertices")
# Create a procedure with the same name as builtin procedure should fail
with pytest.raises(Exception):
create_procedure(interactive_session, create_modern_graph, "count_vertices", "MATCH(n: software) return count(n);")
# Update the builtin procedure should fail
with pytest.raises(Exception):
update_procedure(interactive_session, create_modern_graph, "count_vertices", "A updated description")
# Call the builtin procedure
start_service_on_graph(interactive_session, create_modern_graph)
call_procedure(neo4j_session, create_modern_graph, "count_vertices", '"person"')

3 changes: 3 additions & 0 deletions flex/openapi/openapi_coordinator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,9 @@ components:
type: array
items:
$ref: '#/components/schemas/Parameter'
option:
type: object
additionalProperties: true

GetStoredProcResponse:
allOf:
Expand Down
3 changes: 3 additions & 0 deletions flex/openapi/openapi_interactive.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1582,6 +1582,9 @@ components:
enable:
type: boolean
example : true
option:
type: object
additionalProperties: true
GetProcedureResponse:
x-body-name: get_procedure_response
allOf:
Expand Down
33 changes: 28 additions & 5 deletions flex/storages/metadata/graph_meta_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,32 @@ std::string read_file_to_string(const std::string& file_path) {
return "";
}
}
const std::vector<PluginMeta>& get_builtin_plugin_metas() {
static std::vector<PluginMeta> builtin_plugins;
static bool initialized = false;
if (!initialized) {
PluginMeta count_vertices;
count_vertices.id = "count_vertices";
count_vertices.name = "count_vertices";
count_vertices.description = "A builtin plugin to count vertices";
count_vertices.enable = true;
count_vertices.runnable = true;
count_vertices.type = "cypher";
count_vertices.creation_time = GetCurrentTimeStamp();
count_vertices.update_time = GetCurrentTimeStamp();
count_vertices.params.push_back({"labelName", PropertyType::kString});
count_vertices.returns.push_back({"count", PropertyType::kInt32});
initialized = true;
builtin_plugins.push_back(count_vertices);
}
return builtin_plugins;
}

void append_builtin_plugins(std::vector<PluginMeta>& plugin_metas) {
auto builtin_plugin_metas = get_builtin_plugin_metas();
plugin_metas.insert(plugin_metas.end(), builtin_plugin_metas.begin(),
builtin_plugin_metas.end());
}

UpdateGraphMetaRequest::UpdateGraphMetaRequest(
int64_t data_update_time, const std::string& data_import_config)
Expand Down Expand Up @@ -420,6 +446,8 @@ CreateGraphMetaRequest CreateGraphMetaRequest::FromJson(
request.plugin_metas.push_back(PluginMeta::FromJson(plugin));
}
}
// Add builtin plugins
append_builtin_plugins(request.plugin_metas);
return request;
}

Expand Down Expand Up @@ -671,11 +699,6 @@ UpdatePluginMetaRequest UpdatePluginMetaRequest::FromJson(
if (j.HasMember("enable")) {
request.enable = j["enable"].GetBool();
}
// } catch (const std::exception& e) {
// LOG(ERROR) << "UpdatePluginMetaRequest::FromJson error: " << e.what() <<
// " "
// << json;
// }
return request;
}

Expand Down
2 changes: 2 additions & 0 deletions flex/storages/metadata/graph_meta_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ JobStatus parseFromString(const std::string& status_string);

////////////////// MetaData ///////////////////////
struct PluginMeta;
const std::vector<PluginMeta>& get_builtin_plugin_metas();

struct GraphMeta {
GraphId id;
std::string name;
Expand Down
Loading
Loading