From 7eadf76aebaf449a7b5323312364267703793dc7 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang <56065503+rynewang@users.noreply.github.com> Date: Thu, 10 Oct 2024 16:56:44 -0700 Subject: [PATCH] [core] Decouple create worker vs pop worker request. (#47694) Now, when you call PopWorker(), it finds an idle one or creates a worker. If a new worker is created, the worker is associated to the request and can only be used by it. This PR decouples the worker creation and the worker-to-task assignment, by adding an abstraction namely PopWorkerRequest. Now, if a req triggers a worker creation, the req is put into a queue. If there are workers ready, that is a PushWorker is called, either from a newly started worker or a released worker, Ray matches the first fitting request in the queue. This reduces latency. Later it can also be used to pre-start workers more meaningfully. Signed-off-by: Ruiyang Wang Signed-off-by: ujjawal-khare --- python/ray/tests/test_node_manager.py | 122 ++++++++++++++++ src/ray/raylet/worker_pool.cc | 203 +++++++++++++++----------- src/ray/raylet/worker_pool.h | 4 +- 3 files changed, 243 insertions(+), 86 deletions(-) diff --git a/python/ray/tests/test_node_manager.py b/python/ray/tests/test_node_manager.py index 5eae495af4e8..98e7dc27f608 100644 --- a/python/ray/tests/test_node_manager.py +++ b/python/ray/tests/test_node_manager.py @@ -403,6 +403,128 @@ def f(): assert used_worker_pids == worker_pids +MyPlugin = "HangOnSecondWorkerPlugin" +MY_PLUGIN_CLASS_PATH = "ray.tests.test_node_manager.HangOnSecondWorkerPlugin" +PLUGIN_TIMEOUT = 10 + + +class HangOnSecondWorkerPlugin(RuntimeEnvPlugin): + """ + The first worker will start up normally, but all subsequent workers will hang at + start up indefinitely. How it works: Ray RuntimeEnvAgent caches the modified context + so we can't do it in modify_context. Instead, we use a bash command to read a file + and hang forever. We don't have a good file lock mechanism in bash (flock is not + installed by default in macos), so we also serialize the worker startup. + """ + + name = MyPlugin + + def __init__(self): + # Each URI has a temp dir, a counter file, and a hang.sh script. + self.uris = collections.defaultdict(dict) + + def get_uris(self, runtime_env: "RuntimeEnv") -> List[str]: # noqa: F821 + return [runtime_env[self.name]] + + async def create( + self, + uri: Optional[str], + runtime_env, + context: RuntimeEnvContext, + logger: logging.Logger, + ) -> float: + d = self.uris[uri] + d["temp_dir"] = tempfile.mkdtemp() + logger.info(f"caching temp dir {d['temp_dir']} for uri {uri}") + d["counter_file"] = os.path.join(d["temp_dir"], "script_run_count") + with open(d["counter_file"], "w+") as f: + f.write("0") + d["hang_sh"] = os.path.join(d["temp_dir"], "hang.sh") + with open(d["hang_sh"], "w+") as f: + f.write( + f"""#!/bin/bash + +counter_file="{d['counter_file']}" + +count=$(cat "$counter_file") + +if [ "$count" -eq "0" ]; then + echo "1" > "$counter_file" + echo "first time run" + exit 0 +elif [ "$count" -eq "1" ]; then + echo "2" > "$counter_file" + echo "second time run, sleeping..." + sleep 1000 +fi +""" + ) + os.chmod(d["hang_sh"], 0o755) + return 0.1 + + def modify_context( + self, + uris: List[str], + runtime_env: "RuntimeEnv", # noqa: F821 + ctx: RuntimeEnvContext, + logger: logging.Logger, + ) -> None: + logger.info(f"Starting worker: {uris}, {runtime_env}") + if self.name not in runtime_env: + return + assert len(uris) == 1 + uri = uris[0] + hang_sh = self.uris[uri]["hang_sh"] + ctx.command_prefix += ["bash", hang_sh, "&&"] + + def delete_uri(self, uri: str, logger: logging.Logger) -> float: + temp_dir = self.uris[uri]["temp_dir"] + shutil.rmtree(temp_dir) + del self.uris[uri] + logger.info(f"temp_dir removed: {temp_dir}") + + +@pytest.fixture +def serialize_worker_startup(monkeypatch): + """Only one worker starts up each time, since our bash script is not process-safe""" + monkeypatch.setenv("RAY_worker_maximum_startup_concurrency", "1") + yield + + +@pytest.mark.parametrize( + "set_runtime_env_plugins", + [ + '[{"class":"' + MY_PLUGIN_CLASS_PATH + '"}]', + ], + indirect=True, +) +def test_can_reuse_released_workers( + serialize_worker_startup, set_runtime_env_plugins, ray_start_cluster +): + """ + Uses a runtime env plugin to make sure only 1 worker can start and all subsequent + workers will hang in runtime start up forever. We issue 10 tasks and test that + all the following tasks can still be scheduled on the first worker released from the + first task, i.e. tasks are not binded to the workers that they requested to start. + """ + cluster = ray_start_cluster + cluster.add_node(num_cpus=2) + ray.init(address=cluster.address) + + @ray.remote(runtime_env={"env_vars": {"HELLO": "WORLD"}, MyPlugin: "key"}) + def f(): + # Sleep for a while to make sure other tasks also request workers. + time.sleep(1) + print(f"pid={os.getpid()}, env HELLO={os.environ.get('HELLO')}") + return os.getpid() + + objs = [f.remote() for i in range(10)] + + pids = ray.get(objs) + for pid in pids: + assert pid == pids[0] + + if __name__ == "__main__": import sys diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 29b074868b51..8293be8e29c3 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -1159,102 +1159,164 @@ void WorkerPool::KillIdleWorker(std::shared_ptr idle_worker, } WorkerUnfitForTaskReason WorkerPool::WorkerFitsForTask( - const WorkerInterface &worker, const TaskSpecification &task_spec) const { + const WorkerInterface &worker, const PopWorkerRequest &pop_worker_request) const { if (worker.IsDead()) { return WorkerUnfitForTaskReason::OTHERS; } - if (worker.GetLanguage() != task_spec.GetLanguage()) { + // These workers are exiting. So skip them. + if (pending_exit_idle_workers_.contains(worker.WorkerId())) { return WorkerUnfitForTaskReason::OTHERS; } - // Don't allow worker reuse across jobs or root detached actors. Reuse worker with - // unassigned job_id and root detached actor id is OK. - JobID job_id = worker.GetAssignedJobId(); - if (!job_id.IsNil() && job_id != task_spec.JobId()) { - return WorkerUnfitForTaskReason::ROOT_MISMATCH; + if (worker.GetLanguage() != pop_worker_request.language) { + return WorkerUnfitForTaskReason::OTHERS; + } + if (worker.GetWorkerType() != pop_worker_request.worker_type) { + return WorkerUnfitForTaskReason::OTHERS; } - ActorID root_detached_actor_id = worker.GetRootDetachedActorId(); - if (!root_detached_actor_id.IsNil() && - root_detached_actor_id != task_spec.RootDetachedActorId()) { + + if (!IdMatches(pop_worker_request.root_detached_actor_id, + worker.GetRootDetachedActorId())) { return WorkerUnfitForTaskReason::ROOT_MISMATCH; } - auto is_gpu = worker.GetIsGpu(); - if (is_gpu.has_value()) { - bool task_is_gpu = - task_spec.GetRequiredResources().Get(scheduling::ResourceID::GPU()) > 0; - if (is_gpu.value() != task_is_gpu) { - return WorkerUnfitForTaskReason::OTHERS; + // Only compare job id for actors not rooted to a detached actor. + if (pop_worker_request.root_detached_actor_id.IsNil()) { + if (!IdMatches(pop_worker_request.job_id, worker.GetAssignedJobId())) { + return WorkerUnfitForTaskReason::ROOT_MISMATCH; } } - auto is_actor_worker = worker.GetIsActorWorker(); - if (is_actor_worker.has_value() && - is_actor_worker.value() != task_spec.IsActorCreationTask()) { + // If the request asks for a is_gpu, and the worker is assigned a different is_gpu, + // then skip it. + if (!OptionalMatches(pop_worker_request.is_gpu, worker.GetIsGpu())) { + return WorkerUnfitForTaskReason::OTHERS; + } + // If the request asks for a is_actor_worker, and the worker is assigned a different + // is_actor_worker, then skip it. + if (!OptionalMatches(pop_worker_request.is_actor_worker, worker.GetIsActorWorker())) { return WorkerUnfitForTaskReason::OTHERS; } // TODO(clarng): consider re-using worker that has runtime envionrment // if the task doesn't require one. - if (worker.GetRuntimeEnvHash() != task_spec.GetRuntimeEnvHash()) { + if (worker.GetRuntimeEnvHash() != pop_worker_request.runtime_env_hash) { return WorkerUnfitForTaskReason::RUNTIME_ENV_MISMATCH; } // Skip if the dynamic_options doesn't match. if (LookupWorkerDynamicOptions(worker.GetStartupToken()) != - task_spec.DynamicWorkerOptionsOrEmpty()) { + pop_worker_request.dynamic_options) { return WorkerUnfitForTaskReason::DYNAMIC_OPTIONS_MISMATCH; } - // These workers are exiting. So skip them. - if (pending_exit_idle_workers_.contains(worker.WorkerId())) { - return WorkerUnfitForTaskReason::OTHERS; - } return WorkerUnfitForTaskReason::NONE; } -void WorkerPool::PopWorker(const TaskSpecification &task_spec, - const PopWorkerCallback &callback) { - RAY_LOG(DEBUG) << "Pop worker for task " << task_spec.TaskId() << " task name " - << task_spec.FunctionDescriptor()->ToString(); - auto &state = GetStateForLanguage(task_spec.GetLanguage()); +void WorkerPool::StartNewWorker( + const std::shared_ptr &pop_worker_request) { + auto start_worker_process_fn = [this]( + std::shared_ptr pop_worker_request, + const std::string &serialized_runtime_env_context) { + auto &state = GetStateForLanguage(pop_worker_request->language); + const std::string &serialized_runtime_env = + pop_worker_request->runtime_env_info.serialized_runtime_env(); - std::shared_ptr worker = nullptr; - auto start_worker_process_fn = [this](const TaskSpecification &task_spec, - State &state, - const std::string &serialized_runtime_env_context, - const PopWorkerCallback &callback) { PopWorkerStatus status = PopWorkerStatus::OK; - auto [proc, startup_token] = - StartWorkerProcess(task_spec.GetLanguage(), - rpc::WorkerType::WORKER, - task_spec.JobId(), - &status, - task_spec.DynamicWorkerOptionsOrEmpty(), - task_spec.GetRuntimeEnvHash(), - serialized_runtime_env_context, - task_spec.RuntimeEnvInfo()); + auto [proc, startup_token] = StartWorkerProcess(pop_worker_request->language, + pop_worker_request->worker_type, + pop_worker_request->job_id, + &status, + pop_worker_request->dynamic_options, + pop_worker_request->runtime_env_hash, + serialized_runtime_env_context, + pop_worker_request->runtime_env_info); if (status == PopWorkerStatus::OK) { RAY_CHECK(proc.IsValid()); WarnAboutSize(); - auto task_info = TaskWaitingForWorkerInfo{task_spec, callback}; - state.starting_workers_to_tasks[startup_token] = std::move(task_info); + state.pending_registration_requests.emplace_back(pop_worker_request); + MonitorPopWorkerRequestForRegistration(pop_worker_request); } else if (status == PopWorkerStatus::TooManyStartingWorkerProcesses) { // TODO(jjyao) As an optimization, we don't need to delete the runtime env // but reuse it the next time we retry the request. - DeleteRuntimeEnvIfPossible(task_spec.SerializedRuntimeEnv()); - state.pending_pop_worker_requests.emplace_back( - PopWorkerRequest{task_spec, callback}); + DeleteRuntimeEnvIfPossible(serialized_runtime_env); + state.pending_start_requests.emplace_back(std::move(pop_worker_request)); } else { - DeleteRuntimeEnvIfPossible(task_spec.SerializedRuntimeEnv()); - PopWorkerCallbackAsync(task_spec, callback, nullptr, status); + DeleteRuntimeEnvIfPossible(serialized_runtime_env); + PopWorkerCallbackAsync(std::move(pop_worker_request->callback), nullptr, status); } }; + const std::string &serialized_runtime_env = + pop_worker_request->runtime_env_info.serialized_runtime_env(); + + if (!IsRuntimeEnvEmpty(serialized_runtime_env)) { + // create runtime env. + GetOrCreateRuntimeEnv( + serialized_runtime_env, + pop_worker_request->runtime_env_info.runtime_env_config(), + pop_worker_request->job_id, + [this, start_worker_process_fn, pop_worker_request]( + bool successful, + const std::string &serialized_runtime_env_context, + const std::string &setup_error_message) { + if (successful) { + start_worker_process_fn(pop_worker_request, serialized_runtime_env_context); + } else { + process_failed_runtime_env_setup_failed_++; + pop_worker_request->callback( + nullptr, + PopWorkerStatus::RuntimeEnvCreationFailed, + /*runtime_env_setup_error_message*/ setup_error_message); + } + }); + } else { + start_worker_process_fn(pop_worker_request, ""); + } +} + +void WorkerPool::PopWorker(const TaskSpecification &task_spec, + const PopWorkerCallback &callback) { + RAY_LOG(DEBUG) << "Pop worker for task " << task_spec.TaskId() << " task name " + << task_spec.FunctionDescriptor()->ToString(); // Code path of actor task. RAY_CHECK(!task_spec.IsActorTask()) << "Direct call shouldn't reach here."; + auto pop_worker_request = std::make_shared( + task_spec.GetLanguage(), + rpc::WorkerType::WORKER, + task_spec.JobId(), + task_spec.RootDetachedActorId(), + /*is_gpu=*/task_spec.GetRequiredResources().Get(scheduling::ResourceID::GPU()) > 0, + /*is_actor_worker=*/task_spec.IsActorCreationTask(), + task_spec.RuntimeEnvInfo(), + task_spec.GetRuntimeEnvHash(), + task_spec.DynamicWorkerOptionsOrEmpty(), + [this, task_spec, callback]( + const std::shared_ptr &worker, + PopWorkerStatus status, + const std::string &runtime_env_setup_error_message) -> bool { + // We got a worker suitable for the task. Now let's check if the task is still + // executable. + if (worker && finished_jobs_.contains(task_spec.JobId()) && + task_spec.RootDetachedActorId().IsNil()) { + // When a job finishes, node manager will kill leased workers one time + // and worker pool will kill idle workers periodically. + // The current worker is already removed from the idle workers + // but hasn't been added to the leased workers since the callback is not called + // yet. We shouldn't add this worker to the leased workers since killing leased + // workers for this finished job may already happen and won't happen again (this + // is one time) so it will cause a process leak. Instead we fail the PopWorker + // and add the worker back to the idle workers so it can be killed later. + RAY_CHECK(status == PopWorkerStatus::OK); + callback(nullptr, PopWorkerStatus::JobFinished, ""); + // Not used + return false; + } + return callback(worker, status, runtime_env_setup_error_message); + }); + absl::flat_hash_map skip_reason_count; auto worker_fits_for_task_fn = - [this, &task_spec, &skip_reason_count]( + [this, &pop_worker_request, &skip_reason_count]( const std::pair, int64_t> &pair) -> bool { const auto &worker = pair.first; - WorkerUnfitForTaskReason reason = WorkerFitsForTask(*worker, task_spec); + WorkerUnfitForTaskReason reason = WorkerFitsForTask(*worker, *pop_worker_request); if (reason == WorkerUnfitForTaskReason::NONE) { return true; } @@ -1268,7 +1330,8 @@ void WorkerPool::PopWorker(const TaskSpecification &task_spec, } return false; }; - + auto &state = GetStateForLanguage(task_spec.GetLanguage()); + std::shared_ptr worker = nullptr; auto good_worker_it = std::find_if(idle_of_all_languages_.rbegin(), idle_of_all_languages_.rend(), worker_fits_for_task_fn); @@ -1286,36 +1349,8 @@ void WorkerPool::PopWorker(const TaskSpecification &task_spec, if (worker == nullptr) { RAY_LOG(DEBUG) << "No cached worker, cached workers skipped due to " << debug_string(skip_reason_count); - if (task_spec.HasRuntimeEnv()) { - // create runtime env. - RAY_LOG(DEBUG) << "GetOrCreateRuntimeEnv for task " << task_spec.TaskId(); - GetOrCreateRuntimeEnv( - task_spec.SerializedRuntimeEnv(), - task_spec.RuntimeEnvConfig(), - task_spec.JobId(), - [this, start_worker_process_fn, callback, &state, task_spec]( - bool successful, - const std::string &serialized_runtime_env_context, - const std::string &setup_error_message) { - if (successful) { - start_worker_process_fn( - task_spec, state, serialized_runtime_env_context, callback); - } else { - process_failed_runtime_env_setup_failed_++; - callback(nullptr, - PopWorkerStatus::RuntimeEnvCreationFailed, - /*runtime_env_setup_error_message*/ setup_error_message); - RAY_LOG(WARNING) << "Create runtime env failed for task " - << task_spec.TaskId() - << " and couldn't create the worker."; - } - }); - } else { - start_worker_process_fn(task_spec, state, "", callback); - } - } - - if (worker) { + StartNewWorker(pop_worker_request); + } else { RAY_CHECK(worker->GetAssignedJobId().IsNil() || worker->GetAssignedJobId() == task_spec.JobId()); RAY_LOG(DEBUG) << "Re-using worker " << worker->WorkerId() << " for task " diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index 62a6e6e6352f..6d71290ca832 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -730,8 +730,8 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { void ExecuteOnPrestartWorkersStarted(std::function callback); // If this worker can serve the task. - WorkerUnfitForTaskReason WorkerFitsForTask(const WorkerInterface &worker, - const TaskSpecification &task_spec) const; + WorkerUnfitForTaskReason WorkerFitsForTask( + const WorkerInterface &worker, const PopWorkerRequest &pop_worker_request) const; /// For Process class for managing subprocesses (e.g. reaping zombies). instrumented_io_context *io_service_;