Skip to content

Commit

Permalink
[Core] Fix the ray_tasks{State="PENDING_ARGS_FETCH"} metric counting (r…
Browse files Browse the repository at this point in the history
…ay-project#47770)

Signed-off-by: Jiajun Yao <jeromeyjj@gmail.com>
Signed-off-by: ujjawal-khare <ujjawal.khare@dream11.com>
  • Loading branch information
jjyao authored and ujjawal-khare committed Oct 15, 2024
1 parent 56cfa5c commit bfeaec3
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 2 deletions.
5 changes: 4 additions & 1 deletion python/ray/cluster_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,6 @@ def add_node(self, wait: bool = True, **node_args):
"object_store_memory": 150 * 1024 * 1024, # 150 MiB
"min_worker_port": 0,
"max_worker_port": 0,
"dashboard_port": None,
}
ray_params = ray._private.parameter.RayParams(**node_args)
ray_params.update_if_absent(**default_kwargs)
Expand Down Expand Up @@ -257,6 +256,10 @@ def add_node(self, wait: bool = True, **node_args):
ray_params.update_if_absent(include_log_monitor=False)
# Let grpc pick a port.
ray_params.update_if_absent(node_manager_port=0)
if "dashboard_agent_listen_port" not in node_args:
# Pick a random one to not conflict
# with the head node dashboard agent
ray_params.dashboard_agent_listen_port = None

node = ray._private.node.Node(
ray_params,
Expand Down
48 changes: 48 additions & 0 deletions python/ray/tests/test_task_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import sys
import os
import copy
import multiprocessing

import pytest

Expand Down Expand Up @@ -202,6 +203,53 @@ def f():
proc.kill()


def driver_for_test_task_fetch_args(head_info):
ray.init("auto")

@ray.remote(resources={"worker": 1})
def task1():
return [1] * 1024 * 1024

@ray.remote(resources={"head": 1})
def task2(obj):
pass

o1 = task1.remote()
o2 = task2.remote(o1)

wait_for_condition(
lambda: tasks_by_state(head_info).get("PENDING_ARGS_FETCH", 0.0) == 1.0
)

ray.cancel(o2)

wait_for_condition(
lambda: tasks_by_state(head_info).get("PENDING_ARGS_FETCH", 0.0) == 0.0
)


def test_task_fetch_args(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node(
resources={"head": 1},
_system_config={
"metrics_report_interval_ms": 100,
"testing_asio_delay_us": "ObjectManagerService.grpc_server.Pull=5000000000:5000000000", # noqa: E501
},
)
head_info = ray.init(address=cluster.address)
cluster.add_node(resources={"worker": 1})
cluster.wait_for_nodes()

multiprocessing.set_start_method("spawn")
p = multiprocessing.Process(
target=driver_for_test_task_fetch_args, args=(head_info,)
)
p.start()
p.join()
assert p.exitcode == 0


def test_task_wait_on_deps(shutdown_only):
info = ray.init(num_cpus=2, **METRIC_CONFIG)

Expand Down
1 change: 0 additions & 1 deletion release/benchmarks/object_store/test_object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,4 @@ def data_len(self, arr):
"perf_metric_type": "LATENCY",
}
]
print(f"jjyao {results} {out_file}")
json.dump(results, out_file)
6 changes: 6 additions & 0 deletions src/ray/raylet/dependency_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,12 @@ class DependencyManager : public TaskDependencyManagerInterface {
waiting_task_counter_map.Decrement(task_key);
}
}

~TaskDependencies() {
if (num_missing_dependencies > 0) {
waiting_task_counter_map.Decrement(task_key);
}
}
};

/// Stop tracking this object, if it is no longer needed by any worker or
Expand Down
23 changes: 23 additions & 0 deletions src/ray/raylet/dependency_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,29 @@ TEST_F(DependencyManagerTest, TestDuplicateTaskArgs) {
AssertNoLeaks();
}

/// Test that RemoveTaskDependencies is called before objects
/// becoming local (e.g. the task is cancelled).
TEST_F(DependencyManagerTest, TestRemoveTaskDependenciesBeforeLocal) {
int num_arguments = 3;
std::vector<ObjectID> arguments;
for (int i = 0; i < num_arguments; i++) {
arguments.push_back(ObjectID::FromRandom());
}
TaskID task_id = RandomTaskId();
bool ready = dependency_manager_.RequestTaskDependencies(
task_id, ObjectIdsToRefs(arguments), {"foo", false});
ASSERT_FALSE(ready);
ASSERT_EQ(NumWaiting("bar"), 0);
ASSERT_EQ(NumWaiting("foo"), 1);
ASSERT_EQ(NumWaitingTotal(), 1);

// The task is cancelled
dependency_manager_.RemoveTaskDependencies(task_id);
ASSERT_EQ(NumWaiting("foo"), 0);
ASSERT_EQ(NumWaitingTotal(), 0);
AssertNoLeaks();
}

} // namespace raylet

} // namespace ray
Expand Down

0 comments on commit bfeaec3

Please sign in to comment.