Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Decouple create worker vs pop worker request. #47694

Merged
merged 14 commits into from
Oct 10, 2024
129 changes: 129 additions & 0 deletions python/ray/tests/test_node_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@
from ray._private.utils import get_num_cpus
import time
import sys
from ray._private.runtime_env.context import RuntimeEnvContext
from ray._private.runtime_env.plugin import RuntimeEnvPlugin
from typing import List, Optional
import logging
import tempfile
import collections
import shutil


# This tests the queue transitions for infeasible tasks. This has been an issue
Expand Down Expand Up @@ -396,6 +403,128 @@ def f():
assert used_worker_pids == worker_pids


MyPlugin = "HangOnSecondWorkerPlugin"
MY_PLUGIN_CLASS_PATH = "ray.tests.test_node_manager.HangOnSecondWorkerPlugin"
PLUGIN_TIMEOUT = 10


class HangOnSecondWorkerPlugin(RuntimeEnvPlugin):
"""
The first worker will start up normally, but all subsequent workers will hang at
start up indefinitely. How it works: Ray RuntimeEnvAgent caches the modified context
so we can't do it in modify_context. Instead, we use a bash command to read a file
and hang forever. We don't have a good file lock mechanism in bash (flock is not
installed by default in macos), so we also serialize the worker startup.
"""

name = MyPlugin

def __init__(self):
# Each URI has a temp dir, a counter file, and a hang.sh script.
self.uris = collections.defaultdict(dict)

def get_uris(self, runtime_env: "RuntimeEnv") -> List[str]: # noqa: F821
return [runtime_env[self.name]]

async def create(
self,
uri: Optional[str],
runtime_env,
context: RuntimeEnvContext,
logger: logging.Logger,
) -> float:
d = self.uris[uri]
d["temp_dir"] = tempfile.mkdtemp()
logger.info(f"caching temp dir {d['temp_dir']} for uri {uri}")
d["counter_file"] = os.path.join(d["temp_dir"], "script_run_count")
with open(d["counter_file"], "w+") as f:
f.write("0")
d["hang_sh"] = os.path.join(d["temp_dir"], "hang.sh")
with open(d["hang_sh"], "w+") as f:
f.write(
f"""#!/bin/bash

counter_file="{d['counter_file']}"

count=$(cat "$counter_file")

if [ "$count" -eq "0" ]; then
echo "1" > "$counter_file"
echo "first time run"
exit 0
elif [ "$count" -eq "1" ]; then
echo "2" > "$counter_file"
echo "second time run, sleeping..."
sleep 1000
fi
"""
)
os.chmod(d["hang_sh"], 0o755)
return 0.1

def modify_context(
self,
uris: List[str],
runtime_env: "RuntimeEnv", # noqa: F821
ctx: RuntimeEnvContext,
logger: logging.Logger,
) -> None:
logger.info(f"Starting worker: {uris}, {runtime_env}")
if self.name not in runtime_env:
return
assert len(uris) == 1
uri = uris[0]
hang_sh = self.uris[uri]["hang_sh"]
ctx.command_prefix += ["bash", hang_sh, "&&"]

def delete_uri(self, uri: str, logger: logging.Logger) -> float:
temp_dir = self.uris[uri]["temp_dir"]
shutil.rmtree(temp_dir)
del self.uris[uri]
logger.info(f"temp_dir removed: {temp_dir}")


@pytest.fixture
def serialize_worker_startup(monkeypatch):
"""Only one worker starts up each time, since our bash script is not process-safe"""
monkeypatch.setenv("RAY_worker_maximum_startup_concurrency", "1")
yield


@pytest.mark.parametrize(
"set_runtime_env_plugins",
[
'[{"class":"' + MY_PLUGIN_CLASS_PATH + '"}]',
],
indirect=True,
)
def test_can_reuse_released_workers(
serialize_worker_startup, set_runtime_env_plugins, ray_start_cluster
):
"""
Uses a runtime env plugin to make sure only 1 worker can start and all subsequent
workers will hang in runtime start up forever. We issue 10 tasks and test that
all the following tasks can still be scheduled on the first worker released from the
first task, i.e. tasks are not binded to the workers that they requested to start.
"""
cluster = ray_start_cluster
cluster.add_node(num_cpus=2)
ray.init(address=cluster.address)

@ray.remote(runtime_env={"env_vars": {"HELLO": "WORLD"}, MyPlugin: "key"})
def f():
# Sleep for a while to make sure other tasks also request workers.
time.sleep(1)
print(f"pid={os.getpid()}, env HELLO={os.environ.get('HELLO')}")
return os.getpid()

objs = [f.remote() for i in range(10)]

pids = ray.get(objs)
for pid in pids:
assert pid == pids[0]


if __name__ == "__main__":
import sys

Expand Down
1 change: 1 addition & 0 deletions src/ray/protobuf/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,7 @@ message ActorCreationTaskSpec {
// The dynamic options used in the worker command when starting a worker process for
// an actor creation task. If the list isn't empty, the options will be used to replace
// the placeholder string `RAY_WORKER_DYNAMIC_OPTION_PLACEHOLDER` in the worker command.
// Used by Java workers for JVM options.
repeated string dynamic_worker_options = 5;
// The max number of concurrent calls for default concurrency group of this actor.
int32 max_concurrency = 6;
Expand Down
10 changes: 8 additions & 2 deletions src/ray/raylet/worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "ray/raylet/worker.h"

#include <boost/bind/bind.hpp>
#include <utility>

#include "ray/raylet/format/node_manager_generated.h"
#include "ray/raylet/raylet.h"
Expand Down Expand Up @@ -42,7 +43,7 @@ Worker::Worker(const JobID &job_id,
ip_address_(ip_address),
assigned_port_(-1),
port_(-1),
connection_(connection),
connection_(std::move(connection)),
assigned_job_id_(job_id),
runtime_env_hash_(runtime_env_hash),
bundle_id_(std::make_pair(PlacementGroupID::Nil(), -1)),
Expand Down Expand Up @@ -129,7 +130,12 @@ void Worker::Connect(std::shared_ptr<rpc::CoreWorkerClientInterface> rpc_client)
}
}

void Worker::AssignTaskId(const TaskID &task_id) { assigned_task_id_ = task_id; }
void Worker::AssignTaskId(const TaskID &task_id) {
assigned_task_id_ = task_id;
if (!task_id.IsNil()) {
task_assign_time_ = absl::Now();
}
}

const TaskID &Worker::GetAssignedTaskId() const { return assigned_task_id_; }

Expand Down
1 change: 0 additions & 1 deletion src/ray/raylet/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,6 @@ class Worker : public WorkerInterface {
RAY_CHECK(!task_spec.IsActorTask());
SetIsActorWorker(task_spec.IsActorCreationTask());
assigned_task_ = assigned_task;
task_assign_time_ = absl::Now();
root_detached_actor_id_ = assigned_task.GetTaskSpecification().RootDetachedActorId();
}

Expand Down
Loading