Skip to content

Commit

Permalink
[core] Make is_gpu, is_actor, root_detached_id fields late bind to wo…
Browse files Browse the repository at this point in the history
…rkers. (ray-project#47212)

Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
Signed-off-by: Jiajun Yao <jeromeyjj@gmail.com>
Co-authored-by: Jiajun Yao <jeromeyjj@gmail.com>
Signed-off-by: ujjawal-khare <ujjawal.khare@dream11.com>
  • Loading branch information
2 people authored and ujjawal-khare committed Oct 15, 2024
1 parent 0bfc36a commit 8928dba
Show file tree
Hide file tree
Showing 16 changed files with 300 additions and 265 deletions.
83 changes: 50 additions & 33 deletions python/ray/serve/tests/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -602,26 +602,34 @@ def g():
assert "hello" == requests.get(url_f).text
assert "world" == requests.get(url_g).text

def verify_metrics(metric, expected_output):
for key in expected_output:
assert metric[key] == expected_output[key]

wait_for_condition(
lambda: len(get_metric_dictionaries("serve_deployment_request_counter")) == 2,
lambda: len(get_metric_dictionaries("serve_deployment_request_counter_total"))
== 2,
timeout=40,
)

num_requests = get_metric_dictionaries("serve_deployment_request_counter")
assert len(num_requests) == 2
expected_output = {"route": "/f", "deployment": "f", "application": "app1"}
verify_metrics(num_requests[0], expected_output)
metrics = get_metric_dictionaries("serve_deployment_request_counter_total")
assert len(metrics) == 2
expected_output = {
("/f", "f", "app1"),
("/g", "g", "app2"),
}
assert {
(
metric["route"],
metric["deployment"],
metric["application"],
)
for metric in metrics
} == expected_output

start_metrics = get_metric_dictionaries("serve_deployment_replica_starts")
start_metrics = get_metric_dictionaries("serve_deployment_replica_starts_total")
assert len(start_metrics) == 2
expected_output = {"deployment": "f", "application": "app1"}
verify_metrics(start_metrics[0], expected_output)
expected_output = {"deployment": "g", "application": "app2"}
verify_metrics(start_metrics[1], expected_output)
expected_output = {("f", "app1"), ("g", "app2")}
assert {
(start_metric["deployment"], start_metric["application"])
for start_metric in start_metrics
} == expected_output

# Latency metrics
wait_for_condition(
Expand All @@ -638,19 +646,21 @@ def verify_metrics(metric, expected_output):
latency_metrics = get_metric_dictionaries(metric_name)
print(f"checking metric {metric_name}, {latency_metrics}")
assert len(latency_metrics) == 2
expected_output1 = {"deployment": "f", "application": "app1"}
expected_output2 = {"deployment": "g", "application": "app2"}
verify_metrics(latency_metrics[0], expected_output1)
verify_metrics(latency_metrics[1], expected_output2)
expected_output = {("f", "app1"), ("g", "app2")}
assert {
(latency_metric["deployment"], latency_metric["application"])
for latency_metric in latency_metrics
} == expected_output

wait_for_condition(
lambda: len(get_metric_dictionaries("serve_replica_processing_queries")) == 2
)
processing_queries = get_metric_dictionaries("serve_replica_processing_queries")
expected_output1 = {"deployment": "f", "application": "app1"}
expected_output2 = {"deployment": "g", "application": "app2"}
verify_metrics(processing_queries[0], expected_output1)
verify_metrics(processing_queries[1], expected_output2)
expected_output = {("f", "app1"), ("g", "app2")}
assert {
(processing_query["deployment"], processing_query["application"])
for processing_query in processing_queries
} == expected_output

@serve.deployment
def h():
Expand All @@ -659,23 +669,30 @@ def h():
serve.run(h.bind(), name="app3", route_prefix="/h")
assert 500 == requests.get("http://127.0.0.1:8000/h").status_code
wait_for_condition(
lambda: len(get_metric_dictionaries("serve_deployment_error_counter")) == 1,
lambda: len(get_metric_dictionaries("serve_deployment_error_counter_total"))
== 1,
timeout=40,
)
err_requests = get_metric_dictionaries("serve_deployment_error_counter")
err_requests = get_metric_dictionaries("serve_deployment_error_counter_total")
assert len(err_requests) == 1
expected_output = {"route": "/h", "deployment": "h", "application": "app3"}
verify_metrics(err_requests[0], expected_output)
expected_output = ("/h", "h", "app3")
assert (
err_requests[0]["route"],
err_requests[0]["deployment"],
err_requests[0]["application"],
) == expected_output

health_metrics = get_metric_dictionaries("serve_deployment_replica_healthy")
assert len(health_metrics) == 3, health_metrics
expected_outputs = [
{"deployment": "f", "application": "app1"},
{"deployment": "g", "application": "app2"},
{"deployment": "h", "application": "app3"},
]
for i in range(len(health_metrics)):
verify_metrics(health_metrics[i], expected_outputs[i])
expected_output = {
("f", "app1"),
("g", "app2"),
("h", "app3"),
}
assert {
(health_metric["deployment"], health_metric["application"])
for health_metric in health_metrics
} == expected_output


class TestRequestContextMetrics:
Expand Down
7 changes: 7 additions & 0 deletions python/ray/tests/test_metrics_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,13 @@ def __init__(self):
ray.get(a.__ray_ready__.remote())
ray.get(b.__ray_ready__.remote())

# Run a short lived task to make sure there's a ray::IDLE component.
@ray.remote
def do_nothing():
pass

ray.get(do_nothing.remote())

def verify_components():
metrics = raw_metrics(addr)
metric_names = set(metrics.keys())
Expand Down
39 changes: 39 additions & 0 deletions python/ray/tests/test_node_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,45 @@ def test_jobs_prestart_worker_once(call_ray_start, shutdown_only):
time.sleep(1)


def test_can_use_prestart_idle_workers(ray_start_cluster):
"""Test that actors and GPU tasks can use prestarted workers."""
cluster = ray_start_cluster
NUM_CPUS = 4
NUM_GPUS = 4
cluster.add_node(num_cpus=NUM_CPUS, num_gpus=NUM_GPUS)
ray.init(address=cluster.address)

wait_for_condition(
lambda: len(list_workers(filters=[("worker_type", "=", "WORKER")])) == NUM_CPUS
)

# These workers don't have job_id or is_actor_worker.
workers = list_workers(filters=[("worker_type", "=", "WORKER")], detail=True)
worker_pids = {worker.pid for worker in workers}
assert len(worker_pids) == NUM_CPUS

@ray.remote
class A:
def getpid(self):
return os.getpid()

@ray.remote
def f():
return os.getpid()

used_worker_pids = set()
cpu_actor = A.options(num_cpus=1).remote()
used_worker_pids.add(ray.get(cpu_actor.getpid.remote()))

gpu_actor = A.options(num_gpus=1).remote()
used_worker_pids.add(ray.get(gpu_actor.getpid.remote()))

used_worker_pids.add(ray.get(f.options(num_cpus=1).remote()))
used_worker_pids.add(ray.get(f.options(num_gpus=1).remote()))

assert used_worker_pids == worker_pids


if __name__ == "__main__":
import sys

Expand Down
6 changes: 1 addition & 5 deletions python/ray/tests/test_state_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -1697,11 +1697,7 @@ def ready(self):
"""
result = await client.get_all_worker_info(limit=2)
assert len(result.worker_table_data) == 2
# Driver + 3 workers for actors + 2 prestarted task-only workers
# TODO(clarng): prestart worker on worker lease request doesn't
# work, otherwise it should have created the 2 prestarted task-only
# workers prior to https://github.com/ray-project/ray/pull/33623
assert result.total == 6
assert result.total == 4


def test_humanify():
Expand Down
2 changes: 2 additions & 0 deletions src/mock/ray/raylet/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class MockWorkerInterface : public WorkerInterface {
(),
(const, override));
MOCK_METHOD(const JobID &, GetAssignedJobId, (), (const, override));
MOCK_METHOD(std::optional<bool>, GetIsGpu, (), (const, override));
MOCK_METHOD(std::optional<bool>, GetIsActorWorker, (), (const, override));
MOCK_METHOD(int, GetRuntimeEnvHash, (), (const, override));
MOCK_METHOD(void, AssignActorId, (const ActorID &actor_id), (override));
MOCK_METHOD(const ActorID &, GetActorId, (), (const, override));
Expand Down
60 changes: 16 additions & 44 deletions src/ray/common/task/task_spec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ void TaskSpecification::ComputeResources() {
// Map the scheduling class descriptor to an integer for performance.
sched_cls_id_ = GetSchedulingClass(sched_cls_desc);
}

runtime_env_hash_ = CalculateRuntimeEnvHash(SerializedRuntimeEnv());
}

// Task specification getter methods.
Expand Down Expand Up @@ -201,13 +203,7 @@ bool TaskSpecification::IsRetry() const { return AttemptNumber() > 0; }

int32_t TaskSpecification::MaxRetries() const { return message_->max_retries(); }

int TaskSpecification::GetRuntimeEnvHash() const {
WorkerCacheKey env = {SerializedRuntimeEnv(),
IsActorCreationTask(),
GetRequiredResources().Get(scheduling::ResourceID::GPU()) > 0,
!(RootDetachedActorId().IsNil())};
return env.IntHash();
}
int TaskSpecification::GetRuntimeEnvHash() const { return runtime_env_hash_; }

const SchedulingClass TaskSpecification::GetSchedulingClass() const {
if (!IsActorTask()) {
Expand Down Expand Up @@ -405,6 +401,14 @@ int64_t TaskSpecification::MaxActorRestarts() const {
return message_->actor_creation_task_spec().max_actor_restarts();
}

std::vector<std::string> TaskSpecification::DynamicWorkerOptionsOrEmpty() const {
if (!IsActorCreationTask()) {
return {};
}
return VectorFromProtobuf(
message_->actor_creation_task_spec().dynamic_worker_options());
}

std::vector<std::string> TaskSpecification::DynamicWorkerOptions() const {
RAY_CHECK(IsActorCreationTask());
return VectorFromProtobuf(
Expand Down Expand Up @@ -592,48 +596,16 @@ std::string TaskSpecification::CallSiteString() const {
return stream.str();
}

WorkerCacheKey::WorkerCacheKey(std::string serialized_runtime_env,
bool is_actor,
bool is_gpu,
bool is_root_detached_actor)
: serialized_runtime_env(std::move(serialized_runtime_env)),
is_actor(is_actor && RayConfig::instance().isolate_workers_across_task_types()),
is_gpu(is_gpu && RayConfig::instance().isolate_workers_across_resource_types()),
is_root_detached_actor(is_root_detached_actor),
hash_(CalculateHash()) {}

std::size_t WorkerCacheKey::CalculateHash() const {
size_t hash = 0;
if (EnvIsEmpty()) {
int CalculateRuntimeEnvHash(const std::string &serialized_runtime_env) {
if (IsRuntimeEnvEmpty(serialized_runtime_env)) {
// It's useful to have the same predetermined value for both unspecified and empty
// runtime envs.
if (is_actor) {
hash = 1;
} else {
hash = 0;
}
} else {
boost::hash_combine(hash, serialized_runtime_env);
boost::hash_combine(hash, is_actor);
boost::hash_combine(hash, is_gpu);
boost::hash_combine(hash, is_root_detached_actor);
return 0;
}
return hash;
}

bool WorkerCacheKey::operator==(const WorkerCacheKey &k) const {
// FIXME we should compare fields
return Hash() == k.Hash();
size_t hash = std::hash<std::string>()(serialized_runtime_env);
return static_cast<int>(hash);
}

bool WorkerCacheKey::EnvIsEmpty() const {
return IsRuntimeEnvEmpty(serialized_runtime_env) && !is_gpu && !is_root_detached_actor;
}

std::size_t WorkerCacheKey::Hash() const { return hash_; }

int WorkerCacheKey::IntHash() const { return static_cast<int>(Hash()); }

std::vector<ConcurrencyGroup> TaskSpecification::ConcurrencyGroups() const {
RAY_CHECK(IsActorCreationTask());
std::vector<ConcurrencyGroup> concurrency_groups;
Expand Down
68 changes: 10 additions & 58 deletions src/ray/common/task/task_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <cstddef>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>

#include "absl/synchronization/mutex.h"
Expand Down Expand Up @@ -266,7 +267,7 @@ class TaskSpecification : public MessageWrapper<rpc::TaskSpec> {
///
/// \param message The protobuf message.
explicit TaskSpecification(std::shared_ptr<rpc::TaskSpec> message)
: MessageWrapper(message) {
: MessageWrapper(std::move(message)) {
ComputeResources();
}

Expand Down Expand Up @@ -432,6 +433,8 @@ class TaskSpecification : public MessageWrapper<rpc::TaskSpec> {

std::vector<std::string> DynamicWorkerOptions() const;

std::vector<std::string> DynamicWorkerOptionsOrEmpty() const;

// Methods specific to actor tasks.

ActorID ActorId() const;
Expand Down Expand Up @@ -503,6 +506,7 @@ class TaskSpecification : public MessageWrapper<rpc::TaskSpec> {
std::shared_ptr<ResourceSet> required_placement_resources_;
/// Cached scheduling class of this task.
SchedulingClass sched_cls_id_ = 0;
int runtime_env_hash_ = 0;

/// Below static fields could be mutated in `ComputeResources` concurrently due to
/// multi-threading, we need a mutex to protect it.
Expand All @@ -515,62 +519,10 @@ class TaskSpecification : public MessageWrapper<rpc::TaskSpec> {
static int next_sched_id_ ABSL_GUARDED_BY(mutex_);
};

/// \class WorkerCacheKey
///
/// Class used to cache workers, keyed by runtime_env.
class WorkerCacheKey {
public:
/// Create a cache key with the given environment variable overrides and serialized
/// runtime_env.
///
/// worker. \param serialized_runtime_env The JSON-serialized runtime env for this
/// worker. \param is_actor Whether the worker will be an actor. This is set when
/// task type isolation between workers is enabled.
/// worker. \param is_gpu Whether the worker will be using GPUs. This is set when
/// resource type isolation between workers is enabled.
/// worker. \param is_root_detached_actor Whether the worker will be running
/// tasks or actors whose root ancestor is a detached actor. This is set
/// to prevent worker reuse between tasks whose root is the driver process
/// and tasks whose root is a detached actor.
WorkerCacheKey(std::string serialized_runtime_env,
bool is_actor,
bool is_gpu,
bool is_root_detached_actor);

bool operator==(const WorkerCacheKey &k) const;

/// Check if this worker's environment is empty (the default).
///
/// \return true if there are no environment variables set and the runtime env is the
/// empty string (protobuf default) or a JSON-serialized empty dict.
bool EnvIsEmpty() const;

/// Get the hash for this worker's environment.
///
/// \return The hash of the serialized runtime_env.
std::size_t Hash() const;

/// Get the int-valued hash for this worker's environment, useful for portability in
/// flatbuffers.
///
/// \return The hash truncated to an int.
int IntHash() const;

private:
std::size_t CalculateHash() const;

/// The JSON-serialized runtime env for this worker.
const std::string serialized_runtime_env;
/// Whether the worker is for an actor.
const bool is_actor;
/// Whether the worker is to use a GPU.
const bool is_gpu;
/// Whether the worker is to run tasks or actors
/// whose root is a detached actor.
const bool is_root_detached_actor;
/// The hash of the worker's environment. This is set to 0
/// for unspecified or empty environments.
const std::size_t hash_ = 0;
};
// Get a Hash for the runtime environment string.
// "" and "{}" have the same hash.
// Other than that, only compare literal strings. i.e. '{"a": 1, "b": 2}' and '{"b": 2,
// "a": 1}' have different hashes.
int CalculateRuntimeEnvHash(const std::string &serialized_runtime_env);

} // namespace ray
Loading

0 comments on commit 8928dba

Please sign in to comment.