diff --git a/dashboard/state_aggregator.py b/dashboard/state_aggregator.py index 96e0593a74995..e3aa6f45ba4f3 100644 --- a/dashboard/state_aggregator.py +++ b/dashboard/state_aggregator.py @@ -332,6 +332,8 @@ async def list_workers(self, *, option: ListApiOptions) -> ListApiResponse: data["ip"] = data["worker_address"]["ip_address"] data["start_time_ms"] = int(data["start_time_ms"]) data["end_time_ms"] = int(data["end_time_ms"]) + data["worker_launch_time_ms"] = int(data["worker_launch_time_ms"]) + data["worker_launched_time_ms"] = int(data["worker_launched_time_ms"]) result.append(data) num_after_truncation = len(result) diff --git a/python/ray/_private/state_api_test_utils.py b/python/ray/_private/state_api_test_utils.py index 8ac0f91007360..d142816ccb650 100644 --- a/python/ray/_private/state_api_test_utils.py +++ b/python/ray/_private/state_api_test_utils.py @@ -12,6 +12,7 @@ from ray.experimental.state.api import list_tasks import ray from ray.actor import ActorHandle +from ray.experimental.state.api import list_workers @dataclass @@ -309,6 +310,41 @@ def periodic_invoke_state_apis_with_actor(*args, **kwargs) -> ActorHandle: return actor +def summarize_worker_startup_time(): + workers = list_workers( + detail=True, + filters=[("worker_type", "=", "WORKER")], + limit=10000, + raise_on_missing_output=False, + ) + time_to_launch = [] + time_to_initialize = [] + for worker in workers: + launch_time = worker.get("worker_launch_time_ms") + launched_time = worker.get("worker_launched_time_ms") + start_time = worker.get("start_time_ms") + + if launched_time > 0: + time_to_launch.append(launched_time - launch_time) + if start_time: + time_to_initialize.append(start_time - launched_time) + time_to_launch.sort() + time_to_initialize.sort() + + def print_latencies(latencies): + print(f"Avg: {round(sum(latencies) / len(latencies), 2)} ms") + print(f"P25: {round(latencies[int(len(latencies) * 0.25)], 2)} ms") + print(f"P50: {round(latencies[int(len(latencies) * 0.5)], 2)} ms") + print(f"P95: {round(latencies[int(len(latencies) * 0.95)], 2)} ms") + print(f"P99: {round(latencies[int(len(latencies) * 0.99)], 2)} ms") + + print("Time to launch workers") + print_latencies(time_to_launch) + print("=======================") + print("Time to initialize workers") + print_latencies(time_to_initialize) + + def verify_failed_task(name: str, error_type: str) -> bool: """ Check if a task with 'name' has failed with the exact error type 'error_type' diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index e3eb939e3490c..fc7af4fea2021 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -1983,6 +1983,8 @@ def connect( startup_token: int = 0, ray_debugger_external: bool = False, entrypoint: str = "", + worker_launch_time_ms: int = -1, + worker_launched_time_ms: int = -1, ): """Connect this worker to the raylet, to Plasma, and to GCS. @@ -2004,6 +2006,12 @@ def connect( node this worker is running on. entrypoint: The name of the entrypoint script. Ignored if the mode != SCRIPT_MODE + worker_launch_time_ms: The time when the worker process for this worker + is launched. If the worker is not launched by raylet (e.g., + driver), this must be -1 (default value). + worker_launched_time_ms: The time when the worker process for this worker + finshes launching. If the worker is not launched by raylet (e.g., + driver), this must be -1 (default value). """ # Do some basic checking to make sure we didn't call ray.init twice. error_message = "Perhaps you called ray.init twice by accident?" @@ -2168,6 +2176,8 @@ def connect( startup_token, session_name, "" if mode != SCRIPT_MODE else entrypoint, + worker_launch_time_ms, + worker_launched_time_ms, ) # Notify raylet that the core worker is ready. diff --git a/python/ray/_private/workers/default_worker.py b/python/ray/_private/workers/default_worker.py index 55ccd60b2201c..c57ea8e42d5ab 100644 --- a/python/ray/_private/workers/default_worker.py +++ b/python/ray/_private/workers/default_worker.py @@ -2,6 +2,7 @@ import base64 import json import time +import sys import ray import ray._private.node @@ -144,6 +145,12 @@ required=False, help="The address of web ui", ) +parser.add_argument( + "--worker-launch-time-ms", + required=True, + type=int, + help="The time when raylet starts to launch the worker process.", +) if __name__ == "__main__": @@ -154,6 +161,13 @@ args = parser.parse_args() ray._private.ray_logging.setup_logger(args.logging_level, args.logging_format) + if sys.version_info >= (3, 7): + worker_launched_time_ms = time.time_ns() // 1e6 + else: + # This value might be inaccurate in Python 3.6. + # We will anyway deprecate Python 3.6. + worker_launched_time_ms = time.time() * 1000 + if args.worker_type == "WORKER": mode = ray.WORKER_MODE elif args.worker_type == "SPILL_WORKER": @@ -214,6 +228,8 @@ runtime_env_hash=args.runtime_env_hash, startup_token=args.startup_token, ray_debugger_external=args.ray_debugger_external, + worker_launch_time_ms=args.worker_launch_time_ms, + worker_launched_time_ms=worker_launched_time_ms, ) # Setup log file. diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 01b535b0f45d6..c8265f46b83de 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -1514,7 +1514,8 @@ cdef class CoreWorker: node_ip_address, node_manager_port, raylet_ip_address, local_mode, driver_name, stdout_file, stderr_file, serialized_job_config, metrics_agent_port, runtime_env_hash, - startup_token, session_name, entrypoint): + startup_token, session_name, entrypoint, + worker_launch_time_ms, worker_launched_time_ms): self.is_local_mode = local_mode cdef CCoreWorkerOptions options = CCoreWorkerOptions() @@ -1566,6 +1567,8 @@ cdef class CoreWorker: options.startup_token = startup_token options.session_name = session_name options.entrypoint = entrypoint + options.worker_launch_time_ms = worker_launch_time_ms + options.worker_launched_time_ms = worker_launched_time_ms CCoreWorkerProcess.Initialize(options) self.cgname_to_eventloop_dict = None diff --git a/python/ray/experimental/state/common.py b/python/ray/experimental/state/common.py index ce7b7ddcd2eb8..efb6eeb23b7cd 100644 --- a/python/ray/experimental/state/common.py +++ b/python/ray/experimental/state/common.py @@ -490,10 +490,23 @@ class WorkerState(StateSchema): pid: int = state_column(filterable=True) #: The exit detail of the worker if the worker is dead. exit_detail: Optional[str] = state_column(detail=True, filterable=False) + #: The time worker is first launched. + #: -1 if the value doesn't exist. + #: The lifecycle of worker is as follow. + #: worker_launch_time_ms (process startup requested). + #: -> worker_launched_time_ms (process started). + #: -> start_time_ms (worker is ready to be used). + #: -> end_time_ms (worker is destroyed). + worker_launch_time_ms: int = state_column(filterable=False, detail=True) + #: The time worker is succesfully launched + #: -1 if the value doesn't exist. + worker_launched_time_ms: int = state_column(filterable=False, detail=True) #: The time when the worker is started and initialized. + #: 0 if the value doesn't exist. start_time_ms: int = state_column(filterable=False, detail=True) #: The time when the worker exits. The timestamp could be delayed #: if the worker is dead unexpectedly. + #: 0 if the value doesn't exist. end_time_ms: int = state_column(filterable=False, detail=True) diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 07d95fdb49add..51ec07bff9151 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -335,6 +335,8 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: int startup_token c_string session_name c_string entrypoint + int64_t worker_launch_time_ms + int64_t worker_launched_time_ms cdef cppclass CCoreWorkerProcess "ray::core::CoreWorkerProcess": @staticmethod diff --git a/python/ray/tests/test_metrics_agent.py b/python/ray/tests/test_metrics_agent.py index 8f336fc41124d..036fc92a5a1c8 100644 --- a/python/ray/tests/test_metrics_agent.py +++ b/python/ray/tests/test_metrics_agent.py @@ -51,7 +51,6 @@ "ray_object_directory_lookups", "ray_object_directory_added_locations", "ray_object_directory_removed_locations", - "ray_process_startup_time_ms_sum", "ray_internal_num_processes_started", "ray_internal_num_spilled_tasks", # "ray_unintentional_worker_failures_total", diff --git a/python/ray/tests/test_state_api.py b/python/ray/tests/test_state_api.py index d8d58156484d0..f8962ce20dd27 100644 --- a/python/ray/tests/test_state_api.py +++ b/python/ray/tests/test_state_api.py @@ -2,6 +2,7 @@ import time import json import sys +import signal from collections import Counter from dataclasses import dataclass from typing import List, Tuple @@ -177,7 +178,14 @@ def generate_node_data(id): ) -def generate_worker_data(id, pid=1234): +def generate_worker_data( + id, + pid=1234, + worker_launch_time_ms=1, + worker_launched_time_ms=2, + start_time_ms=3, + end_time_ms=4, +): return WorkerTableData( worker_address=Address( raylet_id=id, ip_address="127.0.0.1", port=124, worker_id=id @@ -187,6 +195,10 @@ def generate_worker_data(id, pid=1234): worker_type=WorkerType.WORKER, pid=pid, exit_type=None, + worker_launch_time_ms=worker_launch_time_ms, + worker_launched_time_ms=worker_launched_time_ms, + start_time_ms=start_time_ms, + end_time_ms=end_time_ms, ) @@ -2083,10 +2095,12 @@ def test_list_get_workers(shutdown_only): ray.init() def verify(): - workers = list_workers() + workers = list_workers(detail=True) assert is_hex(workers[0]["worker_id"]) # +1 to take into account of drivers. assert len(workers) == ray.cluster_resources()["CPU"] + 1 + # End time should be 0 as it is not configured yet. + assert workers[0]["end_time_ms"] == 0 # Test get worker returns the same result workers = list_workers(detail=True) @@ -2097,7 +2111,19 @@ def verify(): return True wait_for_condition(verify) - print(list_workers()) + + # Kill the worker + workers = list_workers() + os.kill(workers[-1]["pid"], signal.SIGKILL) + + def verify(): + workers = list_workers(detail=True, filters=[("is_alive", "=", "False")]) + assert len(workers) == 1 + assert workers[0]["end_time_ms"] != 0 + return True + + wait_for_condition(verify) + print(list_workers(detail=True)) @pytest.mark.skipif( diff --git a/release/benchmarks/distributed/test_many_actors.py b/release/benchmarks/distributed/test_many_actors.py index a40d8a583b095..bcecbd0b9ba2c 100644 --- a/release/benchmarks/distributed/test_many_actors.py +++ b/release/benchmarks/distributed/test_many_actors.py @@ -5,6 +5,7 @@ import tqdm from dashboard_test import DashboardTestAtScale +from ray._private.state_api_test_utils import summarize_worker_startup_time is_smoke_test = True if "SMOKE_TEST" in os.environ: @@ -40,6 +41,11 @@ def no_resource_leaks(): addr = ray.init(address="auto") test_utils.wait_for_condition(no_resource_leaks) +try: + summarize_worker_startup_time() +except Exception as e: + print("Failed to summarize worker startup time.") + print(e) monitor_actor = test_utils.monitor_memory_usage() dashboard_test = DashboardTestAtScale(addr) diff --git a/release/benchmarks/distributed/test_many_tasks.py b/release/benchmarks/distributed/test_many_tasks.py index 365d27e355ab3..bb7639700534c 100644 --- a/release/benchmarks/distributed/test_many_tasks.py +++ b/release/benchmarks/distributed/test_many_tasks.py @@ -9,6 +9,7 @@ from ray._private.state_api_test_utils import ( StateAPICallSpec, periodic_invoke_state_apis_with_actor, + summarize_worker_startup_time, ) sleep_time = 300 @@ -94,6 +95,12 @@ def not_none(res): del monitor_actor test_utils.wait_for_condition(no_resource_leaks) + try: + summarize_worker_startup_time() + except Exception as e: + print("Failed to summarize worker startup time.") + print(e) + rate = num_tasks / (end_time - start_time - sleep_time) print( f"Success! Started {num_tasks} tasks in {end_time - start_time}s. " diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 171bf3725a108..167746e974469 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -219,7 +219,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ gcs_client_ = std::make_shared(options_.gcs_options, GetWorkerID()); RAY_CHECK_OK(gcs_client_->Connect(io_service_)); - RegisterToGcs(); + RegisterToGcs(options_.worker_launch_time_ms, options_.worker_launched_time_ms); // Initialize the task state event buffer. auto task_event_gcs_client = std::make_unique(options_.gcs_options); @@ -811,7 +811,8 @@ void CoreWorker::SetCurrentTaskId(const TaskID &task_id, } } -void CoreWorker::RegisterToGcs() { +void CoreWorker::RegisterToGcs(int64_t worker_launch_time_ms, + int64_t worker_launched_time_ms) { absl::flat_hash_map worker_info; const auto &worker_id = GetWorkerID(); worker_info.emplace("node_ip_address", options_.node_ip_address); @@ -847,6 +848,8 @@ void CoreWorker::RegisterToGcs() { worker_data->set_is_alive(true); worker_data->set_pid(getpid()); worker_data->set_start_time_ms(current_sys_time_ms()); + worker_data->set_worker_launch_time_ms(worker_launch_time_ms); + worker_data->set_worker_launched_time_ms(worker_launched_time_ms); RAY_CHECK_OK(gcs_client_->Workers().AsyncAdd(worker_data, nullptr)); } diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 33aa89729e1be..12e4c40165016 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -1213,7 +1213,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { void ForceExit(const rpc::WorkerExitType exit_type, const std::string &detail); /// Register this worker or driver to GCS. - void RegisterToGcs(); + void RegisterToGcs(int64_t worker_launch_time_ms, int64_t worker_launched_time_ms); /// (WORKER mode only) Check if the raylet has failed. If so, shutdown. void ExitIfParentRayletDies(); diff --git a/src/ray/core_worker/core_worker_options.h b/src/ray/core_worker/core_worker_options.h index 97ab66d4c5a8d..f1fa61d9c0919 100644 --- a/src/ray/core_worker/core_worker_options.h +++ b/src/ray/core_worker/core_worker_options.h @@ -86,7 +86,9 @@ struct CoreWorkerOptions { connect_on_start(true), runtime_env_hash(0), session_name(""), - entrypoint("") {} + entrypoint(""), + worker_launch_time_ms(-1), + worker_launched_time_ms(-1) {} /// Type of this worker (i.e., DRIVER or WORKER). WorkerType worker_type; @@ -183,6 +185,8 @@ struct CoreWorkerOptions { /// Session name (Cluster ID) of the cluster. std::string session_name; std::string entrypoint; + int64_t worker_launch_time_ms; + int64_t worker_launched_time_ms; }; } // namespace core } // namespace ray diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index 597aeea0f27cc..a3350285974af 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -401,10 +401,28 @@ message WorkerTableData { optional string exit_detail = 20; // pid of the worker process. uint32 pid = 21; - // The unix ms timestamp the worker was started at. + + /* The below fields are worker lifecycle events + worker_launch_time_ms (process startup requested). + -> worker_launched_time_ms (process started). + -> start_time_ms (worker is ready to be used). + -> end_time_ms (worker is destroyed). + */ + + // The unix ms timestamp the worker was started and finished initialization. uint64 start_time_ms = 23; // The unix ms timestamp the worker was ended at. uint64 end_time_ms = 24; + // The time when this worker process is requested from raylet. + // The field exists only when the worker is launched + // by a raylet. (I.e., driver worker won't have this value). + // If the value doesn't present, it is -1. + uint64 worker_launch_time_ms = 25; + // The time when this worker process is successfully started. + // The field exists only when the worker is launched + // by a raylet. (I.e., driver worker won't have this value). + // If the value doesn't present, it is -1. + uint64 worker_launched_time_ms = 26; } // Fields to publish when worker fails. diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 093abb41841c6..c96a144659ca0 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -322,6 +322,8 @@ WorkerPool::BuildProcessCommandArgs(const Language &language, if (language == Language::PYTHON) { worker_command_args.push_back("--startup-token=" + std::to_string(worker_startup_token_counter_)); + worker_command_args.push_back("--worker-launch-time-ms=" + + std::to_string(current_sys_time_ms())); } else if (language == Language::CPP) { worker_command_args.push_back("--startup_token=" + std::to_string(worker_startup_token_counter_)); @@ -462,12 +464,9 @@ std::tuple WorkerPool::StartWorkerProcess( serialized_runtime_env_context, state); - // Start a process and measure the startup time. auto start = std::chrono::high_resolution_clock::now(); + // Start a process and measure the startup time. Process proc = StartProcess(worker_command_args, env); - auto end = std::chrono::high_resolution_clock::now(); - auto duration = std::chrono::duration_cast(end - start); - stats::ProcessStartupTimeMs.Record(duration.count()); stats::NumWorkersStarted.Record(1); RAY_LOG(INFO) << "Started worker process with pid " << proc.GetId() << ", the token is " << worker_startup_token_counter_; diff --git a/src/ray/stats/metric_defs.h b/src/ray/stats/metric_defs.h index dd5350ba60683..bb1beeaa33455 100644 --- a/src/ray/stats/metric_defs.h +++ b/src/ray/stats/metric_defs.h @@ -202,12 +202,6 @@ static Gauge ObjectDirectoryRemovedLocations( "have been removed from this node.", "removals"); -/// Worker Pool -static Histogram ProcessStartupTimeMs("process_startup_time_ms", - "Time to start up a worker process.", - "ms", - {1, 10, 100, 1000, 10000}); - static Sum NumWorkersStarted( "internal_num_processes_started", "The total number of worker processes the worker pool has created.",