Skip to content

Commit

Permalink
refactor(interactive): Refine Interactive SDK call_proc API design (#…
Browse files Browse the repository at this point in the history
…3881)

- Introduce customized header `X-Interactive-Request-Format` with value
`[proto, encoder, json]` to distinguish between different format.
- There will be no byte appending on SDK side. The format byte will be
append in `hqps_ic_handler`.
- Support calling procedure without specifying the graph id, by
submitting queries to the running graph with
url`/v1/graph/current/query` .
- Remove `hqps_exit_handler` since not used.
- Create handlers for each shard.
  • Loading branch information
zhanglei1949 committed Jun 5, 2024
1 parent a1b83bb commit 7a55a9b
Show file tree
Hide file tree
Showing 12 changed files with 320 additions and 127 deletions.
1 change: 1 addition & 0 deletions flex/engines/graph_db/database/graph_db_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class GraphDBSession {
static constexpr int32_t MAX_RETRY = 3;
static constexpr int32_t MAX_PLUGIN_NUM = 256; // 2^(sizeof(uint8_t)*8)
#ifdef BUILD_HQPS
static constexpr const char* kCppEncoder = "\x00";
static constexpr const char* kCypherJson = "\x01";
static constexpr const char* kCypherInternalAdhoc = "\x02";
static constexpr const char* kCypherInternalProcedure = "\x03";
Expand Down
193 changes: 111 additions & 82 deletions flex/engines/http_server/handler/hqps_http_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,49 @@
#include "flex/engines/http_server/types.h"
#include "flex/otel/otel.h"

#include <seastar/core/when_all.hh>

namespace seastar {
namespace httpd {
// The seastar::httpd::param_matcher will fail to match if param is not
// specified.
class optional_param_matcher : public matcher {
public:
/**
* Constructor
* @param name the name of the parameter, will be used as the key
* in the parameters object
* @param entire_path when set to true, the matched parameters will
* include all the remaining url until the end of it.
* when set to false the match will terminate at the next slash
*/
explicit optional_param_matcher(const sstring& name) : _name(name) {}

size_t match(const sstring& url, size_t ind, parameters& param) override {
size_t last = find_end_param(url, ind);
if (last == url.size()) {
// Means we didn't find the parameter, but we still return true,
// and set the value to empty string.
param.set(_name, "");
return ind;
}
param.set(_name, url.substr(ind, last - ind));
return last;
}

private:
size_t find_end_param(const sstring& url, size_t ind) {
size_t pos = url.find('/', ind + 1);
if (pos == sstring::npos) {
return url.length();
}
return pos;
}
sstring _name;
};
} // namespace httpd
} // namespace seastar

namespace server {

hqps_ic_handler::hqps_ic_handler(uint32_t init_group_id, uint32_t max_group_id,
Expand Down Expand Up @@ -118,45 +161,45 @@ bool hqps_ic_handler::is_running_graph(const seastar::sstring& graph_id) const {
return running_graph_res.value() == graph_id_str;
}

// Handles both /v1/graph/{graph_id}/query and /v1/query/
// Handles both /v1/graph/{graph_id}/query and /v1/graph/current/query/
seastar::future<std::unique_ptr<seastar::httpd::reply>> hqps_ic_handler::handle(
const seastar::sstring& path, std::unique_ptr<seastar::httpd::request> req,
std::unique_ptr<seastar::httpd::reply> rep) {
auto dst_executor = executor_idx_;
executor_idx_ = (executor_idx_ + 1) % shard_concurrency_;
if (req->content.size() <= 0) {
// At least one input format byte is needed
// TODO(zhanglei): choose read or write based on the request, after the
// read/write info is supported in physical plan
auto request_format = req->get_header(INTERACTIVE_REQUEST_FORMAT);
if (request_format.empty()) {
// If no format specfied, we use default format: proto
request_format = PROTOCOL_FORMAT;
}
if (request_format == JSON_FORMAT) {
req->content.append(gs::GraphDBSession::kCypherJson, 1);
} else if (request_format == PROTOCOL_FORMAT) {
req->content.append(gs::GraphDBSession::kCypherInternalProcedure, 1);
} else if (request_format == ENCODER_FORMAT) {
req->content.append(gs::GraphDBSession::kCppEncoder, 1);
} else {
LOG(ERROR) << "Unsupported request format: " << request_format;
rep->set_status(seastar::httpd::reply::status_type::internal_server_error);
rep->write_body("bin", seastar::sstring("Empty request!"));
rep->write_body("bin", seastar::sstring("Unsupported request format!"));
rep->done();
return seastar::make_ready_future<std::unique_ptr<seastar::httpd::reply>>(
std::move(rep));
}
uint8_t input_format =
req->content.back(); // see graph_db_session.h#parse_query_type
// TODO(zhanglei): choose read or write based on the request, after the
// read/write info is supported in physical plan
if (req->param.exists("graph_id")) {
if (path != "/v1/graph/current/query" && req->param.exists("graph_id")) {
// TODO(zhanglei): get from graph_db.
if (!is_running_graph(req->param["graph_id"])) {
rep->set_status(
seastar::httpd::reply::status_type::internal_server_error);
rep->write_body("bin", seastar::sstring("Failed to get running graph!"));
rep->write_body("bin",
seastar::sstring("The querying query is not running:" +
req->param["graph_id"]));
rep->done();
return seastar::make_ready_future<std::unique_ptr<seastar::httpd::reply>>(
std::move(rep));
}
} else {
req->content.append(gs::GraphDBSession::kCypherInternalProcedure, 1);
// This handler with accept two kinds of queries, /v1/graph/{graph_id}/query
// and /v1/query/ The former one will have a graph_id in the request, and
// the latter one will not. For the first one, the input format is the last
// byte of the request content, and is added at client side; For the second
// one, the request is send from compiler, and currently compiler will not
// add extra bytes to the request content. So we need to add the input
// format here. Finally, we should REMOVE this adhoc appended byte, i.e. the
// input format byte should be added at compiler side
// TODO(zhanglei): remove this adhoc appended byte, add the byte at compiler
// side. Or maybe we should refine the protocol.
}
#ifdef HAVE_OPENTELEMETRY_CPP
auto tracer = otel::get_tracer("hqps_procedure_query_handler");
Expand All @@ -173,31 +216,30 @@ seastar::future<std::unique_ptr<seastar::httpd::reply>> hqps_ic_handler::handle(

return executor_refs_[dst_executor]
.run_graph_db_query(query_param{std::move(req->content)})
.then([input_format
.then([request_format
#ifdef HAVE_OPENTELEMETRY_CPP
,
this, outer_span = outer_span
#endif // HAVE_OPENTELEMETRY_CPP
](auto&& output) {
if (output.content.size() < 4) {
LOG(ERROR) << "Invalid output size: " << output.content.size();
#ifdef HAVE_OPENTELEMETRY_CPP
outer_span->SetStatus(opentelemetry::trace::StatusCode::kError,
"Invalid output size");
outer_span->End();
std::map<std::string, std::string> labels = {{"status", "fail"}};
total_counter_->Add(1, labels);
#endif // HAVE_OPENTELEMETRY_CPP
return seastar::make_ready_future<query_param>(std::move(output));
}
if (input_format == static_cast<uint8_t>(
gs::GraphDBSession::InputFormat::kCppEncoder)) {
if (request_format == ENCODER_FORMAT) {
return seastar::make_ready_future<query_param>(
std::move(output.content));
} else {
// For cypher input format, the results are written with
// output.put_string(), which will add extra 4 bytes. So we need to
// remove the first 4 bytes here.
if (output.content.size() < 4) {
LOG(ERROR) << "Invalid output size: " << output.content.size();
#ifdef HAVE_OPENTELEMETRY_CPP
outer_span->SetStatus(opentelemetry::trace::StatusCode::kError,
"Invalid output size");
outer_span->End();
std::map<std::string, std::string> labels = {{"status", "fail"}};
total_counter_->Add(1, labels);
#endif // HAVE_OPENTELEMETRY_CPP
return seastar::make_ready_future<query_param>(std::move(output));
}
return seastar::make_ready_future<query_param>(
std::move(output.content.substr(4)));
}
Expand Down Expand Up @@ -491,26 +533,10 @@ hqps_adhoc_query_handler::handle(const seastar::sstring& path,
});
}

seastar::future<std::unique_ptr<seastar::httpd::reply>>
hqps_exit_handler::handle(const seastar::sstring& path,
std::unique_ptr<seastar::httpd::request> req,
std::unique_ptr<seastar::httpd::reply> rep) {
HQPSService::get().set_exit_state();
rep->write_body("bin", seastar::sstring{"HQPS service is exiting ..."});
return seastar::make_ready_future<std::unique_ptr<seastar::httpd::reply>>(
std::move(rep));
}

hqps_http_handler::hqps_http_handler(uint16_t http_port)
hqps_http_handler::hqps_http_handler(uint16_t http_port, int32_t shard_num)
: http_port_(http_port) {
ic_handler_ = new hqps_ic_handler(ic_query_group_id, max_group_id,
group_inc_step, shard_query_concurrency);
proc_handler_ = new hqps_ic_handler(proc_query_group_id, max_group_id,
group_inc_step, shard_query_concurrency);
adhoc_query_handler_ = new hqps_adhoc_query_handler(
ic_adhoc_group_id, codegen_group_id, max_group_id, group_inc_step,
shard_adhoc_concurrency);
exit_handler_ = new hqps_exit_handler();
ic_handlers_.resize(shard_num);
adhoc_query_handlers_.resize(shard_num);
}

hqps_http_handler::~hqps_http_handler() {
Expand All @@ -526,9 +552,7 @@ uint16_t hqps_http_handler::get_port() const { return http_port_; }
bool hqps_http_handler::is_running() const { return running_.load(); }

bool hqps_http_handler::is_actors_running() const {
return !ic_handler_->is_current_scope_cancelled() &&
!adhoc_query_handler_->is_current_scope_cancelled() &&
!proc_handler_->is_current_scope_cancelled();
return actors_running_.load();
}

void hqps_http_handler::start() {
Expand Down Expand Up @@ -562,43 +586,48 @@ void hqps_http_handler::stop() {

seastar::future<> hqps_http_handler::stop_query_actors() {
// First cancel the scope.
return ic_handler_->cancel_current_scope()
return ic_handlers_[hiactor::local_shard_id()]
->cancel_current_scope()
.then([this] {
LOG(INFO) << "Cancel ic scope";
return adhoc_query_handler_->cancel_current_scope();
LOG(INFO) << "Cancelled ic scope";
return adhoc_query_handlers_[hiactor::local_shard_id()]
->cancel_current_scope();
})
.then([this] {
LOG(INFO) << "Cancel adhoc scope";
return proc_handler_->cancel_current_scope();
})
.then([] {
LOG(INFO) << "Cancel proc scope";
LOG(INFO) << "Cancelled proc scope";
actors_running_.store(false);
return seastar::make_ready_future<>();
});
}

void hqps_http_handler::start_query_actors() {
ic_handler_->create_actors();
adhoc_query_handler_->create_actors();
proc_handler_->create_actors();
LOG(INFO) << "Restart all actors";
ic_handlers_[hiactor::local_shard_id()]->create_actors();
adhoc_query_handlers_[hiactor::local_shard_id()]->create_actors();
actors_running_.store(true);
}

seastar::future<> hqps_http_handler::set_routes() {
return server_.set_routes([this](seastar::httpd::routes& r) {
auto ic_handler =
new hqps_ic_handler(ic_query_group_id, max_group_id, group_inc_step,
shard_query_concurrency);
auto adhoc_query_handler = new hqps_adhoc_query_handler(
ic_adhoc_group_id, codegen_group_id, max_group_id, group_inc_step,
shard_adhoc_concurrency);

auto rule_proc = new seastar::httpd::match_rule(ic_handler);
rule_proc->add_str("/v1/graph")
.add_matcher(new seastar::httpd::optional_param_matcher("graph_id"))
.add_str("/query");

r.add(rule_proc, seastar::httpd::operation_type::POST);

r.add(seastar::httpd::operation_type::POST,
seastar::httpd::url("/v1/query"), ic_handler_);
{
auto rule_proc = new seastar::httpd::match_rule(proc_handler_);
rule_proc->add_str("/v1/graph").add_param("graph_id").add_str("/query");
// Get All procedures
r.add(rule_proc, seastar::httpd::operation_type::POST);
}
r.add(seastar::httpd::operation_type::POST,
seastar::httpd::url("/interactive/adhoc_query"),
adhoc_query_handler_);
r.add(seastar::httpd::operation_type::POST,
seastar::httpd::url("/interactive/exit"), exit_handler_);
seastar::httpd::url("/interactive/adhoc_query"), adhoc_query_handler);

ic_handlers_[hiactor::local_shard_id()] = ic_handler;
adhoc_query_handlers_[hiactor::local_shard_id()] = adhoc_query_handler;

return seastar::make_ready_future<>();
});
}
Expand Down
23 changes: 10 additions & 13 deletions flex/engines/http_server/handler/hqps_http_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ namespace server {

class hqps_ic_handler : public seastar::httpd::handler_base {
public:
// extra headers
static constexpr const char* INTERACTIVE_REQUEST_FORMAT =
"X-Interactive-Request-Format";
static constexpr const char* PROTOCOL_FORMAT = "proto";
static constexpr const char* JSON_FORMAT = "json";
static constexpr const char* ENCODER_FORMAT = "encoder";
hqps_ic_handler(uint32_t init_group_id, uint32_t max_group_id,
uint32_t group_inc_step, uint32_t shard_concurrency);
~hqps_ic_handler() override;
Expand Down Expand Up @@ -99,17 +105,9 @@ class hqps_adhoc_query_handler : public seastar::httpd::handler_base {
#endif
};

class hqps_exit_handler : public seastar::httpd::handler_base {
public:
seastar::future<std::unique_ptr<seastar::httpd::reply>> handle(
const seastar::sstring& path,
std::unique_ptr<seastar::httpd::request> req,
std::unique_ptr<seastar::httpd::reply> rep) override;
};

class hqps_http_handler {
public:
hqps_http_handler(uint16_t http_port);
hqps_http_handler(uint16_t http_port, int32_t shard_num);
~hqps_http_handler();

void start();
Expand All @@ -131,11 +129,10 @@ class hqps_http_handler {
private:
const uint16_t http_port_;
seastar::httpd::http_server_control server_;
std::atomic<bool> running_{false};
std::atomic<bool> running_{false}, actors_running_{false};

hqps_ic_handler *ic_handler_, *proc_handler_;
hqps_adhoc_query_handler* adhoc_query_handler_;
hqps_exit_handler* exit_handler_;
std::vector<hqps_ic_handler*> ic_handlers_;
std::vector<hqps_adhoc_query_handler*> adhoc_query_handlers_;
};

} // namespace server
Expand Down
3 changes: 2 additions & 1 deletion flex/engines/http_server/service/hqps_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ void HQPSService::init(const ServiceConfig& config) {
actor_sys_ = std::make_unique<actor_system>(
config.shard_num, config.dpdk_mode, config.enable_thread_resource_pool,
config.external_thread_num, [this]() { set_exit_state(); });
query_hdl_ = std::make_unique<hqps_http_handler>(config.query_port);
query_hdl_ =
std::make_unique<hqps_http_handler>(config.query_port, config.shard_num);
if (config.start_admin_service) {
admin_hdl_ = std::make_unique<admin_http_handler>(config.admin_port);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,9 @@ Result<String> updateProcedure(

Result<IrResult.CollectiveResults> callProcedure(String graphId, QueryRequest request);

Result<IrResult.CollectiveResults> callProcedure(QueryRequest request);

Result<byte[]> callProcedureRaw(String graphId, byte[] request);

Result<byte[]> callProcedureRaw(byte[] request);
}
Loading

0 comments on commit 7a55a9b

Please sign in to comment.