Skip to content

Commit

Permalink
Support creating c++ stored procedure via gsctl
Browse files Browse the repository at this point in the history
  • Loading branch information
lidongze0629 committed Sep 11, 2024
1 parent 0662b37 commit 569e796
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 6 deletions.
7 changes: 4 additions & 3 deletions k8s/dockerfiles/flex-interactive.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,20 @@ RUN . ${HOME}/.cargo/env && cd ${HOME}/GraphScope/flex && \
cp ~/GraphScope/interactive_engine/executor/ir/target/release/libir_core.so /opt/flex/lib/

# build coordinator
RUN mkdir -p /opt/flex/wheel
RUN if [ "${ENABLE_COORDINATOR}" = "true" ]; then \
export PATH=${HOME}/.local/bin:${PATH} && \
cd ${HOME}/GraphScope/flex/interactive/sdk && \
./generate_sdk.sh -g python && cd python && \
python3 -m pip install --upgrade pip && python3 -m pip install -r requirements.txt && \
python3 setup.py build_proto && python3 setup.py bdist_wheel && \
mkdir -p /opt/flex/wheel && cp dist/*.whl /opt/flex/wheel/ && \
cp dist/*.whl /opt/flex/wheel/ && \
cd ${HOME}/GraphScope/python && \
export WITHOUT_LEARNING_ENGINE=ON && python3 setup.py bdist_wheel && \
mkdir -p /opt/flex/wheel && cp dist/*.whl /opt/flex/wheel/ && \
cp dist/*.whl /opt/flex/wheel/ && \
cd ${HOME}/GraphScope/coordinator && \
python3 setup.py bdist_wheel && \
mkdir -p /opt/flex/wheel && cp dist/*.whl /opt/flex/wheel/; \
cp dist/*.whl /opt/flex/wheel/; \
fi


Expand Down
5 changes: 5 additions & 0 deletions python/graphscope/gsctl/impl/stored_procedure.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@


def create_stored_procedure(graph_identifier: str, stored_procedure: dict) -> str:
# path begin with "@" represents the local file
if stored_procedure["query"].startswith("@"):
location = stored_procedure["query"][1:]
with open(location, "r") as f:
stored_procedure["query"] = f.read()
context = get_current_context()
with graphscope.flex.rest.ApiClient(
graphscope.flex.rest.Configuration(context.coordinator_endpoint)
Expand Down
71 changes: 69 additions & 2 deletions python/graphscope/gsctl/tests/test_interactive.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,51 @@
COORDINATOR_ENDPOINT = "http://127.0.0.1:8080"


sample_cc = """
#include "flex/engines/hqps_db/app/interactive_app_base.h"
#include "flex/engines/hqps_db/core/sync_engine.h"
#include "flex/utils/app_utils.h"
namespace gs {
class ExampleQuery : public CypherReadAppBase<int32_t> {
public:
using Engine = SyncEngine<gs::MutableCSRInterface>;
using label_id_t = typename gs::MutableCSRInterface::label_id_t;
using vertex_id_t = typename gs::MutableCSRInterface::vertex_id_t;
ExampleQuery() {}
// Query function for query class
results::CollectiveResults Query(const gs::GraphDBSession& sess,
int32_t param1) override {
LOG(INFO) << "param1: " << param1;
gs::MutableCSRInterface graph(sess);
auto ctx0 = Engine::template ScanVertex<gs::AppendOpt::Persist>(
graph, 0, Filter<TruePredicate>());
auto ctx1 = Engine::Project<PROJ_TO_NEW>(
graph, std::move(ctx0),
std::tuple{gs::make_mapper_with_variable<INPUT_COL_ID(0)>(
gs::PropertySelector<int64_t>("id"))});
auto ctx2 = Engine::Limit(std::move(ctx1), 0, 5);
auto res = Engine::Sink(graph, ctx2, std::array<int32_t, 1>{0});
LOG(INFO) << "res: " << res.DebugString();
return res;
}
};
} // namespace gs
extern "C" {
void* CreateApp(gs::GraphDBSession& db) {
gs::ExampleQuery* app = new gs::ExampleQuery();
return static_cast<void*>(app);
}
void DeleteApp(void* app) {
gs::ExampleQuery* casted = static_cast<gs::ExampleQuery*>(app);
delete casted;
}
}
"""

modern_graph = {
"name": "modern_graph",
"description": "This is a test graph",
Expand Down Expand Up @@ -305,7 +350,7 @@ def test_bulk_loading(self, tmpdir):
assert not ds["edge_mappings"]
delete_graph_by_id(graph_id)

def test_procedure(self):
def test_cypher_procedure(self):
stored_procedure_dict = {
"name": "procedure_name",
"description": "This is a test procedure",
Expand All @@ -319,7 +364,7 @@ def test_procedure(self):
new_procedure_exist = False
procedures = list_stored_procedures(graph_id)
for p in procedures:
if p.id == stored_procedure_id and p.name == "procedure_name":
if p.id == stored_procedure_id and p.name == stored_procedure_dict["name"]:
new_procedure_exist = True
assert new_procedure_exist
# test update a procedure
Expand All @@ -339,6 +384,28 @@ def test_procedure(self):
assert not new_procedure_exist
delete_graph_by_id(graph_id)

def test_cpp_procedure(self, tmpdir):
# generate sample_app.cc
cpp_procedure_file = tmpdir.join("sample_app.cc")
cpp_procedure_file.write(sample_cc)
# test create a new cpp stored procedure
stored_procedure_dict = {
"name": "cpp_stored_procedure_name",
"description": "This is a cpp test stored procedure",
"query": f"@{str(cpp_procedure_file)}",
"type": "cpp",
}
graph_id = create_graph(modern_graph)
stored_procedure_id = create_stored_procedure(graph_id, stored_procedure_dict)
assert stored_procedure_id is not None
new_procedure_exist = False
procedures = list_stored_procedures(graph_id)
for p in procedures:
if p.id == stored_procedure_id and p.name == stored_procedure_dict["name"]:
new_procedure_exist = True
assert new_procedure_exist
delete_graph_by_id(graph_id)

def test_service(self):
original_graph_id = None
status = list_service_status()
Expand Down
5 changes: 4 additions & 1 deletion python/graphscope/gsctl/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,12 @@ def create_stored_procedure_node(self, graph, stored_procedures):
parent=graph.id,
)
for p in stored_procedures:
query = p.query.replace("\n", "\\\\n")
if len(query) > 100:
query = query[:100] + "..."
self.tree.create_node(
tag="StoredProc(identifier: {0}, type: {1}, runnable: {2}, query: {3}, description: {4})".format(
p.id, p.type, p.runnable, p.query, p.description
p.id, p.type, p.runnable, query, p.description
),
identifier=f"{stored_procedure_identifier}_{p.id}",
parent=stored_procedure_identifier,
Expand Down

0 comments on commit 569e796

Please sign in to comment.