Skip to content

Commit

Permalink
fix(interactive): Fix stored_procedure parsing in old-version schem…
Browse files Browse the repository at this point in the history
…a and refactor CI (#3853)

- Fix the problem of procedure loading with schema of version v0.0.
- Fix the related CI test. Two cpp procedures are added

```cpp
class ExampleQuery : public CypherReadAppBase<int32_t>;
class ReadExample : public ReadAppBase;
```

The first use `CypherJson` InputFormat, and the latter use `CppEncoder`
format.
  • Loading branch information
zhanglei1949 committed May 30, 2024
1 parent 2cafc1a commit b0d81f5
Show file tree
Hide file tree
Showing 9 changed files with 309 additions and 60 deletions.
38 changes: 26 additions & 12 deletions .github/workflows/hqps-db-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -220,19 +220,28 @@ jobs:
- name: Test cypher&cpp procedure generation and loading
env:
INTERACTIVE_WORKSPACE: /tmp/interactive_workspace
PLUGIN_DIR: ${INTERACTIVE_WORKSPACE}/data/modern_graph/plugins
TMP_INTERACTIVE_WORKSPACE: /tmp/temp_workspace
PLUGIN_DIR: /tmp/temp_workspace/data/modern_graph/plugins
FLEX_DATA_DIR: ${{ github.workspace }}/flex/interactive/examples/modern_graph
LD_LIBRARY_PATH: /usr/local/lib
run: |
rm -rf ${TMP_INTERACTIVE_WORKSPACE}
cd ${GITHUB_WORKSPACE}/flex/build/
SCHEMA_FILE=${GITHUB_WORKSPACE}/flex/interactive/examples/modern_graph/graph.yaml
BULK_LOAD_FILE=${GITHUB_WORKSPACE}/flex/interactive/examples/modern_graph/bulk_load.yaml
mkdir -p ${TMP_INTERACTIVE_WORKSPACE}/data/modern_graph/
cp ${SCHEMA_FILE} ${TMP_INTERACTIVE_WORKSPACE}/data/modern_graph/graph.yaml
GLOG_v=10 ./bin/bulk_loader -g ${SCHEMA_FILE} -l ${BULK_LOAD_FILE} -d ${TMP_INTERACTIVE_WORKSPACE}/data/modern_graph/indices/
mkdir -p ${PLUGIN_DIR}
pip3 install argparse

cd ${GITHUB_WORKSPACE}/flex/bin
./load_plan_and_gen.sh -e=hqps -i=../interactive/examples/modern_graph/get_person_name.cypher -w=/tmp/codegen \
./load_plan_and_gen.sh -e=hqps -i=../tests/interactive/plus_one.cc -w=/tmp/codegen \
--ir_conf=${GITHUB_WORKSPACE}/flex/tests/hqps/engine_config_test.yaml -o=${PLUGIN_DIR} \
--procedure_name=get_person_name \
--procedure_name=plus_one \
--graph_schema_path=../interactive/examples/modern_graph/graph.yaml
./load_plan_and_gen.sh -e=hqps -i=../interactive/sdk/java/src/test/resources/sample_app.cc -w=/tmp/codegen \
--ir_conf=${GITHUB_WORKSPACE}/flex/tests/hqps/engine_config_test.yaml -o=${PLUGIN_DIR} \
--procedure_name=sample_app \
Expand All @@ -242,15 +251,20 @@ jobs:
--ir_conf=${GITHUB_WORKSPACE}/flex/tests/hqps/engine_config_test.yaml -o=${PLUGIN_DIR} \
--procedure_name=count_vertex_num \
--graph_schema_path=../interactive/examples/modern_graph/graph.yaml

# Among the above procedures, the correct input format for each is:
# count_vertex_num: () -> (num: int64), CypherProcedure.
# plus_one: (num: int64) -> (num: int64), CppEncoder
# sample_app: (num: int64) -> (num: int64), kCypherJson

sed -i 's/default_graph: movies/default_graph: modern_graph/g' ${GITHUB_WORKSPACE}/flex/tests/hqps/engine_config_test.yaml
sed -i 's/interactive_workspace/temp_workspace/g' ${GITHUB_WORKSPACE}/flex/tests/hqps/engine_config_test.yaml
cd ${GITHUB_WORKSPACE}/flex/tests/interactive/
bash test_plugin_loading.sh ./modern_graph_schema_v0_0.yaml \
../../interactive/examples/modern_graph/bulk_load.yaml \
${GITHUB_WORKSPACE}/flex/tests/hqps/engine_config_test.yaml
bash test_plugin_loading.sh ./modern_graph_schema_v0_1.yaml \
../../interactive/examples/modern_graph/bulk_load.yaml \
${GITHUB_WORKSPACE}/flex/tests/hqps/engine_config_test.yaml
bash test_plugin_loading.sh ${TMP_INTERACTIVE_WORKSPACE} modern_graph \
${GITHUB_WORKSPACE}/flex/tests/hqps/engine_config_test.yaml \
./modern_graph_schema_v0_0.yaml ./modern_graph_schema_v0_1.yaml
sed -i 's/temp_workspace/interactive_workspace/g' ${GITHUB_WORKSPACE}/flex/tests/hqps/engine_config_test.yaml
sed -i 's/default_graph: modern_graph/default_graph: movies/g' ${GITHUB_WORKSPACE}/flex/tests/hqps/engine_config_test.yaml

- name: Run End-to-End cypher adhoc ldbc query test
env:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ def call_procedure_raw(self, graph_id: StrictStr, params: str) -> Result[str]:
# Interactive currently support four type of inputformat, see flex/engines/graph_db/graph_db_session.h
# Here we add byte of value 1 to denote the input format is in encoder/decoder format
params = params + chr(0)
response = self._procedure_api.call_procedure_with_http_info(
response = self._query_api.proc_call_with_http_info(
graph_id, params
)
return Result.from_response(response)
Expand Down
3 changes: 2 additions & 1 deletion flex/openapi/openapi_interactive.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -941,7 +941,8 @@ components:
primitive_type:
type: string
enum: [DT_SIGNED_INT32, DT_UNSIGNED_INT32, DT_SIGNED_INT64, DT_UNSIGNED_INT64,
DT_BOOL, DT_FLOAT, DT_DOUBLE]
DT_BOOL, DT_FLOAT, DT_DOUBLE, DT_STRING]
# The DT_STRING is added for backward compatibility, it should be replaced by StringType
example: DT_SIGNED_INT32
LongText:
x-body-name: long_text
Expand Down
26 changes: 21 additions & 5 deletions flex/storages/rt_mutable_graph/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -997,21 +997,34 @@ static Status parse_stored_procedures_v00(
}
}
schema.SetPluginDir(directory);
std::vector<std::pair<std::string, std::string>> plugin_name_or_path;
std::vector<std::pair<std::string, std::string>> plugin_name_or_paths;
{
std::vector<std::string> plugin_names;
if (!get_sequence(stored_procedure_node, "enable_lists", plugin_names)) {
LOG(ERROR) << "stored_procedures is not set properly";
}
for (auto& plugin_name : plugin_names) {
plugin_name_or_path.emplace_back(plugin_name, "");
size_t plugin_cnt = 0;
for (auto& plugin_name_or_path : plugin_names) {
// The plugins names specified in enable_lists can be either the name of
// the plugin or the path to the plugin.
auto real_path = directory + "/" + plugin_name_or_path;
if (std::filesystem::exists(real_path)) {
plugin_name_or_paths.emplace_back(
std::string("plugin_") + std::to_string(plugin_cnt++), real_path);
} else if (std::filesystem::exists(plugin_name_or_path)) {
plugin_name_or_paths.emplace_back(
std::string("plugin_") + std::to_string(plugin_cnt++),
plugin_name_or_path);
} else {
LOG(WARNING) << "plugin " << plugin_name_or_path << " not found";
}
}
}

// plugin_name_or_path contains the plugin name or path.
// for path, we just use it as the plugin name, and emplace into the map,
// for name, we try to find the plugin in the directory
if (!schema.EmplacePlugins(plugin_name_or_path)) {
if (!schema.EmplacePlugins(plugin_name_or_paths)) {
LOG(ERROR) << "Fail to emplace all plugins";
return Status(StatusCode::InvalidSchema, "Fail to emplace all plugins");
}
Expand All @@ -1028,11 +1041,14 @@ static Status parse_stored_procedures_v01(
std::vector<std::pair<std::string, std::string>> plugin_name_and_path;
for (auto& cur_node : stored_procedure_node) {
if (cur_node["name"] && cur_node["library"]) {
VLOG(10) << "Parse stored procedure: "
<< cur_node["name"].as<std::string>()
<< " with library: " << cur_node["library"].as<std::string>();
plugin_name_and_path.push_back(
std::make_pair(cur_node["name"].as<std::string>(),
cur_node["library"].as<std::string>()));
} else {
LOG(ERROR) << "Library or name set properly for stored procedure";
LOG(WARNING) << "Library or name set properly for stored procedure";
return Status(StatusCode::InvalidSchema,
"Library or name set properly for stored procedure");
}
Expand Down
4 changes: 2 additions & 2 deletions flex/tests/interactive/modern_graph_schema_v0_0.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
name: modern_graph # then must have a modern dir under ${data} directory
store_type: mutable_csr # v6d, groot, gart
stored_procedures:
directory: /tmp/interactive_workspace/data/modern_graph/plugins/
directory: /tmp/temp_workspace/data/modern_graph/plugins/
enable_lists:
- libget_person_name.so
- libcount_vertex_num.so
- libplus_one.so
- libsample_app.so
schema:
vertex_types:
Expand Down
17 changes: 10 additions & 7 deletions flex/tests/interactive/modern_graph_schema_v0_1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,20 @@ version: v0.1
store_type: mutable_csr # v6d, groot, gart
description: A graph with 2 vertex types and 2 edge types
stored_procedures:
- name: query1
description: A stored procedure that does something
library: /tmp/interactive_workspace/data/modern_graph/plugins/libget_person_name.so
type: cpp
- name: query2
- name: count_vertex_num
description: A stored procedure that does something else
library: /tmp/interactive_workspace/data/modern_graph/plugins/libcount_vertex_num.so
library: /tmp/temp_workspace/data/modern_graph/plugins/libcount_vertex_num.so
type: cypher
returns:
- name: count
type: {primitive_type: DT_SIGNED_INT64}
- name: plus_one
description: A sample application
library: /tmp/temp_workspace/data/modern_graph/plugins/libplus_one.so
type: cpp
- name: sample_app
description: A sample application
library: /tmp/interactive_workspace/data/modern_graph/plugins/libsample_app.so
library: /tmp/temp_workspace/data/modern_graph/plugins/libsample_app.so
type: cpp
schema:
vertex_types:
Expand Down
29 changes: 29 additions & 0 deletions flex/tests/interactive/plus_one.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#include "flex/engines/graph_db/app/app_base.h"
#include "flex/utils/app_utils.h"

namespace gs {
class PlusOne : public ReadAppBase {
public:
PlusOne() {}
// Query function for query class
bool Query(const gs::GraphDBSession& sess, Decoder& input,
Encoder& output) override {
int32_t param1 = input.get_int();
LOG(INFO) << "param1: " << param1;
output.put_int(param1 + 1);
return true;
}
};
} // namespace gs

extern "C" {
void* CreateApp(gs::PlusOne& db) {
gs::PlusOne* app = new gs::PlusOne();
return static_cast<void*>(app);
}

void DeleteApp(void* app) {
gs::PlusOne* casted = static_cast<gs::PlusOne*>(app);
delete casted;
}
}
131 changes: 131 additions & 0 deletions flex/tests/interactive/test_call_proc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


import os
import sys

sys.path.append("../../interactive/sdk/python")

from interactive_sdk.client.driver import Driver
from interactive_sdk.openapi.models.base_edge_type_vertex_type_pair_relations_inner import (
BaseEdgeTypeVertexTypePairRelationsInner,
)
from interactive_sdk.openapi.models.create_edge_type import CreateEdgeType
from interactive_sdk.openapi.models.create_graph_request import CreateGraphRequest
from interactive_sdk.openapi.models.create_graph_schema_request import (
CreateGraphSchemaRequest,
)
from interactive_sdk.openapi.models.create_procedure_request import (
CreateProcedureRequest,
)
from interactive_sdk.openapi.models.create_property_meta import CreatePropertyMeta
from interactive_sdk.openapi.models.create_vertex_type import CreateVertexType
from interactive_sdk.openapi.models.edge_mapping import EdgeMapping
from interactive_sdk.openapi.models.edge_mapping_type_triplet import (
EdgeMappingTypeTriplet,
)
from interactive_sdk.openapi.models.gs_data_type import GSDataType
from interactive_sdk.openapi.models.typed_value import TypedValue
from interactive_sdk.openapi.models.job_status import JobStatus
from interactive_sdk.openapi.models.long_text import LongText
from interactive_sdk.openapi.models.primitive_type import PrimitiveType
from interactive_sdk.openapi.models.schema_mapping import SchemaMapping
from interactive_sdk.openapi.models.schema_mapping_loading_config import (
SchemaMappingLoadingConfig,
)
from interactive_sdk.openapi.models.schema_mapping_loading_config_format import (
SchemaMappingLoadingConfigFormat,
)
from interactive_sdk.openapi.models.start_service_request import StartServiceRequest
from interactive_sdk.openapi.models.string_type import StringType
from interactive_sdk.openapi.models.string_type_string import StringTypeString
from interactive_sdk.openapi.models.vertex_mapping import VertexMapping
from interactive_sdk.openapi.models.query_request import QueryRequest

# Among the above procedures, the correct input format for each is:
# count_vertex_num: () -> (num: int64), CypherProcedure.
# plus_one: (num: int64) -> (num: int64), CppEncoder
# sample_app: (num: int64) -> (num: int64), kCypherJson


class ProcedureCaller():
def __init__(self, endpoint):
self._driver = Driver(endpoint)
self._sess = self._driver.session()

def call_cypher_queries(self):
with self._driver.getNeo4jSession() as session:
result = session.run("CALL count_vertex_num();")
print("call procedure result: ", result)

def callProcedureWithJsonFormat(self, graph_id : str):
# get_person_name
sample_app_ref = QueryRequest(
query_name="sample_app",
arguments=[
TypedValue(
type=GSDataType(
PrimitiveType(primitive_type="DT_SIGNED_INT32")
),
value = 2
)
]
)
resp = self._sess.call_procedure(graph_id, sample_app_ref)
if not resp.is_ok():
print("call sample_app failed: ", resp.get_status_message())
exit(1)
self.call_cypher_queries()

def callProcedureWithEncoder(self, graph_id : str):
# count_vertex_num, should be with id 1
# construct a byte array with bytes: 0x01
params = chr(1)
resp = self._sess.call_procedure_raw(graph_id, params)
if not resp.is_ok():
print("call count_vertex_num failed: ", resp.get_status_message())
exit(1)

# plus_one, should be with id 3
# construct a byte array with bytes: the 4 bytes of integer 1, and a byte 3
byte_string = bytes([0,0,0,0,2]) # 4 bytes of integer 1, and a byte 3
params = byte_string.decode('utf-8')
resp = self._sess.call_procedure_raw(graph_id, params)
if not resp.is_ok():
print("call plus_one failed: ", resp.get_status_message())
exit(1)

if __name__ == "__main__":
#parse command line args
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--endpoint", type=str, default="http://localhost:7777")
parser.add_argument("--graph_id", type=str, default="1")
parser.add_argument("--input-format", type=str, default="json")
#finish
args = parser.parse_args()
print(args)
caller = ProcedureCaller(args.endpoint)
if args.input_format == "json":
caller.callProcedureWithJsonFormat(args.graph_id)
elif args.input_format == "encoder":
caller.callProcedureWithEncoder(args.graph_id)
else:
raise ValueError("Invalid input format: " + args.input_format)

Loading

0 comments on commit b0d81f5

Please sign in to comment.