Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] ray list tasks filter state and name on gcs side #46270

Merged
merged 8 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions python/ray/util/state/state_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,10 @@ async def get_all_task_info(
req_filters.job_id = JobID(hex_to_binary(value)).binary()
elif key == "task_id":
req_filters.task_ids.append(TaskID(hex_to_binary(value)).binary())
elif key == "name":
req_filters.name = value
elif key == "state":
req_filters.state = value
Comment on lines +310 to +313
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Main change 1

else:
continue

Expand Down
24 changes: 23 additions & 1 deletion src/ray/gcs/gcs_server/gcs_task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ void GcsTaskManager::GcsTaskManagerStorage::MarkTaskAttemptFailedIfNeeded(
// We could mark the task as failed even if might not have state updates yet (i.e. only
// profiling events are reported).
auto state_updates = task_events.mutable_state_updates();
state_updates->set_failed_ts(failed_ts);
(*state_updates->mutable_state_ts())[ray::rpc::TaskStatus::FAILED] = failed_ts;
state_updates->mutable_error_info()->CopyFrom(error_info);
}

Expand Down Expand Up @@ -423,6 +423,28 @@ void GcsTaskManager::HandleGetTaskEvents(rpc::GetTaskEventsRequest request,
return false;
}

if (filters.has_state()) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

main change 2

const google::protobuf::EnumDescriptor *task_status_descriptor =
ray::rpc::TaskStatus_descriptor();

// Figure out the latest state of a task.
ray::rpc::TaskStatus state = ray::rpc::TaskStatus::NIL;
if (task_event.has_state_updates()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we are also doing something similar at the client when aggregating later, i wonder if this would go out of sync with that routine.

I wonder if it makes sense to add a current_state to the task event when returned (even when there's no state filter), so we don't need to do that at the client.

This probably could go in another PR.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, we could do that. But based on the current logic, it shouldn't go out of sync since we just iterate through TaskStatus and find the last state that has timestamp.

Having a current_state or define the common function in cython is a good follow-up to do.

for (int i = task_status_descriptor->value_count() - 1; i >= 0; --i) {
if (task_event.state_updates().state_ts().contains(
task_status_descriptor->value(i)->number())) {
state = static_cast<ray::rpc::TaskStatus>(
task_status_descriptor->value(i)->number());
break;
}
}
}

if (filters.state() != task_status_descriptor->FindValueByNumber(state)->name()) {
return false;
}
}

return true;
};

Expand Down
39 changes: 6 additions & 33 deletions src/ray/gcs/pb_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,8 @@ inline bool IsTaskTerminated(const rpc::TaskEvents &task_event) {
}

const auto &state_updates = task_event.state_updates();
return state_updates.has_finished_ts() || state_updates.has_failed_ts();
return state_updates.state_ts().contains(rpc::TaskStatus::FINISHED) ||
state_updates.state_ts().contains(rpc::TaskStatus::FAILED);
}

inline size_t NumProfileEvents(const rpc::TaskEvents &task_event) {
Expand Down Expand Up @@ -335,7 +336,7 @@ inline bool IsTaskFinished(const rpc::TaskEvents &task_event) {
}

const auto &state_updates = task_event.state_updates();
return state_updates.has_finished_ts();
return state_updates.state_ts().contains(rpc::TaskStatus::FINISHED);
}

/// Fill the rpc::TaskStateUpdate with the timestamps according to the status change.
Expand All @@ -346,39 +347,11 @@ inline bool IsTaskFinished(const rpc::TaskEvents &task_event) {
inline void FillTaskStatusUpdateTime(const ray::rpc::TaskStatus &task_status,
int64_t timestamp,
ray::rpc::TaskStateUpdate *state_updates) {
switch (task_status) {
jjyao marked this conversation as resolved.
Show resolved Hide resolved
case rpc::TaskStatus::PENDING_ARGS_AVAIL: {
state_updates->set_pending_args_avail_ts(timestamp);
break;
}
case rpc::TaskStatus::SUBMITTED_TO_WORKER: {
state_updates->set_submitted_to_worker_ts(timestamp);
break;
}
case rpc::TaskStatus::PENDING_NODE_ASSIGNMENT: {
state_updates->set_pending_node_assignment_ts(timestamp);
break;
}
case rpc::TaskStatus::FINISHED: {
state_updates->set_finished_ts(timestamp);
break;
}
case rpc::TaskStatus::FAILED: {
state_updates->set_failed_ts(timestamp);
break;
}
case rpc::TaskStatus::RUNNING: {
state_updates->set_running_ts(timestamp);
break;
}
case rpc::TaskStatus::NIL: {
if (task_status == rpc::TaskStatus::NIL) {
// Not status change.
break;
}
default: {
UNREACHABLE;
}
return;
}
(*state_updates->mutable_state_ts())[task_status] = timestamp;
}

inline std::string FormatPlacementGroupLabelName(const std::string &pg_id) {
Expand Down
15 changes: 3 additions & 12 deletions src/ray/protobuf/gcs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -220,18 +220,6 @@ message TaskLogInfo {
message TaskStateUpdate {
// Node that runs the task.
optional bytes node_id = 1;
// Timestamp when status changes to PENDING_ARGS_AVAIL.
optional int64 pending_args_avail_ts = 2;
// Timestamp when status changes to PENDING_NODE_ASSIGNMENT.
optional int64 pending_node_assignment_ts = 3;
// Timestamp when status changes to SUBMITTED_TO_WORKER.
optional int64 submitted_to_worker_ts = 4;
// Timestamp when status changes to RUNNING.
optional int64 running_ts = 5;
// Timestamp when status changes to FINISHED.
optional int64 finished_ts = 6;
// Timestamp when status changes to FAILED.
optional int64 failed_ts = 7;
// Worker that runs the task.
optional bytes worker_id = 8;
// Task faulure info.
Expand All @@ -244,6 +232,9 @@ message TaskStateUpdate {
optional int32 worker_pid = 12;
// Is task paused by debugger.
optional bool is_debugger_paused = 13;
// Key is the integer value of TaskStatus enum (protobuf doesn't support Enum as key).
// Value is the timestamp when status changes to the target status indicated by the key.
map<int32, int64> state_ts = 14;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

main change 3

}

// Represents events and state changes from a single task run.
Expand Down
2 changes: 2 additions & 0 deletions src/ray/protobuf/gcs_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,8 @@ message GetTaskEventsRequest {
optional string name = 4;
// True if task events from driver (only profiling events) should be excluded.
optional bool exclude_driver = 5;
// Latest state of the task.
optional string state = 6;
jjyao marked this conversation as resolved.
Show resolved Hide resolved
}

// Maximum number of TaskEvents to return.
Expand Down
Loading