Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[State API] Add worker startup & initialization time to state API + use it for many_tasks #31916

Merged
merged 14 commits into from
Mar 24, 2023
Merged
2 changes: 2 additions & 0 deletions dashboard/state_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,8 @@ async def list_workers(self, *, option: ListApiOptions) -> ListApiResponse:
data["ip"] = data["worker_address"]["ip_address"]
data["start_time_ms"] = int(data["start_time_ms"])
data["end_time_ms"] = int(data["end_time_ms"])
data["worker_launch_time_ms"] = int(data["worker_launch_time_ms"])
data["worker_launched_time_ms"] = int(data["worker_launched_time_ms"])
result.append(data)

num_after_truncation = len(result)
Expand Down
31 changes: 31 additions & 0 deletions python/ray/_private/state_api_test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from typing import Callable, Dict, List, Optional
import ray
from ray.actor import ActorHandle
from ray.experimental.state.api import list_workers


@dataclass
Expand Down Expand Up @@ -306,3 +307,33 @@ def periodic_invoke_state_apis_with_actor(*args, **kwargs) -> ActorHandle:
print("State api actor is ready now.")
actor.start.remote()
return actor


def summarize_worker_startup_time():
workers = list_workers(detail=True, filters=[("worker_type", "=", "WORKER")])
time_to_launch = []
time_to_initialize = []
for worker in workers:
launch_time = worker.get("worker_launch_time_ms")
launched_time = worker.get("worker_launched_time_ms")
start_time = worker.get("start_time_ms")

if launched_time > 0:
time_to_launch.append(launched_time - launch_time)
if start_time:
time_to_initialize.append(start_time - launched_time)
time_to_launch.sort()
time_to_initialize.sort()

def print_latencies(latencies):
print(f"Avg: {round(sum(latencies) / len(latencies), 2)} ms")
print(f"P25: {round(latencies[int(len(latencies) * 0.25)], 2)} ms")
print(f"P50: {round(latencies[int(len(latencies) * 0.5)], 2)} ms")
print(f"P95: {round(latencies[int(len(latencies) * 0.95)], 2)} ms")
print(f"P99: {round(latencies[int(len(latencies) * 0.99)], 2)} ms")

print("Time to launch workers")
print_latencies(time_to_launch)
print("=======================")
print("Time to initialize workers")
print_latencies(time_to_initialize)
10 changes: 10 additions & 0 deletions python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1895,6 +1895,8 @@ def connect(
startup_token: int = 0,
ray_debugger_external: bool = False,
entrypoint: str = "",
worker_launch_time_ms: int = -1,
worker_launched_time_ms: int = -1,
):
"""Connect this worker to the raylet, to Plasma, and to GCS.

Expand All @@ -1916,6 +1918,12 @@ def connect(
node this worker is running on.
entrypoint: The name of the entrypoint script. Ignored unless the
mode != SCRIPT_MODE
worker_launch_time_ms: The time when the worker process for this worker
is launched. If the worker is not launched by raylet (e.g.,
driver), this must be -1 (default value).
worker_launched_time_ms: The time when the worker process for this worker
finshes launching. If the worker is not launched by raylet (e.g.,
driver), this must be -1 (default value).
"""
# Do some basic checking to make sure we didn't call ray.init twice.
error_message = "Perhaps you called ray.init twice by accident?"
Expand Down Expand Up @@ -2086,6 +2094,8 @@ def connect(
startup_token,
session_name,
"" if mode != SCRIPT_MODE else entrypoint,
worker_launch_time_ms,
worker_launched_time_ms,
)

# Notify raylet that the core worker is ready.
Expand Down
16 changes: 16 additions & 0 deletions python/ray/_private/workers/default_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import base64
import json
import time
import sys

import ray
import ray._private.node
Expand Down Expand Up @@ -144,6 +145,12 @@
required=False,
help="The address of web ui",
)
parser.add_argument(
"--worker-launch-time-ms",
required=True,
type=int,
help="The time worker process is launched from raylet",
rkooo567 marked this conversation as resolved.
Show resolved Hide resolved
)


if __name__ == "__main__":
Expand All @@ -154,6 +161,13 @@
args = parser.parse_args()
ray._private.ray_logging.setup_logger(args.logging_level, args.logging_format)

if sys.version_info >= (3, 7):
worker_launched_time_ms = time.time_ns() // 1e6
else:
# This value might be inaccurate in Python 3.6.
# We will anyway deprecate Python 3.6.
worker_launched_time = time.time() * 1000

if args.worker_type == "WORKER":
mode = ray.WORKER_MODE
elif args.worker_type == "SPILL_WORKER":
Expand Down Expand Up @@ -214,6 +228,8 @@
runtime_env_hash=args.runtime_env_hash,
startup_token=args.startup_token,
ray_debugger_external=args.ray_debugger_external,
worker_launch_time_ms=args.worker_launch_time_ms,
worker_launched_time_ms=worker_launched_time_ms,
)

# Setup log file.
Expand Down
5 changes: 4 additions & 1 deletion python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1499,7 +1499,8 @@ cdef class CoreWorker:
node_ip_address, node_manager_port, raylet_ip_address,
local_mode, driver_name, stdout_file, stderr_file,
serialized_job_config, metrics_agent_port, runtime_env_hash,
startup_token, session_name, entrypoint):
startup_token, session_name, entrypoint,
worker_launch_time_ms, worker_launched_time_ms):
self.is_local_mode = local_mode

cdef CCoreWorkerOptions options = CCoreWorkerOptions()
Expand Down Expand Up @@ -1551,6 +1552,8 @@ cdef class CoreWorker:
options.startup_token = startup_token
options.session_name = session_name
options.entrypoint = entrypoint
options.worker_launch_time_ms = worker_launch_time_ms
options.worker_launched_time_ms = worker_launched_time_ms
CCoreWorkerProcess.Initialize(options)

self.cgname_to_eventloop_dict = None
Expand Down
13 changes: 13 additions & 0 deletions python/ray/experimental/state/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,10 +475,23 @@ class WorkerState(StateSchema):
pid: int = state_column(filterable=True)
#: The exit detail of the worker if the worker is dead.
exit_detail: Optional[str] = state_column(detail=True, filterable=False)
#: The time worker is first launched.
#: 0 if the value doesn't exist.
#: The lifecycle of worker is as follow.
#: worker_launch_time_ms (process startup requested).
#: -> worker_launched_time_ms (process started).
#: -> start_time_ms (worker is ready to be used).
#: -> end_time_ms (worker is destroyed).
worker_launch_time_ms: int = state_column(filterable=False, detail=True)
#: The time worker is succesfully launched
#: 0 if the value doesn't exist.
worker_launched_time_ms: int = state_column(filterable=False, detail=True)
#: The time when the worker is started and initialized.
#: 0 if the value doesn't exist.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also -1 default it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me try

start_time_ms: int = state_column(filterable=False, detail=True)
#: The time when the worker exits. The timestamp could be delayed
#: if the worker is dead unexpectedly.
#: 0 if the value doesn't exist.
end_time_ms: int = state_column(filterable=False, detail=True)


Expand Down
2 changes: 2 additions & 0 deletions python/ray/includes/libcoreworker.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,8 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
int startup_token
c_string session_name
c_string entrypoint
int64_t worker_launch_time_ms
int64_t worker_launched_time_ms

cdef cppclass CCoreWorkerProcess "ray::core::CoreWorkerProcess":
@staticmethod
Expand Down
1 change: 0 additions & 1 deletion python/ray/tests/test_metrics_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
"ray_object_directory_lookups",
"ray_object_directory_added_locations",
"ray_object_directory_removed_locations",
"ray_process_startup_time_ms_sum",
"ray_internal_num_processes_started",
"ray_internal_num_spilled_tasks",
# "ray_unintentional_worker_failures_total",
Expand Down
32 changes: 29 additions & 3 deletions python/ray/tests/test_state_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import time
import json
import sys
import signal
from collections import Counter
from dataclasses import dataclass
from typing import List, Tuple
Expand Down Expand Up @@ -177,7 +178,14 @@ def generate_node_data(id):
)


def generate_worker_data(id, pid=1234):
def generate_worker_data(
id,
pid=1234,
worker_launch_time_ms=1,
worker_launched_time_ms=2,
start_time_ms=3,
end_time_ms=4,
):
return WorkerTableData(
worker_address=Address(
raylet_id=id, ip_address="127.0.0.1", port=124, worker_id=id
Expand All @@ -187,6 +195,10 @@ def generate_worker_data(id, pid=1234):
worker_type=WorkerType.WORKER,
pid=pid,
exit_type=None,
worker_launch_time_ms=worker_launch_time_ms,
worker_launched_time_ms=worker_launched_time_ms,
start_time_ms=start_time_ms,
end_time_ms=end_time_ms,
)


Expand Down Expand Up @@ -2073,10 +2085,12 @@ def test_list_get_workers(shutdown_only):
ray.init()

def verify():
workers = list_workers()
workers = list_workers(detail=True)
assert is_hex(workers[0]["worker_id"])
# +1 to take into account of drivers.
assert len(workers) == ray.cluster_resources()["CPU"] + 1
# End time should be 0 as it is not configured yet.
assert workers[0]["end_time_ms"] == 0

# Test get worker returns the same result
workers = list_workers(detail=True)
Expand All @@ -2087,7 +2101,19 @@ def verify():
return True

wait_for_condition(verify)
print(list_workers())

# Kill the worker
workers = list_workers()
os.kill(workers[-1]["pid"], signal.SIGKILL)

def verify():
workers = list_workers(detail=True, filters=[("is_alive", "=", "False")])
assert len(workers) == 1
assert workers[0]["end_time_ms"] != 0
return True

wait_for_condition(verify)
print(list_workers(detail=True))


@pytest.mark.skipif(
Expand Down
7 changes: 7 additions & 0 deletions release/benchmarks/distributed/test_many_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from ray._private.state_api_test_utils import (
StateAPICallSpec,
periodic_invoke_state_apis_with_actor,
summarize_worker_startup_time,
)

sleep_time = 300
Expand Down Expand Up @@ -103,6 +104,12 @@ def not_none(res):
del monitor_actor
test_utils.wait_for_condition(no_resource_leaks)

try:
summarize_worker_startup_time()
except Exception as e:
print("Failed to summarize worker startup time.")
print(e)

rate = num_tasks / (end_time - start_time - sleep_time)
print(
f"Success! Started {num_tasks} tasks in {end_time - start_time}s. "
Expand Down
7 changes: 5 additions & 2 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
gcs_client_ = std::make_shared<gcs::GcsClient>(options_.gcs_options);

RAY_CHECK_OK(gcs_client_->Connect(io_service_));
RegisterToGcs();
RegisterToGcs(options_.worker_launch_time_ms, options_.worker_launched_time_ms);

// Initialize the task state event buffer.
auto task_event_gcs_client = std::make_unique<gcs::GcsClient>(options_.gcs_options);
Expand Down Expand Up @@ -764,7 +764,8 @@ void CoreWorker::SetCurrentTaskId(const TaskID &task_id,
}
}

void CoreWorker::RegisterToGcs() {
void CoreWorker::RegisterToGcs(int64_t worker_launch_time_ms,
int64_t worker_launched_time_ms) {
absl::flat_hash_map<std::string, std::string> worker_info;
const auto &worker_id = GetWorkerID();
worker_info.emplace("node_ip_address", options_.node_ip_address);
Expand Down Expand Up @@ -800,6 +801,8 @@ void CoreWorker::RegisterToGcs() {
worker_data->set_is_alive(true);
worker_data->set_pid(getpid());
worker_data->set_start_time_ms(current_sys_time_ms());
worker_data->set_worker_launch_time_ms(worker_launch_time_ms);
worker_data->set_worker_launched_time_ms(worker_launched_time_ms);

RAY_CHECK_OK(gcs_client_->Workers().AsyncAdd(worker_data, nullptr));
}
Expand Down
2 changes: 1 addition & 1 deletion src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -1175,7 +1175,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
void ForceExit(const rpc::WorkerExitType exit_type, const std::string &detail);

/// Register this worker or driver to GCS.
void RegisterToGcs();
void RegisterToGcs(int64_t worker_launch_time_ms, int64_t worker_launched_time_ms);

/// (WORKER mode only) Check if the raylet has failed. If so, shutdown.
void ExitIfParentRayletDies();
Expand Down
6 changes: 5 additions & 1 deletion src/ray/core_worker/core_worker_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ struct CoreWorkerOptions {
connect_on_start(true),
runtime_env_hash(0),
session_name(""),
entrypoint("") {}
entrypoint(""),
worker_launch_time_ms(-1),
worker_launched_time_ms(-1) {}

/// Type of this worker (i.e., DRIVER or WORKER).
WorkerType worker_type;
Expand Down Expand Up @@ -183,6 +185,8 @@ struct CoreWorkerOptions {
/// Session name (Cluster ID) of the cluster.
std::string session_name;
std::string entrypoint;
int64_t worker_launch_time_ms;
int64_t worker_launched_time_ms;
};
} // namespace core
} // namespace ray
18 changes: 17 additions & 1 deletion src/ray/protobuf/gcs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -342,10 +342,26 @@ message WorkerTableData {
optional string exit_detail = 20;
// pid of the worker process.
uint32 pid = 21;
// The unix ms timestamp the worker was started at.

/* The below fields are worker lifecycle events
worker_launch_time_ms (process startup requested).
-> worker_launched_time_ms (process started).
-> start_time_ms (worker is ready to be used).
-> end_time_ms (worker is destroyed).
*/

// The unix ms timestamp the worker was started and finished initialization.
uint64 start_time_ms = 23;
// The unix ms timestamp the worker was ended at.
uint64 end_time_ms = 24;
// The time when this worker process is requested from raylet.
// The field exists only when the worker is launched
// by a raylet. (I.e., driver worker won't have this value).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did we un-optional the fields?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added // If the value doesn't present, it is -1.!

uint64 worker_launch_time_ms = 25;
// The time when this worker process is successfully started.
// The field exists only when the worker is launched
// by a raylet. (I.e., driver worker won't have this value).
uint64 worker_launched_time_ms = 26;
}

// Fields to publish when worker fails.
Expand Down
7 changes: 3 additions & 4 deletions src/ray/raylet/worker_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,8 @@ WorkerPool::BuildProcessCommandArgs(const Language &language,
if (language == Language::PYTHON) {
worker_command_args.push_back("--startup-token=" +
std::to_string(worker_startup_token_counter_));
worker_command_args.push_back("--worker-launch-time-ms=" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm can we log into C++ metrics directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm we anyway need plumbing to calculate the initialization time although we use metrics
also metrics has cardinatliy issue + not practical at this point (until we have a default core dashboard, which we don't plan to have it for a while).

Having these values to worker state also makes sense given that's the direction we are going (add event information to state API)

std::to_string(current_sys_time_ms()));
} else if (language == Language::CPP) {
worker_command_args.push_back("--startup_token=" +
std::to_string(worker_startup_token_counter_));
Expand Down Expand Up @@ -460,12 +462,9 @@ std::tuple<Process, StartupToken> WorkerPool::StartWorkerProcess(
serialized_runtime_env_context,
state);

// Start a process and measure the startup time.
auto start = std::chrono::high_resolution_clock::now();
// Start a process and measure the startup time.
Process proc = StartProcess(worker_command_args, env);
auto end = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
stats::ProcessStartupTimeMs.Record(duration.count());
stats::NumWorkersStarted.Record(1);
RAY_LOG(INFO) << "Started worker process with pid " << proc.GetId() << ", the token is "
<< worker_startup_token_counter_;
Expand Down
6 changes: 0 additions & 6 deletions src/ray/stats/metric_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,12 +202,6 @@ static Gauge ObjectDirectoryRemovedLocations(
"have been removed from this node.",
"removals");

/// Worker Pool
static Histogram ProcessStartupTimeMs("process_startup_time_ms",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed it cuz I thought we don't really use it anyway. I am open to just keep it as well

"Time to start up a worker process.",
"ms",
{1, 10, 100, 1000, 10000});

static Sum NumWorkersStarted(
"internal_num_processes_started",
"The total number of worker processes the worker pool has created.",
Expand Down