Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core]Make GCS InternalKV workload configurable to the Policy. #47736

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions src/ray/common/asio/asio_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@

#include <boost/asio.hpp>
#include <chrono>
#include <thread>

#include "ray/common/asio/instrumented_io_context.h"
#include "ray/util/util.h"

template <typename Duration>
std::shared_ptr<boost::asio::deadline_timer> execute_after(
Expand All @@ -37,3 +39,47 @@ std::shared_ptr<boost::asio::deadline_timer> execute_after(

return timer;
}

/**
* A class that manages an instrumented_io_context and a std::thread.
* The constructor takes a thread name and starts the thread.
* The destructor stops the io_service and joins the thread.
*/
class InstrumentedIoContextWithThread {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be IO instead of Io? lol

public:
/**
* Constructor.
* @param thread_name The name of the thread.
*/
explicit InstrumentedIoContextWithThread(const std::string &thread_name)
: io_service_(), work_(io_service_) {
io_thread_ = std::thread([this, thread_name] {
SetThreadName(thread_name);
io_service_.run();
});
}

~InstrumentedIoContextWithThread() { Stop(); }

// Non-movable and non-copyable.
InstrumentedIoContextWithThread(const InstrumentedIoContextWithThread &) = delete;
InstrumentedIoContextWithThread &operator=(const InstrumentedIoContextWithThread &) =
delete;
InstrumentedIoContextWithThread(InstrumentedIoContextWithThread &&) = delete;
InstrumentedIoContextWithThread &operator=(InstrumentedIoContextWithThread &&) = delete;

instrumented_io_context &GetIoService() { return io_service_; }

// Idempotent. Once it's stopped you can't restart it.
void Stop() {
io_service_.stop();
if (io_thread_.joinable()) {
io_thread_.join();
}
}

private:
instrumented_io_context io_service_;
boost::asio::io_service::work work_; // to keep io_service_ running
std::thread io_thread_;
};
65 changes: 26 additions & 39 deletions src/ray/common/asio/periodical_runner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
namespace ray {

PeriodicalRunner::PeriodicalRunner(instrumented_io_context &io_service)
: io_service_(io_service), mutex_(), stopped_(std::make_shared<bool>(false)) {}
: io_service_(io_service) {}

PeriodicalRunner::~PeriodicalRunner() {
RAY_LOG(DEBUG) << "PeriodicalRunner is destructed";
Expand All @@ -29,7 +29,7 @@ PeriodicalRunner::~PeriodicalRunner() {

void PeriodicalRunner::Clear() {
absl::MutexLock lock(&mutex_);
*stopped_ = true;
stopped_ = true;
for (const auto &timer : timers_) {
timer->cancel();
}
Expand All @@ -38,22 +38,17 @@ void PeriodicalRunner::Clear() {

void PeriodicalRunner::RunFnPeriodically(std::function<void()> fn,
uint64_t period_ms,
const std::string name) {
*stopped_ = false;
const std::string &name) {
stopped_ = false;
if (period_ms > 0) {
auto timer = std::make_shared<boost::asio::deadline_timer>(io_service_);
{
absl::MutexLock lock(&mutex_);
timers_.push_back(timer);
}
io_service_.post(
[this,
stopped = stopped_,
fn = std::move(fn),
period_ms,
name,
timer = std::move(timer)]() {
if (*stopped) {
[this, fn = std::move(fn), period_ms, name, timer = std::move(timer)]() {
if (this->stopped_) {
return;
}
if (RayConfig::instance().event_stats()) {
Expand All @@ -74,28 +69,27 @@ void PeriodicalRunner::DoRunFnPeriodically(
fn();
absl::MutexLock lock(&mutex_);
timer->expires_from_now(period);
timer->async_wait(
[this, stopped = stopped_, fn = std::move(fn), period, timer = std::move(timer)](
const boost::system::error_code &error) {
if (*stopped) {
return;
}
if (error == boost::asio::error::operation_aborted) {
// `operation_aborted` is set when `timer` is canceled or destroyed.
// The Monitor lifetime may be short than the object who use it. (e.g.
// gcs_server)
return;
}
RAY_CHECK(!error) << error.message();
DoRunFnPeriodically(fn, period, timer);
});
timer->async_wait([this, fn, period, timer = std::move(timer)](
const boost::system::error_code &error) {
if (stopped_) {
return;
}
if (error == boost::asio::error::operation_aborted) {
// `operation_aborted` is set when `timer` is canceled or destroyed.
// The Monitor lifetime may be short than the object who use it. (e.g.
// gcs_server)
return;
}
RAY_CHECK(!error) << error.message();
DoRunFnPeriodically(fn, period, timer);
});
}

void PeriodicalRunner::DoRunFnPeriodicallyInstrumented(
const std::function<void()> &fn,
boost::posix_time::milliseconds period,
std::shared_ptr<boost::asio::deadline_timer> timer,
const std::string name) {
const std::string &name) {
fn();
absl::MutexLock lock(&mutex_);
timer->expires_from_now(period);
Expand All @@ -104,24 +98,17 @@ void PeriodicalRunner::DoRunFnPeriodicallyInstrumented(
// event loop.
auto stats_handle = io_service_.stats().RecordStart(name, period.total_nanoseconds());
timer->async_wait([this,
fn = std::move(fn),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why removing move?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because that fn move did not work, because fn is a const& and move is no-op.

stopped = stopped_,
fn,
period,
timer = std::move(timer),
stats_handle = std::move(stats_handle),
name](const boost::system::error_code &error) {
if (*stopped) {
if (this->stopped_) {
return;
}
io_service_.stats().RecordExecution(
[this,
stopped = stopped_,
fn = std::move(fn),
error,
period,
timer = std::move(timer),
name]() {
if (*stopped) {
[this, fn, error, period, timer, name]() {
if (this->stopped_) {
return;
}
if (error == boost::asio::error::operation_aborted) {
Expand All @@ -133,7 +120,7 @@ void PeriodicalRunner::DoRunFnPeriodicallyInstrumented(
RAY_CHECK(!error) << error.message();
DoRunFnPeriodicallyInstrumented(fn, period, timer, name);
},
std::move(stats_handle));
stats_handle);
});
}

Expand Down
8 changes: 4 additions & 4 deletions src/ray/common/asio/periodical_runner.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ namespace ray {
/// All registered functions will stop running once this object is destructed.
class PeriodicalRunner {
public:
PeriodicalRunner(instrumented_io_context &io_service);
explicit PeriodicalRunner(instrumented_io_context &io_service);

~PeriodicalRunner();

void Clear();

void RunFnPeriodically(std::function<void()> fn,
uint64_t period_ms,
const std::string name) ABSL_LOCKS_EXCLUDED(mutex_);
const std::string &name) ABSL_LOCKS_EXCLUDED(mutex_);

private:
void DoRunFnPeriodically(const std::function<void()> &fn,
Expand All @@ -49,14 +49,14 @@ class PeriodicalRunner {
void DoRunFnPeriodicallyInstrumented(const std::function<void()> &fn,
boost::posix_time::milliseconds period,
std::shared_ptr<boost::asio::deadline_timer> timer,
const std::string name)
const std::string &name)
ABSL_LOCKS_EXCLUDED(mutex_);

instrumented_io_context &io_service_;
mutable absl::Mutex mutex_;
std::vector<std::shared_ptr<boost::asio::deadline_timer>> timers_
ABSL_GUARDED_BY(mutex_);
std::shared_ptr<bool> stopped_;
std::atomic<bool> stopped_ = false;
};

} // namespace ray
34 changes: 3 additions & 31 deletions src/ray/gcs/gcs_client/gcs_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <thread>
#include <utility>

#include "ray/common/asio/asio_util.h"
#include "ray/common/ray_config.h"
#include "ray/gcs/gcs_client/accessor.h"
#include "ray/pubsub/subscriber.h"
Expand Down Expand Up @@ -717,38 +718,9 @@ std::unordered_map<std::string, std::string> PythonGetNodeLabels(
node_info.labels().end());
}

/// Creates a singleton thread that runs an io_service.
/// All ConnectToGcsStandalone calls will share this io_service.
class SingletonIoContext {
public:
static SingletonIoContext &Instance() {
static SingletonIoContext instance;
return instance;
}

instrumented_io_context &GetIoService() { return io_service_; }

private:
SingletonIoContext() : work_(io_service_) {
io_thread_ = std::thread([this] {
SetThreadName("singleton_io_context.gcs_client");
io_service_.run();
});
}
~SingletonIoContext() {
io_service_.stop();
if (io_thread_.joinable()) {
io_thread_.join();
}
}

instrumented_io_context io_service_;
boost::asio::io_service::work work_; // to keep io_service_ running
std::thread io_thread_;
};

Status ConnectOnSingletonIoContext(GcsClient &gcs_client, int64_t timeout_ms) {
instrumented_io_context &io_service = SingletonIoContext::Instance().GetIoService();
static InstrumentedIoContextWithThread io_context("gcs_client_io_service");
instrumented_io_context &io_service = io_context.GetIoService();
return gcs_client.Connect(io_service, timeout_ms);
}

Expand Down
Loading