Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Move GCS InternalKV workload to dedicated thread. #47736

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
46 changes: 46 additions & 0 deletions src/ray/common/asio/asio_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@

#include <boost/asio.hpp>
#include <chrono>
#include <thread>

#include "ray/common/asio/instrumented_io_context.h"
#include "ray/util/util.h"

template <typename Duration>
std::shared_ptr<boost::asio::deadline_timer> execute_after(
Expand All @@ -37,3 +39,47 @@ std::shared_ptr<boost::asio::deadline_timer> execute_after(

return timer;
}

/**
* A class that manages an instrumented_io_context and a std::thread.
* The constructor takes a thread name and starts the thread.
* The destructor stops the io_service and joins the thread.
*/
class InstrumentedIoContextWithThread {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be IO instead of Io? lol

public:
/**
* Constructor.
* @param thread_name The name of the thread.
*/
explicit InstrumentedIoContextWithThread(const std::string &thread_name)
: io_service_(), work_(io_service_) {
io_thread_ = std::thread([this, thread_name] {
SetThreadName(thread_name);
io_service_.run();
});
}

~InstrumentedIoContextWithThread() { Stop(); }

// Non-movable and non-copyable.
InstrumentedIoContextWithThread(const InstrumentedIoContextWithThread &) = delete;
InstrumentedIoContextWithThread &operator=(const InstrumentedIoContextWithThread &) =
delete;
InstrumentedIoContextWithThread(InstrumentedIoContextWithThread &&) = delete;
InstrumentedIoContextWithThread &operator=(InstrumentedIoContextWithThread &&) = delete;

instrumented_io_context &GetIoService() { return io_service_; }

// Idempotent. Once it's stopped you can't restart it.
void Stop() {
io_service_.stop();
if (io_thread_.joinable()) {
io_thread_.join();
}
}

private:
instrumented_io_context io_service_;
boost::asio::io_service::work work_; // to keep io_service_ running
std::thread io_thread_;
};
34 changes: 3 additions & 31 deletions src/ray/gcs/gcs_client/gcs_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <thread>
#include <utility>

#include "ray/common/asio/asio_util.h"
#include "ray/common/ray_config.h"
#include "ray/gcs/gcs_client/accessor.h"
#include "ray/pubsub/subscriber.h"
Expand Down Expand Up @@ -717,38 +718,9 @@ std::unordered_map<std::string, std::string> PythonGetNodeLabels(
node_info.labels().end());
}

/// Creates a singleton thread that runs an io_service.
/// All ConnectToGcsStandalone calls will share this io_service.
class SingletonIoContext {
public:
static SingletonIoContext &Instance() {
static SingletonIoContext instance;
return instance;
}

instrumented_io_context &GetIoService() { return io_service_; }

private:
SingletonIoContext() : work_(io_service_) {
io_thread_ = std::thread([this] {
SetThreadName("singleton_io_context.gcs_client");
io_service_.run();
});
}
~SingletonIoContext() {
io_service_.stop();
if (io_thread_.joinable()) {
io_thread_.join();
}
}

instrumented_io_context io_service_;
boost::asio::io_service::work work_; // to keep io_service_ running
std::thread io_thread_;
};

Status ConnectOnSingletonIoContext(GcsClient &gcs_client, int64_t timeout_ms) {
instrumented_io_context &io_service = SingletonIoContext::Instance().GetIoService();
static InstrumentedIoContextWithThread io_context("gcs_client_io_service");
instrumented_io_context &io_service = io_context.GetIoService();
return gcs_client.Connect(io_service, timeout_ms);
}

Expand Down
91 changes: 52 additions & 39 deletions src/ray/gcs/gcs_server/gcs_job_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -247,22 +247,6 @@ void GcsJobManager::HandleGetAllJobInfo(rpc::GetAllJobInfoRequest request,
// entrypoint script calls ray.init() multiple times).
std::unordered_map<std::string, std::vector<int>> job_data_key_to_indices;

// Create a shared counter for the number of jobs processed
std::shared_ptr<int> num_processed_jobs = std::make_shared<int>(0);

// Create a shared boolean flag for the internal KV callback completion
std::shared_ptr<bool> kv_callback_done = std::make_shared<bool>(false);

// Function to send the reply once all jobs have been processed and KV callback
// completed
auto try_send_reply =
[num_processed_jobs, kv_callback_done, reply, send_reply_callback]() {
if (*num_processed_jobs == reply->job_info_list_size() && *kv_callback_done) {
RAY_LOG(DEBUG) << "Finished getting all job info.";
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
}
};

// Load the job table data into the reply.
int i = 0;
for (auto &data : result) {
Expand All @@ -286,28 +270,60 @@ void GcsJobManager::HandleGetAllJobInfo(rpc::GetAllJobInfoRequest request,
job_api_data_keys.push_back(job_data_key);
job_data_key_to_indices[job_data_key].push_back(i);
}
i++;
}

// Jobs are filtered. Now, optionally populate is_running_tasks and job_info. A
// `asyncio.gather` is needed but we are in C++; so we use atomic counters.
rynewang marked this conversation as resolved.
Show resolved Hide resolved

// Atomic counter of pending async tasks before sending the reply.
// Once it reaches total_tasks, the reply is sent.
std::shared_ptr<std::atomic<size_t>> num_finished_tasks =
std::make_shared<std::atomic<size_t>>(0);

if (!request.skip_is_running_tasks_field()) {
JobID job_id = data.first;
WorkerID worker_id =
WorkerID::FromBinary(data.second.driver_address().worker_id());
// N tasks for N jobs; and 1 task for the MultiKVGet. If either is skipped the counter
// still increments.
const size_t total_tasks = reply->job_info_list_size() + 1;

// If job is not dead, get is_running_tasks from the core worker for the driver.
if (data.second.is_dead()) {
// Those async tasks need to atomically read-and-increment the counter, so this
// callback can't capture the atomic variable directly. Instead, it asks for an
// regular variable argument coming from the read-and-increment caller.
auto try_send_reply =
[reply, send_reply_callback, total_tasks](size_t finished_tasks) {
if (finished_tasks == total_tasks) {
RAY_LOG(DEBUG) << "Finished getting all job info.";
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
}
};

if (request.skip_is_running_tasks_field()) {
// Skipping RPCs to workers, just mark all job tasks as done.
const size_t job_count = reply->job_info_list_size();
size_t updated_finished_tasks =
num_finished_tasks->fetch_add(job_count) + job_count;
try_send_reply(updated_finished_tasks);
} else {
for (int i = 0; i < reply->job_info_list_size(); i++) {
const auto &data = reply->job_info_list(i);
auto job_id = JobID::FromBinary(data.job_id());
WorkerID worker_id = WorkerID::FromBinary(data.driver_address().worker_id());

// If job is dead, no need to get.
if (data.is_dead()) {
reply->mutable_job_info_list(i)->set_is_running_tasks(false);
core_worker_clients_.Disconnect(worker_id);
(*num_processed_jobs)++;
try_send_reply();
size_t updated_finished_tasks = num_finished_tasks->fetch_add(1) + 1;
try_send_reply(updated_finished_tasks);
} else {
// Get is_running_tasks from the core worker for the driver.
auto client = core_worker_clients_.GetOrConnect(data.second.driver_address());
auto client = core_worker_clients_.GetOrConnect(data.driver_address());
auto request = std::make_unique<rpc::NumPendingTasksRequest>();
constexpr int64_t kNumPendingTasksRequestTimeoutMs = 1000;
RAY_LOG(DEBUG) << "Send NumPendingTasksRequest to worker " << worker_id
<< ", timeout " << kNumPendingTasksRequestTimeoutMs << " ms.";
client->NumPendingTasks(
std::move(request),
[job_id, worker_id, reply, i, num_processed_jobs, try_send_reply](
[job_id, worker_id, reply, i, num_finished_tasks, try_send_reply](
const Status &status,
const rpc::NumPendingTasksReply &num_pending_tasks_reply) {
RAY_LOG(DEBUG).WithField(worker_id)
Expand All @@ -321,25 +337,25 @@ void GcsJobManager::HandleGetAllJobInfo(rpc::GetAllJobInfoRequest request,
bool is_running_tasks = num_pending_tasks_reply.num_pending_tasks() > 0;
reply->mutable_job_info_list(i)->set_is_running_tasks(is_running_tasks);
}
(*num_processed_jobs)++;
try_send_reply();
size_t updated_finished_tasks = num_finished_tasks->fetch_add(1) + 1;
try_send_reply(updated_finished_tasks);
},
kNumPendingTasksRequestTimeoutMs);
}
} else {
(*num_processed_jobs)++;
try_send_reply();
}
i++;
}

if (!request.skip_submission_job_info_field()) {
if (request.skip_submission_job_info_field()) {
// Skipping MultiKVGet, just mark the counter.
size_t updated_finished_tasks = num_finished_tasks->fetch_add(1) + 1;
try_send_reply(updated_finished_tasks);
} else {
// Load the JobInfo for jobs submitted via the Ray Job API.
auto kv_multi_get_callback =
[reply,
send_reply_callback,
job_data_key_to_indices,
kv_callback_done,
num_finished_tasks,
try_send_reply](std::unordered_map<std::string, std::string> &&result) {
for (const auto &data : result) {
const std::string &job_data_key = data.first;
Expand All @@ -362,13 +378,10 @@ void GcsJobManager::HandleGetAllJobInfo(rpc::GetAllJobInfoRequest request,
}
}
}
*kv_callback_done = true;
try_send_reply();
size_t updated_finished_tasks = num_finished_tasks->fetch_add(1) + 1;
try_send_reply(updated_finished_tasks);
};
internal_kv_.MultiGet("job", job_api_data_keys, kv_multi_get_callback);
} else {
*kv_callback_done = true;
try_send_reply();
}
};
Status status = gcs_table_storage_->JobTable().GetAll(on_done);
Expand Down
Loading