From b0d81f522fb94a3e220bf8216d6204c584ba6bec Mon Sep 17 00:00:00 2001 From: Zhang Lei Date: Thu, 30 May 2024 15:19:51 +0800 Subject: [PATCH] fix(interactive): Fix `stored_procedure` parsing in old-version schema and refactor CI (#3853) - Fix the problem of procedure loading with schema of version v0.0. - Fix the related CI test. Two cpp procedures are added ```cpp class ExampleQuery : public CypherReadAppBase; class ReadExample : public ReadAppBase; ``` The first use `CypherJson` InputFormat, and the latter use `CppEncoder` format. --- .github/workflows/hqps-db-ci.yml | 38 +++-- .../python/interactive_sdk/client/session.py | 2 +- flex/openapi/openapi_interactive.yaml | 3 +- flex/storages/rt_mutable_graph/schema.cc | 26 +++- .../interactive/modern_graph_schema_v0_0.yaml | 4 +- .../interactive/modern_graph_schema_v0_1.yaml | 17 ++- flex/tests/interactive/plus_one.cc | 29 ++++ flex/tests/interactive/test_call_proc.py | 131 ++++++++++++++++++ flex/tests/interactive/test_plugin_loading.sh | 119 +++++++++++----- 9 files changed, 309 insertions(+), 60 deletions(-) create mode 100644 flex/tests/interactive/plus_one.cc create mode 100644 flex/tests/interactive/test_call_proc.py diff --git a/.github/workflows/hqps-db-ci.yml b/.github/workflows/hqps-db-ci.yml index 40bd600394cf..76101786e21f 100644 --- a/.github/workflows/hqps-db-ci.yml +++ b/.github/workflows/hqps-db-ci.yml @@ -220,19 +220,28 @@ jobs: - name: Test cypher&cpp procedure generation and loading env: - INTERACTIVE_WORKSPACE: /tmp/interactive_workspace - PLUGIN_DIR: ${INTERACTIVE_WORKSPACE}/data/modern_graph/plugins + TMP_INTERACTIVE_WORKSPACE: /tmp/temp_workspace + PLUGIN_DIR: /tmp/temp_workspace/data/modern_graph/plugins FLEX_DATA_DIR: ${{ github.workspace }}/flex/interactive/examples/modern_graph LD_LIBRARY_PATH: /usr/local/lib run: | + rm -rf ${TMP_INTERACTIVE_WORKSPACE} + cd ${GITHUB_WORKSPACE}/flex/build/ + SCHEMA_FILE=${GITHUB_WORKSPACE}/flex/interactive/examples/modern_graph/graph.yaml + BULK_LOAD_FILE=${GITHUB_WORKSPACE}/flex/interactive/examples/modern_graph/bulk_load.yaml + mkdir -p ${TMP_INTERACTIVE_WORKSPACE}/data/modern_graph/ + cp ${SCHEMA_FILE} ${TMP_INTERACTIVE_WORKSPACE}/data/modern_graph/graph.yaml + GLOG_v=10 ./bin/bulk_loader -g ${SCHEMA_FILE} -l ${BULK_LOAD_FILE} -d ${TMP_INTERACTIVE_WORKSPACE}/data/modern_graph/indices/ + mkdir -p ${PLUGIN_DIR} + pip3 install argparse cd ${GITHUB_WORKSPACE}/flex/bin - ./load_plan_and_gen.sh -e=hqps -i=../interactive/examples/modern_graph/get_person_name.cypher -w=/tmp/codegen \ + ./load_plan_and_gen.sh -e=hqps -i=../tests/interactive/plus_one.cc -w=/tmp/codegen \ --ir_conf=${GITHUB_WORKSPACE}/flex/tests/hqps/engine_config_test.yaml -o=${PLUGIN_DIR} \ - --procedure_name=get_person_name \ + --procedure_name=plus_one \ --graph_schema_path=../interactive/examples/modern_graph/graph.yaml - + ./load_plan_and_gen.sh -e=hqps -i=../interactive/sdk/java/src/test/resources/sample_app.cc -w=/tmp/codegen \ --ir_conf=${GITHUB_WORKSPACE}/flex/tests/hqps/engine_config_test.yaml -o=${PLUGIN_DIR} \ --procedure_name=sample_app \ @@ -242,15 +251,20 @@ jobs: --ir_conf=${GITHUB_WORKSPACE}/flex/tests/hqps/engine_config_test.yaml -o=${PLUGIN_DIR} \ --procedure_name=count_vertex_num \ --graph_schema_path=../interactive/examples/modern_graph/graph.yaml + + # Among the above procedures, the correct input format for each is: + # count_vertex_num: () -> (num: int64), CypherProcedure. + # plus_one: (num: int64) -> (num: int64), CppEncoder + # sample_app: (num: int64) -> (num: int64), kCypherJson + sed -i 's/default_graph: movies/default_graph: modern_graph/g' ${GITHUB_WORKSPACE}/flex/tests/hqps/engine_config_test.yaml + sed -i 's/interactive_workspace/temp_workspace/g' ${GITHUB_WORKSPACE}/flex/tests/hqps/engine_config_test.yaml cd ${GITHUB_WORKSPACE}/flex/tests/interactive/ - bash test_plugin_loading.sh ./modern_graph_schema_v0_0.yaml \ - ../../interactive/examples/modern_graph/bulk_load.yaml \ - ${GITHUB_WORKSPACE}/flex/tests/hqps/engine_config_test.yaml - - bash test_plugin_loading.sh ./modern_graph_schema_v0_1.yaml \ - ../../interactive/examples/modern_graph/bulk_load.yaml \ - ${GITHUB_WORKSPACE}/flex/tests/hqps/engine_config_test.yaml + bash test_plugin_loading.sh ${TMP_INTERACTIVE_WORKSPACE} modern_graph \ + ${GITHUB_WORKSPACE}/flex/tests/hqps/engine_config_test.yaml \ + ./modern_graph_schema_v0_0.yaml ./modern_graph_schema_v0_1.yaml + sed -i 's/temp_workspace/interactive_workspace/g' ${GITHUB_WORKSPACE}/flex/tests/hqps/engine_config_test.yaml + sed -i 's/default_graph: modern_graph/default_graph: movies/g' ${GITHUB_WORKSPACE}/flex/tests/hqps/engine_config_test.yaml - name: Run End-to-End cypher adhoc ldbc query test env: diff --git a/flex/interactive/sdk/python/interactive_sdk/client/session.py b/flex/interactive/sdk/python/interactive_sdk/client/session.py index 892a15c27702..3c5c0fe154f1 100644 --- a/flex/interactive/sdk/python/interactive_sdk/client/session.py +++ b/flex/interactive/sdk/python/interactive_sdk/client/session.py @@ -536,7 +536,7 @@ def call_procedure_raw(self, graph_id: StrictStr, params: str) -> Result[str]: # Interactive currently support four type of inputformat, see flex/engines/graph_db/graph_db_session.h # Here we add byte of value 1 to denote the input format is in encoder/decoder format params = params + chr(0) - response = self._procedure_api.call_procedure_with_http_info( + response = self._query_api.proc_call_with_http_info( graph_id, params ) return Result.from_response(response) diff --git a/flex/openapi/openapi_interactive.yaml b/flex/openapi/openapi_interactive.yaml index 74737d38cded..c21fe72aca9c 100644 --- a/flex/openapi/openapi_interactive.yaml +++ b/flex/openapi/openapi_interactive.yaml @@ -941,7 +941,8 @@ components: primitive_type: type: string enum: [DT_SIGNED_INT32, DT_UNSIGNED_INT32, DT_SIGNED_INT64, DT_UNSIGNED_INT64, - DT_BOOL, DT_FLOAT, DT_DOUBLE] + DT_BOOL, DT_FLOAT, DT_DOUBLE, DT_STRING] + # The DT_STRING is added for backward compatibility, it should be replaced by StringType example: DT_SIGNED_INT32 LongText: x-body-name: long_text diff --git a/flex/storages/rt_mutable_graph/schema.cc b/flex/storages/rt_mutable_graph/schema.cc index 54ed062f1d3a..301e37ad31ab 100644 --- a/flex/storages/rt_mutable_graph/schema.cc +++ b/flex/storages/rt_mutable_graph/schema.cc @@ -997,21 +997,34 @@ static Status parse_stored_procedures_v00( } } schema.SetPluginDir(directory); - std::vector> plugin_name_or_path; + std::vector> plugin_name_or_paths; { std::vector plugin_names; if (!get_sequence(stored_procedure_node, "enable_lists", plugin_names)) { LOG(ERROR) << "stored_procedures is not set properly"; } - for (auto& plugin_name : plugin_names) { - plugin_name_or_path.emplace_back(plugin_name, ""); + size_t plugin_cnt = 0; + for (auto& plugin_name_or_path : plugin_names) { + // The plugins names specified in enable_lists can be either the name of + // the plugin or the path to the plugin. + auto real_path = directory + "/" + plugin_name_or_path; + if (std::filesystem::exists(real_path)) { + plugin_name_or_paths.emplace_back( + std::string("plugin_") + std::to_string(plugin_cnt++), real_path); + } else if (std::filesystem::exists(plugin_name_or_path)) { + plugin_name_or_paths.emplace_back( + std::string("plugin_") + std::to_string(plugin_cnt++), + plugin_name_or_path); + } else { + LOG(WARNING) << "plugin " << plugin_name_or_path << " not found"; + } } } // plugin_name_or_path contains the plugin name or path. // for path, we just use it as the plugin name, and emplace into the map, // for name, we try to find the plugin in the directory - if (!schema.EmplacePlugins(plugin_name_or_path)) { + if (!schema.EmplacePlugins(plugin_name_or_paths)) { LOG(ERROR) << "Fail to emplace all plugins"; return Status(StatusCode::InvalidSchema, "Fail to emplace all plugins"); } @@ -1028,11 +1041,14 @@ static Status parse_stored_procedures_v01( std::vector> plugin_name_and_path; for (auto& cur_node : stored_procedure_node) { if (cur_node["name"] && cur_node["library"]) { + VLOG(10) << "Parse stored procedure: " + << cur_node["name"].as() + << " with library: " << cur_node["library"].as(); plugin_name_and_path.push_back( std::make_pair(cur_node["name"].as(), cur_node["library"].as())); } else { - LOG(ERROR) << "Library or name set properly for stored procedure"; + LOG(WARNING) << "Library or name set properly for stored procedure"; return Status(StatusCode::InvalidSchema, "Library or name set properly for stored procedure"); } diff --git a/flex/tests/interactive/modern_graph_schema_v0_0.yaml b/flex/tests/interactive/modern_graph_schema_v0_0.yaml index 8fde589f8a53..252b29c999b8 100644 --- a/flex/tests/interactive/modern_graph_schema_v0_0.yaml +++ b/flex/tests/interactive/modern_graph_schema_v0_0.yaml @@ -1,10 +1,10 @@ name: modern_graph # then must have a modern dir under ${data} directory store_type: mutable_csr # v6d, groot, gart stored_procedures: - directory: /tmp/interactive_workspace/data/modern_graph/plugins/ + directory: /tmp/temp_workspace/data/modern_graph/plugins/ enable_lists: - - libget_person_name.so - libcount_vertex_num.so + - libplus_one.so - libsample_app.so schema: vertex_types: diff --git a/flex/tests/interactive/modern_graph_schema_v0_1.yaml b/flex/tests/interactive/modern_graph_schema_v0_1.yaml index 66c8972544f0..8017db55ed18 100644 --- a/flex/tests/interactive/modern_graph_schema_v0_1.yaml +++ b/flex/tests/interactive/modern_graph_schema_v0_1.yaml @@ -3,17 +3,20 @@ version: v0.1 store_type: mutable_csr # v6d, groot, gart description: A graph with 2 vertex types and 2 edge types stored_procedures: - - name: query1 - description: A stored procedure that does something - library: /tmp/interactive_workspace/data/modern_graph/plugins/libget_person_name.so - type: cpp - - name: query2 + - name: count_vertex_num description: A stored procedure that does something else - library: /tmp/interactive_workspace/data/modern_graph/plugins/libcount_vertex_num.so + library: /tmp/temp_workspace/data/modern_graph/plugins/libcount_vertex_num.so + type: cypher + returns: + - name: count + type: {primitive_type: DT_SIGNED_INT64} + - name: plus_one + description: A sample application + library: /tmp/temp_workspace/data/modern_graph/plugins/libplus_one.so type: cpp - name: sample_app description: A sample application - library: /tmp/interactive_workspace/data/modern_graph/plugins/libsample_app.so + library: /tmp/temp_workspace/data/modern_graph/plugins/libsample_app.so type: cpp schema: vertex_types: diff --git a/flex/tests/interactive/plus_one.cc b/flex/tests/interactive/plus_one.cc new file mode 100644 index 000000000000..0ad4a368dda8 --- /dev/null +++ b/flex/tests/interactive/plus_one.cc @@ -0,0 +1,29 @@ +#include "flex/engines/graph_db/app/app_base.h" +#include "flex/utils/app_utils.h" + +namespace gs { +class PlusOne : public ReadAppBase { + public: + PlusOne() {} + // Query function for query class + bool Query(const gs::GraphDBSession& sess, Decoder& input, + Encoder& output) override { + int32_t param1 = input.get_int(); + LOG(INFO) << "param1: " << param1; + output.put_int(param1 + 1); + return true; + } +}; +} // namespace gs + +extern "C" { +void* CreateApp(gs::PlusOne& db) { + gs::PlusOne* app = new gs::PlusOne(); + return static_cast(app); +} + +void DeleteApp(void* app) { + gs::PlusOne* casted = static_cast(app); + delete casted; +} +} \ No newline at end of file diff --git a/flex/tests/interactive/test_call_proc.py b/flex/tests/interactive/test_call_proc.py new file mode 100644 index 000000000000..96abe353c2c1 --- /dev/null +++ b/flex/tests/interactive/test_call_proc.py @@ -0,0 +1,131 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +import os +import sys + +sys.path.append("../../interactive/sdk/python") + +from interactive_sdk.client.driver import Driver +from interactive_sdk.openapi.models.base_edge_type_vertex_type_pair_relations_inner import ( + BaseEdgeTypeVertexTypePairRelationsInner, +) +from interactive_sdk.openapi.models.create_edge_type import CreateEdgeType +from interactive_sdk.openapi.models.create_graph_request import CreateGraphRequest +from interactive_sdk.openapi.models.create_graph_schema_request import ( + CreateGraphSchemaRequest, +) +from interactive_sdk.openapi.models.create_procedure_request import ( + CreateProcedureRequest, +) +from interactive_sdk.openapi.models.create_property_meta import CreatePropertyMeta +from interactive_sdk.openapi.models.create_vertex_type import CreateVertexType +from interactive_sdk.openapi.models.edge_mapping import EdgeMapping +from interactive_sdk.openapi.models.edge_mapping_type_triplet import ( + EdgeMappingTypeTriplet, +) +from interactive_sdk.openapi.models.gs_data_type import GSDataType +from interactive_sdk.openapi.models.typed_value import TypedValue +from interactive_sdk.openapi.models.job_status import JobStatus +from interactive_sdk.openapi.models.long_text import LongText +from interactive_sdk.openapi.models.primitive_type import PrimitiveType +from interactive_sdk.openapi.models.schema_mapping import SchemaMapping +from interactive_sdk.openapi.models.schema_mapping_loading_config import ( + SchemaMappingLoadingConfig, +) +from interactive_sdk.openapi.models.schema_mapping_loading_config_format import ( + SchemaMappingLoadingConfigFormat, +) +from interactive_sdk.openapi.models.start_service_request import StartServiceRequest +from interactive_sdk.openapi.models.string_type import StringType +from interactive_sdk.openapi.models.string_type_string import StringTypeString +from interactive_sdk.openapi.models.vertex_mapping import VertexMapping +from interactive_sdk.openapi.models.query_request import QueryRequest + +# Among the above procedures, the correct input format for each is: +# count_vertex_num: () -> (num: int64), CypherProcedure. +# plus_one: (num: int64) -> (num: int64), CppEncoder +# sample_app: (num: int64) -> (num: int64), kCypherJson + + +class ProcedureCaller(): + def __init__(self, endpoint): + self._driver = Driver(endpoint) + self._sess = self._driver.session() + + def call_cypher_queries(self): + with self._driver.getNeo4jSession() as session: + result = session.run("CALL count_vertex_num();") + print("call procedure result: ", result) + + def callProcedureWithJsonFormat(self, graph_id : str): + # get_person_name + sample_app_ref = QueryRequest( + query_name="sample_app", + arguments=[ + TypedValue( + type=GSDataType( + PrimitiveType(primitive_type="DT_SIGNED_INT32") + ), + value = 2 + ) + ] + ) + resp = self._sess.call_procedure(graph_id, sample_app_ref) + if not resp.is_ok(): + print("call sample_app failed: ", resp.get_status_message()) + exit(1) + self.call_cypher_queries() + + def callProcedureWithEncoder(self, graph_id : str): + # count_vertex_num, should be with id 1 + # construct a byte array with bytes: 0x01 + params = chr(1) + resp = self._sess.call_procedure_raw(graph_id, params) + if not resp.is_ok(): + print("call count_vertex_num failed: ", resp.get_status_message()) + exit(1) + + # plus_one, should be with id 3 + # construct a byte array with bytes: the 4 bytes of integer 1, and a byte 3 + byte_string = bytes([0,0,0,0,2]) # 4 bytes of integer 1, and a byte 3 + params = byte_string.decode('utf-8') + resp = self._sess.call_procedure_raw(graph_id, params) + if not resp.is_ok(): + print("call plus_one failed: ", resp.get_status_message()) + exit(1) + +if __name__ == "__main__": + #parse command line args + import argparse + parser = argparse.ArgumentParser() + parser.add_argument("--endpoint", type=str, default="http://localhost:7777") + parser.add_argument("--graph_id", type=str, default="1") + parser.add_argument("--input-format", type=str, default="json") + #finish + args = parser.parse_args() + print(args) + caller = ProcedureCaller(args.endpoint) + if args.input_format == "json": + caller.callProcedureWithJsonFormat(args.graph_id) + elif args.input_format == "encoder": + caller.callProcedureWithEncoder(args.graph_id) + else: + raise ValueError("Invalid input format: " + args.input_format) + diff --git a/flex/tests/interactive/test_plugin_loading.sh b/flex/tests/interactive/test_plugin_loading.sh index c4d0aa6f7bfd..3c376ca6379d 100644 --- a/flex/tests/interactive/test_plugin_loading.sh +++ b/flex/tests/interactive/test_plugin_loading.sh @@ -15,54 +15,109 @@ set -e SCRIPT_DIR=$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &>/dev/null && pwd) FLEX_HOME=${SCRIPT_DIR}/../../ -BULK_LOADER=${FLEX_HOME}/build/bin/bulk_loader SERVER_BIN=${FLEX_HOME}/build/bin/interactive_server -GIE_HOME=${FLEX_HOME}/../interactive_engine/ -if [ $# -ne 3 ]; then - echo "Receives: $# args, need 3 args" - echo "Usage: $0 " +kill_service(){ + echo "Kill Service first" + ps -ef | grep "interactive_server" | awk '{print $2}' | xargs kill -9 || true + ps -ef | grep "com.alibaba.graphscope.GraphServer" | awk '{print $2}' | xargs kill -9 || true + sleep 3 + # check if service is killed + echo "Kill Service success" +} + +# kill service when exit +trap kill_service EXIT + +start_engine_service(){ + #check SERVER_BIN exists + if [ ! -f ${SERVER_BIN} ]; then + err "SERVER_BIN not found" + exit 1 + fi + cmd="${SERVER_BIN} -w ${WORKSPACE} -c ${ENGINE_CONFIG_PATH} --enable-admin-service true" + cmd="${cmd} --start-compiler true" + + echo "Start engine service with command: ${cmd}" + ${cmd} & + sleep 10 + #check interactive_server is running, if not, exit + ps -ef | grep "interactive_server" | grep -v grep + + echo "Start engine service success" +} + +if [ $# -ne 5 ]; then + echo "Receives: $# args, need 5 args" + echo "Usage: $0 " exit 1 fi -SCHEMA_PATH=$1 -IMPORT_FILE=$2 +WORKSPACE=$1 +GRAPH_NAME=$2 ENGINE_CONFIG_PATH=$3 +SCHEMA_VERSION_00=$4 +SCHEMA_VERSION_01=$5 -echo "SCHEMA_PATH: ${SCHEMA_PATH}" -echo "IMPORT_FILE: ${IMPORT_FILE}" -echo "ENGINE_CONFIG_PATH: ${ENGINE_CONFIG_PATH}" +if [ ! -d ${WORKSPACE} ]; then + echo "WORKSPACE: ${WORKSPACE} not exists" + exit 1 +fi +if [ ! -d ${WORKSPACE}/data/${GRAPH_NAME} ]; then + echo "GRAPH: ${GRAPH_NAME} not exists" + exit 1 +fi + +if [ ! -f ${SCHEMA_VERSION_00} ]; then + echo "SCHEMA_VERSION_00: ${SCHEMA_VERSION_00} not exists" + exit 1 +fi -DATA_PATH=/tmp/test_plugin_loading -if [ -d ${DATA_PATH} ]; then - rm -rf ${DATA_PATH} +if [ ! -f ${SCHEMA_VERSION_01} ]; then + echo "SCHEMA_VERSION_01: ${SCHEMA_VERSION_01} not exists" + exit 1 fi -mkdir -p ${DATA_PATH} -# First load the data with the bulk loader -cmd="GLOG_v=10 ${BULK_LOADER} -g ${SCHEMA_PATH} -d ${DATA_PATH} -l ${IMPORT_FILE}" -echo "Loading data with bulk loader" -echo $cmd -eval $cmd || exit 1 +echo "WORKSPACE: ${WORKSPACE}" +echo "ENGINE_CONFIG_PATH: ${ENGINE_CONFIG_PATH}" # Try to start service with the generated plugins, for both v0.0 schema and v0.1 schema # and check if the service can be started successfully -cmd="GLOG_v=10 ${SERVER_BIN} -g ${SCHEMA_PATH} --data-path ${DATA_PATH} -c ${ENGINE_CONFIG_PATH}" -echo "Starting service with modern graph schema v0.0" -echo $cmd -eval "$cmd &" -sleep 10 +check_procedure_loading_and_calling_via_encoder() { + kill_service + if [ $# -ne 1 ]; then + echo "Receives: $# args, need 1 args" + echo "Usage: $0 " + exit 1 + fi + cp $1 ${WORKSPACE}/data/${GRAPH_NAME}/graph.yaml + start_engine_service + + python3 test_call_proc.py --endpoint http://localhost:7777 --input-format encoder + + kill_service +} -# check process is running, if not running, exit -if ! ps -p $! > /dev/null -then - echo "Test failed for modern graph schema v0.0" +check_procedure_loading_and_calling_via_cypher_json() { + kill_service + if [ $# -ne 1 ]; then + echo "Receives: $# args, need 1 args" + echo "Usage: $0 " exit 1 -fi + fi + cp $1 ${WORKSPACE}/data/${GRAPH_NAME}/graph.yaml + start_engine_service + + python3 test_call_proc.py --endpoint http://localhost:7777 --input-format json + + kill_service +} -# stop the service -kill -9 $! +echo "Testing for schema file: ${SCHEMA_VERSION_00}" +check_procedure_loading_and_calling_via_encoder ${SCHEMA_VERSION_00} +echo "Testing for schema file: ${SCHEMA_VERSION_01}" +check_procedure_loading_and_calling_via_cypher_json ${SCHEMA_VERSION_01} -echo "Test passed for modern graph schema v0.0" \ No newline at end of file +echo "Test passed for plugin loading and calling" \ No newline at end of file