From 58525bbbde83bc5f5a1cff68635512b1b33dc49a Mon Sep 17 00:00:00 2001 From: dentiny Date: Tue, 12 Nov 2024 23:02:13 +0000 Subject: [PATCH 01/10] debug runtime env performance --- .../benchmarks/distributed/test_many_tasks.py | 36 +++++---- src/ray/raylet/resolution_cache.h | 40 ++++++++++ src/ray/raylet/runtime_env_agent_client.cc | 74 +++++++++++++++---- 3 files changed, 119 insertions(+), 31 deletions(-) create mode 100644 src/ray/raylet/resolution_cache.h diff --git a/release/benchmarks/distributed/test_many_tasks.py b/release/benchmarks/distributed/test_many_tasks.py index 6f524cf4e3ea..6a6c06e44e6a 100644 --- a/release/benchmarks/distributed/test_many_tasks.py +++ b/release/benchmarks/distributed/test_many_tasks.py @@ -12,13 +12,13 @@ summarize_worker_startup_time, ) -sleep_time = 300 +sleep_time = 120 def test_max_running_tasks(num_tasks): cpus_per_task = 0.25 - @ray.remote(num_cpus=cpus_per_task) + @ray.remote(num_cpus=cpus_per_task, runtime_env={"env_vars": {"FOO": "bar"}}) def task(): time.sleep(sleep_time) @@ -67,11 +67,10 @@ def no_resource_leaks(): @click.command() @click.option("--num-tasks", required=True, type=int, help="Number of tasks to launch.") def test(num_tasks): - addr = ray.init(address="auto") + addr = ray.init() test_utils.wait_for_condition(no_resource_leaks) monitor_actor = test_utils.monitor_memory_usage() - dashboard_test = DashboardTestAtScale(addr) def not_none(res): return res is not None @@ -83,7 +82,11 @@ def not_none(res): ) start_time = time.time() - used_cpus = test_max_running_tasks(num_tasks) + used_cpus = 0 + try: + used_cpus = test_max_running_tasks(num_tasks) + except Exception as e: + print(str(e)) end_time = time.time() ray.get(monitor_actor.stop_run.remote()) used_gb, usage = ray.get(monitor_actor.get_peak_memory_info.remote()) @@ -91,15 +94,7 @@ def not_none(res): print(f"Peak memory usage per processes:\n {usage}") ray.get(api_caller.stop.remote()) - del api_caller - del monitor_actor - test_utils.wait_for_condition(no_resource_leaks) - - try: - summarize_worker_startup_time() - except Exception as e: - print("Failed to summarize worker startup time.") - print(e) + print(f"end time = {end_time}, start time = {start_time}, sleep time = {sleep_time}, num tasks = {num_tasks}") rate = num_tasks / (end_time - start_time - sleep_time) print( @@ -128,10 +123,21 @@ def not_none(res): }, ], } + print("=======") + print(results) - dashboard_test.update_release_test_result(results) test_utils.safe_write_to_results_json(results) + del api_caller + del monitor_actor + test_utils.wait_for_condition(no_resource_leaks) + + try: + summarize_worker_startup_time() + except Exception as e: + print("Failed to summarize worker startup time.") + print(e) + if __name__ == "__main__": test() diff --git a/src/ray/raylet/resolution_cache.h b/src/ray/raylet/resolution_cache.h new file mode 100644 index 000000000000..f7c82c4fd076 --- /dev/null +++ b/src/ray/raylet/resolution_cache.h @@ -0,0 +1,40 @@ +#pragma once + +#include "absl/container/flat_hash_map.h" +#include + +namespace ray { + +template +class ResolutionCache { + public: + std::optional Get(const Key& key) { + std::lock_guard lck(mtx_); + auto iter = map_.find(key); + if (iter == map_.end()) { + return std::nullopt; + } + return iter->second; + } + + std::optional GetAndPop(const Key& key) { + std::lock_guard lck(mtx_); + auto iter = map_.find(key); + if (iter == map_.end()) { + return std::nullopt; + } + auto value = std::move(iter->second); + map_.erase(iter); + return value; + } + + void Put(Key key, Value val) { + std::lock_guard lck(mtx_); + map_.emplace(std::move(key), std::move(val)); + } + + std::mutex mtx_; + absl::flat_hash_map map_; +}; + +} // namespace ray diff --git a/src/ray/raylet/runtime_env_agent_client.cc b/src/ray/raylet/runtime_env_agent_client.cc index 82a2d5a4c6a6..9d06ad09801c 100644 --- a/src/ray/raylet/runtime_env_agent_client.cc +++ b/src/ray/raylet/runtime_env_agent_client.cc @@ -23,7 +23,10 @@ #include #include +#include + #include "absl/container/flat_hash_set.h" +#include "ray/raylet/resolution_cache.h" #include "absl/strings/str_format.h" #include "ray/common/asio/instrumented_io_context.h" #include "ray/common/status.h" @@ -40,6 +43,22 @@ namespace raylet { namespace { +struct ResolutionKey { + ResolutionKey(std::string_view h, std::string_view p) : host(h), port(p) {} + + std::string host; + std::string port; + + bool operator==(const ResolutionKey& rhs) const { + return host == rhs.host && port == rhs.port; + } + + template + friend H AbslHashValue(H h, const ResolutionKey& k) { + return H::combine(std::move(h), k.host, k.port); + } +}; + //------------------------------------------------------------------------------ // Simple class to make a async POST call. // Will call callback exactly once with pair{non-ok, any} or pair{ok, reply body}. @@ -91,12 +110,25 @@ class Session : public std::enable_shared_from_this { // // This method should only be called once. void run(FinishedCallback finished_callback) { - finished_callback_ = finished_callback; - // Starts the state machine by looking up the domain name. - resolver_.async_resolve( - host_, - port_, - beast::bind_front_handler(&Session::on_resolve, shared_from_this())); + start_ = std::chrono::steady_clock::now(); + finished_callback_ = std::move(finished_callback); + + ResolutionKey key{host_, port_}; + auto opt_tcp_stream = tcp_stream_cache_.GetAndPop(key); + if (opt_tcp_stream.has_value()) { + stream_ = std::move(opt_tcp_stream.value()); + stream_->expires_never(); + + // Send the HTTP request to the remote host + http::async_write( + *stream_, req_, beast::bind_front_handler(&Session::on_write, shared_from_this())); + } else { + // Starts the state machine by looking up the domain name. + resolver_.async_resolve( + host_, + port_, + beast::bind_front_handler(&Session::on_resolve, shared_from_this())); + } } private: @@ -109,13 +141,13 @@ class Session : public std::enable_shared_from_this { std::function succ_callback, std::function fail_callback) : resolver_(ioc), - stream_(ioc), + stream_(std::make_shared(ioc)), host_(std::string(host)), port_(std::string(port)), method_(method), succ_callback_(std::move(succ_callback)), fail_callback_(std::move(fail_callback)) { - stream_.expires_never(); + stream_->expires_never(); req_.method(method_); req_.target(target); req_.body() = std::move(body); @@ -143,9 +175,9 @@ class Session : public std::enable_shared_from_this { return; } - stream_.expires_never(); + stream_->expires_never(); // Make the connection on the IP address we get from a lookup - stream_.async_connect( + stream_->async_connect( results, beast::bind_front_handler(&Session::on_connect, shared_from_this())); } @@ -155,10 +187,10 @@ class Session : public std::enable_shared_from_this { return; } - stream_.expires_never(); + stream_->expires_never(); // Send the HTTP request to the remote host http::async_write( - stream_, req_, beast::bind_front_handler(&Session::on_write, shared_from_this())); + *stream_, req_, beast::bind_front_handler(&Session::on_write, shared_from_this())); } void on_write(beast::error_code ec, std::size_t bytes_transferred) { @@ -168,9 +200,9 @@ class Session : public std::enable_shared_from_this { std::to_string(bytes_transferred))); return; } - stream_.expires_never(); + stream_->expires_never(); // Receive the HTTP response - http::async_read(stream_, + http::async_read(*stream_, buffer_, res_, beast::bind_front_handler(&Session::on_read, shared_from_this())); @@ -194,15 +226,22 @@ class Session : public std::enable_shared_from_this { } // Gracefully close the socket - stream_.socket().shutdown(tcp::socket::shutdown_both, ec); + // stream_->socket().shutdown(tcp::socket::shutdown_both, ec); + + tcp_stream_cache_.Put(ResolutionKey{std::move(host_), std::move(port_)}, std::move(stream_)); + // not_connected happens sometimes so don't bother reporting it. if (ec && ec != beast::errc::not_connected) { RAY_LOG(INFO) << "on_read error after response body received: " << ec.message(); } + + auto end = std::chrono::steady_clock::now(); + auto elapsed = std::chrono::duration_cast(end - start_).count(); + RAY_LOG(INFO) << "Session takes " << elapsed; } tcp::resolver resolver_; - beast::tcp_stream stream_; + std::shared_ptr stream_; std::string host_; std::string port_; http::verb method_; @@ -212,6 +251,9 @@ class Session : public std::enable_shared_from_this { http::request req_; http::response res_; FinishedCallback finished_callback_; + + inline static ResolutionCache> tcp_stream_cache_; + std::chrono::time_point start_; }; // A pool of sessions with a fixed max concurrency. Each session can handle 1 concurrent From 97585f6f0c8b8a8d3fbccb9fcb6385c6b85cd2fc Mon Sep 17 00:00:00 2001 From: dentiny Date: Wed, 13 Nov 2024 08:52:59 +0000 Subject: [PATCH 02/10] more debug info --- BUILD.bazel | 1 + .../benchmarks/distributed/test_many_tasks.py | 3 +- src/ray/raylet/local_task_manager.cc | 4 ++ src/ray/raylet/runtime_env_agent_client.cc | 44 ++++++++++++++--- src/ray/raylet/worker_pool.cc | 48 ++++++++++++++++++- src/ray/raylet/worker_pool.h | 4 +- 6 files changed, 93 insertions(+), 11 deletions(-) diff --git a/BUILD.bazel b/BUILD.bazel index bcace50105a6..745e19295f94 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -716,6 +716,7 @@ ray_cc_library( "@boost//:asio", "@boost//:beast", "@boost//:system", + "@boost//:stacktrace", "@com_github_jupp0r_prometheus_cpp//pull", "@com_google_absl//absl/base:core_headers", "@com_google_absl//absl/container:flat_hash_set", diff --git a/release/benchmarks/distributed/test_many_tasks.py b/release/benchmarks/distributed/test_many_tasks.py index 6a6c06e44e6a..fddfc493bc22 100644 --- a/release/benchmarks/distributed/test_many_tasks.py +++ b/release/benchmarks/distributed/test_many_tasks.py @@ -12,13 +12,14 @@ summarize_worker_startup_time, ) -sleep_time = 120 +sleep_time = 60 def test_max_running_tasks(num_tasks): cpus_per_task = 0.25 @ray.remote(num_cpus=cpus_per_task, runtime_env={"env_vars": {"FOO": "bar"}}) + # @ray.remote(num_cpus=cpus_per_task) def task(): time.sleep(sleep_time) diff --git a/src/ray/raylet/local_task_manager.cc b/src/ray/raylet/local_task_manager.cc index ed1560fc65b9..59e94b9eb95a 100644 --- a/src/ray/raylet/local_task_manager.cc +++ b/src/ray/raylet/local_task_manager.cc @@ -250,6 +250,10 @@ void LocalTaskManager::DispatchScheduledTasksToWorkers() { auto &work = *work_it; const auto &task = work->task; const auto spec = task.GetTaskSpecification(); + + + RAY_LOG(INFO) << "first task spec empty ? " << spec.RuntimeEnvInfo().serialized_runtime_env().empty(); + TaskID task_id = spec.TaskId(); if (work->GetState() == internal::WorkStatus::WAITING_FOR_WORKER) { work_it++; diff --git a/src/ray/raylet/runtime_env_agent_client.cc b/src/ray/raylet/runtime_env_agent_client.cc index 9d06ad09801c..fb12760ffb8b 100644 --- a/src/ray/raylet/runtime_env_agent_client.cc +++ b/src/ray/raylet/runtime_env_agent_client.cc @@ -114,14 +114,33 @@ class Session : public std::enable_shared_from_this { finished_callback_ = std::move(finished_callback); ResolutionKey key{host_, port_}; - auto opt_tcp_stream = tcp_stream_cache_.GetAndPop(key); - if (opt_tcp_stream.has_value()) { - stream_ = std::move(opt_tcp_stream.value()); - stream_->expires_never(); + bool found = false; + + int cache_hit_count = 0; + int cache_miss_count = 0; + { + std::lock_guard lck(cache_mutex_); + if (!tcp_stream_cache_.empty()) { + stream_ = std::move(tcp_stream_cache_.front()); + tcp_stream_cache_.pop_front(); + found = true; + cache_hit++; + } else { + ++cache_miss; + } + + cache_hit_count = cache_hit; + cache_miss_count = cache_miss; + } + + RAY_LOG_EVERY_N(INFO, 10) << "Cache hit " << cache_hit_count << ", cache miss " << cache_miss_count; + + if (found) { + stream_->expires_never(); // Send the HTTP request to the remote host http::async_write( - *stream_, req_, beast::bind_front_handler(&Session::on_write, shared_from_this())); + *stream_, req_, beast::bind_front_handler(&Session::on_write, shared_from_this())); } else { // Starts the state machine by looking up the domain name. resolver_.async_resolve( @@ -228,7 +247,11 @@ class Session : public std::enable_shared_from_this { // Gracefully close the socket // stream_->socket().shutdown(tcp::socket::shutdown_both, ec); - tcp_stream_cache_.Put(ResolutionKey{std::move(host_), std::move(port_)}, std::move(stream_)); + { + std::lock_guard lck(cache_mutex_); + tcp_stream_cache_.emplace_back(std::move(stream_)); + } + // not_connected happens sometimes so don't bother reporting it. if (ec && ec != beast::errc::not_connected) { @@ -252,7 +275,11 @@ class Session : public std::enable_shared_from_this { http::response res_; FinishedCallback finished_callback_; - inline static ResolutionCache> tcp_stream_cache_; + inline static std::deque> tcp_stream_cache_; + inline static std::mutex cache_mutex_; + inline static int cache_hit = 0; + inline static int cache_miss = 0; + std::chrono::time_point start_; }; @@ -402,6 +429,9 @@ class HttpRuntimeEnvAgentClient : public RuntimeEnvAgentClient { const std::string &serialized_runtime_env, const rpc::RuntimeEnvConfig &runtime_env_config, GetOrCreateRuntimeEnvCallback callback) override { + callback(true, "{}", ""); + return; + RetryInvokeOnNotFoundWithDeadline( [=](SuccCallback succ_callback, FailCallback fail_callback) { diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 943ea89b24b5..aa76f29e13be 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -17,6 +17,7 @@ #include #include #include +#include #include "absl/strings/str_split.h" #include "ray/common/constants.h" @@ -230,6 +231,9 @@ void WorkerPool::AddWorkerProcess( const std::chrono::high_resolution_clock::time_point &start, const rpc::RuntimeEnvInfo &runtime_env_info, const std::vector &dynamic_options) { + + RAY_LOG(INFO) << "Add worker process, runtime env empty " << runtime_env_info.serialized_runtime_env().empty(); + state.worker_processes.emplace(worker_startup_token_counter_, WorkerProcessInfo{/*is_pending_registration=*/true, worker_type, @@ -444,6 +448,8 @@ std::tuple WorkerPool::StartWorkerProcess( const int runtime_env_hash, const std::string &serialized_runtime_env_context, const rpc::RuntimeEnvInfo &runtime_env_info) { + RAY_LOG(INFO) << "at start worker process, empty " << runtime_env_info.serialized_runtime_env().empty(); + rpc::JobConfig *job_config = nullptr; if (!job_id.IsNil()) { auto it = all_jobs_.find(job_id); @@ -1016,6 +1022,9 @@ void WorkerPool::PushWorker(const std::shared_ptr &worker) { } if (pop_worker_request) { + + RAY_LOG(INFO) << "DEBUG STR empty ? " << pop_worker_request->runtime_env_info.serialized_runtime_env().empty(); + bool used = pop_worker_request->callback(worker, PopWorkerStatus::OK, ""); if (!used) { // Retry PushWorker. Maybe it can be used by other tasks. @@ -1205,6 +1214,10 @@ WorkerUnfitForTaskReason WorkerPool::WorkerFitsForTask( void WorkerPool::StartNewWorker( const std::shared_ptr &pop_worker_request) { + + RAY_LOG(INFO) << "when start worker, runtime env empty ? " << pop_worker_request->runtime_env_info.serialized_runtime_env().empty(); + + // <---- hjiang auto start_worker_process_fn = [this]( std::shared_ptr pop_worker_request, const std::string &serialized_runtime_env_context) { @@ -1240,7 +1253,8 @@ void WorkerPool::StartNewWorker( const std::string &serialized_runtime_env = pop_worker_request->runtime_env_info.serialized_runtime_env(); - if (!IsRuntimeEnvEmpty(serialized_runtime_env)) { + // if (!IsRuntimeEnvEmpty(serialized_runtime_env)) { + if (false) { // create runtime env. GetOrCreateRuntimeEnv( serialized_runtime_env, @@ -1267,6 +1281,9 @@ void WorkerPool::StartNewWorker( void WorkerPool::PopWorker(const TaskSpecification &task_spec, const PopWorkerCallback &callback) { + + RAY_LOG(INFO) << "task spec empty ? " << task_spec.RuntimeEnvInfo().serialized_runtime_env().empty(); + RAY_LOG(DEBUG) << "Pop worker for task " << task_spec.TaskId() << " task name " << task_spec.FunctionDescriptor()->ToString(); // Code path of actor task. @@ -1308,6 +1325,8 @@ void WorkerPool::PopWorker(const TaskSpecification &task_spec, absl::flat_hash_map skip_reason_count; + RAY_LOG(INFO) << "before work fits for work, empty ? " << pop_worker_request->runtime_env_info.serialized_runtime_env().empty(); + auto worker_fits_for_task_fn = [this, &pop_worker_request, &skip_reason_count]( const std::pair, int64_t> &pair) -> bool { @@ -1649,7 +1668,24 @@ WorkerPool::IOWorkerState &WorkerPool::GetIOWorkerStateFromWorkerType( void WorkerPool::GetOrCreateRuntimeEnv(const std::string &serialized_runtime_env, const rpc::RuntimeEnvConfig &runtime_env_config, const JobID &job_id, - const GetOrCreateRuntimeEnvCallback &callback) { + const GetOrCreateRuntimeEnvCallback &callback) { + RAY_LOG(INFO) << "serialized env length = " << serialized_runtime_env.length() << "\n" + << boost::stacktrace::stacktrace(); + + /* + message RuntimeEnvConfig { + /// The timeout of runtime env creation. + int32 setup_timeout_seconds = 1; + /// Indicates whether to install runtime env eagerly before the workers are leased. + bool eager_install = 2; + /// A list of files to stream the runtime env setup logs to. + repeated string log_files = 3; + } + */ + for (const auto& cur_log : runtime_env_config.log_files()) { + RAY_LOG(INFO) << "log file length = " << cur_log.length(); + } + RAY_LOG(DEBUG) << "GetOrCreateRuntimeEnv for job " << job_id << " with runtime_env " << serialized_runtime_env; runtime_env_agent_client_->GetOrCreateRuntimeEnv( @@ -1661,6 +1697,12 @@ void WorkerPool::GetOrCreateRuntimeEnv(const std::string &serialized_runtime_env const std::string &serialized_runtime_env_context, const std::string &setup_error_message) { if (successful) { + + // RAY_LOG(INFO) << "runtime env context len = " << serialized_runtime_env_context.length() + // << "|" << serialized_runtime_env_context << "|" + // << serialized_runtime_env_context[0] << "|" + // << serialized_runtime_env_context[1] << "|"; + callback(true, serialized_runtime_env_context, ""); } else { RAY_LOG(WARNING) << "Couldn't create a runtime environment for job " << job_id @@ -1676,6 +1718,8 @@ void WorkerPool::GetOrCreateRuntimeEnv(const std::string &serialized_runtime_env void WorkerPool::DeleteRuntimeEnvIfPossible(const std::string &serialized_runtime_env) { RAY_LOG(DEBUG) << "DeleteRuntimeEnvIfPossible " << serialized_runtime_env; + return; + if (!IsRuntimeEnvEmpty(serialized_runtime_env)) { runtime_env_agent_client_->DeleteRuntimeEnvIfPossible( serialized_runtime_env, [serialized_runtime_env](bool successful) { diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index ef2e1e048635..a2d4b06bc1b1 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -539,7 +539,9 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { runtime_env_info(std::move(runtime_env_info)), runtime_env_hash(runtime_hash), dynamic_options(std::move(options)), - callback(std::move(callback)) {} + callback(std::move(callback)) { + // RAY_LOG(INFO) << "DEBUG STR empty ? " << runtime_env_info.serialized_runtime_env().empty(); + } }; // Starts a new worker that fulfills `pop_worker_request`. From 6a68f238e5a70fe5aa62870f7902737c37cfe41d Mon Sep 17 00:00:00 2001 From: dentiny Date: Wed, 13 Nov 2024 22:41:15 +0000 Subject: [PATCH 03/10] optimizae --- src/ray/common/task/task_spec.cc | 6 +++--- src/ray/common/task/task_spec.h | 6 +++--- src/ray/raylet/local_task_manager.cc | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/ray/common/task/task_spec.cc b/src/ray/common/task/task_spec.cc index a1b3c04f80b3..88d804941155 100644 --- a/src/ray/common/task/task_spec.cc +++ b/src/ray/common/task/task_spec.cc @@ -181,15 +181,15 @@ ray::FunctionDescriptor TaskSpecification::FunctionDescriptor() const { return ray::FunctionDescriptorBuilder::FromProto(message_->function_descriptor()); } -rpc::RuntimeEnvInfo TaskSpecification::RuntimeEnvInfo() const { +const rpc::RuntimeEnvInfo& TaskSpecification::RuntimeEnvInfo() const { return message_->runtime_env_info(); } -std::string TaskSpecification::SerializedRuntimeEnv() const { +const std::string& TaskSpecification::SerializedRuntimeEnv() const { return message_->runtime_env_info().serialized_runtime_env(); } -rpc::RuntimeEnvConfig TaskSpecification::RuntimeEnvConfig() const { +const rpc::RuntimeEnvConfig& TaskSpecification::RuntimeEnvConfig() const { return message_->runtime_env_info().runtime_env_config(); } diff --git a/src/ray/common/task/task_spec.h b/src/ray/common/task/task_spec.h index 019835062d31..f9a3d230ad97 100644 --- a/src/ray/common/task/task_spec.h +++ b/src/ray/common/task/task_spec.h @@ -306,11 +306,11 @@ class TaskSpecification : public MessageWrapper { ray::FunctionDescriptor FunctionDescriptor() const; - [[nodiscard]] rpc::RuntimeEnvInfo RuntimeEnvInfo() const; + [[nodiscard]] const rpc::RuntimeEnvInfo& RuntimeEnvInfo() const; - std::string SerializedRuntimeEnv() const; + const std::string& SerializedRuntimeEnv() const; - rpc::RuntimeEnvConfig RuntimeEnvConfig() const; + const rpc::RuntimeEnvConfig& RuntimeEnvConfig() const; bool HasRuntimeEnv() const; diff --git a/src/ray/raylet/local_task_manager.cc b/src/ray/raylet/local_task_manager.cc index 59e94b9eb95a..f89b24d8e175 100644 --- a/src/ray/raylet/local_task_manager.cc +++ b/src/ray/raylet/local_task_manager.cc @@ -249,7 +249,7 @@ void LocalTaskManager::DispatchScheduledTasksToWorkers() { for (auto work_it = dispatch_queue.begin(); work_it != dispatch_queue.end();) { auto &work = *work_it; const auto &task = work->task; - const auto spec = task.GetTaskSpecification(); + const auto &spec = task.GetTaskSpecification(); RAY_LOG(INFO) << "first task spec empty ? " << spec.RuntimeEnvInfo().serialized_runtime_env().empty(); From 051e0971862eaed37ade9332f19c13a2a803b1c5 Mon Sep 17 00:00:00 2001 From: dentiny Date: Wed, 13 Nov 2024 23:32:00 +0000 Subject: [PATCH 04/10] debug --- .../benchmarks/distributed/test_many_tasks.py | 10 +++-- src/ray/common/task/task_spec.cc | 6 +-- src/ray/common/task/task_spec.h | 6 +-- src/ray/raylet/local_task_manager.cc | 12 +++--- src/ray/raylet/resolution_cache.h | 13 ++++--- src/ray/raylet/runtime_env_agent_client.cc | 29 +++++++------- src/ray/raylet/worker_pool.cc | 39 ++++++++++--------- src/ray/raylet/worker_pool.h | 5 ++- 8 files changed, 67 insertions(+), 53 deletions(-) diff --git a/release/benchmarks/distributed/test_many_tasks.py b/release/benchmarks/distributed/test_many_tasks.py index fddfc493bc22..999f31db0dfa 100644 --- a/release/benchmarks/distributed/test_many_tasks.py +++ b/release/benchmarks/distributed/test_many_tasks.py @@ -5,7 +5,8 @@ import tqdm from ray.util.state import summarize_tasks -from many_nodes_tests.dashboard_test import DashboardTestAtScale + +# from many_nodes_tests.dashboard_test import DashboardTestAtScale from ray._private.state_api_test_utils import ( StateAPICallSpec, periodic_invoke_state_apis_with_actor, @@ -68,7 +69,7 @@ def no_resource_leaks(): @click.command() @click.option("--num-tasks", required=True, type=int, help="Number of tasks to launch.") def test(num_tasks): - addr = ray.init() + ray.init() test_utils.wait_for_condition(no_resource_leaks) monitor_actor = test_utils.monitor_memory_usage() @@ -95,7 +96,10 @@ def not_none(res): print(f"Peak memory usage per processes:\n {usage}") ray.get(api_caller.stop.remote()) - print(f"end time = {end_time}, start time = {start_time}, sleep time = {sleep_time}, num tasks = {num_tasks}") + print( + f"end time = {end_time}, start time = {start_time}," + "sleep time = {sleep_time}, num tasks = {num_tasks}" + ) rate = num_tasks / (end_time - start_time - sleep_time) print( diff --git a/src/ray/common/task/task_spec.cc b/src/ray/common/task/task_spec.cc index 88d804941155..5633316ad2ed 100644 --- a/src/ray/common/task/task_spec.cc +++ b/src/ray/common/task/task_spec.cc @@ -181,15 +181,15 @@ ray::FunctionDescriptor TaskSpecification::FunctionDescriptor() const { return ray::FunctionDescriptorBuilder::FromProto(message_->function_descriptor()); } -const rpc::RuntimeEnvInfo& TaskSpecification::RuntimeEnvInfo() const { +const rpc::RuntimeEnvInfo &TaskSpecification::RuntimeEnvInfo() const { return message_->runtime_env_info(); } -const std::string& TaskSpecification::SerializedRuntimeEnv() const { +const std::string &TaskSpecification::SerializedRuntimeEnv() const { return message_->runtime_env_info().serialized_runtime_env(); } -const rpc::RuntimeEnvConfig& TaskSpecification::RuntimeEnvConfig() const { +const rpc::RuntimeEnvConfig &TaskSpecification::RuntimeEnvConfig() const { return message_->runtime_env_info().runtime_env_config(); } diff --git a/src/ray/common/task/task_spec.h b/src/ray/common/task/task_spec.h index f9a3d230ad97..ac4a38c92cee 100644 --- a/src/ray/common/task/task_spec.h +++ b/src/ray/common/task/task_spec.h @@ -306,11 +306,11 @@ class TaskSpecification : public MessageWrapper { ray::FunctionDescriptor FunctionDescriptor() const; - [[nodiscard]] const rpc::RuntimeEnvInfo& RuntimeEnvInfo() const; + [[nodiscard]] const rpc::RuntimeEnvInfo &RuntimeEnvInfo() const; - const std::string& SerializedRuntimeEnv() const; + const std::string &SerializedRuntimeEnv() const; - const rpc::RuntimeEnvConfig& RuntimeEnvConfig() const; + const rpc::RuntimeEnvConfig &RuntimeEnvConfig() const; bool HasRuntimeEnv() const; diff --git a/src/ray/raylet/local_task_manager.cc b/src/ray/raylet/local_task_manager.cc index f89b24d8e175..d8e07d9edf2a 100644 --- a/src/ray/raylet/local_task_manager.cc +++ b/src/ray/raylet/local_task_manager.cc @@ -81,6 +81,8 @@ void LocalTaskManager::QueueAndScheduleTask(std::shared_ptr work } bool LocalTaskManager::WaitForTaskArgsRequests(std::shared_ptr work) { + { const auto &spec = work->task.GetTaskSpecification(); } + const auto &task = work->task; const auto &task_id = task.GetTaskSpecification().TaskId(); const auto &scheduling_key = task.GetTaskSpecification().GetSchedulingClass(); @@ -93,18 +95,18 @@ bool LocalTaskManager::WaitForTaskArgsRequests(std::shared_ptr w {task.GetTaskSpecification().GetName(), task.GetTaskSpecification().IsRetry()}); if (args_ready) { RAY_LOG(DEBUG) << "Args already ready, task can be dispatched " << task_id; - tasks_to_dispatch_[scheduling_key].push_back(work); + tasks_to_dispatch_[scheduling_key].emplace_back(std::move(task)); } else { RAY_LOG(DEBUG) << "Waiting for args for task: " << task.GetTaskSpecification().TaskId(); can_dispatch = false; - auto it = waiting_task_queue_.insert(waiting_task_queue_.end(), work); + auto it = waiting_task_queue_.insert(waiting_task_queue_.end(), std::move(task)); RAY_CHECK(waiting_tasks_index_.emplace(task_id, it).second); } } else { RAY_LOG(DEBUG) << "No args, task can be dispatched " << task.GetTaskSpecification().TaskId(); - tasks_to_dispatch_[scheduling_key].push_back(work); + tasks_to_dispatch_[scheduling_key].emplace_back(std::move(task)); } return can_dispatch; } @@ -251,8 +253,8 @@ void LocalTaskManager::DispatchScheduledTasksToWorkers() { const auto &task = work->task; const auto &spec = task.GetTaskSpecification(); - - RAY_LOG(INFO) << "first task spec empty ? " << spec.RuntimeEnvInfo().serialized_runtime_env().empty(); + RAY_LOG(INFO) << "first task spec empty ? " + << spec.RuntimeEnvInfo().serialized_runtime_env().empty(); TaskID task_id = spec.TaskId(); if (work->GetState() == internal::WorkStatus::WAITING_FOR_WORKER) { diff --git a/src/ray/raylet/resolution_cache.h b/src/ray/raylet/resolution_cache.h index f7c82c4fd076..9e5a23cf2d77 100644 --- a/src/ray/raylet/resolution_cache.h +++ b/src/ray/raylet/resolution_cache.h @@ -1,27 +1,28 @@ #pragma once -#include "absl/container/flat_hash_map.h" #include +#include "absl/container/flat_hash_map.h" + namespace ray { template class ResolutionCache { public: - std::optional Get(const Key& key) { + std::optional Get(const Key &key) { std::lock_guard lck(mtx_); auto iter = map_.find(key); if (iter == map_.end()) { - return std::nullopt; + return std::nullopt; } return iter->second; } - std::optional GetAndPop(const Key& key) { + std::optional GetAndPop(const Key &key) { std::lock_guard lck(mtx_); auto iter = map_.find(key); if (iter == map_.end()) { - return std::nullopt; + return std::nullopt; } auto value = std::move(iter->second); map_.erase(iter); @@ -34,7 +35,7 @@ class ResolutionCache { } std::mutex mtx_; - absl::flat_hash_map map_; + absl::flat_hash_map map_; }; } // namespace ray diff --git a/src/ray/raylet/runtime_env_agent_client.cc b/src/ray/raylet/runtime_env_agent_client.cc index fb12760ffb8b..c4d3358bd63e 100644 --- a/src/ray/raylet/runtime_env_agent_client.cc +++ b/src/ray/raylet/runtime_env_agent_client.cc @@ -16,6 +16,7 @@ #include #include #include +#include #include #include #include @@ -23,13 +24,11 @@ #include #include -#include - #include "absl/container/flat_hash_set.h" -#include "ray/raylet/resolution_cache.h" #include "absl/strings/str_format.h" #include "ray/common/asio/instrumented_io_context.h" #include "ray/common/status.h" +#include "ray/raylet/resolution_cache.h" #include "ray/util/logging.h" #include "src/ray/protobuf/runtime_env_agent.pb.h" @@ -49,12 +48,12 @@ struct ResolutionKey { std::string host; std::string port; - bool operator==(const ResolutionKey& rhs) const { + bool operator==(const ResolutionKey &rhs) const { return host == rhs.host && port == rhs.port; } template - friend H AbslHashValue(H h, const ResolutionKey& k) { + friend H AbslHashValue(H h, const ResolutionKey &k) { return H::combine(std::move(h), k.host, k.port); } }; @@ -117,7 +116,7 @@ class Session : public std::enable_shared_from_this { bool found = false; - int cache_hit_count = 0; + int cache_hit_count = 0; int cache_miss_count = 0; { std::lock_guard lck(cache_mutex_); @@ -134,13 +133,16 @@ class Session : public std::enable_shared_from_this { cache_miss_count = cache_miss; } - RAY_LOG_EVERY_N(INFO, 10) << "Cache hit " << cache_hit_count << ", cache miss " << cache_miss_count; + RAY_LOG_EVERY_N(INFO, 10) << "Cache hit " << cache_hit_count << ", cache miss " + << cache_miss_count; if (found) { stream_->expires_never(); // Send the HTTP request to the remote host http::async_write( - *stream_, req_, beast::bind_front_handler(&Session::on_write, shared_from_this())); + *stream_, + req_, + beast::bind_front_handler(&Session::on_write, shared_from_this())); } else { // Starts the state machine by looking up the domain name. resolver_.async_resolve( @@ -208,8 +210,9 @@ class Session : public std::enable_shared_from_this { stream_->expires_never(); // Send the HTTP request to the remote host - http::async_write( - *stream_, req_, beast::bind_front_handler(&Session::on_write, shared_from_this())); + http::async_write(*stream_, + req_, + beast::bind_front_handler(&Session::on_write, shared_from_this())); } void on_write(beast::error_code ec, std::size_t bytes_transferred) { @@ -251,7 +254,6 @@ class Session : public std::enable_shared_from_this { std::lock_guard lck(cache_mutex_); tcp_stream_cache_.emplace_back(std::move(stream_)); } - // not_connected happens sometimes so don't bother reporting it. if (ec && ec != beast::errc::not_connected) { @@ -259,7 +261,8 @@ class Session : public std::enable_shared_from_this { } auto end = std::chrono::steady_clock::now(); - auto elapsed = std::chrono::duration_cast(end - start_).count(); + auto elapsed = + std::chrono::duration_cast(end - start_).count(); RAY_LOG(INFO) << "Session takes " << elapsed; } @@ -431,7 +434,7 @@ class HttpRuntimeEnvAgentClient : public RuntimeEnvAgentClient { GetOrCreateRuntimeEnvCallback callback) override { callback(true, "{}", ""); return; - + RetryInvokeOnNotFoundWithDeadline( [=](SuccCallback succ_callback, FailCallback fail_callback) { diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index aa76f29e13be..2fa716d8e3d5 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -16,8 +16,8 @@ #include #include -#include #include +#include #include "absl/strings/str_split.h" #include "ray/common/constants.h" @@ -231,8 +231,8 @@ void WorkerPool::AddWorkerProcess( const std::chrono::high_resolution_clock::time_point &start, const rpc::RuntimeEnvInfo &runtime_env_info, const std::vector &dynamic_options) { - - RAY_LOG(INFO) << "Add worker process, runtime env empty " << runtime_env_info.serialized_runtime_env().empty(); + RAY_LOG(INFO) << "Add worker process, runtime env empty " + << runtime_env_info.serialized_runtime_env().empty(); state.worker_processes.emplace(worker_startup_token_counter_, WorkerProcessInfo{/*is_pending_registration=*/true, @@ -448,8 +448,9 @@ std::tuple WorkerPool::StartWorkerProcess( const int runtime_env_hash, const std::string &serialized_runtime_env_context, const rpc::RuntimeEnvInfo &runtime_env_info) { - RAY_LOG(INFO) << "at start worker process, empty " << runtime_env_info.serialized_runtime_env().empty(); - + RAY_LOG(INFO) << "at start worker process, empty " + << runtime_env_info.serialized_runtime_env().empty(); + rpc::JobConfig *job_config = nullptr; if (!job_id.IsNil()) { auto it = all_jobs_.find(job_id); @@ -1022,8 +1023,9 @@ void WorkerPool::PushWorker(const std::shared_ptr &worker) { } if (pop_worker_request) { - - RAY_LOG(INFO) << "DEBUG STR empty ? " << pop_worker_request->runtime_env_info.serialized_runtime_env().empty(); + RAY_LOG(INFO) + << "DEBUG STR empty ? " + << pop_worker_request->runtime_env_info.serialized_runtime_env().empty(); bool used = pop_worker_request->callback(worker, PopWorkerStatus::OK, ""); if (!used) { @@ -1214,8 +1216,8 @@ WorkerUnfitForTaskReason WorkerPool::WorkerFitsForTask( void WorkerPool::StartNewWorker( const std::shared_ptr &pop_worker_request) { - - RAY_LOG(INFO) << "when start worker, runtime env empty ? " << pop_worker_request->runtime_env_info.serialized_runtime_env().empty(); + RAY_LOG(INFO) << "when start worker, runtime env empty ? " + << pop_worker_request->runtime_env_info.serialized_runtime_env().empty(); // <---- hjiang auto start_worker_process_fn = [this]( @@ -1281,9 +1283,9 @@ void WorkerPool::StartNewWorker( void WorkerPool::PopWorker(const TaskSpecification &task_spec, const PopWorkerCallback &callback) { - - RAY_LOG(INFO) << "task spec empty ? " << task_spec.RuntimeEnvInfo().serialized_runtime_env().empty(); - + RAY_LOG(INFO) << "task spec empty ? " + << task_spec.RuntimeEnvInfo().serialized_runtime_env().empty(); + RAY_LOG(DEBUG) << "Pop worker for task " << task_spec.TaskId() << " task name " << task_spec.FunctionDescriptor()->ToString(); // Code path of actor task. @@ -1325,7 +1327,8 @@ void WorkerPool::PopWorker(const TaskSpecification &task_spec, absl::flat_hash_map skip_reason_count; - RAY_LOG(INFO) << "before work fits for work, empty ? " << pop_worker_request->runtime_env_info.serialized_runtime_env().empty(); + RAY_LOG(INFO) << "before work fits for work, empty ? " + << pop_worker_request->runtime_env_info.serialized_runtime_env().empty(); auto worker_fits_for_task_fn = [this, &pop_worker_request, &skip_reason_count]( @@ -1668,7 +1671,7 @@ WorkerPool::IOWorkerState &WorkerPool::GetIOWorkerStateFromWorkerType( void WorkerPool::GetOrCreateRuntimeEnv(const std::string &serialized_runtime_env, const rpc::RuntimeEnvConfig &runtime_env_config, const JobID &job_id, - const GetOrCreateRuntimeEnvCallback &callback) { + const GetOrCreateRuntimeEnvCallback &callback) { RAY_LOG(INFO) << "serialized env length = " << serialized_runtime_env.length() << "\n" << boost::stacktrace::stacktrace(); @@ -1682,10 +1685,10 @@ void WorkerPool::GetOrCreateRuntimeEnv(const std::string &serialized_runtime_env repeated string log_files = 3; } */ - for (const auto& cur_log : runtime_env_config.log_files()) { + for (const auto &cur_log : runtime_env_config.log_files()) { RAY_LOG(INFO) << "log file length = " << cur_log.length(); } - + RAY_LOG(DEBUG) << "GetOrCreateRuntimeEnv for job " << job_id << " with runtime_env " << serialized_runtime_env; runtime_env_agent_client_->GetOrCreateRuntimeEnv( @@ -1697,8 +1700,8 @@ void WorkerPool::GetOrCreateRuntimeEnv(const std::string &serialized_runtime_env const std::string &serialized_runtime_env_context, const std::string &setup_error_message) { if (successful) { - - // RAY_LOG(INFO) << "runtime env context len = " << serialized_runtime_env_context.length() + // RAY_LOG(INFO) << "runtime env context len = " << + // serialized_runtime_env_context.length() // << "|" << serialized_runtime_env_context << "|" // << serialized_runtime_env_context[0] << "|" // << serialized_runtime_env_context[1] << "|"; diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index a2d4b06bc1b1..d9ed04800077 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -540,8 +540,9 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { runtime_env_hash(runtime_hash), dynamic_options(std::move(options)), callback(std::move(callback)) { - // RAY_LOG(INFO) << "DEBUG STR empty ? " << runtime_env_info.serialized_runtime_env().empty(); - } + // RAY_LOG(INFO) << "DEBUG STR empty ? " << + // runtime_env_info.serialized_runtime_env().empty(); + } }; // Starts a new worker that fulfills `pop_worker_request`. From 2a313c18748ce6e18a15431196aa002111a22416 Mon Sep 17 00:00:00 2001 From: dentiny Date: Wed, 13 Nov 2024 23:49:15 +0000 Subject: [PATCH 05/10] debug --- src/ray/raylet/local_task_manager.cc | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/ray/raylet/local_task_manager.cc b/src/ray/raylet/local_task_manager.cc index d8e07d9edf2a..bb02aae2bce2 100644 --- a/src/ray/raylet/local_task_manager.cc +++ b/src/ray/raylet/local_task_manager.cc @@ -81,7 +81,10 @@ void LocalTaskManager::QueueAndScheduleTask(std::shared_ptr work } bool LocalTaskManager::WaitForTaskArgsRequests(std::shared_ptr work) { - { const auto &spec = work->task.GetTaskSpecification(); } + { + const auto &spec = work->task.GetTaskSpecification(); + RAY_LOG(INFO) << "at WaitForTaskArgsRequests, serialized runtime env empty ? " << spec.SerializedRuntimeEnv().empty(); + } const auto &task = work->task; const auto &task_id = task.GetTaskSpecification().TaskId(); @@ -95,18 +98,18 @@ bool LocalTaskManager::WaitForTaskArgsRequests(std::shared_ptr w {task.GetTaskSpecification().GetName(), task.GetTaskSpecification().IsRetry()}); if (args_ready) { RAY_LOG(DEBUG) << "Args already ready, task can be dispatched " << task_id; - tasks_to_dispatch_[scheduling_key].emplace_back(std::move(task)); + tasks_to_dispatch_[scheduling_key].emplace_back(task); } else { RAY_LOG(DEBUG) << "Waiting for args for task: " << task.GetTaskSpecification().TaskId(); can_dispatch = false; - auto it = waiting_task_queue_.insert(waiting_task_queue_.end(), std::move(task)); + auto it = waiting_task_queue_.insert(waiting_task_queue_.end(), task); RAY_CHECK(waiting_tasks_index_.emplace(task_id, it).second); } } else { RAY_LOG(DEBUG) << "No args, task can be dispatched " << task.GetTaskSpecification().TaskId(); - tasks_to_dispatch_[scheduling_key].emplace_back(std::move(task)); + tasks_to_dispatch_[scheduling_key].emplace_back(task); } return can_dispatch; } From 32187bf19e0ca7d860d0d8b133f58dc27edca8d9 Mon Sep 17 00:00:00 2001 From: dentiny Date: Thu, 14 Nov 2024 00:29:15 +0000 Subject: [PATCH 06/10] debug --- src/ray/common/task/task.cc | 2 ++ src/ray/common/task/task.h | 2 ++ src/ray/common/task/task_spec.cc | 4 ++++ src/ray/common/task/task_spec.h | 2 ++ src/ray/raylet/local_task_manager.cc | 7 ++++--- 5 files changed, 14 insertions(+), 3 deletions(-) diff --git a/src/ray/common/task/task.cc b/src/ray/common/task/task.cc index e2ac8571c4e5..20c93c5ca363 100644 --- a/src/ray/common/task/task.cc +++ b/src/ray/common/task/task.cc @@ -33,6 +33,8 @@ RayTask::RayTask(TaskSpecification task_spec, std::string preferred_node_id) const TaskSpecification &RayTask::GetTaskSpecification() const { return task_spec_; } +TaskSpecification& RayTask::MutableTaskSpec() { return task_spec_; } + const std::vector &RayTask::GetDependencies() const { return dependencies_; } diff --git a/src/ray/common/task/task.h b/src/ray/common/task/task.h index 5a4a9e323de5..ca6dc875b231 100644 --- a/src/ray/common/task/task.h +++ b/src/ray/common/task/task.h @@ -50,6 +50,8 @@ class RayTask { /// \return The immutable specification for the task. const TaskSpecification &GetTaskSpecification() const; + TaskSpecification& MutableTaskSpec(); + /// Get the task's object dependencies. This comprises the immutable task /// arguments and the mutable execution dependencies. /// diff --git a/src/ray/common/task/task_spec.cc b/src/ray/common/task/task_spec.cc index 5633316ad2ed..7e1e73c408ab 100644 --- a/src/ray/common/task/task_spec.cc +++ b/src/ray/common/task/task_spec.cc @@ -189,6 +189,10 @@ const std::string &TaskSpecification::SerializedRuntimeEnv() const { return message_->runtime_env_info().serialized_runtime_env(); } +void TaskSpecification::ClearRuntimeEnv() { + message_->mutable_runtime_env_info()->clear_serialized_runtime_env(); +} + const rpc::RuntimeEnvConfig &TaskSpecification::RuntimeEnvConfig() const { return message_->runtime_env_info().runtime_env_config(); } diff --git a/src/ray/common/task/task_spec.h b/src/ray/common/task/task_spec.h index ac4a38c92cee..7530f5ffd9fa 100644 --- a/src/ray/common/task/task_spec.h +++ b/src/ray/common/task/task_spec.h @@ -310,6 +310,8 @@ class TaskSpecification : public MessageWrapper { const std::string &SerializedRuntimeEnv() const; + void ClearRuntimeEnv(); + const rpc::RuntimeEnvConfig &RuntimeEnvConfig() const; bool HasRuntimeEnv() const; diff --git a/src/ray/raylet/local_task_manager.cc b/src/ray/raylet/local_task_manager.cc index bb02aae2bce2..a2b4f3c6372a 100644 --- a/src/ray/raylet/local_task_manager.cc +++ b/src/ray/raylet/local_task_manager.cc @@ -84,6 +84,7 @@ bool LocalTaskManager::WaitForTaskArgsRequests(std::shared_ptr w { const auto &spec = work->task.GetTaskSpecification(); RAY_LOG(INFO) << "at WaitForTaskArgsRequests, serialized runtime env empty ? " << spec.SerializedRuntimeEnv().empty(); + work->task.MutableTaskSpec().ClearRuntimeEnv(); } const auto &task = work->task; @@ -98,18 +99,18 @@ bool LocalTaskManager::WaitForTaskArgsRequests(std::shared_ptr w {task.GetTaskSpecification().GetName(), task.GetTaskSpecification().IsRetry()}); if (args_ready) { RAY_LOG(DEBUG) << "Args already ready, task can be dispatched " << task_id; - tasks_to_dispatch_[scheduling_key].emplace_back(task); + tasks_to_dispatch_[scheduling_key].emplace_back(std::move(work)); } else { RAY_LOG(DEBUG) << "Waiting for args for task: " << task.GetTaskSpecification().TaskId(); can_dispatch = false; - auto it = waiting_task_queue_.insert(waiting_task_queue_.end(), task); + auto it = waiting_task_queue_.insert(waiting_task_queue_.end(), std::move(work)); RAY_CHECK(waiting_tasks_index_.emplace(task_id, it).second); } } else { RAY_LOG(DEBUG) << "No args, task can be dispatched " << task.GetTaskSpecification().TaskId(); - tasks_to_dispatch_[scheduling_key].emplace_back(task); + tasks_to_dispatch_[scheduling_key].emplace_back(std::move(work)); } return can_dispatch; } From 63b4f1bf305b8c46bb8ae493b7c6edb11fc16c69 Mon Sep 17 00:00:00 2001 From: dentiny Date: Thu, 14 Nov 2024 00:34:15 +0000 Subject: [PATCH 07/10] debug --- src/ray/raylet/local_task_manager.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/raylet/local_task_manager.cc b/src/ray/raylet/local_task_manager.cc index a2b4f3c6372a..3d2f6bcb5094 100644 --- a/src/ray/raylet/local_task_manager.cc +++ b/src/ray/raylet/local_task_manager.cc @@ -76,7 +76,7 @@ void LocalTaskManager::QueueAndScheduleTask(std::shared_ptr work // guarantee that the local node is not selected for scheduling. ASSERT_FALSE( cluster_resource_scheduler_->GetLocalResourceManager().IsLocalNodeDraining()); - WaitForTaskArgsRequests(work); + WaitForTaskArgsRequests(std::move(work)); ScheduleAndDispatchTasks(); } From 98f8283591d9fbfd5d670557276c5f5443b8af4d Mon Sep 17 00:00:00 2001 From: dentiny Date: Thu, 14 Nov 2024 02:16:10 +0000 Subject: [PATCH 08/10] debug --- src/ray/core_worker/transport/normal_task_submitter.cc | 2 +- src/ray/raylet/node_manager.cc | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/ray/core_worker/transport/normal_task_submitter.cc b/src/ray/core_worker/transport/normal_task_submitter.cc index 9e8ac970db1b..043223fbd422 100644 --- a/src/ray/core_worker/transport/normal_task_submitter.cc +++ b/src/ray/core_worker/transport/normal_task_submitter.cc @@ -333,7 +333,7 @@ void NormalTaskSubmitter::RequestNewWorkerIfNeeded(const SchedulingKey &scheduli // same TaskID to request a worker auto resource_spec_msg = scheduling_key_entry.resource_spec.GetMutableMessage(); resource_spec_msg.set_task_id(TaskID::FromRandom(job_id_).Binary()); - const TaskSpecification resource_spec = TaskSpecification(resource_spec_msg); + const TaskSpecification resource_spec = TaskSpecification(std::move(resource_spec_msg)); rpc::Address best_node_address; const bool is_spillback = (raylet_address != nullptr); bool is_selected_based_on_locality = false; diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 879edff0bb6c..974cbcc58d03 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1797,6 +1797,10 @@ void NodeManager::HandleReportWorkerBacklog(rpc::ReportWorkerBacklogRequest requ void NodeManager::HandleRequestWorkerLease(rpc::RequestWorkerLeaseRequest request, rpc::RequestWorkerLeaseReply *reply, rpc::SendReplyCallback send_reply_callback) { + + ////// debug???? problematic??? + request.mutable_resource_spec()->mutable_runtime_env_info()->mutable_serialized_runtime_env()->clear(); + rpc::Task task_message; task_message.mutable_task_spec()->CopyFrom(request.resource_spec()); RayTask task(task_message); From 2d1ad2a30050a118f0b67b56d1cb5855228aa97b Mon Sep 17 00:00:00 2001 From: dentiny Date: Thu, 14 Nov 2024 03:16:43 +0000 Subject: [PATCH 09/10] debug --- src/ray/core_worker/core_worker.cc | 6 +++++- .../transport/normal_task_submitter.cc | 15 +++++++++++++-- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index a09d1e4afc0c..782aa0846728 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -2173,6 +2173,10 @@ std::vector CoreWorker::SubmitTask( const std::string &debugger_breakpoint, const std::string &serialized_retry_exception_allowlist, const TaskID current_task_id) { + + RAY_LOG(INFO) << "CoreWorker::SubmitTask " << task_options.serialized_runtime_env_info; + + RAY_CHECK(scheduling_strategy.scheduling_strategy_case() != rpc::SchedulingStrategy::SchedulingStrategyCase::SCHEDULING_STRATEGY_NOT_SET); @@ -2206,7 +2210,7 @@ std::vector CoreWorker::SubmitTask( constrained_resources, debugger_breakpoint, depth, - task_options.serialized_runtime_env_info, + "",// task_options.serialized_runtime_env_info, worker_context_.GetMainThreadOrActorCreationTaskID(), /*concurrency_group_name*/ "", /*include_job_config*/ true, diff --git a/src/ray/core_worker/transport/normal_task_submitter.cc b/src/ray/core_worker/transport/normal_task_submitter.cc index 043223fbd422..03f28c918648 100644 --- a/src/ray/core_worker/transport/normal_task_submitter.cc +++ b/src/ray/core_worker/transport/normal_task_submitter.cc @@ -17,6 +17,8 @@ #include "ray/core_worker/transport/dependency_resolver.h" #include "ray/gcs/pb_util.h" +#include + namespace ray { namespace core { @@ -39,10 +41,14 @@ Status NormalTaskSubmitter::SubmitTask(TaskSpecification task_spec) { RAY_LOG(DEBUG) << "Task dependencies resolved " << task_spec.TaskId(); bool keep_executing = true; + + // auto start = std::chrono::steady_clock::now(); + { absl::MutexLock lock(&mu_); - if (cancelled_tasks_.find(task_spec.TaskId()) != cancelled_tasks_.end()) { - cancelled_tasks_.erase(task_spec.TaskId()); + auto task_iter = cancelled_tasks_.find(task_spec.TaskId()); + if (task_iter != cancelled_tasks_.end()) { + cancelled_tasks_.erase(task_iter); keep_executing = false; } if (keep_executing) { @@ -82,6 +88,11 @@ Status NormalTaskSubmitter::SubmitTask(TaskSpecification task_spec) { RequestNewWorkerIfNeeded(scheduling_key); } } + + // auto end = std::chrono::steady_clock::now(); + // auto elapsed = std::chrono::duration_cast(end - start).count(); + // RAY_LOG(INFO) << "elapse for resolution " << elapsed; + if (!keep_executing) { RAY_UNUSED(task_finisher_->FailOrRetryPendingTask( task_spec.TaskId(), rpc::ErrorType::TASK_CANCELLED, nullptr)); From cf9e5dd84024219f51fd379fc680919b8fb2e7bf Mon Sep 17 00:00:00 2001 From: dentiny Date: Thu, 14 Nov 2024 09:47:19 +0000 Subject: [PATCH 10/10] debug --- cpp/src/ray/runtime/task/native_task_submitter.cc | 2 +- src/ray/common/task/task.cc | 2 +- src/ray/common/task/task.h | 2 +- src/ray/core_worker/core_worker.cc | 6 ++---- src/ray/core_worker/transport/normal_task_submitter.cc | 8 ++++---- src/ray/raylet/local_task_manager.cc | 3 ++- src/ray/raylet/node_manager.cc | 8 +++++--- 7 files changed, 16 insertions(+), 15 deletions(-) diff --git a/cpp/src/ray/runtime/task/native_task_submitter.cc b/cpp/src/ray/runtime/task/native_task_submitter.cc index bd1a544ca23a..87a8a17eab2f 100644 --- a/cpp/src/ray/runtime/task/native_task_submitter.cc +++ b/cpp/src/ray/runtime/task/native_task_submitter.cc @@ -65,7 +65,7 @@ ObjectID NativeTaskSubmitter::Submit(InvocationSpec &invocation, TaskOptions options{}; options.name = call_options.name; options.resources = call_options.resources; - options.serialized_runtime_env_info = call_options.serialized_runtime_env_info; + // options.serialized_runtime_env_info = call_options.serialized_runtime_env_info; options.generator_backpressure_num_objects = -1; std::vector return_refs; if (invocation.task_type == TaskType::ACTOR_TASK) { diff --git a/src/ray/common/task/task.cc b/src/ray/common/task/task.cc index 20c93c5ca363..b9924f996709 100644 --- a/src/ray/common/task/task.cc +++ b/src/ray/common/task/task.cc @@ -33,7 +33,7 @@ RayTask::RayTask(TaskSpecification task_spec, std::string preferred_node_id) const TaskSpecification &RayTask::GetTaskSpecification() const { return task_spec_; } -TaskSpecification& RayTask::MutableTaskSpec() { return task_spec_; } +TaskSpecification &RayTask::MutableTaskSpec() { return task_spec_; } const std::vector &RayTask::GetDependencies() const { return dependencies_; diff --git a/src/ray/common/task/task.h b/src/ray/common/task/task.h index ca6dc875b231..949549677504 100644 --- a/src/ray/common/task/task.h +++ b/src/ray/common/task/task.h @@ -50,7 +50,7 @@ class RayTask { /// \return The immutable specification for the task. const TaskSpecification &GetTaskSpecification() const; - TaskSpecification& MutableTaskSpec(); + TaskSpecification &MutableTaskSpec(); /// Get the task's object dependencies. This comprises the immutable task /// arguments and the mutable execution dependencies. diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 782aa0846728..83a9332b2d63 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -2173,10 +2173,8 @@ std::vector CoreWorker::SubmitTask( const std::string &debugger_breakpoint, const std::string &serialized_retry_exception_allowlist, const TaskID current_task_id) { - RAY_LOG(INFO) << "CoreWorker::SubmitTask " << task_options.serialized_runtime_env_info; - - + RAY_CHECK(scheduling_strategy.scheduling_strategy_case() != rpc::SchedulingStrategy::SchedulingStrategyCase::SCHEDULING_STRATEGY_NOT_SET); @@ -2210,7 +2208,7 @@ std::vector CoreWorker::SubmitTask( constrained_resources, debugger_breakpoint, depth, - "",// task_options.serialized_runtime_env_info, + "", // task_options.serialized_runtime_env_info, worker_context_.GetMainThreadOrActorCreationTaskID(), /*concurrency_group_name*/ "", /*include_job_config*/ true, diff --git a/src/ray/core_worker/transport/normal_task_submitter.cc b/src/ray/core_worker/transport/normal_task_submitter.cc index 03f28c918648..5eec57e657a0 100644 --- a/src/ray/core_worker/transport/normal_task_submitter.cc +++ b/src/ray/core_worker/transport/normal_task_submitter.cc @@ -14,11 +14,11 @@ #include "ray/core_worker/transport/normal_task_submitter.h" +#include + #include "ray/core_worker/transport/dependency_resolver.h" #include "ray/gcs/pb_util.h" -#include - namespace ray { namespace core { @@ -90,8 +90,8 @@ Status NormalTaskSubmitter::SubmitTask(TaskSpecification task_spec) { } // auto end = std::chrono::steady_clock::now(); - // auto elapsed = std::chrono::duration_cast(end - start).count(); - // RAY_LOG(INFO) << "elapse for resolution " << elapsed; + // auto elapsed = std::chrono::duration_cast(end - + // start).count(); RAY_LOG(INFO) << "elapse for resolution " << elapsed; if (!keep_executing) { RAY_UNUSED(task_finisher_->FailOrRetryPendingTask( diff --git a/src/ray/raylet/local_task_manager.cc b/src/ray/raylet/local_task_manager.cc index 3d2f6bcb5094..07e49785b339 100644 --- a/src/ray/raylet/local_task_manager.cc +++ b/src/ray/raylet/local_task_manager.cc @@ -83,7 +83,8 @@ void LocalTaskManager::QueueAndScheduleTask(std::shared_ptr work bool LocalTaskManager::WaitForTaskArgsRequests(std::shared_ptr work) { { const auto &spec = work->task.GetTaskSpecification(); - RAY_LOG(INFO) << "at WaitForTaskArgsRequests, serialized runtime env empty ? " << spec.SerializedRuntimeEnv().empty(); + RAY_LOG(INFO) << "at WaitForTaskArgsRequests, serialized runtime env empty ? " + << spec.SerializedRuntimeEnv().empty(); work->task.MutableTaskSpec().ClearRuntimeEnv(); } diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 974cbcc58d03..2170291d8000 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1797,10 +1797,12 @@ void NodeManager::HandleReportWorkerBacklog(rpc::ReportWorkerBacklogRequest requ void NodeManager::HandleRequestWorkerLease(rpc::RequestWorkerLeaseRequest request, rpc::RequestWorkerLeaseReply *reply, rpc::SendReplyCallback send_reply_callback) { - ////// debug???? problematic??? - request.mutable_resource_spec()->mutable_runtime_env_info()->mutable_serialized_runtime_env()->clear(); - + request.mutable_resource_spec() + ->mutable_runtime_env_info() + ->mutable_serialized_runtime_env() + ->clear(); + rpc::Task task_message; task_message.mutable_task_spec()->CopyFrom(request.resource_spec()); RayTask task(task_message);