Skip to content

Commit

Permalink
feat(interactive): Enable starting compiler as a subprocess of intera…
Browse files Browse the repository at this point in the history
…ctive server (alibaba#3650)

As our `AdminService` is able to switching services between graphs, we
should be also able to switch graph on compiler service. For now, we
make `GraphServer` a subprocess of `InteractiveServer`.
  • Loading branch information
zhanglei1949 authored Mar 26, 2024
1 parent 577a231 commit 27b78f2
Show file tree
Hide file tree
Showing 8 changed files with 235 additions and 17 deletions.
16 changes: 15 additions & 1 deletion docs/flex/interactive/development/admin_service.md
Original file line number Diff line number Diff line change
Expand Up @@ -1041,4 +1041,18 @@ curl -X GET -H "Content-Type: application/json" "http://[host]/v1/node/status"

#### Status Codes
- `200 OK`: Request successful.
- `500 Internal Error`: Server internal Error.
- `500 Internal Error`: Server internal Error.

## Enable AdminService in development

To start admin service in development, use the command line argument `--enable-admin-service true`. `${ENGINE_CONFIG}` specifies the configuration for interactive query engine, see [engine-configuration](https://graphscope.io/docs/flex/interactive/configuration). `${WORKSPACE}` points to the directory where interactive related data is maintaned.

```bash
./bin/interactive_server -c ${ENGINE_CONFIG} -w ${WORKSPACE} --enable-admin-service true
```

### Start Compiler Service
The Compiler service could be started as a subprocess of the AdminService. This ensures that when switching graphs in the AdminService, the Compiler service also switches to the corresponding graph's schema. This is the default behavior in the current Interactive.

```bash
./bin/interactive_server -c ${ENGINE_CONFIG} -w ${WORKSPACE} --enable-admin-service true --start-compiler true
5 changes: 4 additions & 1 deletion flex/bin/interactive_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,9 @@ int main(int argc, char** argv) {
bpo::value<unsigned>()->default_value(2),
"worker thread number")(
"enable-trace", bpo::value<bool>()->default_value(false),
"whether to enable opentelemetry tracing");
"whether to enable opentelemetry tracing")(
"start-compiler", bpo::value<bool>()->default_value(false),
"whether or not to start compiler");

setenv("TZ", "Asia/Shanghai", 1);
tzset();
Expand Down Expand Up @@ -179,6 +181,7 @@ int main(int argc, char** argv) {
server::ServiceConfig service_config = node.as<server::ServiceConfig>();
service_config.engine_config_path = engine_config_file;
service_config.start_admin_service = vm["enable-admin-service"].as<bool>();
service_config.start_compiler = vm["start-compiler"].as<bool>();

auto& db = gs::GraphDB::get();

Expand Down
56 changes: 45 additions & 11 deletions flex/engines/http_server/handler/hqps_http_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ hqps_ic_handler::hqps_ic_handler(uint32_t init_group_id, uint32_t max_group_id,
max_group_id_(max_group_id),
group_inc_step_(group_inc_step),
shard_concurrency_(shard_concurrency),
executor_idx_(0) {
executor_idx_(0),
is_cancelled_(false) {
executor_refs_.reserve(shard_concurrency_);
hiactor::scope_builder builder;
builder.set_shard(hiactor::local_shard_id())
Expand All @@ -87,10 +88,15 @@ seastar::future<> hqps_ic_handler::cancel_current_scope() {
LOG(INFO) << "Cancel IC scope successfully!";
// clear the actor refs
executor_refs_.clear();
is_cancelled_ = true;
return seastar::make_ready_future<>();
});
}

bool hqps_ic_handler::is_current_scope_cancelled() const {
return is_cancelled_;
}

bool hqps_ic_handler::create_actors() {
if (executor_refs_.size() > 0) {
LOG(ERROR) << "The actors have been already created!";
Expand All @@ -114,6 +120,7 @@ bool hqps_ic_handler::create_actors() {
for (unsigned i = 0; i < shard_concurrency_; ++i) {
executor_refs_.emplace_back(builder.build_ref<executor_ref>(i));
}
is_cancelled_ = false; // locked outside
return true;
}

Expand Down Expand Up @@ -166,7 +173,8 @@ hqps_adhoc_query_handler::hqps_adhoc_query_handler(
max_group_id_(max_group_id),
group_inc_step_(group_inc_step),
shard_concurrency_(shard_concurrency),
executor_idx_(0) {
executor_idx_(0),
is_cancelled_(false) {
executor_refs_.reserve(shard_concurrency_);
{
hiactor::scope_builder builder;
Expand Down Expand Up @@ -213,10 +221,15 @@ seastar::future<> hqps_adhoc_query_handler::cancel_current_scope() {
executor_refs_.clear();
codegen_actor_refs_.clear();
LOG(INFO) << "Clear actor refs successfully!";
is_cancelled_ = true;
return seastar::make_ready_future<>();
});
}

bool hqps_adhoc_query_handler::is_current_scope_cancelled() const {
return is_cancelled_;
}

bool hqps_adhoc_query_handler::create_actors() {
if (executor_refs_.size() > 0 || codegen_actor_refs_.size() > 0) {
LOG(ERROR) << "The actors have been already created!";
Expand Down Expand Up @@ -255,6 +268,7 @@ bool hqps_adhoc_query_handler::create_actors() {
hiactor::scope<hiactor::actor_group>(cur_codegen_group_id_));
codegen_actor_refs_.emplace_back(builder.build_ref<codegen_actor_ref>(0));
}
is_cancelled_ = false;
return true;
}

Expand Down Expand Up @@ -378,6 +392,11 @@ uint16_t hqps_http_handler::get_port() const { return http_port_; }

bool hqps_http_handler::is_running() const { return running_.load(); }

bool hqps_http_handler::is_actors_running() const {
return !ic_handler_->is_current_scope_cancelled() &&
!adhoc_query_handler_->is_current_scope_cancelled();
}

void hqps_http_handler::start() {
auto fut = seastar::alien::submit_to(
*seastar::alien::internal::default_instance, 0, [this] {
Expand Down Expand Up @@ -409,15 +428,30 @@ void hqps_http_handler::stop() {

seastar::future<> hqps_http_handler::stop_query_actors() {
// First cancel the scope.
return ic_handler_->cancel_current_scope()
.then([this] {
LOG(INFO) << "Cancel ic scope";
return adhoc_query_handler_->cancel_current_scope();
})
.then([] {
LOG(INFO) << "Cancel adhoc scope";
return seastar::make_ready_future<>();
});
if (ic_handler_->is_current_scope_cancelled()) {
LOG(INFO) << "IC scope has been cancelled!";
if (adhoc_query_handler_->is_current_scope_cancelled()) {
LOG(INFO) << "Adhoc scope has been cancelled!";
return seastar::make_ready_future<>();
} else {
return adhoc_query_handler_->cancel_current_scope();
}
} else {
return ic_handler_->cancel_current_scope()
.then([this] {
LOG(INFO) << "Cancel ic scope";
if (adhoc_query_handler_->is_current_scope_cancelled()) {
LOG(INFO) << "Adhoc scope has been cancelled!";
return seastar::make_ready_future<>();
} else {
return adhoc_query_handler_->cancel_current_scope();
}
})
.then([] {
LOG(INFO) << "Cancel adhoc scope";
return seastar::make_ready_future<>();
});
}
}

void hqps_http_handler::start_query_actors() {
Expand Down
8 changes: 8 additions & 0 deletions flex/engines/http_server/handler/hqps_http_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ class hqps_ic_handler : public seastar::httpd::handler_base {

seastar::future<> cancel_current_scope();

bool is_current_scope_cancelled() const;

seastar::future<std::unique_ptr<seastar::httpd::reply>> handle(
const seastar::sstring& path,
std::unique_ptr<seastar::httpd::request> req,
Expand All @@ -45,6 +47,7 @@ class hqps_ic_handler : public seastar::httpd::handler_base {
const uint32_t shard_concurrency_;
uint32_t executor_idx_;
std::vector<executor_ref> executor_refs_;
bool is_cancelled_;
};

class hqps_adhoc_query_handler : public seastar::httpd::handler_base {
Expand All @@ -58,6 +61,8 @@ class hqps_adhoc_query_handler : public seastar::httpd::handler_base {

seastar::future<> cancel_current_scope();

bool is_current_scope_cancelled() const;

bool create_actors();

seastar::future<std::unique_ptr<seastar::httpd::reply>> handle(
Expand All @@ -72,6 +77,7 @@ class hqps_adhoc_query_handler : public seastar::httpd::handler_base {
uint32_t executor_idx_;
std::vector<executor_ref> executor_refs_;
std::vector<codegen_actor_ref> codegen_actor_refs_;
bool is_cancelled_;
};

class hqps_exit_handler : public seastar::httpd::handler_base {
Expand All @@ -94,6 +100,8 @@ class hqps_http_handler {

bool is_running() const;

bool is_actors_running() const;

seastar::future<> stop_query_actors();

void start_query_actors();
Expand Down
119 changes: 118 additions & 1 deletion flex/engines/http_server/service/hqps_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,13 @@ ServiceConfig::ServiceConfig()
dpdk_mode(false),
enable_thread_resource_pool(true),
external_thread_num(2),
start_admin_service(true) {}
start_admin_service(true),
start_compiler(false) {}

const std::string HQPSService::DEFAULT_GRAPH_NAME = "modern_graph";
const std::string HQPSService::DEFAULT_INTERACTIVE_HOME = "/opt/flex/";
const std::string HQPSService::COMPILER_SERVER_CLASS_NAME =
"com.alibaba.graphscope.GraphServer";

HQPSService& HQPSService::get() {
static HQPSService instance;
Expand All @@ -48,12 +52,16 @@ void HQPSService::init(const ServiceConfig& config) {
initialized_.store(true);
service_config_ = config;
gs::init_cpu_usage_watch();
if (config.start_compiler) {
start_compiler_subprocess();
}
}

HQPSService::~HQPSService() {
if (actor_sys_) {
actor_sys_->terminate();
}
stop_compiler_subprocess();
}

const ServiceConfig& HQPSService::get_service_config() const {
Expand Down Expand Up @@ -111,7 +119,15 @@ void HQPSService::run_and_wait_for_exit() {

void HQPSService::set_exit_state() { running_.store(false); }

bool HQPSService::is_actors_running() const {
if (query_hdl_) {
return query_hdl_->is_actors_running();
} else
return false;
}

seastar::future<> HQPSService::stop_query_actors() {
std::unique_lock<std::mutex> lock(mtx_);
if (query_hdl_) {
return query_hdl_->stop_query_actors();
} else {
Expand All @@ -122,11 +138,112 @@ seastar::future<> HQPSService::stop_query_actors() {
}

void HQPSService::start_query_actors() {
std::unique_lock<std::mutex> lock(mtx_);
if (query_hdl_) {
query_hdl_->start_query_actors();
} else {
std::cerr << "Query handler has not been inited!" << std::endl;
return;
}
}

bool HQPSService::start_compiler_subprocess(
const std::string& graph_schema_path) {
LOG(INFO) << "Start compiler subprocess";
stop_compiler_subprocess();
auto java_bin_path = boost::process::search_path("java");
if (java_bin_path.empty()) {
std::cerr << "Java binary not found in PATH!" << std::endl;
return false;
}
// try to find compiler jar from env.
auto interactive_class_path = find_interactive_class_path();
if (interactive_class_path.empty()) {
std::cerr << "Interactive home not found!" << std::endl;
return false;
}
std::stringstream ss;
ss << "java -cp " << interactive_class_path;
if (!graph_schema_path.empty()) {
ss << " -Dgraph.schema=" << graph_schema_path;
}
ss << " " << COMPILER_SERVER_CLASS_NAME;
ss << " " << service_config_.engine_config_path;
auto cmd_str = ss.str();
LOG(INFO) << "Start compiler with command: " << cmd_str;
auto compiler_log = WorkDirManipulator::GetCompilerLogFile();

compiler_process_ =
boost::process::child(cmd_str, boost::process::std_out > compiler_log,
boost::process::std_err > compiler_log);
LOG(INFO) << "Compiler process started with pid: " << compiler_process_.id();
// sleep for a while to wait for the compiler to start
std::this_thread::sleep_for(std::chrono::seconds(4));
// check if the compiler process is still running
if (!compiler_process_.running()) {
LOG(ERROR) << "Compiler process failed to start!";
return false;
}
return true;
}

bool HQPSService::stop_compiler_subprocess() {
if (compiler_process_.running()) {
LOG(INFO) << "Terminate previous compiler process with pid: "
<< compiler_process_.id();
compiler_process_.terminate();
}
return true;
}

std::string HQPSService::find_interactive_class_path() {
std::string interactive_home = DEFAULT_INTERACTIVE_HOME;
if (std::getenv("INTERACTIVE_HOME")) {
// try to use DEFAULT_INTERACTIVE_HOME
interactive_home = std::getenv("INTERACTIVE_HOME");
}

// check compiler*.jar in DEFAULT_INTERACTIVE_HOME/lib/
LOG(INFO) << "try to find compiler*.jar in " << interactive_home << "/lib/";
auto lib_path = interactive_home + "/lib/";
if (boost::filesystem::exists(lib_path)) {
for (auto& p : boost::filesystem::directory_iterator(lib_path)) {
if (p.path().filename().string().find("compiler") != std::string::npos &&
p.path().extension() == ".jar") {
return lib_path + "* -Djna.library.path=" + lib_path;
}
}
}
// if not, try the relative path from current binary's path
auto current_binary_path = boost::filesystem::canonical("/proc/self/exe");
auto current_binary_dir = current_binary_path.parent_path();
auto ir_core_lib_path =
current_binary_dir /
"../../../interactive_engine/executor/ir/target/release/";
if (!boost::filesystem::exists(ir_core_lib_path)) {
LOG(ERROR) << "ir_core_lib_path not found";
return "";
}
// compiler*.jar in
// current_binary_dir/../../interactive_engine/compiler/target/
auto compiler_path =
current_binary_dir / "../../../interactive_engine/compiler/target/";
LOG(INFO) << "try to find compiler*.jar in " << compiler_path;
if (boost::filesystem::exists(compiler_path)) {
for (auto& p : boost::filesystem::directory_iterator(compiler_path)) {
if (p.path().filename().string().find("compiler") != std::string::npos &&
p.path().extension() == ".jar") {
auto libs_path = compiler_path / "libs";
// combine the path with the libs folder
if (boost::filesystem::exists(libs_path)) {
return p.path().string() + ":" + libs_path.string() +
"/* -Djna.library.path=" + ir_core_lib_path.string();
}
}
}
}
LOG(ERROR) << "Compiler jar not found";
return "";
}

} // namespace server
Loading

0 comments on commit 27b78f2

Please sign in to comment.