From 39035b4cafb5822cbeef2c1784251a512bf3f9df Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Fri, 30 Jun 2023 11:37:16 -0700 Subject: [PATCH] [core] Add extra metrics for workers (#36973) This PR adds two extra metrics for workers: - It distinguish the owned objects and owned actors for the worker - Adding the number of active executing tasks for the worker --- python/ray/tests/test_metrics.py | 103 +++++++++++++++++-- src/ray/core_worker/core_worker.cc | 5 +- src/ray/core_worker/reference_count.cc | 19 +++- src/ray/core_worker/reference_count.h | 9 +- src/ray/object_manager/plasma/store.h | 5 +- src/ray/object_manager/plasma/store_runner.h | 2 +- src/ray/protobuf/common.proto | 7 +- 7 files changed, 129 insertions(+), 21 deletions(-) diff --git a/python/ray/tests/test_metrics.py b/python/ray/tests/test_metrics.py index 368fb2888338..d770af8c5347 100644 --- a/python/ray/tests/test_metrics.py +++ b/python/ray/tests/test_metrics.py @@ -114,9 +114,13 @@ def get_owner_info(node_ids): node_stats = ray._private.internal_api.node_stats( node_addrs[node_id][0], node_addrs[node_id][1], False ) - owner_stats[node_id] = sum( + num_owned_objects = sum( [stats.num_owned_objects for stats in node_stats.core_workers_stats] ) + num_owned_actors = sum( + [stats.num_owned_actors for stats in node_stats.core_workers_stats] + ) + owner_stats[node_id] = (num_owned_objects, num_owned_actors) primary_copy_stats[ node_id ] = node_stats.store_stats.num_object_store_primary_copies @@ -131,7 +135,7 @@ def get_owner_info(node_ids): return owner_stats, primary_copy_stats -def test_node_object_metrics(ray_start_cluster, monkeypatch): +def test_node_object_metrics(ray_start_cluster): NUM_NODES = 3 cluster = ray_start_cluster for i in range(NUM_NODES): @@ -152,7 +156,9 @@ def get_node_id(): # x is owned by node_0 # x is stored at node_0 x = ray.put([1]) # noqa: F841 - wait_for_condition(lambda: get_owner_info(node_ids) == ([1, 0, 0], [1, 0, 0])) + wait_for_condition( + lambda: get_owner_info(node_ids) == ([(1, 0), (0, 0), (0, 0)], [1, 0, 0]) + ) # Test nested with put @ray.remote(resources={"node_1": 1}) @@ -166,7 +172,9 @@ def big_obj(): # big_obj is owned by node_0 # big_obj is stored in memory (no primary copy) big_obj_ref = big_obj.remote() # noqa: F841 - wait_for_condition(lambda: get_owner_info(node_ids) == ([2, 1, 0], [1, 1, 0])) + wait_for_condition( + lambda: get_owner_info(node_ids) == ([(2, 0), (1, 0), (0, 0)], [1, 1, 0]) + ) # Test nested with task (small output) @ray.remote(resources={"node_1": 1}) @@ -185,11 +193,15 @@ def task(): # nest_ref is owned by node_0 # nest_ref is stored in memory (no primary copy) nest_ref = nest_task.remote(1) # noqa: F841 - wait_for_condition(lambda: get_owner_info(node_ids) == ([3, 2, 0], [1, 1, 0])) + wait_for_condition( + lambda: get_owner_info(node_ids) == ([(3, 0), (2, 0), (0, 0)], [1, 1, 0]) + ) big_nest = nest_task.remote(1024 * 1024 * 10) # noqa: F841 - wait_for_condition(lambda: get_owner_info(node_ids) == ([4, 3, 0], [1, 1, 1])) + wait_for_condition( + lambda: get_owner_info(node_ids) == ([(4, 0), (3, 0), (0, 0)], [1, 1, 1]) + ) # Test with assigned owned @ray.remote(resources={"node_2": 0.5}, num_cpus=0) @@ -207,14 +219,16 @@ def gen(self): # o is owned by actor (node_2) # o is stored in object store of node_0 o = ray.put(1, _owner=actor) # noqa: F841 - wait_for_condition(lambda: get_owner_info(node_ids) == ([5, 3, 1], [2, 1, 1])) + wait_for_condition( + lambda: get_owner_info(node_ids) == ([(4, 1), (3, 0), (1, 0)], [2, 1, 1]) + ) # Test with detached owned # detached actor is owned by GCS. So it's not counted in the owner stats detached_actor = A.options(lifetime="detached", name="A").remote() ray.get(detached_actor.ready.remote()) for i in range(3): - assert get_owner_info(node_ids) == ([5, 3, 1], [2, 1, 1]) + assert get_owner_info(node_ids) == ([(4, 1), (3, 0), (1, 0)], [2, 1, 1]) import time time.sleep(1) @@ -222,7 +236,78 @@ def gen(self): # the inner object is owned by A (node_2) # the inner object is stored in object store of node_2 gen_obj = detached_actor.gen.remote() # noqa: F841 - wait_for_condition(lambda: get_owner_info(node_ids) == ([6, 3, 2], [2, 1, 2])) + wait_for_condition( + lambda: get_owner_info(node_ids) == ([(5, 1), (3, 0), (2, 0)], [2, 1, 2]) + ) + + +def test_running_tasks(ray_start_cluster): + NUM_NODES = 3 + cluster = ray_start_cluster + for i in range(NUM_NODES): + cluster.add_node(True, resources={f"node_{i}": 1}) + if i == 0: + ray.init(address=cluster.address) + node_ids = [] + + for i in range(NUM_NODES): + + @ray.remote(resources={f"node_{i}": 1}) + def get_node_id(): + return ray.get_runtime_context().get_node_id() + + node_ids.append(ray.get(get_node_id.remote())) + + @ray.remote + def f(t): + import time + + time.sleep(t) + + tasks = [ + f.options(resources={"node_0": 1}).remote(0), + f.options(resources={"node_1": 1}).remote(100000), + f.options(resources={"node_2": 1}).remote(100000), + ] + + ready, pending = ray.wait(tasks) + assert len(ready) == 1 + assert len(pending) == 2 + + node_addrs = { + n["NodeID"]: (n["NodeManagerAddress"], n["NodeManagerPort"]) + for n in ray.nodes() + } + + def check(): + for i in range(NUM_NODES): + node_stats = ray._private.internal_api.node_stats( + node_addrs[node_ids[i]][0], node_addrs[node_ids[i]][1], False + ) + + if i == 0: + assert ( + sum( + [ + stats.num_running_tasks + for stats in node_stats.core_workers_stats + ] + ) + == 0 + ) + else: + assert ( + sum( + [ + stats.num_running_tasks + for stats in node_stats.core_workers_stats + ] + ) + == 1 + ) + return True + + wait_for_condition(check) def test_multi_node_metrics_export_port_discovery(ray_start_cluster): diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 1f58a50e21ba..0164df3235ff 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1942,7 +1942,6 @@ Status CoreWorker::CreateActor(const RayFunction &function, const JobID job_id = worker_context_.GetCurrentJobID(); // Propagate existing environment variable overrides, but override them with any new // ones - std::vector return_ids; TaskSpecBuilder builder; auto new_placement_resources = AddPlacementGroupConstraint(actor_creation_options.placement_resources, @@ -3600,7 +3599,8 @@ void CoreWorker::HandleGetCoreWorkerStats(rpc::GetCoreWorkerStatsRequest request stats->set_task_queue_length(task_queue_length_); stats->set_num_executed_tasks(num_executed_tasks_); stats->set_num_object_refs_in_scope(reference_counter_->NumObjectIDsInScope()); - stats->set_num_owned_objects(reference_counter_->NumObjectOwnedByUs()); + stats->set_num_owned_objects(reference_counter_->NumObjectsOwnedByUs()); + stats->set_num_owned_actors(reference_counter_->NumActorsOwnedByUs()); stats->set_ip_address(rpc_address_.ip_address()); stats->set_port(rpc_address_.port()); stats->set_pid(getpid()); @@ -3609,6 +3609,7 @@ void CoreWorker::HandleGetCoreWorkerStats(rpc::GetCoreWorkerStatsRequest request stats->set_worker_id(worker_context_.GetWorkerID().Binary()); stats->set_actor_id(actor_id_.Binary()); stats->set_worker_type(worker_context_.GetWorkerType()); + stats->set_num_running_tasks(current_tasks_.size()); auto used_resources_map = stats->mutable_used_resources(); for (auto const &it : *resource_ids_) { rpc::ResourceAllocations allocations; diff --git a/src/ray/core_worker/reference_count.cc b/src/ray/core_worker/reference_count.cc index cadb8b40e588..e99738fd8b73 100644 --- a/src/ray/core_worker/reference_count.cc +++ b/src/ray/core_worker/reference_count.cc @@ -285,7 +285,11 @@ bool ReferenceCounter::AddOwnedObjectInternal( if (object_id_refs_.count(object_id) != 0) { return false; } - num_objects_owned_by_us_++; + if (ObjectID::IsActorID(object_id)) { + num_actors_owned_by_us_++; + } else { + num_objects_owned_by_us_++; + } RAY_LOG(DEBUG) << "Adding owned object " << object_id; // If the entry doesn't exist, we initialize the direct reference count to zero // because this corresponds to a submitted task whose return ObjectID will be created @@ -702,7 +706,11 @@ void ReferenceCounter::EraseReference(ReferenceTable::iterator it) { } freed_objects_.erase(it->first); if (it->second.owned_by_us) { - num_objects_owned_by_us_--; + if (ObjectID::IsActorID(it->first)) { + num_actors_owned_by_us_--; + } else { + num_objects_owned_by_us_--; + } } object_id_refs_.erase(it); ShutdownIfNeeded(); @@ -849,11 +857,16 @@ size_t ReferenceCounter::NumObjectIDsInScope() const { return object_id_refs_.size(); } -size_t ReferenceCounter::NumObjectOwnedByUs() const { +size_t ReferenceCounter::NumObjectsOwnedByUs() const { absl::MutexLock lock(&mutex_); return num_objects_owned_by_us_; } +size_t ReferenceCounter::NumActorsOwnedByUs() const { + absl::MutexLock lock(&mutex_); + return num_actors_owned_by_us_; +} + std::unordered_set ReferenceCounter::GetAllInScopeObjectIDs() const { absl::MutexLock lock(&mutex_); std::unordered_set in_scope_object_ids; diff --git a/src/ray/core_worker/reference_count.h b/src/ray/core_worker/reference_count.h index 894b426a9d97..3243532a66cc 100644 --- a/src/ray/core_worker/reference_count.h +++ b/src/ray/core_worker/reference_count.h @@ -337,7 +337,11 @@ class ReferenceCounter : public ReferenceCounterInterface, /// Returns the total number of ObjectIDs currently in scope. size_t NumObjectIDsInScope() const LOCKS_EXCLUDED(mutex_); - size_t NumObjectOwnedByUs() const LOCKS_EXCLUDED(mutex_); + /// Returns the total number of objects owned by this worker. + size_t NumObjectsOwnedByUs() const LOCKS_EXCLUDED(mutex_); + + /// Returns the total number of actors owned by this worker. + size_t NumActorsOwnedByUs() const LOCKS_EXCLUDED(mutex_); /// Returns a set of all ObjectIDs currently in scope (i.e., nonzero reference count). std::unordered_set GetAllInScopeObjectIDs() const LOCKS_EXCLUDED(mutex_); @@ -1041,6 +1045,9 @@ class ReferenceCounter : public ReferenceCounterInterface, /// Keep track of objects owend by this worker. size_t num_objects_owned_by_us_ GUARDED_BY(mutex_) = 0; + + /// Keep track of actors owend by this worker. + size_t num_actors_owned_by_us_ GUARDED_BY(mutex_) = 0; }; } // namespace core diff --git a/src/ray/object_manager/plasma/store.h b/src/ray/object_manager/plasma/store.h index bb6ae9e086dc..9fc65ca01197 100644 --- a/src/ray/object_manager/plasma/store.h +++ b/src/ray/object_manager/plasma/store.h @@ -86,8 +86,7 @@ class PlasmaStore { /// Get the available memory for new objects to be created. This includes /// memory that is currently being used for created but unsealed objects. - void GetAvailableMemory(std::function callback) const - LOCKS_EXCLUDED(mutex_) { + size_t GetAvailableMemory() const LOCKS_EXCLUDED(mutex_) { absl::MutexLock lock(&mutex_); RAY_CHECK((object_lifecycle_mgr_.GetNumBytesUnsealed() > 0 && object_lifecycle_mgr_.GetNumObjectsUnsealed() > 0) || @@ -105,7 +104,7 @@ class PlasmaStore { if (num_bytes_in_use < allocator_.GetFootprintLimit()) { available = allocator_.GetFootprintLimit() - num_bytes_in_use; } - callback(available); + return available; } private: diff --git a/src/ray/object_manager/plasma/store_runner.h b/src/ray/object_manager/plasma/store_runner.h index 22e31b014684..672a69becbcb 100644 --- a/src/ray/object_manager/plasma/store_runner.h +++ b/src/ray/object_manager/plasma/store_runner.h @@ -30,7 +30,7 @@ class PlasmaStoreRunner { int64_t GetFallbackAllocated() const; void GetAvailableMemoryAsync(std::function callback) const { - main_service_.post([this, callback]() { store_->GetAvailableMemory(callback); }, + main_service_.post([this, callback]() { callback(store_->GetAvailableMemory()); }, "PlasmaStoreRunner.GetAvailableMemory"); } diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index 499e5137b884..68b879192c1e 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -764,8 +764,7 @@ message ResourceAllocations { // Debug info returned from the core worker. message CoreWorkerStats { - // Debug string of the currently executing task. - string current_task_desc = 1; + reserved 1; // Number of pending normal and actor tasks. int32 num_pending_tasks = 2; // Number of object refs in local scope. @@ -808,6 +807,10 @@ message CoreWorkerStats { int64 objects_total = 24; // Number of objects owned by the worker. int64 num_owned_objects = 25; + // Number of actors owned by the worker. + int64 num_owned_actors = 26; + // Number of running tasks + int64 num_running_tasks = 27; } // Resource usage reported by the node reporter.