diff --git a/BUILD.bazel b/BUILD.bazel index bcace50105a6..745e19295f94 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -716,6 +716,7 @@ ray_cc_library( "@boost//:asio", "@boost//:beast", "@boost//:system", + "@boost//:stacktrace", "@com_github_jupp0r_prometheus_cpp//pull", "@com_google_absl//absl/base:core_headers", "@com_google_absl//absl/container:flat_hash_set", diff --git a/cpp/src/ray/runtime/task/native_task_submitter.cc b/cpp/src/ray/runtime/task/native_task_submitter.cc index bd1a544ca23a..87a8a17eab2f 100644 --- a/cpp/src/ray/runtime/task/native_task_submitter.cc +++ b/cpp/src/ray/runtime/task/native_task_submitter.cc @@ -65,7 +65,7 @@ ObjectID NativeTaskSubmitter::Submit(InvocationSpec &invocation, TaskOptions options{}; options.name = call_options.name; options.resources = call_options.resources; - options.serialized_runtime_env_info = call_options.serialized_runtime_env_info; + // options.serialized_runtime_env_info = call_options.serialized_runtime_env_info; options.generator_backpressure_num_objects = -1; std::vector return_refs; if (invocation.task_type == TaskType::ACTOR_TASK) { diff --git a/release/benchmarks/distributed/test_many_tasks.py b/release/benchmarks/distributed/test_many_tasks.py index 6f524cf4e3ea..999f31db0dfa 100644 --- a/release/benchmarks/distributed/test_many_tasks.py +++ b/release/benchmarks/distributed/test_many_tasks.py @@ -5,20 +5,22 @@ import tqdm from ray.util.state import summarize_tasks -from many_nodes_tests.dashboard_test import DashboardTestAtScale + +# from many_nodes_tests.dashboard_test import DashboardTestAtScale from ray._private.state_api_test_utils import ( StateAPICallSpec, periodic_invoke_state_apis_with_actor, summarize_worker_startup_time, ) -sleep_time = 300 +sleep_time = 60 def test_max_running_tasks(num_tasks): cpus_per_task = 0.25 - @ray.remote(num_cpus=cpus_per_task) + @ray.remote(num_cpus=cpus_per_task, runtime_env={"env_vars": {"FOO": "bar"}}) + # @ray.remote(num_cpus=cpus_per_task) def task(): time.sleep(sleep_time) @@ -67,11 +69,10 @@ def no_resource_leaks(): @click.command() @click.option("--num-tasks", required=True, type=int, help="Number of tasks to launch.") def test(num_tasks): - addr = ray.init(address="auto") + ray.init() test_utils.wait_for_condition(no_resource_leaks) monitor_actor = test_utils.monitor_memory_usage() - dashboard_test = DashboardTestAtScale(addr) def not_none(res): return res is not None @@ -83,7 +84,11 @@ def not_none(res): ) start_time = time.time() - used_cpus = test_max_running_tasks(num_tasks) + used_cpus = 0 + try: + used_cpus = test_max_running_tasks(num_tasks) + except Exception as e: + print(str(e)) end_time = time.time() ray.get(monitor_actor.stop_run.remote()) used_gb, usage = ray.get(monitor_actor.get_peak_memory_info.remote()) @@ -91,15 +96,10 @@ def not_none(res): print(f"Peak memory usage per processes:\n {usage}") ray.get(api_caller.stop.remote()) - del api_caller - del monitor_actor - test_utils.wait_for_condition(no_resource_leaks) - - try: - summarize_worker_startup_time() - except Exception as e: - print("Failed to summarize worker startup time.") - print(e) + print( + f"end time = {end_time}, start time = {start_time}," + "sleep time = {sleep_time}, num tasks = {num_tasks}" + ) rate = num_tasks / (end_time - start_time - sleep_time) print( @@ -128,10 +128,21 @@ def not_none(res): }, ], } + print("=======") + print(results) - dashboard_test.update_release_test_result(results) test_utils.safe_write_to_results_json(results) + del api_caller + del monitor_actor + test_utils.wait_for_condition(no_resource_leaks) + + try: + summarize_worker_startup_time() + except Exception as e: + print("Failed to summarize worker startup time.") + print(e) + if __name__ == "__main__": test() diff --git a/src/ray/common/task/task.cc b/src/ray/common/task/task.cc index e2ac8571c4e5..b9924f996709 100644 --- a/src/ray/common/task/task.cc +++ b/src/ray/common/task/task.cc @@ -33,6 +33,8 @@ RayTask::RayTask(TaskSpecification task_spec, std::string preferred_node_id) const TaskSpecification &RayTask::GetTaskSpecification() const { return task_spec_; } +TaskSpecification &RayTask::MutableTaskSpec() { return task_spec_; } + const std::vector &RayTask::GetDependencies() const { return dependencies_; } diff --git a/src/ray/common/task/task.h b/src/ray/common/task/task.h index 5a4a9e323de5..949549677504 100644 --- a/src/ray/common/task/task.h +++ b/src/ray/common/task/task.h @@ -50,6 +50,8 @@ class RayTask { /// \return The immutable specification for the task. const TaskSpecification &GetTaskSpecification() const; + TaskSpecification &MutableTaskSpec(); + /// Get the task's object dependencies. This comprises the immutable task /// arguments and the mutable execution dependencies. /// diff --git a/src/ray/common/task/task_spec.cc b/src/ray/common/task/task_spec.cc index a1b3c04f80b3..7e1e73c408ab 100644 --- a/src/ray/common/task/task_spec.cc +++ b/src/ray/common/task/task_spec.cc @@ -181,15 +181,19 @@ ray::FunctionDescriptor TaskSpecification::FunctionDescriptor() const { return ray::FunctionDescriptorBuilder::FromProto(message_->function_descriptor()); } -rpc::RuntimeEnvInfo TaskSpecification::RuntimeEnvInfo() const { +const rpc::RuntimeEnvInfo &TaskSpecification::RuntimeEnvInfo() const { return message_->runtime_env_info(); } -std::string TaskSpecification::SerializedRuntimeEnv() const { +const std::string &TaskSpecification::SerializedRuntimeEnv() const { return message_->runtime_env_info().serialized_runtime_env(); } -rpc::RuntimeEnvConfig TaskSpecification::RuntimeEnvConfig() const { +void TaskSpecification::ClearRuntimeEnv() { + message_->mutable_runtime_env_info()->clear_serialized_runtime_env(); +} + +const rpc::RuntimeEnvConfig &TaskSpecification::RuntimeEnvConfig() const { return message_->runtime_env_info().runtime_env_config(); } diff --git a/src/ray/common/task/task_spec.h b/src/ray/common/task/task_spec.h index 019835062d31..7530f5ffd9fa 100644 --- a/src/ray/common/task/task_spec.h +++ b/src/ray/common/task/task_spec.h @@ -306,11 +306,13 @@ class TaskSpecification : public MessageWrapper { ray::FunctionDescriptor FunctionDescriptor() const; - [[nodiscard]] rpc::RuntimeEnvInfo RuntimeEnvInfo() const; + [[nodiscard]] const rpc::RuntimeEnvInfo &RuntimeEnvInfo() const; - std::string SerializedRuntimeEnv() const; + const std::string &SerializedRuntimeEnv() const; - rpc::RuntimeEnvConfig RuntimeEnvConfig() const; + void ClearRuntimeEnv(); + + const rpc::RuntimeEnvConfig &RuntimeEnvConfig() const; bool HasRuntimeEnv() const; diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index a09d1e4afc0c..83a9332b2d63 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -2173,6 +2173,8 @@ std::vector CoreWorker::SubmitTask( const std::string &debugger_breakpoint, const std::string &serialized_retry_exception_allowlist, const TaskID current_task_id) { + RAY_LOG(INFO) << "CoreWorker::SubmitTask " << task_options.serialized_runtime_env_info; + RAY_CHECK(scheduling_strategy.scheduling_strategy_case() != rpc::SchedulingStrategy::SchedulingStrategyCase::SCHEDULING_STRATEGY_NOT_SET); @@ -2206,7 +2208,7 @@ std::vector CoreWorker::SubmitTask( constrained_resources, debugger_breakpoint, depth, - task_options.serialized_runtime_env_info, + "", // task_options.serialized_runtime_env_info, worker_context_.GetMainThreadOrActorCreationTaskID(), /*concurrency_group_name*/ "", /*include_job_config*/ true, diff --git a/src/ray/core_worker/transport/normal_task_submitter.cc b/src/ray/core_worker/transport/normal_task_submitter.cc index 9e8ac970db1b..5eec57e657a0 100644 --- a/src/ray/core_worker/transport/normal_task_submitter.cc +++ b/src/ray/core_worker/transport/normal_task_submitter.cc @@ -14,6 +14,8 @@ #include "ray/core_worker/transport/normal_task_submitter.h" +#include + #include "ray/core_worker/transport/dependency_resolver.h" #include "ray/gcs/pb_util.h" @@ -39,10 +41,14 @@ Status NormalTaskSubmitter::SubmitTask(TaskSpecification task_spec) { RAY_LOG(DEBUG) << "Task dependencies resolved " << task_spec.TaskId(); bool keep_executing = true; + + // auto start = std::chrono::steady_clock::now(); + { absl::MutexLock lock(&mu_); - if (cancelled_tasks_.find(task_spec.TaskId()) != cancelled_tasks_.end()) { - cancelled_tasks_.erase(task_spec.TaskId()); + auto task_iter = cancelled_tasks_.find(task_spec.TaskId()); + if (task_iter != cancelled_tasks_.end()) { + cancelled_tasks_.erase(task_iter); keep_executing = false; } if (keep_executing) { @@ -82,6 +88,11 @@ Status NormalTaskSubmitter::SubmitTask(TaskSpecification task_spec) { RequestNewWorkerIfNeeded(scheduling_key); } } + + // auto end = std::chrono::steady_clock::now(); + // auto elapsed = std::chrono::duration_cast(end - + // start).count(); RAY_LOG(INFO) << "elapse for resolution " << elapsed; + if (!keep_executing) { RAY_UNUSED(task_finisher_->FailOrRetryPendingTask( task_spec.TaskId(), rpc::ErrorType::TASK_CANCELLED, nullptr)); @@ -333,7 +344,7 @@ void NormalTaskSubmitter::RequestNewWorkerIfNeeded(const SchedulingKey &scheduli // same TaskID to request a worker auto resource_spec_msg = scheduling_key_entry.resource_spec.GetMutableMessage(); resource_spec_msg.set_task_id(TaskID::FromRandom(job_id_).Binary()); - const TaskSpecification resource_spec = TaskSpecification(resource_spec_msg); + const TaskSpecification resource_spec = TaskSpecification(std::move(resource_spec_msg)); rpc::Address best_node_address; const bool is_spillback = (raylet_address != nullptr); bool is_selected_based_on_locality = false; diff --git a/src/ray/raylet/local_task_manager.cc b/src/ray/raylet/local_task_manager.cc index ed1560fc65b9..07e49785b339 100644 --- a/src/ray/raylet/local_task_manager.cc +++ b/src/ray/raylet/local_task_manager.cc @@ -76,11 +76,18 @@ void LocalTaskManager::QueueAndScheduleTask(std::shared_ptr work // guarantee that the local node is not selected for scheduling. ASSERT_FALSE( cluster_resource_scheduler_->GetLocalResourceManager().IsLocalNodeDraining()); - WaitForTaskArgsRequests(work); + WaitForTaskArgsRequests(std::move(work)); ScheduleAndDispatchTasks(); } bool LocalTaskManager::WaitForTaskArgsRequests(std::shared_ptr work) { + { + const auto &spec = work->task.GetTaskSpecification(); + RAY_LOG(INFO) << "at WaitForTaskArgsRequests, serialized runtime env empty ? " + << spec.SerializedRuntimeEnv().empty(); + work->task.MutableTaskSpec().ClearRuntimeEnv(); + } + const auto &task = work->task; const auto &task_id = task.GetTaskSpecification().TaskId(); const auto &scheduling_key = task.GetTaskSpecification().GetSchedulingClass(); @@ -93,18 +100,18 @@ bool LocalTaskManager::WaitForTaskArgsRequests(std::shared_ptr w {task.GetTaskSpecification().GetName(), task.GetTaskSpecification().IsRetry()}); if (args_ready) { RAY_LOG(DEBUG) << "Args already ready, task can be dispatched " << task_id; - tasks_to_dispatch_[scheduling_key].push_back(work); + tasks_to_dispatch_[scheduling_key].emplace_back(std::move(work)); } else { RAY_LOG(DEBUG) << "Waiting for args for task: " << task.GetTaskSpecification().TaskId(); can_dispatch = false; - auto it = waiting_task_queue_.insert(waiting_task_queue_.end(), work); + auto it = waiting_task_queue_.insert(waiting_task_queue_.end(), std::move(work)); RAY_CHECK(waiting_tasks_index_.emplace(task_id, it).second); } } else { RAY_LOG(DEBUG) << "No args, task can be dispatched " << task.GetTaskSpecification().TaskId(); - tasks_to_dispatch_[scheduling_key].push_back(work); + tasks_to_dispatch_[scheduling_key].emplace_back(std::move(work)); } return can_dispatch; } @@ -249,7 +256,11 @@ void LocalTaskManager::DispatchScheduledTasksToWorkers() { for (auto work_it = dispatch_queue.begin(); work_it != dispatch_queue.end();) { auto &work = *work_it; const auto &task = work->task; - const auto spec = task.GetTaskSpecification(); + const auto &spec = task.GetTaskSpecification(); + + RAY_LOG(INFO) << "first task spec empty ? " + << spec.RuntimeEnvInfo().serialized_runtime_env().empty(); + TaskID task_id = spec.TaskId(); if (work->GetState() == internal::WorkStatus::WAITING_FOR_WORKER) { work_it++; diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 879edff0bb6c..2170291d8000 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1797,6 +1797,12 @@ void NodeManager::HandleReportWorkerBacklog(rpc::ReportWorkerBacklogRequest requ void NodeManager::HandleRequestWorkerLease(rpc::RequestWorkerLeaseRequest request, rpc::RequestWorkerLeaseReply *reply, rpc::SendReplyCallback send_reply_callback) { + ////// debug???? problematic??? + request.mutable_resource_spec() + ->mutable_runtime_env_info() + ->mutable_serialized_runtime_env() + ->clear(); + rpc::Task task_message; task_message.mutable_task_spec()->CopyFrom(request.resource_spec()); RayTask task(task_message); diff --git a/src/ray/raylet/resolution_cache.h b/src/ray/raylet/resolution_cache.h new file mode 100644 index 000000000000..9e5a23cf2d77 --- /dev/null +++ b/src/ray/raylet/resolution_cache.h @@ -0,0 +1,41 @@ +#pragma once + +#include + +#include "absl/container/flat_hash_map.h" + +namespace ray { + +template +class ResolutionCache { + public: + std::optional Get(const Key &key) { + std::lock_guard lck(mtx_); + auto iter = map_.find(key); + if (iter == map_.end()) { + return std::nullopt; + } + return iter->second; + } + + std::optional GetAndPop(const Key &key) { + std::lock_guard lck(mtx_); + auto iter = map_.find(key); + if (iter == map_.end()) { + return std::nullopt; + } + auto value = std::move(iter->second); + map_.erase(iter); + return value; + } + + void Put(Key key, Value val) { + std::lock_guard lck(mtx_); + map_.emplace(std::move(key), std::move(val)); + } + + std::mutex mtx_; + absl::flat_hash_map map_; +}; + +} // namespace ray diff --git a/src/ray/raylet/runtime_env_agent_client.cc b/src/ray/raylet/runtime_env_agent_client.cc index 82a2d5a4c6a6..c4d3358bd63e 100644 --- a/src/ray/raylet/runtime_env_agent_client.cc +++ b/src/ray/raylet/runtime_env_agent_client.cc @@ -16,6 +16,7 @@ #include #include #include +#include #include #include #include @@ -27,6 +28,7 @@ #include "absl/strings/str_format.h" #include "ray/common/asio/instrumented_io_context.h" #include "ray/common/status.h" +#include "ray/raylet/resolution_cache.h" #include "ray/util/logging.h" #include "src/ray/protobuf/runtime_env_agent.pb.h" @@ -40,6 +42,22 @@ namespace raylet { namespace { +struct ResolutionKey { + ResolutionKey(std::string_view h, std::string_view p) : host(h), port(p) {} + + std::string host; + std::string port; + + bool operator==(const ResolutionKey &rhs) const { + return host == rhs.host && port == rhs.port; + } + + template + friend H AbslHashValue(H h, const ResolutionKey &k) { + return H::combine(std::move(h), k.host, k.port); + } +}; + //------------------------------------------------------------------------------ // Simple class to make a async POST call. // Will call callback exactly once with pair{non-ok, any} or pair{ok, reply body}. @@ -91,12 +109,47 @@ class Session : public std::enable_shared_from_this { // // This method should only be called once. void run(FinishedCallback finished_callback) { - finished_callback_ = finished_callback; - // Starts the state machine by looking up the domain name. - resolver_.async_resolve( - host_, - port_, - beast::bind_front_handler(&Session::on_resolve, shared_from_this())); + start_ = std::chrono::steady_clock::now(); + finished_callback_ = std::move(finished_callback); + + ResolutionKey key{host_, port_}; + + bool found = false; + + int cache_hit_count = 0; + int cache_miss_count = 0; + { + std::lock_guard lck(cache_mutex_); + if (!tcp_stream_cache_.empty()) { + stream_ = std::move(tcp_stream_cache_.front()); + tcp_stream_cache_.pop_front(); + found = true; + cache_hit++; + } else { + ++cache_miss; + } + + cache_hit_count = cache_hit; + cache_miss_count = cache_miss; + } + + RAY_LOG_EVERY_N(INFO, 10) << "Cache hit " << cache_hit_count << ", cache miss " + << cache_miss_count; + + if (found) { + stream_->expires_never(); + // Send the HTTP request to the remote host + http::async_write( + *stream_, + req_, + beast::bind_front_handler(&Session::on_write, shared_from_this())); + } else { + // Starts the state machine by looking up the domain name. + resolver_.async_resolve( + host_, + port_, + beast::bind_front_handler(&Session::on_resolve, shared_from_this())); + } } private: @@ -109,13 +162,13 @@ class Session : public std::enable_shared_from_this { std::function succ_callback, std::function fail_callback) : resolver_(ioc), - stream_(ioc), + stream_(std::make_shared(ioc)), host_(std::string(host)), port_(std::string(port)), method_(method), succ_callback_(std::move(succ_callback)), fail_callback_(std::move(fail_callback)) { - stream_.expires_never(); + stream_->expires_never(); req_.method(method_); req_.target(target); req_.body() = std::move(body); @@ -143,9 +196,9 @@ class Session : public std::enable_shared_from_this { return; } - stream_.expires_never(); + stream_->expires_never(); // Make the connection on the IP address we get from a lookup - stream_.async_connect( + stream_->async_connect( results, beast::bind_front_handler(&Session::on_connect, shared_from_this())); } @@ -155,10 +208,11 @@ class Session : public std::enable_shared_from_this { return; } - stream_.expires_never(); + stream_->expires_never(); // Send the HTTP request to the remote host - http::async_write( - stream_, req_, beast::bind_front_handler(&Session::on_write, shared_from_this())); + http::async_write(*stream_, + req_, + beast::bind_front_handler(&Session::on_write, shared_from_this())); } void on_write(beast::error_code ec, std::size_t bytes_transferred) { @@ -168,9 +222,9 @@ class Session : public std::enable_shared_from_this { std::to_string(bytes_transferred))); return; } - stream_.expires_never(); + stream_->expires_never(); // Receive the HTTP response - http::async_read(stream_, + http::async_read(*stream_, buffer_, res_, beast::bind_front_handler(&Session::on_read, shared_from_this())); @@ -194,15 +248,26 @@ class Session : public std::enable_shared_from_this { } // Gracefully close the socket - stream_.socket().shutdown(tcp::socket::shutdown_both, ec); + // stream_->socket().shutdown(tcp::socket::shutdown_both, ec); + + { + std::lock_guard lck(cache_mutex_); + tcp_stream_cache_.emplace_back(std::move(stream_)); + } + // not_connected happens sometimes so don't bother reporting it. if (ec && ec != beast::errc::not_connected) { RAY_LOG(INFO) << "on_read error after response body received: " << ec.message(); } + + auto end = std::chrono::steady_clock::now(); + auto elapsed = + std::chrono::duration_cast(end - start_).count(); + RAY_LOG(INFO) << "Session takes " << elapsed; } tcp::resolver resolver_; - beast::tcp_stream stream_; + std::shared_ptr stream_; std::string host_; std::string port_; http::verb method_; @@ -212,6 +277,13 @@ class Session : public std::enable_shared_from_this { http::request req_; http::response res_; FinishedCallback finished_callback_; + + inline static std::deque> tcp_stream_cache_; + inline static std::mutex cache_mutex_; + inline static int cache_hit = 0; + inline static int cache_miss = 0; + + std::chrono::time_point start_; }; // A pool of sessions with a fixed max concurrency. Each session can handle 1 concurrent @@ -360,6 +432,9 @@ class HttpRuntimeEnvAgentClient : public RuntimeEnvAgentClient { const std::string &serialized_runtime_env, const rpc::RuntimeEnvConfig &runtime_env_config, GetOrCreateRuntimeEnvCallback callback) override { + callback(true, "{}", ""); + return; + RetryInvokeOnNotFoundWithDeadline( [=](SuccCallback succ_callback, FailCallback fail_callback) { diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 943ea89b24b5..2fa716d8e3d5 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -16,6 +16,7 @@ #include #include +#include #include #include "absl/strings/str_split.h" @@ -230,6 +231,9 @@ void WorkerPool::AddWorkerProcess( const std::chrono::high_resolution_clock::time_point &start, const rpc::RuntimeEnvInfo &runtime_env_info, const std::vector &dynamic_options) { + RAY_LOG(INFO) << "Add worker process, runtime env empty " + << runtime_env_info.serialized_runtime_env().empty(); + state.worker_processes.emplace(worker_startup_token_counter_, WorkerProcessInfo{/*is_pending_registration=*/true, worker_type, @@ -444,6 +448,9 @@ std::tuple WorkerPool::StartWorkerProcess( const int runtime_env_hash, const std::string &serialized_runtime_env_context, const rpc::RuntimeEnvInfo &runtime_env_info) { + RAY_LOG(INFO) << "at start worker process, empty " + << runtime_env_info.serialized_runtime_env().empty(); + rpc::JobConfig *job_config = nullptr; if (!job_id.IsNil()) { auto it = all_jobs_.find(job_id); @@ -1016,6 +1023,10 @@ void WorkerPool::PushWorker(const std::shared_ptr &worker) { } if (pop_worker_request) { + RAY_LOG(INFO) + << "DEBUG STR empty ? " + << pop_worker_request->runtime_env_info.serialized_runtime_env().empty(); + bool used = pop_worker_request->callback(worker, PopWorkerStatus::OK, ""); if (!used) { // Retry PushWorker. Maybe it can be used by other tasks. @@ -1205,6 +1216,10 @@ WorkerUnfitForTaskReason WorkerPool::WorkerFitsForTask( void WorkerPool::StartNewWorker( const std::shared_ptr &pop_worker_request) { + RAY_LOG(INFO) << "when start worker, runtime env empty ? " + << pop_worker_request->runtime_env_info.serialized_runtime_env().empty(); + + // <---- hjiang auto start_worker_process_fn = [this]( std::shared_ptr pop_worker_request, const std::string &serialized_runtime_env_context) { @@ -1240,7 +1255,8 @@ void WorkerPool::StartNewWorker( const std::string &serialized_runtime_env = pop_worker_request->runtime_env_info.serialized_runtime_env(); - if (!IsRuntimeEnvEmpty(serialized_runtime_env)) { + // if (!IsRuntimeEnvEmpty(serialized_runtime_env)) { + if (false) { // create runtime env. GetOrCreateRuntimeEnv( serialized_runtime_env, @@ -1267,6 +1283,9 @@ void WorkerPool::StartNewWorker( void WorkerPool::PopWorker(const TaskSpecification &task_spec, const PopWorkerCallback &callback) { + RAY_LOG(INFO) << "task spec empty ? " + << task_spec.RuntimeEnvInfo().serialized_runtime_env().empty(); + RAY_LOG(DEBUG) << "Pop worker for task " << task_spec.TaskId() << " task name " << task_spec.FunctionDescriptor()->ToString(); // Code path of actor task. @@ -1308,6 +1327,9 @@ void WorkerPool::PopWorker(const TaskSpecification &task_spec, absl::flat_hash_map skip_reason_count; + RAY_LOG(INFO) << "before work fits for work, empty ? " + << pop_worker_request->runtime_env_info.serialized_runtime_env().empty(); + auto worker_fits_for_task_fn = [this, &pop_worker_request, &skip_reason_count]( const std::pair, int64_t> &pair) -> bool { @@ -1650,6 +1672,23 @@ void WorkerPool::GetOrCreateRuntimeEnv(const std::string &serialized_runtime_env const rpc::RuntimeEnvConfig &runtime_env_config, const JobID &job_id, const GetOrCreateRuntimeEnvCallback &callback) { + RAY_LOG(INFO) << "serialized env length = " << serialized_runtime_env.length() << "\n" + << boost::stacktrace::stacktrace(); + + /* + message RuntimeEnvConfig { + /// The timeout of runtime env creation. + int32 setup_timeout_seconds = 1; + /// Indicates whether to install runtime env eagerly before the workers are leased. + bool eager_install = 2; + /// A list of files to stream the runtime env setup logs to. + repeated string log_files = 3; + } + */ + for (const auto &cur_log : runtime_env_config.log_files()) { + RAY_LOG(INFO) << "log file length = " << cur_log.length(); + } + RAY_LOG(DEBUG) << "GetOrCreateRuntimeEnv for job " << job_id << " with runtime_env " << serialized_runtime_env; runtime_env_agent_client_->GetOrCreateRuntimeEnv( @@ -1661,6 +1700,12 @@ void WorkerPool::GetOrCreateRuntimeEnv(const std::string &serialized_runtime_env const std::string &serialized_runtime_env_context, const std::string &setup_error_message) { if (successful) { + // RAY_LOG(INFO) << "runtime env context len = " << + // serialized_runtime_env_context.length() + // << "|" << serialized_runtime_env_context << "|" + // << serialized_runtime_env_context[0] << "|" + // << serialized_runtime_env_context[1] << "|"; + callback(true, serialized_runtime_env_context, ""); } else { RAY_LOG(WARNING) << "Couldn't create a runtime environment for job " << job_id @@ -1676,6 +1721,8 @@ void WorkerPool::GetOrCreateRuntimeEnv(const std::string &serialized_runtime_env void WorkerPool::DeleteRuntimeEnvIfPossible(const std::string &serialized_runtime_env) { RAY_LOG(DEBUG) << "DeleteRuntimeEnvIfPossible " << serialized_runtime_env; + return; + if (!IsRuntimeEnvEmpty(serialized_runtime_env)) { runtime_env_agent_client_->DeleteRuntimeEnvIfPossible( serialized_runtime_env, [serialized_runtime_env](bool successful) { diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index ef2e1e048635..d9ed04800077 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -539,7 +539,10 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { runtime_env_info(std::move(runtime_env_info)), runtime_env_hash(runtime_hash), dynamic_options(std::move(options)), - callback(std::move(callback)) {} + callback(std::move(callback)) { + // RAY_LOG(INFO) << "DEBUG STR empty ? " << + // runtime_env_info.serialized_runtime_env().empty(); + } }; // Starts a new worker that fulfills `pop_worker_request`.