Skip to content

Commit

Permalink
[core] Decouple create worker vs pop worker request. (ray-project#47694)
Browse files Browse the repository at this point in the history
Now, when you call PopWorker(), it finds an idle one or creates a
worker. If a new worker is created, the worker is associated to the
request and can only be used by it.

This PR decouples the worker creation and the worker-to-task assignment,
by adding an abstraction namely PopWorkerRequest. Now, if a req triggers
a worker creation, the req is put into a queue. If there are workers
ready, that is a PushWorker is called, either from a newly started
worker or a released worker, Ray matches the first fitting request in
the queue. This reduces latency.

Later it can also be used to pre-start workers more meaningfully.

Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
Signed-off-by: ujjawal-khare <ujjawal.khare@dream11.com>
  • Loading branch information
rynewang authored and ujjawal-khare committed Oct 15, 2024
1 parent baf0174 commit 56e3a64
Show file tree
Hide file tree
Showing 3 changed files with 243 additions and 86 deletions.
122 changes: 122 additions & 0 deletions python/ray/tests/test_node_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,128 @@ def f():
assert used_worker_pids == worker_pids


MyPlugin = "HangOnSecondWorkerPlugin"
MY_PLUGIN_CLASS_PATH = "ray.tests.test_node_manager.HangOnSecondWorkerPlugin"
PLUGIN_TIMEOUT = 10


class HangOnSecondWorkerPlugin(RuntimeEnvPlugin):
"""
The first worker will start up normally, but all subsequent workers will hang at
start up indefinitely. How it works: Ray RuntimeEnvAgent caches the modified context
so we can't do it in modify_context. Instead, we use a bash command to read a file
and hang forever. We don't have a good file lock mechanism in bash (flock is not
installed by default in macos), so we also serialize the worker startup.
"""

name = MyPlugin

def __init__(self):
# Each URI has a temp dir, a counter file, and a hang.sh script.
self.uris = collections.defaultdict(dict)

def get_uris(self, runtime_env: "RuntimeEnv") -> List[str]: # noqa: F821
return [runtime_env[self.name]]

async def create(
self,
uri: Optional[str],
runtime_env,
context: RuntimeEnvContext,
logger: logging.Logger,
) -> float:
d = self.uris[uri]
d["temp_dir"] = tempfile.mkdtemp()
logger.info(f"caching temp dir {d['temp_dir']} for uri {uri}")
d["counter_file"] = os.path.join(d["temp_dir"], "script_run_count")
with open(d["counter_file"], "w+") as f:
f.write("0")
d["hang_sh"] = os.path.join(d["temp_dir"], "hang.sh")
with open(d["hang_sh"], "w+") as f:
f.write(
f"""#!/bin/bash
counter_file="{d['counter_file']}"
count=$(cat "$counter_file")
if [ "$count" -eq "0" ]; then
echo "1" > "$counter_file"
echo "first time run"
exit 0
elif [ "$count" -eq "1" ]; then
echo "2" > "$counter_file"
echo "second time run, sleeping..."
sleep 1000
fi
"""
)
os.chmod(d["hang_sh"], 0o755)
return 0.1

def modify_context(
self,
uris: List[str],
runtime_env: "RuntimeEnv", # noqa: F821
ctx: RuntimeEnvContext,
logger: logging.Logger,
) -> None:
logger.info(f"Starting worker: {uris}, {runtime_env}")
if self.name not in runtime_env:
return
assert len(uris) == 1
uri = uris[0]
hang_sh = self.uris[uri]["hang_sh"]
ctx.command_prefix += ["bash", hang_sh, "&&"]

def delete_uri(self, uri: str, logger: logging.Logger) -> float:
temp_dir = self.uris[uri]["temp_dir"]
shutil.rmtree(temp_dir)
del self.uris[uri]
logger.info(f"temp_dir removed: {temp_dir}")


@pytest.fixture
def serialize_worker_startup(monkeypatch):
"""Only one worker starts up each time, since our bash script is not process-safe"""
monkeypatch.setenv("RAY_worker_maximum_startup_concurrency", "1")
yield


@pytest.mark.parametrize(
"set_runtime_env_plugins",
[
'[{"class":"' + MY_PLUGIN_CLASS_PATH + '"}]',
],
indirect=True,
)
def test_can_reuse_released_workers(
serialize_worker_startup, set_runtime_env_plugins, ray_start_cluster
):
"""
Uses a runtime env plugin to make sure only 1 worker can start and all subsequent
workers will hang in runtime start up forever. We issue 10 tasks and test that
all the following tasks can still be scheduled on the first worker released from the
first task, i.e. tasks are not binded to the workers that they requested to start.
"""
cluster = ray_start_cluster
cluster.add_node(num_cpus=2)
ray.init(address=cluster.address)

@ray.remote(runtime_env={"env_vars": {"HELLO": "WORLD"}, MyPlugin: "key"})
def f():
# Sleep for a while to make sure other tasks also request workers.
time.sleep(1)
print(f"pid={os.getpid()}, env HELLO={os.environ.get('HELLO')}")
return os.getpid()

objs = [f.remote() for i in range(10)]

pids = ray.get(objs)
for pid in pids:
assert pid == pids[0]


if __name__ == "__main__":
import sys

Expand Down
203 changes: 119 additions & 84 deletions src/ray/raylet/worker_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1159,102 +1159,164 @@ void WorkerPool::KillIdleWorker(std::shared_ptr<WorkerInterface> idle_worker,
}

WorkerUnfitForTaskReason WorkerPool::WorkerFitsForTask(
const WorkerInterface &worker, const TaskSpecification &task_spec) const {
const WorkerInterface &worker, const PopWorkerRequest &pop_worker_request) const {
if (worker.IsDead()) {
return WorkerUnfitForTaskReason::OTHERS;
}
if (worker.GetLanguage() != task_spec.GetLanguage()) {
// These workers are exiting. So skip them.
if (pending_exit_idle_workers_.contains(worker.WorkerId())) {
return WorkerUnfitForTaskReason::OTHERS;
}
// Don't allow worker reuse across jobs or root detached actors. Reuse worker with
// unassigned job_id and root detached actor id is OK.
JobID job_id = worker.GetAssignedJobId();
if (!job_id.IsNil() && job_id != task_spec.JobId()) {
return WorkerUnfitForTaskReason::ROOT_MISMATCH;
if (worker.GetLanguage() != pop_worker_request.language) {
return WorkerUnfitForTaskReason::OTHERS;
}
if (worker.GetWorkerType() != pop_worker_request.worker_type) {
return WorkerUnfitForTaskReason::OTHERS;
}
ActorID root_detached_actor_id = worker.GetRootDetachedActorId();
if (!root_detached_actor_id.IsNil() &&
root_detached_actor_id != task_spec.RootDetachedActorId()) {

if (!IdMatches(pop_worker_request.root_detached_actor_id,
worker.GetRootDetachedActorId())) {
return WorkerUnfitForTaskReason::ROOT_MISMATCH;
}
auto is_gpu = worker.GetIsGpu();
if (is_gpu.has_value()) {
bool task_is_gpu =
task_spec.GetRequiredResources().Get(scheduling::ResourceID::GPU()) > 0;
if (is_gpu.value() != task_is_gpu) {
return WorkerUnfitForTaskReason::OTHERS;
// Only compare job id for actors not rooted to a detached actor.
if (pop_worker_request.root_detached_actor_id.IsNil()) {
if (!IdMatches(pop_worker_request.job_id, worker.GetAssignedJobId())) {
return WorkerUnfitForTaskReason::ROOT_MISMATCH;
}
}
auto is_actor_worker = worker.GetIsActorWorker();
if (is_actor_worker.has_value() &&
is_actor_worker.value() != task_spec.IsActorCreationTask()) {
// If the request asks for a is_gpu, and the worker is assigned a different is_gpu,
// then skip it.
if (!OptionalMatches(pop_worker_request.is_gpu, worker.GetIsGpu())) {
return WorkerUnfitForTaskReason::OTHERS;
}
// If the request asks for a is_actor_worker, and the worker is assigned a different
// is_actor_worker, then skip it.
if (!OptionalMatches(pop_worker_request.is_actor_worker, worker.GetIsActorWorker())) {
return WorkerUnfitForTaskReason::OTHERS;
}
// TODO(clarng): consider re-using worker that has runtime envionrment
// if the task doesn't require one.
if (worker.GetRuntimeEnvHash() != task_spec.GetRuntimeEnvHash()) {
if (worker.GetRuntimeEnvHash() != pop_worker_request.runtime_env_hash) {
return WorkerUnfitForTaskReason::RUNTIME_ENV_MISMATCH;
}
// Skip if the dynamic_options doesn't match.
if (LookupWorkerDynamicOptions(worker.GetStartupToken()) !=
task_spec.DynamicWorkerOptionsOrEmpty()) {
pop_worker_request.dynamic_options) {
return WorkerUnfitForTaskReason::DYNAMIC_OPTIONS_MISMATCH;
}
// These workers are exiting. So skip them.
if (pending_exit_idle_workers_.contains(worker.WorkerId())) {
return WorkerUnfitForTaskReason::OTHERS;
}
return WorkerUnfitForTaskReason::NONE;
}

void WorkerPool::PopWorker(const TaskSpecification &task_spec,
const PopWorkerCallback &callback) {
RAY_LOG(DEBUG) << "Pop worker for task " << task_spec.TaskId() << " task name "
<< task_spec.FunctionDescriptor()->ToString();
auto &state = GetStateForLanguage(task_spec.GetLanguage());
void WorkerPool::StartNewWorker(
const std::shared_ptr<PopWorkerRequest> &pop_worker_request) {
auto start_worker_process_fn = [this](
std::shared_ptr<PopWorkerRequest> pop_worker_request,
const std::string &serialized_runtime_env_context) {
auto &state = GetStateForLanguage(pop_worker_request->language);
const std::string &serialized_runtime_env =
pop_worker_request->runtime_env_info.serialized_runtime_env();

std::shared_ptr<WorkerInterface> worker = nullptr;
auto start_worker_process_fn = [this](const TaskSpecification &task_spec,
State &state,
const std::string &serialized_runtime_env_context,
const PopWorkerCallback &callback) {
PopWorkerStatus status = PopWorkerStatus::OK;
auto [proc, startup_token] =
StartWorkerProcess(task_spec.GetLanguage(),
rpc::WorkerType::WORKER,
task_spec.JobId(),
&status,
task_spec.DynamicWorkerOptionsOrEmpty(),
task_spec.GetRuntimeEnvHash(),
serialized_runtime_env_context,
task_spec.RuntimeEnvInfo());
auto [proc, startup_token] = StartWorkerProcess(pop_worker_request->language,
pop_worker_request->worker_type,
pop_worker_request->job_id,
&status,
pop_worker_request->dynamic_options,
pop_worker_request->runtime_env_hash,
serialized_runtime_env_context,
pop_worker_request->runtime_env_info);
if (status == PopWorkerStatus::OK) {
RAY_CHECK(proc.IsValid());
WarnAboutSize();
auto task_info = TaskWaitingForWorkerInfo{task_spec, callback};
state.starting_workers_to_tasks[startup_token] = std::move(task_info);
state.pending_registration_requests.emplace_back(pop_worker_request);
MonitorPopWorkerRequestForRegistration(pop_worker_request);
} else if (status == PopWorkerStatus::TooManyStartingWorkerProcesses) {
// TODO(jjyao) As an optimization, we don't need to delete the runtime env
// but reuse it the next time we retry the request.
DeleteRuntimeEnvIfPossible(task_spec.SerializedRuntimeEnv());
state.pending_pop_worker_requests.emplace_back(
PopWorkerRequest{task_spec, callback});
DeleteRuntimeEnvIfPossible(serialized_runtime_env);
state.pending_start_requests.emplace_back(std::move(pop_worker_request));
} else {
DeleteRuntimeEnvIfPossible(task_spec.SerializedRuntimeEnv());
PopWorkerCallbackAsync(task_spec, callback, nullptr, status);
DeleteRuntimeEnvIfPossible(serialized_runtime_env);
PopWorkerCallbackAsync(std::move(pop_worker_request->callback), nullptr, status);
}
};

const std::string &serialized_runtime_env =
pop_worker_request->runtime_env_info.serialized_runtime_env();

if (!IsRuntimeEnvEmpty(serialized_runtime_env)) {
// create runtime env.
GetOrCreateRuntimeEnv(
serialized_runtime_env,
pop_worker_request->runtime_env_info.runtime_env_config(),
pop_worker_request->job_id,
[this, start_worker_process_fn, pop_worker_request](
bool successful,
const std::string &serialized_runtime_env_context,
const std::string &setup_error_message) {
if (successful) {
start_worker_process_fn(pop_worker_request, serialized_runtime_env_context);
} else {
process_failed_runtime_env_setup_failed_++;
pop_worker_request->callback(
nullptr,
PopWorkerStatus::RuntimeEnvCreationFailed,
/*runtime_env_setup_error_message*/ setup_error_message);
}
});
} else {
start_worker_process_fn(pop_worker_request, "");
}
}

void WorkerPool::PopWorker(const TaskSpecification &task_spec,
const PopWorkerCallback &callback) {
RAY_LOG(DEBUG) << "Pop worker for task " << task_spec.TaskId() << " task name "
<< task_spec.FunctionDescriptor()->ToString();
// Code path of actor task.
RAY_CHECK(!task_spec.IsActorTask()) << "Direct call shouldn't reach here.";

auto pop_worker_request = std::make_shared<PopWorkerRequest>(
task_spec.GetLanguage(),
rpc::WorkerType::WORKER,
task_spec.JobId(),
task_spec.RootDetachedActorId(),
/*is_gpu=*/task_spec.GetRequiredResources().Get(scheduling::ResourceID::GPU()) > 0,
/*is_actor_worker=*/task_spec.IsActorCreationTask(),
task_spec.RuntimeEnvInfo(),
task_spec.GetRuntimeEnvHash(),
task_spec.DynamicWorkerOptionsOrEmpty(),
[this, task_spec, callback](
const std::shared_ptr<WorkerInterface> &worker,
PopWorkerStatus status,
const std::string &runtime_env_setup_error_message) -> bool {
// We got a worker suitable for the task. Now let's check if the task is still
// executable.
if (worker && finished_jobs_.contains(task_spec.JobId()) &&
task_spec.RootDetachedActorId().IsNil()) {
// When a job finishes, node manager will kill leased workers one time
// and worker pool will kill idle workers periodically.
// The current worker is already removed from the idle workers
// but hasn't been added to the leased workers since the callback is not called
// yet. We shouldn't add this worker to the leased workers since killing leased
// workers for this finished job may already happen and won't happen again (this
// is one time) so it will cause a process leak. Instead we fail the PopWorker
// and add the worker back to the idle workers so it can be killed later.
RAY_CHECK(status == PopWorkerStatus::OK);
callback(nullptr, PopWorkerStatus::JobFinished, "");
// Not used
return false;
}
return callback(worker, status, runtime_env_setup_error_message);
});

absl::flat_hash_map<WorkerUnfitForTaskReason, size_t> skip_reason_count;

auto worker_fits_for_task_fn =
[this, &task_spec, &skip_reason_count](
[this, &pop_worker_request, &skip_reason_count](
const std::pair<std::shared_ptr<WorkerInterface>, int64_t> &pair) -> bool {
const auto &worker = pair.first;
WorkerUnfitForTaskReason reason = WorkerFitsForTask(*worker, task_spec);
WorkerUnfitForTaskReason reason = WorkerFitsForTask(*worker, *pop_worker_request);
if (reason == WorkerUnfitForTaskReason::NONE) {
return true;
}
Expand All @@ -1268,7 +1330,8 @@ void WorkerPool::PopWorker(const TaskSpecification &task_spec,
}
return false;
};

auto &state = GetStateForLanguage(task_spec.GetLanguage());
std::shared_ptr<WorkerInterface> worker = nullptr;
auto good_worker_it = std::find_if(idle_of_all_languages_.rbegin(),
idle_of_all_languages_.rend(),
worker_fits_for_task_fn);
Expand All @@ -1286,36 +1349,8 @@ void WorkerPool::PopWorker(const TaskSpecification &task_spec,
if (worker == nullptr) {
RAY_LOG(DEBUG) << "No cached worker, cached workers skipped due to "
<< debug_string(skip_reason_count);
if (task_spec.HasRuntimeEnv()) {
// create runtime env.
RAY_LOG(DEBUG) << "GetOrCreateRuntimeEnv for task " << task_spec.TaskId();
GetOrCreateRuntimeEnv(
task_spec.SerializedRuntimeEnv(),
task_spec.RuntimeEnvConfig(),
task_spec.JobId(),
[this, start_worker_process_fn, callback, &state, task_spec](
bool successful,
const std::string &serialized_runtime_env_context,
const std::string &setup_error_message) {
if (successful) {
start_worker_process_fn(
task_spec, state, serialized_runtime_env_context, callback);
} else {
process_failed_runtime_env_setup_failed_++;
callback(nullptr,
PopWorkerStatus::RuntimeEnvCreationFailed,
/*runtime_env_setup_error_message*/ setup_error_message);
RAY_LOG(WARNING) << "Create runtime env failed for task "
<< task_spec.TaskId()
<< " and couldn't create the worker.";
}
});
} else {
start_worker_process_fn(task_spec, state, "", callback);
}
}

if (worker) {
StartNewWorker(pop_worker_request);
} else {
RAY_CHECK(worker->GetAssignedJobId().IsNil() ||
worker->GetAssignedJobId() == task_spec.JobId());
RAY_LOG(DEBUG) << "Re-using worker " << worker->WorkerId() << " for task "
Expand Down
Loading

0 comments on commit 56e3a64

Please sign in to comment.