diff --git a/src/ray/common/asio/asio_util.h b/src/ray/common/asio/asio_util.h index 232e397e5ce75..0dae0a972a6b8 100644 --- a/src/ray/common/asio/asio_util.h +++ b/src/ray/common/asio/asio_util.h @@ -16,8 +16,10 @@ #include #include +#include #include "ray/common/asio/instrumented_io_context.h" +#include "ray/util/util.h" template std::shared_ptr execute_after( @@ -37,3 +39,47 @@ std::shared_ptr execute_after( return timer; } + +/** + * A class that manages an instrumented_io_context and a std::thread. + * The constructor takes a thread name and starts the thread. + * The destructor stops the io_service and joins the thread. + */ +class InstrumentedIOContextWithThread { + public: + /** + * Constructor. + * @param thread_name The name of the thread. + */ + explicit InstrumentedIOContextWithThread(const std::string &thread_name) + : io_service_(), work_(io_service_) { + io_thread_ = std::thread([this, thread_name] { + SetThreadName(thread_name); + io_service_.run(); + }); + } + + ~InstrumentedIOContextWithThread() { Stop(); } + + // Non-movable and non-copyable. + InstrumentedIOContextWithThread(const InstrumentedIOContextWithThread &) = delete; + InstrumentedIOContextWithThread &operator=(const InstrumentedIOContextWithThread &) = + delete; + InstrumentedIOContextWithThread(InstrumentedIOContextWithThread &&) = delete; + InstrumentedIOContextWithThread &operator=(InstrumentedIOContextWithThread &&) = delete; + + instrumented_io_context &GetIoService() { return io_service_; } + + // Idempotent. Once it's stopped you can't restart it. + void Stop() { + io_service_.stop(); + if (io_thread_.joinable()) { + io_thread_.join(); + } + } + + private: + instrumented_io_context io_service_; + boost::asio::io_service::work work_; // to keep io_service_ running + std::thread io_thread_; +}; diff --git a/src/ray/gcs/gcs_client/gcs_client.cc b/src/ray/gcs/gcs_client/gcs_client.cc index cb734bc1dca19..e46007f819ba0 100644 --- a/src/ray/gcs/gcs_client/gcs_client.cc +++ b/src/ray/gcs/gcs_client/gcs_client.cc @@ -18,6 +18,7 @@ #include #include +#include "ray/common/asio/asio_util.h" #include "ray/common/ray_config.h" #include "ray/gcs/gcs_client/accessor.h" #include "ray/pubsub/subscriber.h" @@ -717,38 +718,9 @@ std::unordered_map PythonGetNodeLabels( node_info.labels().end()); } -/// Creates a singleton thread that runs an io_service. -/// All ConnectToGcsStandalone calls will share this io_service. -class SingletonIoContext { - public: - static SingletonIoContext &Instance() { - static SingletonIoContext instance; - return instance; - } - - instrumented_io_context &GetIoService() { return io_service_; } - - private: - SingletonIoContext() : work_(io_service_) { - io_thread_ = std::thread([this] { - SetThreadName("singleton_io_context.gcs_client"); - io_service_.run(); - }); - } - ~SingletonIoContext() { - io_service_.stop(); - if (io_thread_.joinable()) { - io_thread_.join(); - } - } - - instrumented_io_context io_service_; - boost::asio::io_service::work work_; // to keep io_service_ running - std::thread io_thread_; -}; - Status ConnectOnSingletonIoContext(GcsClient &gcs_client, int64_t timeout_ms) { - instrumented_io_context &io_service = SingletonIoContext::Instance().GetIoService(); + static InstrumentedIOContextWithThread io_context("gcs_client_io_service"); + instrumented_io_context &io_service = io_context.GetIoService(); return gcs_client.Connect(io_service, timeout_ms); } diff --git a/src/ray/gcs/gcs_server/gcs_job_manager.cc b/src/ray/gcs/gcs_server/gcs_job_manager.cc index b42de3f955338..7b9f8af23da8d 100644 --- a/src/ray/gcs/gcs_server/gcs_job_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_job_manager.cc @@ -247,22 +247,6 @@ void GcsJobManager::HandleGetAllJobInfo(rpc::GetAllJobInfoRequest request, // entrypoint script calls ray.init() multiple times). std::unordered_map> job_data_key_to_indices; - // Create a shared counter for the number of jobs processed - std::shared_ptr num_processed_jobs = std::make_shared(0); - - // Create a shared boolean flag for the internal KV callback completion - std::shared_ptr kv_callback_done = std::make_shared(false); - - // Function to send the reply once all jobs have been processed and KV callback - // completed - auto try_send_reply = - [num_processed_jobs, kv_callback_done, reply, send_reply_callback]() { - if (*num_processed_jobs == reply->job_info_list_size() && *kv_callback_done) { - RAY_LOG(DEBUG) << "Finished getting all job info."; - GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); - } - }; - // Load the job table data into the reply. int i = 0; for (auto &data : result) { @@ -286,28 +270,64 @@ void GcsJobManager::HandleGetAllJobInfo(rpc::GetAllJobInfoRequest request, job_api_data_keys.push_back(job_data_key); job_data_key_to_indices[job_data_key].push_back(i); } + i++; + } - if (!request.skip_is_running_tasks_field()) { - JobID job_id = data.first; - WorkerID worker_id = - WorkerID::FromBinary(data.second.driver_address().worker_id()); + // Jobs are filtered. Now, optionally populate is_running_tasks and job_info. We + // do async calls to: + // + // - N outbound RPCs, one to each jobs' core workers on GcsServer::main_service_. + // - One InternalKV MultiGet call on GcsServer::kv_service_. + // + // And then we wait all by examining an atomic num_finished_tasks counter and then + // reply. The wait counter is written from 2 different thread, which requires an + // atomic read-and-increment. Each thread performs read-and-increment, and check + // the atomic readout to ensure try_send_reply is executed exactly once. + + // Atomic counter of pending async tasks before sending the reply. + // Once it reaches total_tasks, the reply is sent. + std::shared_ptr> num_finished_tasks = + std::make_shared>(0); + + // N tasks for N jobs; and 1 task for the MultiKVGet. If either is skipped the counter + // still increments. + const size_t total_tasks = reply->job_info_list_size() + 1; + auto try_send_reply = + [reply, send_reply_callback, total_tasks](size_t finished_tasks) { + if (finished_tasks == total_tasks) { + RAY_LOG(DEBUG) << "Finished getting all job info."; + GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); + } + }; - // If job is not dead, get is_running_tasks from the core worker for the driver. - if (data.second.is_dead()) { + if (request.skip_is_running_tasks_field()) { + // Skipping RPCs to workers, just mark all job tasks as done. + const size_t job_count = reply->job_info_list_size(); + size_t updated_finished_tasks = + num_finished_tasks->fetch_add(job_count) + job_count; + try_send_reply(updated_finished_tasks); + } else { + for (int i = 0; i < reply->job_info_list_size(); i++) { + const auto &data = reply->job_info_list(i); + auto job_id = JobID::FromBinary(data.job_id()); + WorkerID worker_id = WorkerID::FromBinary(data.driver_address().worker_id()); + + // If job is dead, no need to get. + if (data.is_dead()) { reply->mutable_job_info_list(i)->set_is_running_tasks(false); core_worker_clients_.Disconnect(worker_id); - (*num_processed_jobs)++; - try_send_reply(); + size_t updated_finished_tasks = num_finished_tasks->fetch_add(1) + 1; + try_send_reply(updated_finished_tasks); } else { // Get is_running_tasks from the core worker for the driver. - auto client = core_worker_clients_.GetOrConnect(data.second.driver_address()); + auto client = core_worker_clients_.GetOrConnect(data.driver_address()); auto request = std::make_unique(); constexpr int64_t kNumPendingTasksRequestTimeoutMs = 1000; RAY_LOG(DEBUG) << "Send NumPendingTasksRequest to worker " << worker_id << ", timeout " << kNumPendingTasksRequestTimeoutMs << " ms."; client->NumPendingTasks( std::move(request), - [job_id, worker_id, reply, i, num_processed_jobs, try_send_reply]( + [job_id, worker_id, reply, i, num_finished_tasks, try_send_reply]( const Status &status, const rpc::NumPendingTasksReply &num_pending_tasks_reply) { RAY_LOG(DEBUG).WithField(worker_id) @@ -321,25 +341,25 @@ void GcsJobManager::HandleGetAllJobInfo(rpc::GetAllJobInfoRequest request, bool is_running_tasks = num_pending_tasks_reply.num_pending_tasks() > 0; reply->mutable_job_info_list(i)->set_is_running_tasks(is_running_tasks); } - (*num_processed_jobs)++; - try_send_reply(); + size_t updated_finished_tasks = num_finished_tasks->fetch_add(1) + 1; + try_send_reply(updated_finished_tasks); }, kNumPendingTasksRequestTimeoutMs); } - } else { - (*num_processed_jobs)++; - try_send_reply(); } - i++; } - if (!request.skip_submission_job_info_field()) { + if (request.skip_submission_job_info_field()) { + // Skipping MultiKVGet, just mark the counter. + size_t updated_finished_tasks = num_finished_tasks->fetch_add(1) + 1; + try_send_reply(updated_finished_tasks); + } else { // Load the JobInfo for jobs submitted via the Ray Job API. auto kv_multi_get_callback = [reply, send_reply_callback, job_data_key_to_indices, - kv_callback_done, + num_finished_tasks, try_send_reply](std::unordered_map &&result) { for (const auto &data : result) { const std::string &job_data_key = data.first; @@ -362,13 +382,10 @@ void GcsJobManager::HandleGetAllJobInfo(rpc::GetAllJobInfoRequest request, } } } - *kv_callback_done = true; - try_send_reply(); + size_t updated_finished_tasks = num_finished_tasks->fetch_add(1) + 1; + try_send_reply(updated_finished_tasks); }; internal_kv_.MultiGet("job", job_api_data_keys, kv_multi_get_callback); - } else { - *kv_callback_done = true; - try_send_reply(); } }; Status status = gcs_table_storage_->JobTable().GetAll(on_done); diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index 4affb289f42a9..174785681859b 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -54,6 +54,9 @@ GcsServer::GcsServer(const ray::gcs::GcsServerConfig &config, : config_(config), storage_type_(GetStorageType()), main_service_(main_service), + pubsub_io_context_("pubsub_io_context"), + task_io_context_("task_io_context"), + ray_syncer_io_context_("ray_syncer_io_context"), rpc_server_(config.grpc_server_name, config.grpc_server_port, config.node_ip_address == "127.0.0.1", @@ -65,7 +68,7 @@ GcsServer::GcsServer(const ray::gcs::GcsServerConfig &config, RayConfig::instance().gcs_server_rpc_client_thread_num()), raylet_client_pool_( std::make_shared(client_call_manager_)), - pubsub_periodical_runner_(pubsub_io_service_), + pubsub_periodical_runner_(pubsub_io_context_.GetIoService()), periodical_runner_(main_service), is_started_(false), is_stopped_(false) { @@ -264,13 +267,12 @@ void GcsServer::DoStart(const GcsInitData &gcs_init_data) { void GcsServer::Stop() { if (!is_stopped_) { RAY_LOG(INFO) << "Stopping GCS server."; - ray_syncer_io_context_.stop(); - ray_syncer_thread_->join(); - ray_syncer_.reset(); - gcs_task_manager_->Stop(); + ray_syncer_io_context_.Stop(); + task_io_context_.Stop(); + pubsub_io_context_.Stop(); - pubsub_handler_->Stop(); + ray_syncer_.reset(); pubsub_handler_.reset(); // Shutdown the rpc server @@ -531,16 +533,12 @@ GcsServer::StorageType GcsServer::GetStorageType() const { } void GcsServer::InitRaySyncer(const GcsInitData &gcs_init_data) { - ray_syncer_ = - std::make_unique(ray_syncer_io_context_, kGCSNodeID.Binary()); + ray_syncer_ = std::make_unique(ray_syncer_io_context_.GetIoService(), + kGCSNodeID.Binary()); ray_syncer_->Register( syncer::MessageType::RESOURCE_VIEW, nullptr, gcs_resource_manager_.get()); ray_syncer_->Register( syncer::MessageType::COMMANDS, nullptr, gcs_resource_manager_.get()); - ray_syncer_thread_ = std::make_unique([this]() { - boost::asio::io_service::work work(ray_syncer_io_context_); - ray_syncer_io_context_.run(); - }); ray_syncer_service_ = std::make_unique(*ray_syncer_); rpc_server_.RegisterService(*ray_syncer_service_); } @@ -587,10 +585,10 @@ void GcsServer::InitKVService() { } void GcsServer::InitPubSubHandler() { - pubsub_handler_ = - std::make_unique(pubsub_io_service_, gcs_publisher_); - pubsub_service_ = std::make_unique(pubsub_io_service_, - *pubsub_handler_); + pubsub_handler_ = std::make_unique( + pubsub_io_context_.GetIoService(), gcs_publisher_); + pubsub_service_ = std::make_unique( + pubsub_io_context_.GetIoService(), *pubsub_handler_); // Register service. rpc_server_.RegisterService(*pubsub_service_); } @@ -684,10 +682,10 @@ void GcsServer::InitGcsAutoscalerStateManager(const GcsInitData &gcs_init_data) } void GcsServer::InitGcsTaskManager() { - gcs_task_manager_ = std::make_unique(); + gcs_task_manager_ = std::make_unique(task_io_context_.GetIoService()); // Register service. - task_info_service_.reset(new rpc::TaskInfoGrpcService(gcs_task_manager_->GetIoContext(), - *gcs_task_manager_)); + task_info_service_.reset( + new rpc::TaskInfoGrpcService(task_io_context_.GetIoService(), *gcs_task_manager_)); rpc_server_.RegisterService(*task_info_service_); } @@ -841,9 +839,15 @@ void GcsServer::PrintAsioStats() { const auto event_stats_print_interval_ms = RayConfig::instance().event_stats_print_interval_ms(); if (event_stats_print_interval_ms != -1 && RayConfig::instance().event_stats()) { - RAY_LOG(INFO) << "Event stats:\n\n" << main_service_.stats().StatsString() << "\n\n"; - RAY_LOG(INFO) << "GcsTaskManager Event stats:\n\n" - << gcs_task_manager_->GetIoContext().stats().StatsString() << "\n\n"; + RAY_LOG(INFO) << "main_service_ Event stats:\n\n" + << main_service_.stats().StatsString() << "\n\n"; + RAY_LOG(INFO) << "pubsub_io_context_ Event stats:\n\n" + << pubsub_io_context_.GetIoService().stats().StatsString() << "\n\n"; + RAY_LOG(INFO) << "task_io_context_ Event stats:\n\n" + << task_io_context_.GetIoService().stats().StatsString() << "\n\n"; + RAY_LOG(INFO) << "ray_syncer_io_context_ Event stats:\n\n" + << ray_syncer_io_context_.GetIoService().stats().StatsString() + << "\n\n"; } } diff --git a/src/ray/gcs/gcs_server/gcs_server.h b/src/ray/gcs/gcs_server/gcs_server.h index 99296ee6d84ba..0c6d47198f6fa 100644 --- a/src/ray/gcs/gcs_server/gcs_server.h +++ b/src/ray/gcs/gcs_server/gcs_server.h @@ -14,6 +14,7 @@ #pragma once +#include "ray/common/asio/asio_util.h" #include "ray/common/asio/instrumented_io_context.h" #include "ray/common/ray_syncer/ray_syncer.h" #include "ray/common/runtime_env_manager.h" @@ -212,7 +213,11 @@ class GcsServer { /// The main io service to drive event posted from grpc threads. instrumented_io_context &main_service_; /// The io service used by Pubsub, for isolation from other workload. - instrumented_io_context pubsub_io_service_; + InstrumentedIOContextWithThread pubsub_io_context_; + // The io service used by task manager. + InstrumentedIOContextWithThread task_io_context_; + // The io service used by ray syncer. + InstrumentedIOContextWithThread ray_syncer_io_context_; /// The grpc server rpc::GrpcServer rpc_server_; /// The `ClientCallManager` object that is shared by all `NodeManagerWorkerClient`s. @@ -254,8 +259,6 @@ class GcsServer { /// Ray Syncer related fields. std::unique_ptr ray_syncer_; std::unique_ptr ray_syncer_service_; - std::unique_ptr ray_syncer_thread_; - instrumented_io_context ray_syncer_io_context_; /// The node id of GCS. NodeID gcs_node_id_; diff --git a/src/ray/gcs/gcs_server/gcs_task_manager.cc b/src/ray/gcs/gcs_server/gcs_task_manager.cc index 33a8bb6ded862..38b631a785450 100644 --- a/src/ray/gcs/gcs_server/gcs_task_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_task_manager.cc @@ -21,13 +21,6 @@ namespace ray { namespace gcs { -void GcsTaskManager::Stop() { - io_service_.stop(); - if (io_service_thread_->joinable()) { - io_service_thread_->join(); - } -} - std::vector GcsTaskManager::GcsTaskManagerStorage::GetTaskEvents() const { std::vector ret; diff --git a/src/ray/gcs/gcs_server/gcs_task_manager.h b/src/ray/gcs/gcs_server/gcs_task_manager.h index 9c8e8c215d8d6..1e87baec43b36 100644 --- a/src/ray/gcs/gcs_server/gcs_task_manager.h +++ b/src/ray/gcs/gcs_server/gcs_task_manager.h @@ -86,18 +86,13 @@ class FinishedTaskActorTaskGcPolicy : public TaskEventsGcPolicyInterface { class GcsTaskManager : public rpc::TaskInfoHandler { public: /// Create a GcsTaskManager. - GcsTaskManager() - : stats_counter_(), + explicit GcsTaskManager(instrumented_io_context &io_service) + : io_service_(io_service), + stats_counter_(), task_event_storage_(std::make_unique( RayConfig::instance().task_events_max_num_task_in_gcs(), stats_counter_, std::make_unique())), - io_service_thread_(std::make_unique([this] { - SetThreadName("task_events"); - // Keep io_service_ alive. - boost::asio::io_service::work io_service_work_(io_service_); - io_service_.run(); - })), periodical_runner_(io_service_) { periodical_runner_.RunFnPeriodically([this] { task_event_storage_->GcJobSummary(); }, 5 * 1000, @@ -122,12 +117,6 @@ class GcsTaskManager : public rpc::TaskInfoHandler { rpc::GetTaskEventsReply *reply, rpc::SendReplyCallback send_reply_callback) override; - /// Stops the event loop and the thread of the task event handler. - /// - /// After this is called, no more requests will be handled. - /// This function returns when the io thread is joined. - void Stop(); - /// Handler to be called when a job finishes. This marks all non-terminated tasks /// of the job as failed. /// @@ -143,11 +132,6 @@ class GcsTaskManager : public rpc::TaskInfoHandler { void OnWorkerDead(const WorkerID &worker_id, const std::shared_ptr &worker_failure_data); - /// Returns the io_service. - /// - /// \return Reference to its io_service. - instrumented_io_context &GetIoContext() { return io_service_; } - /// Return string of debug state. /// /// \return Debug string @@ -514,6 +498,9 @@ class GcsTaskManager : public rpc::TaskInfoHandler { /// Test only size_t GetNumTaskEventsStored() { return stats_counter_.Get(kNumTaskEventsStored); } + /// Dedicated IO service separated from the main service. + instrumented_io_context &io_service_; + // Mutex guarding the usage stats client absl::Mutex mutex_; @@ -526,12 +513,6 @@ class GcsTaskManager : public rpc::TaskInfoHandler { // the io_service_thread_. Access to it is *not* thread safe. std::unique_ptr task_event_storage_; - /// Its own separate IO service separated from the main service. - instrumented_io_context io_service_; - - /// Its own IO thread from the main thread. - std::unique_ptr io_service_thread_; - /// The runner to run function periodically. PeriodicalRunner periodical_runner_; diff --git a/src/ray/gcs/gcs_server/pubsub_handler.cc b/src/ray/gcs/gcs_server/pubsub_handler.cc index a110ce49b956e..d926b051102ca 100644 --- a/src/ray/gcs/gcs_server/pubsub_handler.cc +++ b/src/ray/gcs/gcs_server/pubsub_handler.cc @@ -22,12 +22,6 @@ InternalPubSubHandler::InternalPubSubHandler( const std::shared_ptr &gcs_publisher) : io_service_(io_service), gcs_publisher_(gcs_publisher) { RAY_CHECK(gcs_publisher_); - io_service_thread_ = std::make_unique([this] { - SetThreadName("pubsub"); - // Keep io_service_ alive. - boost::asio::io_service::work io_service_work_(io_service_); - io_service_.run(); - }); } void InternalPubSubHandler::HandleGcsPublish(rpc::GcsPublishRequest request, @@ -129,12 +123,5 @@ void InternalPubSubHandler::RemoveSubscriberFrom(const std::string &sender_id) { sender_to_subscribers_.erase(iter); } -void InternalPubSubHandler::Stop() { - io_service_.stop(); - if (io_service_thread_->joinable()) { - io_service_thread_->join(); - } -} - } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/gcs_server/pubsub_handler.h b/src/ray/gcs/gcs_server/pubsub_handler.h index 71dded16967aa..a92209a6954cb 100644 --- a/src/ray/gcs/gcs_server/pubsub_handler.h +++ b/src/ray/gcs/gcs_server/pubsub_handler.h @@ -47,9 +47,6 @@ class InternalPubSubHandler : public rpc::InternalPubSubHandler { rpc::GcsUnregisterSubscriberReply *reply, rpc::SendReplyCallback send_reply_callback) final; - // Stops the event loop and the thread of the pubsub handler. - void Stop(); - std::string DebugString() const; void RemoveSubscriberFrom(const std::string &sender_id); @@ -57,7 +54,6 @@ class InternalPubSubHandler : public rpc::InternalPubSubHandler { private: /// Not owning the io service, to allow sharing it with pubsub::Publisher. instrumented_io_context &io_service_; - std::unique_ptr io_service_thread_; std::shared_ptr gcs_publisher_; absl::flat_hash_map> sender_to_subscribers_; }; diff --git a/src/ray/gcs/gcs_server/test/gcs_task_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_task_manager_test.cc index 07d4ee6629661..b36bf2a65d24a 100644 --- a/src/ray/gcs/gcs_server/test/gcs_task_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_task_manager_test.cc @@ -18,6 +18,7 @@ #include "gmock/gmock.h" #include "gtest/gtest.h" +#include "ray/common/asio/asio_util.h" #include "ray/gcs/pb_util.h" #include "ray/gcs/test/gcs_test_util.h" @@ -36,9 +37,15 @@ class GcsTaskManagerTest : public ::testing::Test { )"); } - virtual void SetUp() { task_manager.reset(new GcsTaskManager()); } + virtual void SetUp() { + io_context_ = std::make_unique("GcsTaskManagerTest"); + task_manager = std::make_unique(io_context_->GetIoService()); + } - virtual void TearDown() { task_manager->Stop(); } + virtual void TearDown() { + task_manager.reset(); + io_context_.reset(); + } std::vector GenTaskIDs(size_t num_tasks) { std::vector task_ids; @@ -104,7 +111,7 @@ class GcsTaskManagerTest : public ::testing::Test { request.mutable_data()->CopyFrom(events_data); // Dispatch so that it runs in GcsTaskManager's io service. - task_manager->GetIoContext().dispatch( + io_context_->GetIoService().dispatch( [this, &promise, &request, &reply]() { task_manager->HandleAddTaskEventData( request, @@ -161,7 +168,7 @@ class GcsTaskManagerTest : public ::testing::Test { request.mutable_filters()->set_exclude_driver(exclude_driver); - task_manager->GetIoContext().dispatch( + io_context_->GetIoService().dispatch( [this, &promise, &request, &reply]() { task_manager->HandleGetTaskEvents( request, @@ -275,6 +282,7 @@ class GcsTaskManagerTest : public ::testing::Test { } std::unique_ptr task_manager = nullptr; + std::unique_ptr io_context_ = nullptr; }; class GcsTaskManagerMemoryLimitedTest : public GcsTaskManagerTest { diff --git a/src/ray/gcs/redis_context.cc b/src/ray/gcs/redis_context.cc index efe775eb6bad2..6de20bfe34af8 100644 --- a/src/ray/gcs/redis_context.cc +++ b/src/ray/gcs/redis_context.cc @@ -199,8 +199,8 @@ void RedisRequestContext::RedisResponseFn(struct redisAsyncContext *async_contex }, "RedisRequestContext.Callback"); auto end_time = absl::Now(); - ray::stats::GcsLatency().Record((end_time - request_cxt->start_time_) / - absl::Milliseconds(1)); + ray::stats::GcsLatency().Record( + absl::ToDoubleMilliseconds(end_time - request_cxt->start_time_)); delete request_cxt; } } @@ -215,7 +215,7 @@ void RedisRequestContext::Run() { --pending_retries_; Status status = redis_context_->RedisAsyncCommandArgv( - *(RedisResponseFn), this, argv_.size(), argv_.data(), argc_.data()); + RedisResponseFn, this, argv_.size(), argv_.data(), argc_.data()); if (!status.ok()) { RedisResponseFn(redis_context_->GetRawRedisAsyncContext(), nullptr, this); diff --git a/src/ray/gcs/store_client/observable_store_client.cc b/src/ray/gcs/store_client/observable_store_client.cc index c188e27e57b7f..147c9191a824a 100644 --- a/src/ray/gcs/store_client/observable_store_client.cc +++ b/src/ray/gcs/store_client/observable_store_client.cc @@ -29,19 +29,19 @@ Status ObservableStoreClient::AsyncPut(const std::string &table_name, std::function callback) { auto start = absl::GetCurrentTimeNanos(); STATS_gcs_storage_operation_count.Record(1, "Put"); - return delegate_->AsyncPut(table_name, - key, - data, - overwrite, - [start, callback = std::move(callback)](auto result) { - auto end = absl::GetCurrentTimeNanos(); - STATS_gcs_storage_operation_latency_ms.Record( - absl::Nanoseconds(end - start) / absl::Milliseconds(1), - "Put"); - if (callback) { - callback(std::move(result)); - } - }); + return delegate_->AsyncPut( + table_name, + key, + data, + overwrite, + [start, callback = std::move(callback)](auto result) { + auto end = absl::GetCurrentTimeNanos(); + STATS_gcs_storage_operation_latency_ms.Record( + absl::ToDoubleMilliseconds(absl::Nanoseconds(end - start)), "Put"); + if (callback) { + callback(std::move(result)); + } + }); } Status ObservableStoreClient::AsyncGet( @@ -54,7 +54,7 @@ Status ObservableStoreClient::AsyncGet( table_name, key, [start, callback](auto status, auto result) { auto end = absl::GetCurrentTimeNanos(); STATS_gcs_storage_operation_latency_ms.Record( - absl::Nanoseconds(end - start) / absl::Milliseconds(1), "Get"); + absl::ToDoubleMilliseconds(absl::Nanoseconds(end - start)), "Get"); if (callback) { callback(status, std::move(result)); } @@ -69,7 +69,7 @@ Status ObservableStoreClient::AsyncGetAll( return delegate_->AsyncGetAll(table_name, [start, callback](auto result) { auto end = absl::GetCurrentTimeNanos(); STATS_gcs_storage_operation_latency_ms.Record( - absl::Nanoseconds(end - start) / absl::Milliseconds(1), "GetAll"); + absl::ToDoubleMilliseconds(absl::Nanoseconds(end - start)), "GetAll"); if (callback) { callback(std::move(result)); } @@ -84,7 +84,7 @@ Status ObservableStoreClient::AsyncMultiGet( return delegate_->AsyncMultiGet(table_name, keys, [start, callback](auto result) { auto end = absl::GetCurrentTimeNanos(); STATS_gcs_storage_operation_latency_ms.Record( - absl::Nanoseconds(end - start) / absl::Milliseconds(1), "MultiGet"); + absl::ToDoubleMilliseconds(absl::Nanoseconds(end - start)), "MultiGet"); if (callback) { callback(std::move(result)); } @@ -100,7 +100,7 @@ Status ObservableStoreClient::AsyncDelete(const std::string &table_name, table_name, key, [start, callback = std::move(callback)](auto result) { auto end = absl::GetCurrentTimeNanos(); STATS_gcs_storage_operation_latency_ms.Record( - absl::Nanoseconds(end - start) / absl::Milliseconds(1), "Delete"); + absl::ToDoubleMilliseconds(absl::Nanoseconds(end - start)), "Delete"); if (callback) { callback(std::move(result)); } @@ -116,7 +116,7 @@ Status ObservableStoreClient::AsyncBatchDelete(const std::string &table_name, table_name, keys, [start, callback = std::move(callback)](auto result) { auto end = absl::GetCurrentTimeNanos(); STATS_gcs_storage_operation_latency_ms.Record( - absl::Nanoseconds(end - start) / absl::Milliseconds(1), "BatchDelete"); + absl::ToDoubleMilliseconds(absl::Nanoseconds(end - start)), "BatchDelete"); if (callback) { callback(std::move(result)); } @@ -135,7 +135,7 @@ Status ObservableStoreClient::AsyncGetKeys( table_name, prefix, [start, callback = std::move(callback)](auto result) { auto end = absl::GetCurrentTimeNanos(); STATS_gcs_storage_operation_latency_ms.Record( - absl::Nanoseconds(end - start) / absl::Milliseconds(1), "GetKeys"); + absl::ToDoubleMilliseconds(absl::Nanoseconds(end - start)), "GetKeys"); if (callback) { callback(std::move(result)); } @@ -151,7 +151,7 @@ Status ObservableStoreClient::AsyncExists(const std::string &table_name, table_name, key, [start, callback = std::move(callback)](auto result) { auto end = absl::GetCurrentTimeNanos(); STATS_gcs_storage_operation_latency_ms.Record( - absl::Nanoseconds(end - start) / absl::Milliseconds(1), "Exists"); + absl::ToDoubleMilliseconds(absl::Nanoseconds(end - start)), "Exists"); if (callback) { callback(std::move(result)); }