Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Fix the ray_tasks{State="PENDING_ARGS_FETCH"} metric counting #47770

Merged
merged 4 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion python/ray/cluster_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,6 @@ def add_node(self, wait: bool = True, **node_args):
"object_store_memory": 150 * 1024 * 1024, # 150 MiB
"min_worker_port": 0,
"max_worker_port": 0,
"dashboard_port": None,
}
ray_params = ray._private.parameter.RayParams(**node_args)
ray_params.update_if_absent(**default_kwargs)
Expand Down Expand Up @@ -257,6 +256,10 @@ def add_node(self, wait: bool = True, **node_args):
ray_params.update_if_absent(include_log_monitor=False)
# Let grpc pick a port.
ray_params.update_if_absent(node_manager_port=0)
if "dashboard_agent_listen_port" not in node_args:
# Pick a random one to not conflict
# with the head node dashboard agent
ray_params.dashboard_agent_listen_port = None

node = ray._private.node.Node(
ray_params,
Expand Down
48 changes: 48 additions & 0 deletions python/ray/tests/test_task_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import sys
import os
import copy
import multiprocessing

import pytest

Expand Down Expand Up @@ -202,6 +203,53 @@ def f():
proc.kill()


def driver_for_test_task_fetch_args(head_info):
ray.init("auto")

@ray.remote(resources={"worker": 1})
def task1():
return [1] * 1024 * 1024

@ray.remote(resources={"head": 1})
def task2(obj):
pass

o1 = task1.remote()
o2 = task2.remote(o1)

wait_for_condition(
lambda: tasks_by_state(head_info).get("PENDING_ARGS_FETCH", 0.0) == 1.0
)

ray.cancel(o2)

wait_for_condition(
lambda: tasks_by_state(head_info).get("PENDING_ARGS_FETCH", 0.0) == 0.0
)


def test_task_fetch_args(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node(
resources={"head": 1},
_system_config={
"metrics_report_interval_ms": 100,
"testing_asio_delay_us": "ObjectManagerService.grpc_server.Pull=5000000000:5000000000", # noqa: E501
},
)
head_info = ray.init(address=cluster.address)
cluster.add_node(resources={"worker": 1})
cluster.wait_for_nodes()

multiprocessing.set_start_method("spawn")
p = multiprocessing.Process(
target=driver_for_test_task_fetch_args, args=(head_info,)
)
p.start()
p.join()
assert p.exitcode == 0


def test_task_wait_on_deps(shutdown_only):
info = ray.init(num_cpus=2, **METRIC_CONFIG)

Expand Down
1 change: 0 additions & 1 deletion release/benchmarks/object_store/test_object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,4 @@ def data_len(self, arr):
"perf_metric_type": "LATENCY",
}
]
print(f"jjyao {results} {out_file}")
json.dump(results, out_file)
6 changes: 6 additions & 0 deletions src/ray/raylet/dependency_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,12 @@ class DependencyManager : public TaskDependencyManagerInterface {
waiting_task_counter_map.Decrement(task_key);
}
}

~TaskDependencies() {
if (num_missing_dependencies > 0) {
waiting_task_counter_map.Decrement(task_key);
}
}
};

/// Stop tracking this object, if it is no longer needed by any worker or
Expand Down
23 changes: 23 additions & 0 deletions src/ray/raylet/dependency_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,29 @@ TEST_F(DependencyManagerTest, TestDuplicateTaskArgs) {
AssertNoLeaks();
}

/// Test that RemoveTaskDependencies is called before objects
/// becoming local (e.g. the task is cancelled).
TEST_F(DependencyManagerTest, TestRemoveTaskDependenciesBeforeLocal) {
int num_arguments = 3;
std::vector<ObjectID> arguments;
for (int i = 0; i < num_arguments; i++) {
arguments.push_back(ObjectID::FromRandom());
}
TaskID task_id = RandomTaskId();
bool ready = dependency_manager_.RequestTaskDependencies(
task_id, ObjectIdsToRefs(arguments), {"foo", false});
ASSERT_FALSE(ready);
ASSERT_EQ(NumWaiting("bar"), 0);
ASSERT_EQ(NumWaiting("foo"), 1);
ASSERT_EQ(NumWaitingTotal(), 1);

// The task is cancelled
dependency_manager_.RemoveTaskDependencies(task_id);
ASSERT_EQ(NumWaiting("foo"), 0);
ASSERT_EQ(NumWaitingTotal(), 0);
AssertNoLeaks();
}

} // namespace raylet

} // namespace ray
Expand Down