Skip to content

Commit

Permalink
[core] Add extra metrics for workers (#36973)
Browse files Browse the repository at this point in the history
This PR adds two extra metrics for workers:

- It distinguish the owned objects and owned actors for the worker
- Adding the number of active executing tasks for the worker
  • Loading branch information
fishbone authored Jun 30, 2023
1 parent dcb0a71 commit 39035b4
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 21 deletions.
103 changes: 94 additions & 9 deletions python/ray/tests/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,13 @@ def get_owner_info(node_ids):
node_stats = ray._private.internal_api.node_stats(
node_addrs[node_id][0], node_addrs[node_id][1], False
)
owner_stats[node_id] = sum(
num_owned_objects = sum(
[stats.num_owned_objects for stats in node_stats.core_workers_stats]
)
num_owned_actors = sum(
[stats.num_owned_actors for stats in node_stats.core_workers_stats]
)
owner_stats[node_id] = (num_owned_objects, num_owned_actors)
primary_copy_stats[
node_id
] = node_stats.store_stats.num_object_store_primary_copies
Expand All @@ -131,7 +135,7 @@ def get_owner_info(node_ids):
return owner_stats, primary_copy_stats


def test_node_object_metrics(ray_start_cluster, monkeypatch):
def test_node_object_metrics(ray_start_cluster):
NUM_NODES = 3
cluster = ray_start_cluster
for i in range(NUM_NODES):
Expand All @@ -152,7 +156,9 @@ def get_node_id():
# x is owned by node_0
# x is stored at node_0
x = ray.put([1]) # noqa: F841
wait_for_condition(lambda: get_owner_info(node_ids) == ([1, 0, 0], [1, 0, 0]))
wait_for_condition(
lambda: get_owner_info(node_ids) == ([(1, 0), (0, 0), (0, 0)], [1, 0, 0])
)

# Test nested with put
@ray.remote(resources={"node_1": 1})
Expand All @@ -166,7 +172,9 @@ def big_obj():
# big_obj is owned by node_0
# big_obj is stored in memory (no primary copy)
big_obj_ref = big_obj.remote() # noqa: F841
wait_for_condition(lambda: get_owner_info(node_ids) == ([2, 1, 0], [1, 1, 0]))
wait_for_condition(
lambda: get_owner_info(node_ids) == ([(2, 0), (1, 0), (0, 0)], [1, 1, 0])
)

# Test nested with task (small output)
@ray.remote(resources={"node_1": 1})
Expand All @@ -185,11 +193,15 @@ def task():
# nest_ref is owned by node_0
# nest_ref is stored in memory (no primary copy)
nest_ref = nest_task.remote(1) # noqa: F841
wait_for_condition(lambda: get_owner_info(node_ids) == ([3, 2, 0], [1, 1, 0]))
wait_for_condition(
lambda: get_owner_info(node_ids) == ([(3, 0), (2, 0), (0, 0)], [1, 1, 0])
)

big_nest = nest_task.remote(1024 * 1024 * 10) # noqa: F841

wait_for_condition(lambda: get_owner_info(node_ids) == ([4, 3, 0], [1, 1, 1]))
wait_for_condition(
lambda: get_owner_info(node_ids) == ([(4, 0), (3, 0), (0, 0)], [1, 1, 1])
)

# Test with assigned owned
@ray.remote(resources={"node_2": 0.5}, num_cpus=0)
Expand All @@ -207,22 +219,95 @@ def gen(self):
# o is owned by actor (node_2)
# o is stored in object store of node_0
o = ray.put(1, _owner=actor) # noqa: F841
wait_for_condition(lambda: get_owner_info(node_ids) == ([5, 3, 1], [2, 1, 1]))
wait_for_condition(
lambda: get_owner_info(node_ids) == ([(4, 1), (3, 0), (1, 0)], [2, 1, 1])
)

# Test with detached owned
# detached actor is owned by GCS. So it's not counted in the owner stats
detached_actor = A.options(lifetime="detached", name="A").remote()
ray.get(detached_actor.ready.remote())
for i in range(3):
assert get_owner_info(node_ids) == ([5, 3, 1], [2, 1, 1])
assert get_owner_info(node_ids) == ([(4, 1), (3, 0), (1, 0)], [2, 1, 1])
import time

time.sleep(1)
# gen_obj is owned by node_0
# the inner object is owned by A (node_2)
# the inner object is stored in object store of node_2
gen_obj = detached_actor.gen.remote() # noqa: F841
wait_for_condition(lambda: get_owner_info(node_ids) == ([6, 3, 2], [2, 1, 2]))
wait_for_condition(
lambda: get_owner_info(node_ids) == ([(5, 1), (3, 0), (2, 0)], [2, 1, 2])
)


def test_running_tasks(ray_start_cluster):
NUM_NODES = 3
cluster = ray_start_cluster
for i in range(NUM_NODES):
cluster.add_node(True, resources={f"node_{i}": 1})
if i == 0:
ray.init(address=cluster.address)
node_ids = []

for i in range(NUM_NODES):

@ray.remote(resources={f"node_{i}": 1})
def get_node_id():
return ray.get_runtime_context().get_node_id()

node_ids.append(ray.get(get_node_id.remote()))

@ray.remote
def f(t):
import time

time.sleep(t)

tasks = [
f.options(resources={"node_0": 1}).remote(0),
f.options(resources={"node_1": 1}).remote(100000),
f.options(resources={"node_2": 1}).remote(100000),
]

ready, pending = ray.wait(tasks)
assert len(ready) == 1
assert len(pending) == 2

node_addrs = {
n["NodeID"]: (n["NodeManagerAddress"], n["NodeManagerPort"])
for n in ray.nodes()
}

def check():
for i in range(NUM_NODES):
node_stats = ray._private.internal_api.node_stats(
node_addrs[node_ids[i]][0], node_addrs[node_ids[i]][1], False
)

if i == 0:
assert (
sum(
[
stats.num_running_tasks
for stats in node_stats.core_workers_stats
]
)
== 0
)
else:
assert (
sum(
[
stats.num_running_tasks
for stats in node_stats.core_workers_stats
]
)
== 1
)
return True

wait_for_condition(check)


def test_multi_node_metrics_export_port_discovery(ray_start_cluster):
Expand Down
5 changes: 3 additions & 2 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1942,7 +1942,6 @@ Status CoreWorker::CreateActor(const RayFunction &function,
const JobID job_id = worker_context_.GetCurrentJobID();
// Propagate existing environment variable overrides, but override them with any new
// ones
std::vector<ObjectID> return_ids;
TaskSpecBuilder builder;
auto new_placement_resources =
AddPlacementGroupConstraint(actor_creation_options.placement_resources,
Expand Down Expand Up @@ -3600,7 +3599,8 @@ void CoreWorker::HandleGetCoreWorkerStats(rpc::GetCoreWorkerStatsRequest request
stats->set_task_queue_length(task_queue_length_);
stats->set_num_executed_tasks(num_executed_tasks_);
stats->set_num_object_refs_in_scope(reference_counter_->NumObjectIDsInScope());
stats->set_num_owned_objects(reference_counter_->NumObjectOwnedByUs());
stats->set_num_owned_objects(reference_counter_->NumObjectsOwnedByUs());
stats->set_num_owned_actors(reference_counter_->NumActorsOwnedByUs());
stats->set_ip_address(rpc_address_.ip_address());
stats->set_port(rpc_address_.port());
stats->set_pid(getpid());
Expand All @@ -3609,6 +3609,7 @@ void CoreWorker::HandleGetCoreWorkerStats(rpc::GetCoreWorkerStatsRequest request
stats->set_worker_id(worker_context_.GetWorkerID().Binary());
stats->set_actor_id(actor_id_.Binary());
stats->set_worker_type(worker_context_.GetWorkerType());
stats->set_num_running_tasks(current_tasks_.size());
auto used_resources_map = stats->mutable_used_resources();
for (auto const &it : *resource_ids_) {
rpc::ResourceAllocations allocations;
Expand Down
19 changes: 16 additions & 3 deletions src/ray/core_worker/reference_count.cc
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,11 @@ bool ReferenceCounter::AddOwnedObjectInternal(
if (object_id_refs_.count(object_id) != 0) {
return false;
}
num_objects_owned_by_us_++;
if (ObjectID::IsActorID(object_id)) {
num_actors_owned_by_us_++;
} else {
num_objects_owned_by_us_++;
}
RAY_LOG(DEBUG) << "Adding owned object " << object_id;
// If the entry doesn't exist, we initialize the direct reference count to zero
// because this corresponds to a submitted task whose return ObjectID will be created
Expand Down Expand Up @@ -702,7 +706,11 @@ void ReferenceCounter::EraseReference(ReferenceTable::iterator it) {
}
freed_objects_.erase(it->first);
if (it->second.owned_by_us) {
num_objects_owned_by_us_--;
if (ObjectID::IsActorID(it->first)) {
num_actors_owned_by_us_--;
} else {
num_objects_owned_by_us_--;
}
}
object_id_refs_.erase(it);
ShutdownIfNeeded();
Expand Down Expand Up @@ -849,11 +857,16 @@ size_t ReferenceCounter::NumObjectIDsInScope() const {
return object_id_refs_.size();
}

size_t ReferenceCounter::NumObjectOwnedByUs() const {
size_t ReferenceCounter::NumObjectsOwnedByUs() const {
absl::MutexLock lock(&mutex_);
return num_objects_owned_by_us_;
}

size_t ReferenceCounter::NumActorsOwnedByUs() const {
absl::MutexLock lock(&mutex_);
return num_actors_owned_by_us_;
}

std::unordered_set<ObjectID> ReferenceCounter::GetAllInScopeObjectIDs() const {
absl::MutexLock lock(&mutex_);
std::unordered_set<ObjectID> in_scope_object_ids;
Expand Down
9 changes: 8 additions & 1 deletion src/ray/core_worker/reference_count.h
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,11 @@ class ReferenceCounter : public ReferenceCounterInterface,
/// Returns the total number of ObjectIDs currently in scope.
size_t NumObjectIDsInScope() const LOCKS_EXCLUDED(mutex_);

size_t NumObjectOwnedByUs() const LOCKS_EXCLUDED(mutex_);
/// Returns the total number of objects owned by this worker.
size_t NumObjectsOwnedByUs() const LOCKS_EXCLUDED(mutex_);

/// Returns the total number of actors owned by this worker.
size_t NumActorsOwnedByUs() const LOCKS_EXCLUDED(mutex_);

/// Returns a set of all ObjectIDs currently in scope (i.e., nonzero reference count).
std::unordered_set<ObjectID> GetAllInScopeObjectIDs() const LOCKS_EXCLUDED(mutex_);
Expand Down Expand Up @@ -1041,6 +1045,9 @@ class ReferenceCounter : public ReferenceCounterInterface,

/// Keep track of objects owend by this worker.
size_t num_objects_owned_by_us_ GUARDED_BY(mutex_) = 0;

/// Keep track of actors owend by this worker.
size_t num_actors_owned_by_us_ GUARDED_BY(mutex_) = 0;
};

} // namespace core
Expand Down
5 changes: 2 additions & 3 deletions src/ray/object_manager/plasma/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,7 @@ class PlasmaStore {

/// Get the available memory for new objects to be created. This includes
/// memory that is currently being used for created but unsealed objects.
void GetAvailableMemory(std::function<void(size_t)> callback) const
LOCKS_EXCLUDED(mutex_) {
size_t GetAvailableMemory() const LOCKS_EXCLUDED(mutex_) {
absl::MutexLock lock(&mutex_);
RAY_CHECK((object_lifecycle_mgr_.GetNumBytesUnsealed() > 0 &&
object_lifecycle_mgr_.GetNumObjectsUnsealed() > 0) ||
Expand All @@ -105,7 +104,7 @@ class PlasmaStore {
if (num_bytes_in_use < allocator_.GetFootprintLimit()) {
available = allocator_.GetFootprintLimit() - num_bytes_in_use;
}
callback(available);
return available;
}

private:
Expand Down
2 changes: 1 addition & 1 deletion src/ray/object_manager/plasma/store_runner.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class PlasmaStoreRunner {
int64_t GetFallbackAllocated() const;

void GetAvailableMemoryAsync(std::function<void(size_t)> callback) const {
main_service_.post([this, callback]() { store_->GetAvailableMemory(callback); },
main_service_.post([this, callback]() { callback(store_->GetAvailableMemory()); },
"PlasmaStoreRunner.GetAvailableMemory");
}

Expand Down
7 changes: 5 additions & 2 deletions src/ray/protobuf/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -764,8 +764,7 @@ message ResourceAllocations {

// Debug info returned from the core worker.
message CoreWorkerStats {
// Debug string of the currently executing task.
string current_task_desc = 1;
reserved 1;
// Number of pending normal and actor tasks.
int32 num_pending_tasks = 2;
// Number of object refs in local scope.
Expand Down Expand Up @@ -808,6 +807,10 @@ message CoreWorkerStats {
int64 objects_total = 24;
// Number of objects owned by the worker.
int64 num_owned_objects = 25;
// Number of actors owned by the worker.
int64 num_owned_actors = 26;
// Number of running tasks
int64 num_running_tasks = 27;
}

// Resource usage reported by the node reporter.
Expand Down

0 comments on commit 39035b4

Please sign in to comment.