Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hjiang/debug env perofrmance #48735

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,7 @@ ray_cc_library(
"@boost//:asio",
"@boost//:beast",
"@boost//:system",
"@boost//:stacktrace",
"@com_github_jupp0r_prometheus_cpp//pull",
"@com_google_absl//absl/base:core_headers",
"@com_google_absl//absl/container:flat_hash_set",
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/ray/runtime/task/native_task_submitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ ObjectID NativeTaskSubmitter::Submit(InvocationSpec &invocation,
TaskOptions options{};
options.name = call_options.name;
options.resources = call_options.resources;
options.serialized_runtime_env_info = call_options.serialized_runtime_env_info;
// options.serialized_runtime_env_info = call_options.serialized_runtime_env_info;
options.generator_backpressure_num_objects = -1;
std::vector<rpc::ObjectReference> return_refs;
if (invocation.task_type == TaskType::ACTOR_TASK) {
Expand Down
43 changes: 27 additions & 16 deletions release/benchmarks/distributed/test_many_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,22 @@
import tqdm

from ray.util.state import summarize_tasks
from many_nodes_tests.dashboard_test import DashboardTestAtScale

# from many_nodes_tests.dashboard_test import DashboardTestAtScale
from ray._private.state_api_test_utils import (
StateAPICallSpec,
periodic_invoke_state_apis_with_actor,
summarize_worker_startup_time,
)

sleep_time = 300
sleep_time = 60


def test_max_running_tasks(num_tasks):
cpus_per_task = 0.25

@ray.remote(num_cpus=cpus_per_task)
@ray.remote(num_cpus=cpus_per_task, runtime_env={"env_vars": {"FOO": "bar"}})
# @ray.remote(num_cpus=cpus_per_task)
def task():
time.sleep(sleep_time)

Expand Down Expand Up @@ -67,11 +69,10 @@ def no_resource_leaks():
@click.command()
@click.option("--num-tasks", required=True, type=int, help="Number of tasks to launch.")
def test(num_tasks):
addr = ray.init(address="auto")
ray.init()

test_utils.wait_for_condition(no_resource_leaks)
monitor_actor = test_utils.monitor_memory_usage()
dashboard_test = DashboardTestAtScale(addr)

def not_none(res):
return res is not None
Expand All @@ -83,23 +84,22 @@ def not_none(res):
)

start_time = time.time()
used_cpus = test_max_running_tasks(num_tasks)
used_cpus = 0
try:
used_cpus = test_max_running_tasks(num_tasks)
except Exception as e:
print(str(e))
end_time = time.time()
ray.get(monitor_actor.stop_run.remote())
used_gb, usage = ray.get(monitor_actor.get_peak_memory_info.remote())
print(f"Peak memory usage: {round(used_gb, 2)}GB")
print(f"Peak memory usage per processes:\n {usage}")
ray.get(api_caller.stop.remote())

del api_caller
del monitor_actor
test_utils.wait_for_condition(no_resource_leaks)

try:
summarize_worker_startup_time()
except Exception as e:
print("Failed to summarize worker startup time.")
print(e)
print(
f"end time = {end_time}, start time = {start_time},"
"sleep time = {sleep_time}, num tasks = {num_tasks}"
)

rate = num_tasks / (end_time - start_time - sleep_time)
print(
Expand Down Expand Up @@ -128,10 +128,21 @@ def not_none(res):
},
],
}
print("=======")
print(results)

dashboard_test.update_release_test_result(results)
test_utils.safe_write_to_results_json(results)

del api_caller
del monitor_actor
test_utils.wait_for_condition(no_resource_leaks)

try:
summarize_worker_startup_time()
except Exception as e:
print("Failed to summarize worker startup time.")
print(e)


if __name__ == "__main__":
test()
2 changes: 2 additions & 0 deletions src/ray/common/task/task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ RayTask::RayTask(TaskSpecification task_spec, std::string preferred_node_id)

const TaskSpecification &RayTask::GetTaskSpecification() const { return task_spec_; }

TaskSpecification &RayTask::MutableTaskSpec() { return task_spec_; }

const std::vector<rpc::ObjectReference> &RayTask::GetDependencies() const {
return dependencies_;
}
Expand Down
2 changes: 2 additions & 0 deletions src/ray/common/task/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ class RayTask {
/// \return The immutable specification for the task.
const TaskSpecification &GetTaskSpecification() const;

TaskSpecification &MutableTaskSpec();

/// Get the task's object dependencies. This comprises the immutable task
/// arguments and the mutable execution dependencies.
///
Expand Down
10 changes: 7 additions & 3 deletions src/ray/common/task/task_spec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -181,15 +181,19 @@ ray::FunctionDescriptor TaskSpecification::FunctionDescriptor() const {
return ray::FunctionDescriptorBuilder::FromProto(message_->function_descriptor());
}

rpc::RuntimeEnvInfo TaskSpecification::RuntimeEnvInfo() const {
const rpc::RuntimeEnvInfo &TaskSpecification::RuntimeEnvInfo() const {
return message_->runtime_env_info();
}

std::string TaskSpecification::SerializedRuntimeEnv() const {
const std::string &TaskSpecification::SerializedRuntimeEnv() const {
return message_->runtime_env_info().serialized_runtime_env();
}

rpc::RuntimeEnvConfig TaskSpecification::RuntimeEnvConfig() const {
void TaskSpecification::ClearRuntimeEnv() {
message_->mutable_runtime_env_info()->clear_serialized_runtime_env();
}

const rpc::RuntimeEnvConfig &TaskSpecification::RuntimeEnvConfig() const {
return message_->runtime_env_info().runtime_env_config();
}

Expand Down
8 changes: 5 additions & 3 deletions src/ray/common/task/task_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -306,11 +306,13 @@ class TaskSpecification : public MessageWrapper<rpc::TaskSpec> {

ray::FunctionDescriptor FunctionDescriptor() const;

[[nodiscard]] rpc::RuntimeEnvInfo RuntimeEnvInfo() const;
[[nodiscard]] const rpc::RuntimeEnvInfo &RuntimeEnvInfo() const;

std::string SerializedRuntimeEnv() const;
const std::string &SerializedRuntimeEnv() const;

rpc::RuntimeEnvConfig RuntimeEnvConfig() const;
void ClearRuntimeEnv();

const rpc::RuntimeEnvConfig &RuntimeEnvConfig() const;

bool HasRuntimeEnv() const;

Expand Down
4 changes: 3 additions & 1 deletion src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2173,6 +2173,8 @@ std::vector<rpc::ObjectReference> CoreWorker::SubmitTask(
const std::string &debugger_breakpoint,
const std::string &serialized_retry_exception_allowlist,
const TaskID current_task_id) {
RAY_LOG(INFO) << "CoreWorker::SubmitTask " << task_options.serialized_runtime_env_info;

RAY_CHECK(scheduling_strategy.scheduling_strategy_case() !=
rpc::SchedulingStrategy::SchedulingStrategyCase::SCHEDULING_STRATEGY_NOT_SET);

Expand Down Expand Up @@ -2206,7 +2208,7 @@ std::vector<rpc::ObjectReference> CoreWorker::SubmitTask(
constrained_resources,
debugger_breakpoint,
depth,
task_options.serialized_runtime_env_info,
"", // task_options.serialized_runtime_env_info,
worker_context_.GetMainThreadOrActorCreationTaskID(),
/*concurrency_group_name*/ "",
/*include_job_config*/ true,
Expand Down
17 changes: 14 additions & 3 deletions src/ray/core_worker/transport/normal_task_submitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

#include "ray/core_worker/transport/normal_task_submitter.h"

#include <chrono>

#include "ray/core_worker/transport/dependency_resolver.h"
#include "ray/gcs/pb_util.h"

Expand All @@ -39,10 +41,14 @@ Status NormalTaskSubmitter::SubmitTask(TaskSpecification task_spec) {
RAY_LOG(DEBUG) << "Task dependencies resolved " << task_spec.TaskId();

bool keep_executing = true;

// auto start = std::chrono::steady_clock::now();

{
absl::MutexLock lock(&mu_);
if (cancelled_tasks_.find(task_spec.TaskId()) != cancelled_tasks_.end()) {
cancelled_tasks_.erase(task_spec.TaskId());
auto task_iter = cancelled_tasks_.find(task_spec.TaskId());
if (task_iter != cancelled_tasks_.end()) {
cancelled_tasks_.erase(task_iter);
keep_executing = false;
}
if (keep_executing) {
Expand Down Expand Up @@ -82,6 +88,11 @@ Status NormalTaskSubmitter::SubmitTask(TaskSpecification task_spec) {
RequestNewWorkerIfNeeded(scheduling_key);
}
}

// auto end = std::chrono::steady_clock::now();
// auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(end -
// start).count(); RAY_LOG(INFO) << "elapse for resolution " << elapsed;

if (!keep_executing) {
RAY_UNUSED(task_finisher_->FailOrRetryPendingTask(
task_spec.TaskId(), rpc::ErrorType::TASK_CANCELLED, nullptr));
Expand Down Expand Up @@ -333,7 +344,7 @@ void NormalTaskSubmitter::RequestNewWorkerIfNeeded(const SchedulingKey &scheduli
// same TaskID to request a worker
auto resource_spec_msg = scheduling_key_entry.resource_spec.GetMutableMessage();
resource_spec_msg.set_task_id(TaskID::FromRandom(job_id_).Binary());
const TaskSpecification resource_spec = TaskSpecification(resource_spec_msg);
const TaskSpecification resource_spec = TaskSpecification(std::move(resource_spec_msg));
rpc::Address best_node_address;
const bool is_spillback = (raylet_address != nullptr);
bool is_selected_based_on_locality = false;
Expand Down
21 changes: 16 additions & 5 deletions src/ray/raylet/local_task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,18 @@ void LocalTaskManager::QueueAndScheduleTask(std::shared_ptr<internal::Work> work
// guarantee that the local node is not selected for scheduling.
ASSERT_FALSE(
cluster_resource_scheduler_->GetLocalResourceManager().IsLocalNodeDraining());
WaitForTaskArgsRequests(work);
WaitForTaskArgsRequests(std::move(work));
ScheduleAndDispatchTasks();
}

bool LocalTaskManager::WaitForTaskArgsRequests(std::shared_ptr<internal::Work> work) {
{
const auto &spec = work->task.GetTaskSpecification();
RAY_LOG(INFO) << "at WaitForTaskArgsRequests, serialized runtime env empty ? "
<< spec.SerializedRuntimeEnv().empty();
work->task.MutableTaskSpec().ClearRuntimeEnv();
}

const auto &task = work->task;
const auto &task_id = task.GetTaskSpecification().TaskId();
const auto &scheduling_key = task.GetTaskSpecification().GetSchedulingClass();
Expand All @@ -93,18 +100,18 @@ bool LocalTaskManager::WaitForTaskArgsRequests(std::shared_ptr<internal::Work> w
{task.GetTaskSpecification().GetName(), task.GetTaskSpecification().IsRetry()});
if (args_ready) {
RAY_LOG(DEBUG) << "Args already ready, task can be dispatched " << task_id;
tasks_to_dispatch_[scheduling_key].push_back(work);
tasks_to_dispatch_[scheduling_key].emplace_back(std::move(work));
} else {
RAY_LOG(DEBUG) << "Waiting for args for task: "
<< task.GetTaskSpecification().TaskId();
can_dispatch = false;
auto it = waiting_task_queue_.insert(waiting_task_queue_.end(), work);
auto it = waiting_task_queue_.insert(waiting_task_queue_.end(), std::move(work));
RAY_CHECK(waiting_tasks_index_.emplace(task_id, it).second);
}
} else {
RAY_LOG(DEBUG) << "No args, task can be dispatched "
<< task.GetTaskSpecification().TaskId();
tasks_to_dispatch_[scheduling_key].push_back(work);
tasks_to_dispatch_[scheduling_key].emplace_back(std::move(work));
}
return can_dispatch;
}
Expand Down Expand Up @@ -249,7 +256,11 @@ void LocalTaskManager::DispatchScheduledTasksToWorkers() {
for (auto work_it = dispatch_queue.begin(); work_it != dispatch_queue.end();) {
auto &work = *work_it;
const auto &task = work->task;
const auto spec = task.GetTaskSpecification();
const auto &spec = task.GetTaskSpecification();

RAY_LOG(INFO) << "first task spec empty ? "
<< spec.RuntimeEnvInfo().serialized_runtime_env().empty();

TaskID task_id = spec.TaskId();
if (work->GetState() == internal::WorkStatus::WAITING_FOR_WORKER) {
work_it++;
Expand Down
6 changes: 6 additions & 0 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1797,6 +1797,12 @@ void NodeManager::HandleReportWorkerBacklog(rpc::ReportWorkerBacklogRequest requ
void NodeManager::HandleRequestWorkerLease(rpc::RequestWorkerLeaseRequest request,
rpc::RequestWorkerLeaseReply *reply,
rpc::SendReplyCallback send_reply_callback) {
////// debug???? problematic???
request.mutable_resource_spec()
->mutable_runtime_env_info()
->mutable_serialized_runtime_env()
->clear();

rpc::Task task_message;
task_message.mutable_task_spec()->CopyFrom(request.resource_spec());
RayTask task(task_message);
Expand Down
41 changes: 41 additions & 0 deletions src/ray/raylet/resolution_cache.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#pragma once

#include <mutex>

#include "absl/container/flat_hash_map.h"

namespace ray {

template <typename Key, typename Value>
class ResolutionCache {
public:
std::optional<Value> Get(const Key &key) {
std::lock_guard lck(mtx_);
auto iter = map_.find(key);
if (iter == map_.end()) {
return std::nullopt;
}
return iter->second;
}

std::optional<Value> GetAndPop(const Key &key) {
std::lock_guard lck(mtx_);
auto iter = map_.find(key);
if (iter == map_.end()) {
return std::nullopt;
}
auto value = std::move(iter->second);
map_.erase(iter);
return value;
}

void Put(Key key, Value val) {
std::lock_guard lck(mtx_);
map_.emplace(std::move(key), std::move(val));
}

std::mutex mtx_;
absl::flat_hash_map<Key, Value> map_;
};

} // namespace ray
Loading