From 7a55a9bc7e9193b4849e1f098b712776bad70cd2 Mon Sep 17 00:00:00 2001 From: Zhang Lei Date: Wed, 5 Jun 2024 12:45:16 +0800 Subject: [PATCH] refactor(interactive): Refine Interactive SDK `call_proc` API design (#3881) - Introduce customized header `X-Interactive-Request-Format` with value `[proto, encoder, json]` to distinguish between different format. - There will be no byte appending on SDK side. The format byte will be append in `hqps_ic_handler`. - Support calling procedure without specifying the graph id, by submitting queries to the running graph with url`/v1/graph/current/query` . - Remove `hqps_exit_handler` since not used. - Create handlers for each shard. --- .../graph_db/database/graph_db_session.h | 1 + .../http_server/handler/hqps_http_handler.cc | 193 ++++++++++-------- .../http_server/handler/hqps_http_handler.h | 23 +-- .../http_server/service/hqps_service.cc | 3 +- .../client/ProcedureInterface.java | 4 + .../client/impl/DefaultSession.java | 66 ++++-- .../interactive/client/DriverTest.java | 20 +- .../python/interactive_sdk/client/session.py | 55 ++++- .../sdk/python/test/test_driver.py | 22 +- flex/openapi/openapi_interactive.yaml | 37 ++++ flex/tests/hqps/admin_http_test.cc | 18 +- .../common/client/HttpExecutionClient.java | 5 +- 12 files changed, 320 insertions(+), 127 deletions(-) diff --git a/flex/engines/graph_db/database/graph_db_session.h b/flex/engines/graph_db/database/graph_db_session.h index 1338a7aa0cab..5d21f13ec28c 100644 --- a/flex/engines/graph_db/database/graph_db_session.h +++ b/flex/engines/graph_db/database/graph_db_session.h @@ -47,6 +47,7 @@ class GraphDBSession { static constexpr int32_t MAX_RETRY = 3; static constexpr int32_t MAX_PLUGIN_NUM = 256; // 2^(sizeof(uint8_t)*8) #ifdef BUILD_HQPS + static constexpr const char* kCppEncoder = "\x00"; static constexpr const char* kCypherJson = "\x01"; static constexpr const char* kCypherInternalAdhoc = "\x02"; static constexpr const char* kCypherInternalProcedure = "\x03"; diff --git a/flex/engines/http_server/handler/hqps_http_handler.cc b/flex/engines/http_server/handler/hqps_http_handler.cc index 4264275287f1..d6ed03df3ee2 100644 --- a/flex/engines/http_server/handler/hqps_http_handler.cc +++ b/flex/engines/http_server/handler/hqps_http_handler.cc @@ -27,6 +27,49 @@ #include "flex/engines/http_server/types.h" #include "flex/otel/otel.h" +#include + +namespace seastar { +namespace httpd { +// The seastar::httpd::param_matcher will fail to match if param is not +// specified. +class optional_param_matcher : public matcher { + public: + /** + * Constructor + * @param name the name of the parameter, will be used as the key + * in the parameters object + * @param entire_path when set to true, the matched parameters will + * include all the remaining url until the end of it. + * when set to false the match will terminate at the next slash + */ + explicit optional_param_matcher(const sstring& name) : _name(name) {} + + size_t match(const sstring& url, size_t ind, parameters& param) override { + size_t last = find_end_param(url, ind); + if (last == url.size()) { + // Means we didn't find the parameter, but we still return true, + // and set the value to empty string. + param.set(_name, ""); + return ind; + } + param.set(_name, url.substr(ind, last - ind)); + return last; + } + + private: + size_t find_end_param(const sstring& url, size_t ind) { + size_t pos = url.find('/', ind + 1); + if (pos == sstring::npos) { + return url.length(); + } + return pos; + } + sstring _name; +}; +} // namespace httpd +} // namespace seastar + namespace server { hqps_ic_handler::hqps_ic_handler(uint32_t init_group_id, uint32_t max_group_id, @@ -118,45 +161,45 @@ bool hqps_ic_handler::is_running_graph(const seastar::sstring& graph_id) const { return running_graph_res.value() == graph_id_str; } -// Handles both /v1/graph/{graph_id}/query and /v1/query/ +// Handles both /v1/graph/{graph_id}/query and /v1/graph/current/query/ seastar::future> hqps_ic_handler::handle( const seastar::sstring& path, std::unique_ptr req, std::unique_ptr rep) { auto dst_executor = executor_idx_; executor_idx_ = (executor_idx_ + 1) % shard_concurrency_; - if (req->content.size() <= 0) { - // At least one input format byte is needed + // TODO(zhanglei): choose read or write based on the request, after the + // read/write info is supported in physical plan + auto request_format = req->get_header(INTERACTIVE_REQUEST_FORMAT); + if (request_format.empty()) { + // If no format specfied, we use default format: proto + request_format = PROTOCOL_FORMAT; + } + if (request_format == JSON_FORMAT) { + req->content.append(gs::GraphDBSession::kCypherJson, 1); + } else if (request_format == PROTOCOL_FORMAT) { + req->content.append(gs::GraphDBSession::kCypherInternalProcedure, 1); + } else if (request_format == ENCODER_FORMAT) { + req->content.append(gs::GraphDBSession::kCppEncoder, 1); + } else { + LOG(ERROR) << "Unsupported request format: " << request_format; rep->set_status(seastar::httpd::reply::status_type::internal_server_error); - rep->write_body("bin", seastar::sstring("Empty request!")); + rep->write_body("bin", seastar::sstring("Unsupported request format!")); rep->done(); return seastar::make_ready_future>( std::move(rep)); } - uint8_t input_format = - req->content.back(); // see graph_db_session.h#parse_query_type - // TODO(zhanglei): choose read or write based on the request, after the - // read/write info is supported in physical plan - if (req->param.exists("graph_id")) { + if (path != "/v1/graph/current/query" && req->param.exists("graph_id")) { + // TODO(zhanglei): get from graph_db. if (!is_running_graph(req->param["graph_id"])) { rep->set_status( seastar::httpd::reply::status_type::internal_server_error); - rep->write_body("bin", seastar::sstring("Failed to get running graph!")); + rep->write_body("bin", + seastar::sstring("The querying query is not running:" + + req->param["graph_id"])); rep->done(); return seastar::make_ready_future>( std::move(rep)); } - } else { - req->content.append(gs::GraphDBSession::kCypherInternalProcedure, 1); - // This handler with accept two kinds of queries, /v1/graph/{graph_id}/query - // and /v1/query/ The former one will have a graph_id in the request, and - // the latter one will not. For the first one, the input format is the last - // byte of the request content, and is added at client side; For the second - // one, the request is send from compiler, and currently compiler will not - // add extra bytes to the request content. So we need to add the input - // format here. Finally, we should REMOVE this adhoc appended byte, i.e. the - // input format byte should be added at compiler side - // TODO(zhanglei): remove this adhoc appended byte, add the byte at compiler - // side. Or maybe we should refine the protocol. } #ifdef HAVE_OPENTELEMETRY_CPP auto tracer = otel::get_tracer("hqps_procedure_query_handler"); @@ -173,31 +216,30 @@ seastar::future> hqps_ic_handler::handle( return executor_refs_[dst_executor] .run_graph_db_query(query_param{std::move(req->content)}) - .then([input_format + .then([request_format #ifdef HAVE_OPENTELEMETRY_CPP , this, outer_span = outer_span #endif // HAVE_OPENTELEMETRY_CPP ](auto&& output) { - if (output.content.size() < 4) { - LOG(ERROR) << "Invalid output size: " << output.content.size(); -#ifdef HAVE_OPENTELEMETRY_CPP - outer_span->SetStatus(opentelemetry::trace::StatusCode::kError, - "Invalid output size"); - outer_span->End(); - std::map labels = {{"status", "fail"}}; - total_counter_->Add(1, labels); -#endif // HAVE_OPENTELEMETRY_CPP - return seastar::make_ready_future(std::move(output)); - } - if (input_format == static_cast( - gs::GraphDBSession::InputFormat::kCppEncoder)) { + if (request_format == ENCODER_FORMAT) { return seastar::make_ready_future( std::move(output.content)); } else { // For cypher input format, the results are written with // output.put_string(), which will add extra 4 bytes. So we need to // remove the first 4 bytes here. + if (output.content.size() < 4) { + LOG(ERROR) << "Invalid output size: " << output.content.size(); +#ifdef HAVE_OPENTELEMETRY_CPP + outer_span->SetStatus(opentelemetry::trace::StatusCode::kError, + "Invalid output size"); + outer_span->End(); + std::map labels = {{"status", "fail"}}; + total_counter_->Add(1, labels); +#endif // HAVE_OPENTELEMETRY_CPP + return seastar::make_ready_future(std::move(output)); + } return seastar::make_ready_future( std::move(output.content.substr(4))); } @@ -491,26 +533,10 @@ hqps_adhoc_query_handler::handle(const seastar::sstring& path, }); } -seastar::future> -hqps_exit_handler::handle(const seastar::sstring& path, - std::unique_ptr req, - std::unique_ptr rep) { - HQPSService::get().set_exit_state(); - rep->write_body("bin", seastar::sstring{"HQPS service is exiting ..."}); - return seastar::make_ready_future>( - std::move(rep)); -} - -hqps_http_handler::hqps_http_handler(uint16_t http_port) +hqps_http_handler::hqps_http_handler(uint16_t http_port, int32_t shard_num) : http_port_(http_port) { - ic_handler_ = new hqps_ic_handler(ic_query_group_id, max_group_id, - group_inc_step, shard_query_concurrency); - proc_handler_ = new hqps_ic_handler(proc_query_group_id, max_group_id, - group_inc_step, shard_query_concurrency); - adhoc_query_handler_ = new hqps_adhoc_query_handler( - ic_adhoc_group_id, codegen_group_id, max_group_id, group_inc_step, - shard_adhoc_concurrency); - exit_handler_ = new hqps_exit_handler(); + ic_handlers_.resize(shard_num); + adhoc_query_handlers_.resize(shard_num); } hqps_http_handler::~hqps_http_handler() { @@ -526,9 +552,7 @@ uint16_t hqps_http_handler::get_port() const { return http_port_; } bool hqps_http_handler::is_running() const { return running_.load(); } bool hqps_http_handler::is_actors_running() const { - return !ic_handler_->is_current_scope_cancelled() && - !adhoc_query_handler_->is_current_scope_cancelled() && - !proc_handler_->is_current_scope_cancelled(); + return actors_running_.load(); } void hqps_http_handler::start() { @@ -562,43 +586,48 @@ void hqps_http_handler::stop() { seastar::future<> hqps_http_handler::stop_query_actors() { // First cancel the scope. - return ic_handler_->cancel_current_scope() + return ic_handlers_[hiactor::local_shard_id()] + ->cancel_current_scope() .then([this] { - LOG(INFO) << "Cancel ic scope"; - return adhoc_query_handler_->cancel_current_scope(); + LOG(INFO) << "Cancelled ic scope"; + return adhoc_query_handlers_[hiactor::local_shard_id()] + ->cancel_current_scope(); }) .then([this] { - LOG(INFO) << "Cancel adhoc scope"; - return proc_handler_->cancel_current_scope(); - }) - .then([] { - LOG(INFO) << "Cancel proc scope"; + LOG(INFO) << "Cancelled proc scope"; + actors_running_.store(false); return seastar::make_ready_future<>(); }); } void hqps_http_handler::start_query_actors() { - ic_handler_->create_actors(); - adhoc_query_handler_->create_actors(); - proc_handler_->create_actors(); - LOG(INFO) << "Restart all actors"; + ic_handlers_[hiactor::local_shard_id()]->create_actors(); + adhoc_query_handlers_[hiactor::local_shard_id()]->create_actors(); + actors_running_.store(true); } seastar::future<> hqps_http_handler::set_routes() { return server_.set_routes([this](seastar::httpd::routes& r) { + auto ic_handler = + new hqps_ic_handler(ic_query_group_id, max_group_id, group_inc_step, + shard_query_concurrency); + auto adhoc_query_handler = new hqps_adhoc_query_handler( + ic_adhoc_group_id, codegen_group_id, max_group_id, group_inc_step, + shard_adhoc_concurrency); + + auto rule_proc = new seastar::httpd::match_rule(ic_handler); + rule_proc->add_str("/v1/graph") + .add_matcher(new seastar::httpd::optional_param_matcher("graph_id")) + .add_str("/query"); + + r.add(rule_proc, seastar::httpd::operation_type::POST); + r.add(seastar::httpd::operation_type::POST, - seastar::httpd::url("/v1/query"), ic_handler_); - { - auto rule_proc = new seastar::httpd::match_rule(proc_handler_); - rule_proc->add_str("/v1/graph").add_param("graph_id").add_str("/query"); - // Get All procedures - r.add(rule_proc, seastar::httpd::operation_type::POST); - } - r.add(seastar::httpd::operation_type::POST, - seastar::httpd::url("/interactive/adhoc_query"), - adhoc_query_handler_); - r.add(seastar::httpd::operation_type::POST, - seastar::httpd::url("/interactive/exit"), exit_handler_); + seastar::httpd::url("/interactive/adhoc_query"), adhoc_query_handler); + + ic_handlers_[hiactor::local_shard_id()] = ic_handler; + adhoc_query_handlers_[hiactor::local_shard_id()] = adhoc_query_handler; + return seastar::make_ready_future<>(); }); } diff --git a/flex/engines/http_server/handler/hqps_http_handler.h b/flex/engines/http_server/handler/hqps_http_handler.h index 1ca5195e2e20..a89e97dfe6e5 100644 --- a/flex/engines/http_server/handler/hqps_http_handler.h +++ b/flex/engines/http_server/handler/hqps_http_handler.h @@ -31,6 +31,12 @@ namespace server { class hqps_ic_handler : public seastar::httpd::handler_base { public: + // extra headers + static constexpr const char* INTERACTIVE_REQUEST_FORMAT = + "X-Interactive-Request-Format"; + static constexpr const char* PROTOCOL_FORMAT = "proto"; + static constexpr const char* JSON_FORMAT = "json"; + static constexpr const char* ENCODER_FORMAT = "encoder"; hqps_ic_handler(uint32_t init_group_id, uint32_t max_group_id, uint32_t group_inc_step, uint32_t shard_concurrency); ~hqps_ic_handler() override; @@ -99,17 +105,9 @@ class hqps_adhoc_query_handler : public seastar::httpd::handler_base { #endif }; -class hqps_exit_handler : public seastar::httpd::handler_base { - public: - seastar::future> handle( - const seastar::sstring& path, - std::unique_ptr req, - std::unique_ptr rep) override; -}; - class hqps_http_handler { public: - hqps_http_handler(uint16_t http_port); + hqps_http_handler(uint16_t http_port, int32_t shard_num); ~hqps_http_handler(); void start(); @@ -131,11 +129,10 @@ class hqps_http_handler { private: const uint16_t http_port_; seastar::httpd::http_server_control server_; - std::atomic running_{false}; + std::atomic running_{false}, actors_running_{false}; - hqps_ic_handler *ic_handler_, *proc_handler_; - hqps_adhoc_query_handler* adhoc_query_handler_; - hqps_exit_handler* exit_handler_; + std::vector ic_handlers_; + std::vector adhoc_query_handlers_; }; } // namespace server diff --git a/flex/engines/http_server/service/hqps_service.cc b/flex/engines/http_server/service/hqps_service.cc index 5502094e39de..b7dc7e85d28a 100644 --- a/flex/engines/http_server/service/hqps_service.cc +++ b/flex/engines/http_server/service/hqps_service.cc @@ -64,7 +64,8 @@ void HQPSService::init(const ServiceConfig& config) { actor_sys_ = std::make_unique( config.shard_num, config.dpdk_mode, config.enable_thread_resource_pool, config.external_thread_num, [this]() { set_exit_state(); }); - query_hdl_ = std::make_unique(config.query_port); + query_hdl_ = + std::make_unique(config.query_port, config.shard_num); if (config.start_admin_service) { admin_hdl_ = std::make_unique(config.admin_port); } diff --git a/flex/interactive/sdk/java/src/main/java/com/alibaba/graphscope/interactive/client/ProcedureInterface.java b/flex/interactive/sdk/java/src/main/java/com/alibaba/graphscope/interactive/client/ProcedureInterface.java index 4d44a703f30e..14a304336076 100644 --- a/flex/interactive/sdk/java/src/main/java/com/alibaba/graphscope/interactive/client/ProcedureInterface.java +++ b/flex/interactive/sdk/java/src/main/java/com/alibaba/graphscope/interactive/client/ProcedureInterface.java @@ -40,5 +40,9 @@ Result updateProcedure( Result callProcedure(String graphId, QueryRequest request); + Result callProcedure(QueryRequest request); + Result callProcedureRaw(String graphId, byte[] request); + + Result callProcedureRaw(byte[] request); } diff --git a/flex/interactive/sdk/java/src/main/java/com/alibaba/graphscope/interactive/client/impl/DefaultSession.java b/flex/interactive/sdk/java/src/main/java/com/alibaba/graphscope/interactive/client/impl/DefaultSession.java index 9bd20cddde4a..acc36eeb1658 100644 --- a/flex/interactive/sdk/java/src/main/java/com/alibaba/graphscope/interactive/client/impl/DefaultSession.java +++ b/flex/interactive/sdk/java/src/main/java/com/alibaba/graphscope/interactive/client/impl/DefaultSession.java @@ -43,6 +43,9 @@ public class DefaultSession implements Session { private static final int DEFAULT_READ_TIMEOUT = 30000; private static final int DEFAULT_WRITE_TIMEOUT = 30000; + private static String JSON_FORMAT_STRING = "json"; + private static String PROTO_FORMAT_STRING = "proto"; + private static String ENCODER_FORMAT_STRING = "encoder"; private final ApiClient client, queryClient; @@ -316,13 +319,6 @@ public Result updateProcedure( } } - private byte[] encodeString(String jsonStr, int lastByte) { - byte[] bytes = new byte[jsonStr.length() + 1]; - System.arraycopy(jsonStr.getBytes(), 0, bytes, 0, jsonStr.length()); - bytes[jsonStr.length()] = (byte) lastByte; - return bytes; - } - @Override public Result callProcedure( String graphName, QueryRequest request) { @@ -330,8 +326,34 @@ public Result callProcedure( // Interactive currently support four type of inputformat, see // flex/engines/graph_db/graph_db_session.h // Here we add byte of value 1 to denote the input format is in JSON format. - byte[] encodedStr = encodeString(request.toJson(), 1); - ApiResponse response = queryApi.procCallWithHttpInfo(graphName, encodedStr); + ApiResponse response = + queryApi.procCallWithHttpInfo( + graphName, JSON_FORMAT_STRING, request.toJson().getBytes()); + if (response.getStatusCode() != 200) { + return Result.fromException( + new ApiException(response.getStatusCode(), "Failed to call procedure")); + } + IrResult.CollectiveResults results = + IrResult.CollectiveResults.parseFrom(response.getData()); + return new Result<>(results); + } catch (ApiException e) { + e.printStackTrace(); + return Result.fromException(e); + } catch (InvalidProtocolBufferException e) { + e.printStackTrace(); + return Result.error(e.getMessage()); + } + } + + @Override + public Result callProcedure(QueryRequest request) { + try { + // Interactive currently support four type of inputformat, see + // flex/engines/graph_db/graph_db_session.h + // Here we add byte of value 1 to denote the input format is in JSON format. + ApiResponse response = + queryApi.procCallCurrentWithHttpInfo( + JSON_FORMAT_STRING, request.toJson().getBytes()); if (response.getStatusCode() != 200) { return Result.fromException( new ApiException(response.getStatusCode(), "Failed to call procedure")); @@ -355,10 +377,28 @@ public Result callProcedureRaw(String graphName, byte[] request) { // flex/engines/graph_db/graph_db_session.h // Here we add byte of value 0 to denote the input format is in raw encoder/decoder // format. - byte[] encodedStr = new byte[request.length + 1]; - System.arraycopy(request, 0, encodedStr, 0, request.length); - encodedStr[request.length] = 0; - ApiResponse response = queryApi.procCallWithHttpInfo(graphName, encodedStr); + ApiResponse response = + queryApi.procCallWithHttpInfo(graphName, ENCODER_FORMAT_STRING, request); + if (response.getStatusCode() != 200) { + return Result.fromException( + new ApiException(response.getStatusCode(), "Failed to call procedure")); + } + return new Result(response.getData()); + } catch (ApiException e) { + e.printStackTrace(); + return Result.fromException(e); + } + } + + @Override + public Result callProcedureRaw(byte[] request) { + try { + // Interactive currently support four type of inputformat, see + // flex/engines/graph_db/graph_db_session.h + // Here we add byte of value 0 to denote the input format is in raw encoder/decoder + // format. + ApiResponse response = + queryApi.procCallCurrentWithHttpInfo(ENCODER_FORMAT_STRING, request); if (response.getStatusCode() != 200) { return Result.fromException( new ApiException(response.getStatusCode(), "Failed to call procedure")); diff --git a/flex/interactive/sdk/java/src/test/java/com/alibaba/graphscope/interactive/client/DriverTest.java b/flex/interactive/sdk/java/src/test/java/com/alibaba/graphscope/interactive/client/DriverTest.java index b72901bd766a..2c75b6ceb070 100644 --- a/flex/interactive/sdk/java/src/test/java/com/alibaba/graphscope/interactive/client/DriverTest.java +++ b/flex/interactive/sdk/java/src/test/java/com/alibaba/graphscope/interactive/client/DriverTest.java @@ -501,6 +501,24 @@ public void test9CallCppProcedure1() { @Test @Order(12) + public void test9CallCppProcedure1Current() { + QueryRequest request = new QueryRequest(); + request.setQueryName(cppProcedureId1); + request.addArgumentsItem( + new TypedValue() + .value(1) + .type( + new GSDataType( + new PrimitiveType() + .primitiveType( + PrimitiveType.PrimitiveTypeEnum + .SIGNED_INT32)))); + Result resp = session.callProcedure(request); + assertOk(resp); + } + + @Test + @Order(13) public void test9CallCppProcedure2() { byte[] bytes = new byte[4 + 1]; Encoder encoder = new Encoder(bytes); @@ -511,7 +529,7 @@ public void test9CallCppProcedure2() { } @Test - @Order(13) + @Order(14) public void test10CallCypherProcedureViaNeo4j() { String query = "CALL " + cypherProcedureId + "() YIELD *;"; org.neo4j.driver.Result result = neo4jSession.run(query); diff --git a/flex/interactive/sdk/python/interactive_sdk/client/session.py b/flex/interactive/sdk/python/interactive_sdk/client/session.py index 51cbd9c9217b..670d36c84196 100644 --- a/flex/interactive/sdk/python/interactive_sdk/client/session.py +++ b/flex/interactive/sdk/python/interactive_sdk/client/session.py @@ -235,11 +235,20 @@ def call_procedure( self, graph_id: StrictStr, params: QueryRequest ) -> Result[CollectiveResults]: raise NotImplementedError - + + @abstractmethod + def call_procedure_current( + self, params: QueryRequest + ) -> Result[CollectiveResults]: + raise NotImplementedError + @abstractmethod def call_procedure_raw(self, graph_id: StrictStr, params: str) -> Result[str]: raise NotImplementedError + @abstractmethod + def call_procedure_current_raw(self, params: str) -> Result[str]: + raise NotImplementedError class QueryServiceInterface: @abstractmethod @@ -291,6 +300,9 @@ class Session( class DefaultSession(Session): + PROTOCOL_FORMAT = "proto" + JSON_FORMAT = "json" + ENCODER_FORMAT = "encoder" def __init__(self, uri: str): self._client = ApiClient(Configuration(host=uri)) @@ -518,9 +530,29 @@ def call_procedure( try: # Interactive currently support four type of inputformat, see flex/engines/graph_db/graph_db_session.h # Here we add byte of value 1 to denote the input format is in json format - params_str = params.to_json() + chr(1) response = self._query_api.proc_call_with_http_info( - graph_id, params_str + graph_id = graph_id, + x_interactive_request_format = self.JSON_FORMAT, + body=params.to_json() + ) + result = CollectiveResults() + if response.status_code == 200: + result.ParseFromString(response.data) + return Result.ok(result) + else: + return Result(Status.from_response(response), result) + except Exception as e: + return Result.from_exception(e) + + def call_procedure_current( + self, params: QueryRequest + ) -> Result[CollectiveResults]: + try: + # Interactive currently support four type of inputformat, see flex/engines/graph_db/graph_db_session.h + # Here we add byte of value 1 to denote the input format is in json format + response = self._query_api.proc_call_current_with_http_info( + x_interactive_request_format = self.JSON_FORMAT, + body = params.to_json() ) result = CollectiveResults() if response.status_code == 200: @@ -535,9 +567,22 @@ def call_procedure_raw(self, graph_id: StrictStr, params: str) -> Result[str]: try: # Interactive currently support four type of inputformat, see flex/engines/graph_db/graph_db_session.h # Here we add byte of value 1 to denote the input format is in encoder/decoder format - params = params + chr(0) response = self._query_api.proc_call_with_http_info( - graph_id, params + graph_id = graph_id, + x_interactive_request_format = self.ENCODER_FORMAT, + body = params + ) + return Result.from_response(response) + except Exception as e: + return Result.from_exception(e) + + def call_procedure_current_raw(self, params: str) -> Result[str]: + try: + # Interactive currently support four type of inputformat, see flex/engines/graph_db/graph_db_session.h + # Here we add byte of value 1 to denote the input format is in encoder/decoder format + response = self._query_api.proc_call_current_with_http_info( + x_interactive_request_format = self.ENCODER_FORMAT, + body = params ) return Result.from_response(response) except Exception as e: diff --git a/flex/interactive/sdk/python/test/test_driver.py b/flex/interactive/sdk/python/test/test_driver.py index 498ea13feca8..d0831d398c77 100644 --- a/flex/interactive/sdk/python/test/test_driver.py +++ b/flex/interactive/sdk/python/test/test_driver.py @@ -94,7 +94,7 @@ def test_example(self): self.createGraph() self.bulkLoading() self.waitJobFinish() - self.test_list_graph() + self.list_graph() self.runCypherQuery() self.runGremlinQuery() self.createCypherProcedure() @@ -102,6 +102,7 @@ def test_example(self): self.restart() self.callProcedure() self.callProcedureWithHttp() + self.callProcedureWithHttpCurrent() def createGraph(self): create_graph = CreateGraphRequest(name="test_graph", description="test graph") @@ -197,7 +198,7 @@ def waitJobFinish(self): time.sleep(1) print("job finished") - def test_list_graph(self): + def list_graph(self): resp = self._sess.list_graphs() assert resp.is_ok() print("list graph: ", resp.get_value()) @@ -276,10 +277,25 @@ def callProcedureWithHttp(self): ) ] ) - resp = self._sess.call_procedure(self._graph_id, req) + resp = self._sess.call_procedure(graph_id = self._graph_id, params = req) assert resp.is_ok() print("call procedure result: ", resp.get_value()) + def callProcedureWithHttpCurrent(self): + req = QueryRequest( + query_name=self._cpp_proc_name, + arguments=[ + TypedValue( + type=GSDataType( + PrimitiveType(primitive_type="DT_SIGNED_INT32") + ), + value = 1 + ) + ] + ) + resp = self._sess.call_procedure_current(params = req) + assert resp.is_ok() + print("call procedure result: ", resp.get_value()) if __name__ == "__main__": unittest.main() diff --git a/flex/openapi/openapi_interactive.yaml b/flex/openapi/openapi_interactive.yaml index 0f8d4275f5b4..d5177d0ff419 100644 --- a/flex/openapi/openapi_interactive.yaml +++ b/flex/openapi/openapi_interactive.yaml @@ -904,6 +904,43 @@ paths: required: true schema: type: string + - name: X-Interactive-Request-Format + in: header + required: true + schema: + type: string + enum: [proto, encoder, json] + requestBody: + content: + text/plain: + schema: + type: string + format: byte + responses: + '200': + description: Successfully runned. Empty if failed? + content: + text/plain: + schema: + type: string + format: byte + '500': + description: Server internal error + /v1/graph/current/query: + post: + tags: + - QueryService + summary: run queries on the running graph + operationId: proc_call_current + description: | + Submit a query to the running graph. + parameters: + - name: X-Interactive-Request-Format + in: header + required: true + schema: + type: string + enum: [proto, encoder, json] requestBody: content: text/plain: diff --git a/flex/tests/hqps/admin_http_test.cc b/flex/tests/hqps/admin_http_test.cc index d15792dba540..e9cb408ee690 100644 --- a/flex/tests/hqps/admin_http_test.cc +++ b/flex/tests/hqps/admin_http_test.cc @@ -132,8 +132,8 @@ void run_builtin_graph_test( for (auto& proc_id : plugin_ids) { procedure::Query query; query.mutable_query_name()->set_name(proc_id); - auto res = query_client.Post("/v1/query", query.SerializeAsString(), - "text/plain"); + auto res = query_client.Post("/v1/graph/current/query", + query.SerializeAsString(), "text/plain"); CHECK(res->status != 200); LOG(INFO) << "call procedure response: " << res->body; // find failed in res->body @@ -160,8 +160,8 @@ void run_builtin_graph_test( for (auto& plugin_id : plugin_ids) { procedure::Query query; query.mutable_query_name()->set_name(plugin_id); - auto res = query_client.Post("/v1/query", query.SerializeAsString(), - "text/plain"); + auto res = query_client.Post("/v1/graph/current/query", + query.SerializeAsString(), "text/plain"); CHECK(res->status == 200) << "call procedure should success: " << res->body << ", for query: " << query.DebugString(); @@ -298,8 +298,8 @@ void run_procedure_test(httplib::Client& client, httplib::Client& query_client, auto query_str = pair.second; procedure::Query query; query.mutable_query_name()->set_name(query_name); - auto res = query_client.Post("/v1/query", query.SerializeAsString(), - "text/plain"); + auto res = query_client.Post("/v1/graph/current/query", + query.SerializeAsString(), "text/plain"); CHECK(res->status != 200) << "call previous procedure on current graph should fail: " << res->body; @@ -309,7 +309,8 @@ void run_procedure_test(httplib::Client& client, httplib::Client& query_client, //----4. call procedures----------------------------------------------- for (auto& proc_id : plugin_ids) { auto call_proc_payload = generate_call_procedure_payload(graph_id, proc_id); - res = query_client.Post("/v1/query", call_proc_payload, "text/plain"); + res = query_client.Post("/v1/graph/current/query", call_proc_payload, + "text/plain"); CHECK(res->status == 200) << "call procedure failed: " << res->body << ", for query: " << call_proc_payload; } @@ -325,7 +326,8 @@ void run_procedure_test(httplib::Client& client, httplib::Client& query_client, if (procedures.size() > 0) { auto call_proc_payload = generate_call_procedure_payload(graph_id, plugin_ids[0]); - res = query_client.Post("/v1/query", call_proc_payload, "text/plain"); + res = query_client.Post("/v1/graph/current/query", call_proc_payload, + "text/plain"); CHECK(res->status == 200) << "call procedure failed: " << res->body << ", for query: " << call_proc_payload; } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/HttpExecutionClient.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/HttpExecutionClient.java index 8fd8020e5ac8..2b10ebbc6506 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/HttpExecutionClient.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/HttpExecutionClient.java @@ -55,8 +55,10 @@ public class HttpExecutionClient extends ExecutionClient { private static final Logger logger = LoggerFactory.getLogger(HttpExecutionClient.class); private static final String CONTENT_TYPE = "Content-Type"; + private static final String INTERACTIVE_REQUEST_FORMAT = "X-Interactive-Request-Format"; private static final String TEXT_PLAIN = "text/plain;charset=UTF-8"; - private static final String INTERACTIVE_QUERY_PATH = "/v1/query"; + private static final String PROTOCOL_FORMAT = "proto"; + private static final String INTERACTIVE_QUERY_PATH = "/v1/graph/current/query"; private static final String INTERACTIVE_ADHOC_QUERY_PATH = "/interactive/adhoc_query"; private final HttpClient httpClient; @@ -96,6 +98,7 @@ public void submit( HttpRequest.newBuilder() .uri(uri) .headers(CONTENT_TYPE, TEXT_PLAIN) + .headers(INTERACTIVE_REQUEST_FORMAT, PROTOCOL_FORMAT) .POST( HttpRequest.BodyPublishers.ofByteArray( (byte[])