Skip to content

Commit

Permalink
feat(interactive): Support builtin app in Interactive (#4242)
Browse files Browse the repository at this point in the history
This PR introduces support for built-in procedures in Interactive. Users
can access these procedures as soon as the graph is created, with more
built-in apps to be added in the future.

For instance, a user can count the vertices of a specific label using:

```cypher
call count_vertices("person");
```

Please note that built-in procedures cannot be deleted or updated, and
users cannot create procedures with the same name as a built-in
procedure.
  • Loading branch information
zhanglei1949 authored Sep 20, 2024
1 parent 36313cf commit 56963cd
Show file tree
Hide file tree
Showing 16 changed files with 249 additions and 15 deletions.
3 changes: 2 additions & 1 deletion flex/engines/graph_db/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
add_subdirectory(runtime)
file(GLOB_RECURSE GRAPH_DB_SRC_FILES "${CMAKE_CURRENT_SOURCE_DIR}/app/*.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/database/*.cc")
"${CMAKE_CURRENT_SOURCE_DIR}/database/*.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/app/builtin/*.cc")

add_library(flex_graph_db SHARED ${GRAPH_DB_SRC_FILES})

Expand Down
54 changes: 54 additions & 0 deletions flex/engines/graph_db/app/builtin/count_vertices.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/** Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "flex/engines/graph_db/app/builtin/count_vertices.h"

namespace gs {

bool CountVertices::DoQuery(GraphDBSession& sess, Decoder& input,
Encoder& output) {
// First get the read transaction.
auto txn = sess.GetReadTransaction();
// We expect one param of type string from decoder.
if (input.empty()) {
return false;
}
std::string label_name{input.get_string()};
const auto& schema = txn.schema();
if (!schema.has_vertex_label(label_name)) {
output.put_string_view("The requested label doesn't exits.");
return false; // The requested label doesn't exits.
}
auto label_id = schema.get_vertex_label_id(label_name);
// The vertices are labeled internally from 0 ~ vertex_label_num, accumulate
auto vertex_num = txn.GetVertexNum(label_id);
// the count.
results::CollectiveResults results;
auto result = results.add_results();
result->mutable_record()
->add_columns()
->mutable_entry()
->mutable_element()
->mutable_object()
->set_i32(vertex_num);

output.put_string_view(results.SerializeAsString());
txn.Commit();
return true;
}

AppWrapper CountVerticesFactory::CreateApp(const GraphDB& db) {
return AppWrapper(new CountVertices(), NULL);
}
} // namespace gs
39 changes: 39 additions & 0 deletions flex/engines/graph_db/app/builtin/count_vertices.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/** Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef ENGINES_GRAPH_DB_APP_BUILDIN_COUNT_VERTICES_H_
#define ENGINES_GRAPH_DB_APP_BUILDIN_COUNT_VERTICES_H_
#include "flex/engines/graph_db/database/graph_db_session.h"
#include "flex/engines/hqps_db/app/interactive_app_base.h"

namespace gs {
// A simple app to count the number of vertices of a given label.
class CountVertices : public CypherInternalPbWriteAppBase {
public:
CountVertices() {}
bool DoQuery(GraphDBSession& sess, Decoder& input, Encoder& output) override;
};

class CountVerticesFactory : public AppFactoryBase {
public:
CountVerticesFactory() = default;
~CountVerticesFactory() = default;

AppWrapper CreateApp(const GraphDB& db) override;
};

} // namespace gs

#endif // ENGINES_GRAPH_DB_APP_BUILDIN_COUNT_VERTICES_H_
3 changes: 3 additions & 0 deletions flex/engines/graph_db/database/graph_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

#include "flex/engines/graph_db/database/graph_db.h"
#include "flex/engines/graph_db/app/adhoc_app.h"
#include "flex/engines/graph_db/app/builtin/count_vertices.h"
#include "flex/engines/graph_db/app/hqps_app.h"
#include "flex/engines/graph_db/app/server_app.h"
#include "flex/engines/graph_db/database/graph_db_session.h"
Expand Down Expand Up @@ -405,6 +406,8 @@ void GraphDB::initApps(
}
// Builtin apps
app_factories_[0] = std::make_shared<ServerAppFactory>();
app_factories_[Schema::BUILTIN_COUNT_VERTICES_PLUGIN_ID] =
std::make_shared<CountVerticesFactory>();
app_factories_[Schema::HQPS_ADHOC_READ_PLUGIN_ID] =
std::make_shared<HQPSAdhocReadAppFactory>();
app_factories_[Schema::HQPS_ADHOC_WRITE_PLUGIN_ID] =
Expand Down
7 changes: 7 additions & 0 deletions flex/engines/graph_db/database/graph_db_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,13 @@ GraphDBSession::parse_query_type_from_cypher_internal(
gs::Status(StatusCode::NOT_FOUND, "Query name is empty"));
}
const auto& app_name_to_path_index = schema().GetPlugins();
// First check whether the query name is builtin query
for (int i = 0; i < Schema::BUILTIN_PLUGIN_NUM; ++i) {
std::string builtin_query_name = Schema::BUILTIN_PLUGIN_NAMES[i];
if (query_name == builtin_query_name) {
return std::make_pair(Schema::BUILTIN_PLUGIN_IDS[i], str_view);
}
}
if (app_name_to_path_index.count(query_name) <= 0) {
LOG(ERROR) << "Query name is not registered: " << query_name;
return Result<std::pair<uint8_t, std::string_view>>(gs::Status(
Expand Down
7 changes: 4 additions & 3 deletions flex/engines/hqps_db/app/interactive_app_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@

namespace gs {

void put_argument(gs::Encoder& encoder, const procedure::Argument& argument) {
inline void put_argument(gs::Encoder& encoder,
const procedure::Argument& argument) {
auto& value = argument.value();
auto item_case = value.item_case();
switch (item_case) {
Expand All @@ -46,8 +47,8 @@ void put_argument(gs::Encoder& encoder, const procedure::Argument& argument) {
}
}

bool parse_input_argument(gs::Decoder& raw_input,
gs::Encoder& argument_encoder) {
inline bool parse_input_argument(gs::Decoder& raw_input,
gs::Encoder& argument_encoder) {
if (raw_input.size() == 0) {
VLOG(10) << "No arguments found in input";
return true;
Expand Down
38 changes: 38 additions & 0 deletions flex/engines/http_server/actor/admin_actor.act.cc
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,12 @@ seastar::future<seastar::sstring> invoke_creating_procedure(
if (json.HasMember("name")) {
// Currently we need id== name
rapidjson::Value& name = json["name"];
if (gs::Schema::IsBuiltinPlugin(name.GetString())) {
return seastar::make_exception_future<seastar::sstring>(
std::string(
"The plugin name is a builtin plugin, cannot be created: ") +
name.GetString());
}
rapidjson::Value name_copy(name, json.GetAllocator());
json.AddMember("id", name_copy, json.GetAllocator());
}
Expand Down Expand Up @@ -516,6 +522,9 @@ seastar::future<admin_query_result> admin_actor::run_get_graph_meta(
// There can also be procedures that builtin in the graph meta.
for (auto& plugin_meta : graph_meta.plugin_metas) {
add_runnable_info(plugin_meta);
if (plugin_meta.bound_graph.empty()) {
plugin_meta.bound_graph = query_param.content;
}
}
graph_meta.plugin_metas.insert(graph_meta.plugin_metas.end(),
all_plugin_metas.begin(),
Expand Down Expand Up @@ -697,6 +706,15 @@ admin_actor::get_procedure_by_procedure_name(
auto get_procedure_res =
metadata_store_->GetPluginMeta(graph_id, procedure_id);

auto builtin_plugins = gs::get_builtin_plugin_metas();
for (auto& builtin_plugin : builtin_plugins) {
if (builtin_plugin.id == procedure_id.c_str()) {
add_runnable_info(builtin_plugin);
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(builtin_plugin.ToJson()));
}
}

if (get_procedure_res.ok()) {
VLOG(10) << "Successfully get procedure procedures";
auto& proc_meta = get_procedure_res.value();
Expand Down Expand Up @@ -803,6 +821,16 @@ seastar::future<admin_query_result> admin_actor::delete_procedure(
gs::Result<seastar::sstring>(graph_meta_res.status()));
}

if (gs::Schema::IsBuiltinPlugin(procedure_id)) {
LOG(ERROR) << "The plugin name is a builtin plugin, cannot be deleted: "
<< procedure_id;
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(gs::Status(
gs::StatusCode::ILLEGAL_OPERATION,
"The plugin name is a builtin plugin, cannot be deleted: " +
procedure_id)));
}

auto get_procedure_res =
metadata_store_->GetPluginMeta(graph_id, procedure_id);

Expand Down Expand Up @@ -859,6 +887,16 @@ seastar::future<admin_query_result> admin_actor::update_procedure(
gs::Result<seastar::sstring>(graph_meta_res.status()));
}

if (gs::Schema::IsBuiltinPlugin(procedure_id)) {
LOG(ERROR) << "The plugin name is a builtin plugin, cannot be updated: "
<< procedure_id;
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(gs::Status(
gs::StatusCode::ILLEGAL_OPERATION,
"The plugin name is a builtin plugin, cannot be updated: " +
procedure_id)));
}

auto get_procedure_res =
metadata_store_->GetPluginMeta(graph_id, procedure_id);

Expand Down
17 changes: 15 additions & 2 deletions flex/interactive/sdk/python/gs_interactive/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,8 +395,8 @@ def run_cypher_test_suite(neo4j_sess : Neo4jSession, graph_id: str, queries: lis
for query in queries:
submit_query_via_neo4j_endpoint(neo4j_sess, graph_id, query)

def call_procedure(neo4j_sess : Neo4jSession, graph_id: str, proc_name: str):
query = "CALL " + proc_name + "()"
def call_procedure(neo4j_sess : Neo4jSession, graph_id: str, proc_name: str, *args):
query = "CALL " + proc_name + "(" + ",".join(args) + ")"
result = neo4j_sess.run(query)
for record in result:
print(record)
Expand Down Expand Up @@ -435,6 +435,19 @@ def create_procedure(sess: Session, graph_id: str, name: str, query: str, descri
proc_id = resp.get_value().procedure_id
return proc_id

def delete_procedure(sess: Session, graph_id: str, proc_id: str):
resp = sess.delete_procedure(graph_id, proc_id)
if not resp.is_ok():
print("Failed to delete procedure: ", resp.get_status_message())
raise Exception("Failed to delete procedure, status: ", resp.get_status_message())

def update_procedure(sess: Session, graph_id: str, proc_id: str, desc : str):
request = UpdateProcedureRequest(
description=desc)
resp = sess.update_procedure(graph_id, proc_id, request)
if not resp.is_ok():
print("Failed to update procedure: ", resp.get_status_message())
raise Exception("Failed to update procedure, status: ", resp.get_status_message())

def start_service_on_graph(interactive_session, graph_id : str):
resp = interactive_session.start_service(StartServiceRequest(graph_id=graph_id))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@


from gs_interactive.tests.conftest import create_vertex_only_modern_graph, start_service_on_graph,interactive_driver
from gs_interactive.tests.conftest import create_procedure, delete_running_graph, create_modern_graph, create_partial_modern_graph,run_cypher_test_suite, call_procedure
from gs_interactive.tests.conftest import create_procedure,delete_procedure,update_procedure, delete_running_graph, create_modern_graph, create_partial_modern_graph,run_cypher_test_suite, call_procedure
from gs_interactive.tests.conftest import import_data_to_vertex_only_modern_graph, import_data_to_partial_modern_graph, import_data_to_full_modern_graph


Expand Down Expand Up @@ -121,3 +121,18 @@ def test_procedure_creation(interactive_session, neo4j_session, create_modern_gr
with pytest.raises(Exception):
create_procedure(interactive_session, create_modern_graph, "test_proc2", "MATCH(n: IDONTKOWN) return count(n)")

def test_builtin_procedure(interactive_session,neo4j_session, create_modern_graph):
print("[Test builtin procedure]")
# Delete the builtin procedure should fail
with pytest.raises(Exception):
delete_procedure(interactive_session, create_modern_graph, "count_vertices")
# Create a procedure with the same name as builtin procedure should fail
with pytest.raises(Exception):
create_procedure(interactive_session, create_modern_graph, "count_vertices", "MATCH(n: software) return count(n);")
# Update the builtin procedure should fail
with pytest.raises(Exception):
update_procedure(interactive_session, create_modern_graph, "count_vertices", "A updated description")
# Call the builtin procedure
start_service_on_graph(interactive_session, create_modern_graph)
call_procedure(neo4j_session, create_modern_graph, "count_vertices", '"person"')

3 changes: 3 additions & 0 deletions flex/openapi/openapi_coordinator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,9 @@ components:
type: array
items:
$ref: '#/components/schemas/Parameter'
option:
type: object
additionalProperties: true

GetStoredProcResponse:
allOf:
Expand Down
3 changes: 3 additions & 0 deletions flex/openapi/openapi_interactive.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1582,6 +1582,9 @@ components:
enable:
type: boolean
example : true
option:
type: object
additionalProperties: true
GetProcedureResponse:
x-body-name: get_procedure_response
allOf:
Expand Down
33 changes: 28 additions & 5 deletions flex/storages/metadata/graph_meta_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,32 @@ std::string read_file_to_string(const std::string& file_path) {
return "";
}
}
const std::vector<PluginMeta>& get_builtin_plugin_metas() {
static std::vector<PluginMeta> builtin_plugins;
static bool initialized = false;
if (!initialized) {
PluginMeta count_vertices;
count_vertices.id = "count_vertices";
count_vertices.name = "count_vertices";
count_vertices.description = "A builtin plugin to count vertices";
count_vertices.enable = true;
count_vertices.runnable = true;
count_vertices.type = "cypher";
count_vertices.creation_time = GetCurrentTimeStamp();
count_vertices.update_time = GetCurrentTimeStamp();
count_vertices.params.push_back({"labelName", PropertyType::kString});
count_vertices.returns.push_back({"count", PropertyType::kInt32});
initialized = true;
builtin_plugins.push_back(count_vertices);
}
return builtin_plugins;
}

void append_builtin_plugins(std::vector<PluginMeta>& plugin_metas) {
auto builtin_plugin_metas = get_builtin_plugin_metas();
plugin_metas.insert(plugin_metas.end(), builtin_plugin_metas.begin(),
builtin_plugin_metas.end());
}

UpdateGraphMetaRequest::UpdateGraphMetaRequest(
int64_t data_update_time, const std::string& data_import_config)
Expand Down Expand Up @@ -420,6 +446,8 @@ CreateGraphMetaRequest CreateGraphMetaRequest::FromJson(
request.plugin_metas.push_back(PluginMeta::FromJson(plugin));
}
}
// Add builtin plugins
append_builtin_plugins(request.plugin_metas);
return request;
}

Expand Down Expand Up @@ -671,11 +699,6 @@ UpdatePluginMetaRequest UpdatePluginMetaRequest::FromJson(
if (j.HasMember("enable")) {
request.enable = j["enable"].GetBool();
}
// } catch (const std::exception& e) {
// LOG(ERROR) << "UpdatePluginMetaRequest::FromJson error: " << e.what() <<
// " "
// << json;
// }
return request;
}

Expand Down
2 changes: 2 additions & 0 deletions flex/storages/metadata/graph_meta_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ JobStatus parseFromString(const std::string& status_string);

////////////////// MetaData ///////////////////////
struct PluginMeta;
const std::vector<PluginMeta>& get_builtin_plugin_metas();

struct GraphMeta {
GraphId id;
std::string name;
Expand Down
Loading

0 comments on commit 56963cd

Please sign in to comment.