Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Move GCS InternalKV workload to dedicated thread. #47736

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
47 changes: 20 additions & 27 deletions src/ray/common/asio/periodical_runner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
namespace ray {

PeriodicalRunner::PeriodicalRunner(instrumented_io_context &io_service)
: io_service_(io_service), mutex_(), stopped_(std::make_shared<bool>(false)) {}
: io_service_(io_service) {}

PeriodicalRunner::~PeriodicalRunner() {
RAY_LOG(DEBUG) << "PeriodicalRunner is destructed";
Expand All @@ -38,7 +38,7 @@ void PeriodicalRunner::Clear() {

void PeriodicalRunner::RunFnPeriodically(std::function<void()> fn,
uint64_t period_ms,
const std::string name) {
const std::string &name) {
*stopped_ = false;
if (period_ms > 0) {
auto timer = std::make_shared<boost::asio::deadline_timer>(io_service_);
Expand Down Expand Up @@ -74,28 +74,27 @@ void PeriodicalRunner::DoRunFnPeriodically(
fn();
absl::MutexLock lock(&mutex_);
timer->expires_from_now(period);
timer->async_wait(
[this, stopped = stopped_, fn = std::move(fn), period, timer = std::move(timer)](
const boost::system::error_code &error) {
if (*stopped) {
return;
}
if (error == boost::asio::error::operation_aborted) {
// `operation_aborted` is set when `timer` is canceled or destroyed.
// The Monitor lifetime may be short than the object who use it. (e.g.
// gcs_server)
return;
}
RAY_CHECK(!error) << error.message();
DoRunFnPeriodically(fn, period, timer);
});
timer->async_wait([this, stopped = stopped_, fn, period, timer = std::move(timer)](
const boost::system::error_code &error) {
if (*stopped) {
return;
}
if (error == boost::asio::error::operation_aborted) {
// `operation_aborted` is set when `timer` is canceled or destroyed.
// The Monitor lifetime may be short than the object who use it. (e.g.
// gcs_server)
return;
}
RAY_CHECK(!error) << error.message();
DoRunFnPeriodically(fn, period, timer);
});
}

void PeriodicalRunner::DoRunFnPeriodicallyInstrumented(
const std::function<void()> &fn,
boost::posix_time::milliseconds period,
std::shared_ptr<boost::asio::deadline_timer> timer,
const std::string name) {
const std::string &name) {
fn();
absl::MutexLock lock(&mutex_);
timer->expires_from_now(period);
Expand All @@ -104,7 +103,7 @@ void PeriodicalRunner::DoRunFnPeriodicallyInstrumented(
// event loop.
auto stats_handle = io_service_.stats().RecordStart(name, period.total_nanoseconds());
timer->async_wait([this,
fn = std::move(fn),
fn,
stopped = stopped_,
period,
timer = std::move(timer),
Expand All @@ -114,13 +113,7 @@ void PeriodicalRunner::DoRunFnPeriodicallyInstrumented(
return;
}
io_service_.stats().RecordExecution(
[this,
stopped = stopped_,
fn = std::move(fn),
error,
period,
timer = std::move(timer),
name]() {
[this, stopped = stopped, fn, error, period, timer, name]() {
if (*stopped) {
return;
}
Expand All @@ -133,7 +126,7 @@ void PeriodicalRunner::DoRunFnPeriodicallyInstrumented(
RAY_CHECK(!error) << error.message();
DoRunFnPeriodicallyInstrumented(fn, period, timer, name);
},
std::move(stats_handle));
stats_handle);
});
}

Expand Down
10 changes: 6 additions & 4 deletions src/ray/common/asio/periodical_runner.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ namespace ray {
/// All registered functions will stop running once this object is destructed.
class PeriodicalRunner {
public:
PeriodicalRunner(instrumented_io_context &io_service);
explicit PeriodicalRunner(instrumented_io_context &io_service);

~PeriodicalRunner();

void Clear();

void RunFnPeriodically(std::function<void()> fn,
uint64_t period_ms,
const std::string name) ABSL_LOCKS_EXCLUDED(mutex_);
const std::string &name) ABSL_LOCKS_EXCLUDED(mutex_);

private:
void DoRunFnPeriodically(const std::function<void()> &fn,
Expand All @@ -49,14 +49,16 @@ class PeriodicalRunner {
void DoRunFnPeriodicallyInstrumented(const std::function<void()> &fn,
boost::posix_time::milliseconds period,
std::shared_ptr<boost::asio::deadline_timer> timer,
const std::string name)
const std::string &name)
ABSL_LOCKS_EXCLUDED(mutex_);

instrumented_io_context &io_service_;
mutable absl::Mutex mutex_;
std::vector<std::shared_ptr<boost::asio::deadline_timer>> timers_
ABSL_GUARDED_BY(mutex_);
std::shared_ptr<bool> stopped_;
// `stopped_` is copied to the timer callback, and may outlive `this`.
std::shared_ptr<std::atomic<bool>> stopped_ =
std::make_shared<std::atomic<bool>>(false);
};

} // namespace ray
26 changes: 17 additions & 9 deletions src/ray/gcs/gcs_server/gcs_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ GcsServer::GcsServer(const ray::gcs::GcsServerConfig &config,
storage_type_(GetStorageType()),
main_service_(main_service),
pubsub_io_context_("pubsub_io_context"),
kv_io_context_("kv_io_context"),
task_io_context_("task_io_context"),
ray_syncer_io_context_("ray_syncer_io_context"),
rpc_server_(config.grpc_server_name,
Expand All @@ -72,14 +73,16 @@ GcsServer::GcsServer(const ray::gcs::GcsServerConfig &config,
periodical_runner_(main_service),
is_started_(false),
is_stopped_(false) {
// Init GCS table storage.
// Init GCS table storage. Note this is on main_service_, not kv_io_context_, to avoid
// congestion on the kv_io_context_.
RAY_LOG(INFO) << "GCS storage type is " << storage_type_;
switch (storage_type_) {
case StorageType::IN_MEMORY:
gcs_table_storage_ = std::make_shared<InMemoryGcsTableStorage>(main_service_);
break;
case StorageType::REDIS_PERSIST:
gcs_table_storage_ = std::make_shared<gcs::RedisGcsTableStorage>(GetOrConnectRedis());
gcs_table_storage_ =
std::make_shared<gcs::RedisGcsTableStorage>(GetOrConnectRedis(main_service_));
break;
default:
RAY_LOG(FATAL) << "Unexpected storage type: " << storage_type_;
Expand Down Expand Up @@ -270,6 +273,7 @@ void GcsServer::Stop() {

ray_syncer_io_context_.Stop();
task_io_context_.Stop();
kv_io_context_.Stop();
pubsub_io_context_.Stop();

ray_syncer_.reset();
Expand Down Expand Up @@ -561,13 +565,13 @@ void GcsServer::InitKVManager() {
std::unique_ptr<InternalKVInterface> instance;
switch (storage_type_) {
case (StorageType::REDIS_PERSIST):
instance = std::make_unique<StoreClientInternalKV>(
std::make_unique<RedisStoreClient>(GetOrConnectRedis()));
instance = std::make_unique<StoreClientInternalKV>(std::make_unique<RedisStoreClient>(
GetOrConnectRedis(kv_io_context_.GetIoService())));
break;
case (StorageType::IN_MEMORY):
instance =
std::make_unique<StoreClientInternalKV>(std::make_unique<ObservableStoreClient>(
std::make_unique<InMemoryStoreClient>(main_service_)));
std::make_unique<InMemoryStoreClient>(kv_io_context_.GetIoService())));
break;
default:
RAY_LOG(FATAL) << "Unexpected storage type! " << storage_type_;
Expand All @@ -579,7 +583,8 @@ void GcsServer::InitKVManager() {

void GcsServer::InitKVService() {
RAY_CHECK(kv_manager_);
kv_service_ = std::make_unique<rpc::InternalKVGrpcService>(main_service_, *kv_manager_);
kv_service_ = std::make_unique<rpc::InternalKVGrpcService>(
kv_io_context_.GetIoService(), *kv_manager_);
// Register service.
rpc_server_.RegisterService(*kv_service_, false /* token_auth */);
}
Expand Down Expand Up @@ -818,15 +823,16 @@ std::string GcsServer::GetDebugState() const {
return stream.str();
}

std::shared_ptr<RedisClient> GcsServer::GetOrConnectRedis() {
std::shared_ptr<RedisClient> GcsServer::GetOrConnectRedis(
instrumented_io_context &io_service) {
if (redis_client_ == nullptr) {
redis_client_ = std::make_shared<RedisClient>(GetRedisClientOptions());
auto status = redis_client_->Connect(main_service_);
auto status = redis_client_->Connect(io_service);
RAY_CHECK(status.ok()) << "Failed to init redis gcs client as " << status;

// Init redis failure detector.
gcs_redis_failure_detector_ =
std::make_shared<GcsRedisFailureDetector>(main_service_, redis_client_, []() {
std::make_shared<GcsRedisFailureDetector>(io_service, redis_client_, []() {
RAY_LOG(FATAL) << "Redis connection failed. Shutdown GCS.";
});
gcs_redis_failure_detector_->Start();
Expand All @@ -843,6 +849,8 @@ void GcsServer::PrintAsioStats() {
<< main_service_.stats().StatsString() << "\n\n";
RAY_LOG(INFO) << "pubsub_io_context_ Event stats:\n\n"
<< pubsub_io_context_.GetIoService().stats().StatsString() << "\n\n";
RAY_LOG(INFO) << "kv_io_context_ Event stats:\n\n"
<< kv_io_context_.GetIoService().stats().StatsString() << "\n\n";
RAY_LOG(INFO) << "task_io_context_ Event stats:\n\n"
<< task_io_context_.GetIoService().stats().StatsString() << "\n\n";
RAY_LOG(INFO) << "ray_syncer_io_context_ Event stats:\n\n"
Expand Down
4 changes: 3 additions & 1 deletion src/ray/gcs/gcs_server/gcs_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ class GcsServer {
void PrintAsioStats();

/// Get or connect to a redis server
std::shared_ptr<RedisClient> GetOrConnectRedis();
std::shared_ptr<RedisClient> GetOrConnectRedis(instrumented_io_context &io_service);

void TryGlobalGC();

Expand All @@ -214,6 +214,8 @@ class GcsServer {
instrumented_io_context &main_service_;
/// The io service used by Pubsub, for isolation from other workload.
InstrumentedIOContextWithThread pubsub_io_context_;
// The io service used by internal KV service, table storage and the StoreClient.
InstrumentedIOContextWithThread kv_io_context_;
// The io service used by task manager.
InstrumentedIOContextWithThread task_io_context_;
// The io service used by ray syncer.
Expand Down