Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(interactive): Enable starting compiler as a subprocess of interactive server #3650

Merged
merged 3 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion docs/flex/interactive/development/admin_service.md
Original file line number Diff line number Diff line change
Expand Up @@ -1041,4 +1041,18 @@ curl -X GET -H "Content-Type: application/json" "http://[host]/v1/node/status"

#### Status Codes
- `200 OK`: Request successful.
- `500 Internal Error`: Server internal Error.
- `500 Internal Error`: Server internal Error.

## Enable AdminService in development

To start admin service in development, use the command line argument `--enable-admin-service true`. `${ENGINE_CONFIG}` specifies the configuration for interactive query engine, see [engine-configuration](https://graphscope.io/docs/flex/interactive/configuration). `${WORKSPACE}` points to the directory where interactive related data is maintaned.

```bash
./bin/interactive_server -c ${ENGINE_CONFIG} -w ${WORKSPACE} --enable-admin-service true
```

### Start Compiler Service
The Compiler service could be started as a subprocess of the AdminService. This ensures that when switching graphs in the AdminService, the Compiler service also switches to the corresponding graph's schema. This is the default behavior in the current Interactive.

```bash
./bin/interactive_server -c ${ENGINE_CONFIG} -w ${WORKSPACE} --enable-admin-service true --start-compiler true
5 changes: 4 additions & 1 deletion flex/bin/interactive_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,9 @@ int main(int argc, char** argv) {
bpo::value<unsigned>()->default_value(2),
"worker thread number")(
"enable-trace", bpo::value<bool>()->default_value(false),
"whether to enable opentelemetry tracing");
"whether to enable opentelemetry tracing")(
"start-compiler", bpo::value<bool>()->default_value(false),
"whether or not to start compiler");

setenv("TZ", "Asia/Shanghai", 1);
tzset();
Expand Down Expand Up @@ -179,6 +181,7 @@ int main(int argc, char** argv) {
server::ServiceConfig service_config = node.as<server::ServiceConfig>();
service_config.engine_config_path = engine_config_file;
service_config.start_admin_service = vm["enable-admin-service"].as<bool>();
service_config.start_compiler = vm["start-compiler"].as<bool>();

auto& db = gs::GraphDB::get();

Expand Down
56 changes: 45 additions & 11 deletions flex/engines/http_server/handler/hqps_http_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ hqps_ic_handler::hqps_ic_handler(uint32_t init_group_id, uint32_t max_group_id,
max_group_id_(max_group_id),
group_inc_step_(group_inc_step),
shard_concurrency_(shard_concurrency),
executor_idx_(0) {
executor_idx_(0),
is_cancelled_(false) {
executor_refs_.reserve(shard_concurrency_);
hiactor::scope_builder builder;
builder.set_shard(hiactor::local_shard_id())
Expand All @@ -87,10 +88,15 @@ seastar::future<> hqps_ic_handler::cancel_current_scope() {
LOG(INFO) << "Cancel IC scope successfully!";
// clear the actor refs
executor_refs_.clear();
is_cancelled_ = true;
return seastar::make_ready_future<>();
});
}

bool hqps_ic_handler::is_current_scope_cancelled() const {
return is_cancelled_;
}

bool hqps_ic_handler::create_actors() {
if (executor_refs_.size() > 0) {
LOG(ERROR) << "The actors have been already created!";
Expand All @@ -114,6 +120,7 @@ bool hqps_ic_handler::create_actors() {
for (unsigned i = 0; i < shard_concurrency_; ++i) {
executor_refs_.emplace_back(builder.build_ref<executor_ref>(i));
}
is_cancelled_ = false; // locked outside
return true;
}

Expand Down Expand Up @@ -166,7 +173,8 @@ hqps_adhoc_query_handler::hqps_adhoc_query_handler(
max_group_id_(max_group_id),
group_inc_step_(group_inc_step),
shard_concurrency_(shard_concurrency),
executor_idx_(0) {
executor_idx_(0),
is_cancelled_(false) {
executor_refs_.reserve(shard_concurrency_);
{
hiactor::scope_builder builder;
Expand Down Expand Up @@ -213,10 +221,15 @@ seastar::future<> hqps_adhoc_query_handler::cancel_current_scope() {
executor_refs_.clear();
codegen_actor_refs_.clear();
LOG(INFO) << "Clear actor refs successfully!";
is_cancelled_ = true;
return seastar::make_ready_future<>();
});
}

bool hqps_adhoc_query_handler::is_current_scope_cancelled() const {
return is_cancelled_;
}

bool hqps_adhoc_query_handler::create_actors() {
if (executor_refs_.size() > 0 || codegen_actor_refs_.size() > 0) {
LOG(ERROR) << "The actors have been already created!";
Expand Down Expand Up @@ -255,6 +268,7 @@ bool hqps_adhoc_query_handler::create_actors() {
hiactor::scope<hiactor::actor_group>(cur_codegen_group_id_));
codegen_actor_refs_.emplace_back(builder.build_ref<codegen_actor_ref>(0));
}
is_cancelled_ = false;
return true;
}

Expand Down Expand Up @@ -378,6 +392,11 @@ uint16_t hqps_http_handler::get_port() const { return http_port_; }

bool hqps_http_handler::is_running() const { return running_.load(); }

bool hqps_http_handler::is_actors_running() const {
return !ic_handler_->is_current_scope_cancelled() &&
!adhoc_query_handler_->is_current_scope_cancelled();
}

void hqps_http_handler::start() {
auto fut = seastar::alien::submit_to(
*seastar::alien::internal::default_instance, 0, [this] {
Expand Down Expand Up @@ -409,15 +428,30 @@ void hqps_http_handler::stop() {

seastar::future<> hqps_http_handler::stop_query_actors() {
// First cancel the scope.
return ic_handler_->cancel_current_scope()
.then([this] {
LOG(INFO) << "Cancel ic scope";
return adhoc_query_handler_->cancel_current_scope();
})
.then([] {
LOG(INFO) << "Cancel adhoc scope";
return seastar::make_ready_future<>();
});
if (ic_handler_->is_current_scope_cancelled()) {
LOG(INFO) << "IC scope has been cancelled!";
if (adhoc_query_handler_->is_current_scope_cancelled()) {
LOG(INFO) << "Adhoc scope has been cancelled!";
return seastar::make_ready_future<>();
} else {
return adhoc_query_handler_->cancel_current_scope();
}
} else {
return ic_handler_->cancel_current_scope()
.then([this] {
LOG(INFO) << "Cancel ic scope";
if (adhoc_query_handler_->is_current_scope_cancelled()) {
LOG(INFO) << "Adhoc scope has been cancelled!";
return seastar::make_ready_future<>();
} else {
return adhoc_query_handler_->cancel_current_scope();
}
})
.then([] {
LOG(INFO) << "Cancel adhoc scope";
return seastar::make_ready_future<>();
});
}
}

void hqps_http_handler::start_query_actors() {
Expand Down
8 changes: 8 additions & 0 deletions flex/engines/http_server/handler/hqps_http_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ class hqps_ic_handler : public seastar::httpd::handler_base {

seastar::future<> cancel_current_scope();

bool is_current_scope_cancelled() const;

seastar::future<std::unique_ptr<seastar::httpd::reply>> handle(
const seastar::sstring& path,
std::unique_ptr<seastar::httpd::request> req,
Expand All @@ -45,6 +47,7 @@ class hqps_ic_handler : public seastar::httpd::handler_base {
const uint32_t shard_concurrency_;
uint32_t executor_idx_;
std::vector<executor_ref> executor_refs_;
bool is_cancelled_;
};

class hqps_adhoc_query_handler : public seastar::httpd::handler_base {
Expand All @@ -58,6 +61,8 @@ class hqps_adhoc_query_handler : public seastar::httpd::handler_base {

seastar::future<> cancel_current_scope();

bool is_current_scope_cancelled() const;

bool create_actors();

seastar::future<std::unique_ptr<seastar::httpd::reply>> handle(
Expand All @@ -72,6 +77,7 @@ class hqps_adhoc_query_handler : public seastar::httpd::handler_base {
uint32_t executor_idx_;
std::vector<executor_ref> executor_refs_;
std::vector<codegen_actor_ref> codegen_actor_refs_;
bool is_cancelled_;
};

class hqps_exit_handler : public seastar::httpd::handler_base {
Expand All @@ -94,6 +100,8 @@ class hqps_http_handler {

bool is_running() const;

bool is_actors_running() const;

seastar::future<> stop_query_actors();

void start_query_actors();
Expand Down
119 changes: 118 additions & 1 deletion flex/engines/http_server/service/hqps_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,13 @@ ServiceConfig::ServiceConfig()
dpdk_mode(false),
enable_thread_resource_pool(true),
external_thread_num(2),
start_admin_service(true) {}
start_admin_service(true),
start_compiler(false) {}

const std::string HQPSService::DEFAULT_GRAPH_NAME = "modern_graph";
const std::string HQPSService::DEFAULT_INTERACTIVE_HOME = "/opt/flex/";
const std::string HQPSService::COMPILER_SERVER_CLASS_NAME =
"com.alibaba.graphscope.GraphServer";

HQPSService& HQPSService::get() {
static HQPSService instance;
Expand All @@ -48,12 +52,16 @@ void HQPSService::init(const ServiceConfig& config) {
initialized_.store(true);
service_config_ = config;
gs::init_cpu_usage_watch();
if (config.start_compiler) {
start_compiler_subprocess();
}
}

HQPSService::~HQPSService() {
if (actor_sys_) {
actor_sys_->terminate();
}
stop_compiler_subprocess();
}

const ServiceConfig& HQPSService::get_service_config() const {
Expand Down Expand Up @@ -111,7 +119,15 @@ void HQPSService::run_and_wait_for_exit() {

void HQPSService::set_exit_state() { running_.store(false); }

bool HQPSService::is_actors_running() const {
if (query_hdl_) {
return query_hdl_->is_actors_running();
} else
return false;
}

seastar::future<> HQPSService::stop_query_actors() {
std::unique_lock<std::mutex> lock(mtx_);
if (query_hdl_) {
return query_hdl_->stop_query_actors();
} else {
Expand All @@ -122,11 +138,112 @@ seastar::future<> HQPSService::stop_query_actors() {
}

void HQPSService::start_query_actors() {
std::unique_lock<std::mutex> lock(mtx_);
if (query_hdl_) {
query_hdl_->start_query_actors();
} else {
std::cerr << "Query handler has not been inited!" << std::endl;
return;
}
}

bool HQPSService::start_compiler_subprocess(
const std::string& graph_schema_path) {
LOG(INFO) << "Start compiler subprocess";
stop_compiler_subprocess();
auto java_bin_path = boost::process::search_path("java");
if (java_bin_path.empty()) {
std::cerr << "Java binary not found in PATH!" << std::endl;
return false;
}
// try to find compiler jar from env.
auto interactive_class_path = find_interactive_class_path();
if (interactive_class_path.empty()) {
std::cerr << "Interactive home not found!" << std::endl;
return false;
}
std::stringstream ss;
ss << "java -cp " << interactive_class_path;
if (!graph_schema_path.empty()) {
ss << " -Dgraph.schema=" << graph_schema_path;
}
ss << " " << COMPILER_SERVER_CLASS_NAME;
ss << " " << service_config_.engine_config_path;
auto cmd_str = ss.str();
LOG(INFO) << "Start compiler with command: " << cmd_str;
auto compiler_log = WorkDirManipulator::GetCompilerLogFile();

compiler_process_ =
boost::process::child(cmd_str, boost::process::std_out > compiler_log,
boost::process::std_err > compiler_log);
LOG(INFO) << "Compiler process started with pid: " << compiler_process_.id();
// sleep for a while to wait for the compiler to start
std::this_thread::sleep_for(std::chrono::seconds(4));
// check if the compiler process is still running
if (!compiler_process_.running()) {
LOG(ERROR) << "Compiler process failed to start!";
return false;
}
return true;
}

bool HQPSService::stop_compiler_subprocess() {
if (compiler_process_.running()) {
LOG(INFO) << "Terminate previous compiler process with pid: "
<< compiler_process_.id();
compiler_process_.terminate();
}
return true;
}

std::string HQPSService::find_interactive_class_path() {
std::string interactive_home = DEFAULT_INTERACTIVE_HOME;
if (std::getenv("INTERACTIVE_HOME")) {
// try to use DEFAULT_INTERACTIVE_HOME
interactive_home = std::getenv("INTERACTIVE_HOME");
}

// check compiler*.jar in DEFAULT_INTERACTIVE_HOME/lib/
LOG(INFO) << "try to find compiler*.jar in " << interactive_home << "/lib/";
auto lib_path = interactive_home + "/lib/";
if (boost::filesystem::exists(lib_path)) {
for (auto& p : boost::filesystem::directory_iterator(lib_path)) {
if (p.path().filename().string().find("compiler") != std::string::npos &&
p.path().extension() == ".jar") {
return lib_path + "* -Djna.library.path=" + lib_path;
}
}
}
// if not, try the relative path from current binary's path
auto current_binary_path = boost::filesystem::canonical("/proc/self/exe");
auto current_binary_dir = current_binary_path.parent_path();
auto ir_core_lib_path =
current_binary_dir /
"../../../interactive_engine/executor/ir/target/release/";
if (!boost::filesystem::exists(ir_core_lib_path)) {
LOG(ERROR) << "ir_core_lib_path not found";
return "";
}
// compiler*.jar in
// current_binary_dir/../../interactive_engine/compiler/target/
auto compiler_path =
current_binary_dir / "../../../interactive_engine/compiler/target/";
LOG(INFO) << "try to find compiler*.jar in " << compiler_path;
if (boost::filesystem::exists(compiler_path)) {
for (auto& p : boost::filesystem::directory_iterator(compiler_path)) {
if (p.path().filename().string().find("compiler") != std::string::npos &&
p.path().extension() == ".jar") {
auto libs_path = compiler_path / "libs";
// combine the path with the libs folder
if (boost::filesystem::exists(libs_path)) {
return p.path().string() + ":" + libs_path.string() +
"/* -Djna.library.path=" + ir_core_lib_path.string();
}
}
}
}
LOG(ERROR) << "Compiler jar not found";
return "";
}

} // namespace server
Loading
Loading