From fcef11c938e96ce893d646b1423bd5d45b2858d9 Mon Sep 17 00:00:00 2001 From: Cuong Nguyen <128072568+can-anyscale@users.noreply.github.com> Date: Fri, 6 Sep 2024 07:39:12 -0700 Subject: [PATCH 01/11] Revert "[observability][export-api] Write task events" (#47536) Reverts ray-project/ray#47193 --- src/ray/common/ray_config_def.h | 10 - src/ray/core_worker/core_worker_process.cc | 3 +- src/ray/core_worker/task_event_buffer.cc | 187 ++---------------- src/ray/core_worker/task_event_buffer.h | 52 +---- .../test/task_event_buffer_test.cc | 138 ------------- src/ray/gcs/pb_util.h | 82 -------- src/ray/util/event.cc | 12 +- 7 files changed, 33 insertions(+), 451 deletions(-) diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 7f48c7926c8a..849b0b147cff 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -498,20 +498,10 @@ RAY_CONFIG(int64_t, task_events_dropped_task_attempts_gc_threshold_s, 15 * 60) /// workers. Events will be evicted based on a FIFO order. RAY_CONFIG(uint64_t, task_events_max_num_status_events_buffer_on_worker, 100 * 1000) -/// Max number of task status events that will be stored to export -/// for the export API. Events will be evicted based on a FIFO order. -RAY_CONFIG(uint64_t, - task_events_max_num_export_status_events_buffer_on_worker, - 1000 * 1000) - /// Max number of task events to be send in a single message to GCS. This caps both /// the message size, and also the processing work on GCS. RAY_CONFIG(uint64_t, task_events_send_batch_size, 10 * 1000) -/// Max number of task events to be written in a single flush iteration. This -/// caps the number of file writes per iteration. -RAY_CONFIG(uint64_t, export_task_events_write_batch_size, 10 * 1000) - /// Max number of profile events allowed to be tracked for a single task. /// Setting the value to -1 allows unlimited profile events to be tracked. RAY_CONFIG(int64_t, task_events_max_num_profile_events_per_task, 1000) diff --git a/src/ray/core_worker/core_worker_process.cc b/src/ray/core_worker/core_worker_process.cc index 191788e7e045..d587fcee8b7e 100644 --- a/src/ray/core_worker/core_worker_process.cc +++ b/src/ray/core_worker/core_worker_process.cc @@ -137,8 +137,7 @@ CoreWorkerProcessImpl::CoreWorkerProcessImpl(const CoreWorkerOptions &options) // Initialize event framework. if (RayConfig::instance().event_log_reporter_enabled() && !options_.log_dir.empty()) { const std::vector source_types = { - ray::rpc::Event_SourceType::Event_SourceType_CORE_WORKER, - ray::rpc::ExportEvent_SourceType::ExportEvent_SourceType_EXPORT_TASK}; + ray::rpc::Event_SourceType::Event_SourceType_CORE_WORKER}; RayEventInit(source_types, absl::flat_hash_map(), options_.log_dir, diff --git a/src/ray/core_worker/task_event_buffer.cc b/src/ray/core_worker/task_event_buffer.cc index 1971b5efd563..115811d3a999 100644 --- a/src/ray/core_worker/task_event_buffer.cc +++ b/src/ray/core_worker/task_event_buffer.cc @@ -15,7 +15,6 @@ #include "ray/core_worker/task_event_buffer.h" #include "ray/gcs/pb_util.h" -#include "ray/util/event.h" namespace ray { namespace core { @@ -109,62 +108,6 @@ void TaskStatusEvent::ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) { } } -void TaskStatusEvent::ToRpcTaskExportEvents( - std::shared_ptr rpc_task_export_event_data) { - // Base fields - rpc_task_export_event_data->set_task_id(task_id_.Binary()); - rpc_task_export_event_data->set_job_id(job_id_.Binary()); - rpc_task_export_event_data->set_attempt_number(attempt_number_); - - // Task info. - if (task_spec_) { - gcs::FillExportTaskInfo(rpc_task_export_event_data->mutable_task_info(), *task_spec_); - } - - // Task status update. - auto dst_state_update = rpc_task_export_event_data->mutable_state_updates(); - gcs::FillExportTaskStatusUpdateTime(task_status_, timestamp_, dst_state_update); - - if (!state_update_.has_value()) { - return; - } - - if (state_update_->node_id_.has_value()) { - RAY_CHECK(task_status_ == rpc::TaskStatus::SUBMITTED_TO_WORKER) - << "Node ID should be included when task status changes to " - "SUBMITTED_TO_WORKER."; - dst_state_update->set_node_id(state_update_->node_id_->Binary()); - } - - if (state_update_->worker_id_.has_value()) { - RAY_CHECK(task_status_ == rpc::TaskStatus::SUBMITTED_TO_WORKER) - << "Worker ID should be included when task status changes to " - "SUBMITTED_TO_WORKER."; - dst_state_update->set_worker_id(state_update_->worker_id_->Binary()); - } - - if (state_update_->error_info_.has_value()) { - auto error_info = dst_state_update->mutable_error_info(); - error_info->set_error_message((*state_update_->error_info_).error_message()); - error_info->set_error_type((*state_update_->error_info_).error_type()); - } - - if (state_update_->task_log_info_.has_value()) { - rpc::ExportTaskEventData::TaskLogInfo export_task_log_info; - gcs::TaskLogInfoToExport(state_update_->task_log_info_.value(), - &export_task_log_info); - dst_state_update->mutable_task_log_info()->MergeFrom(export_task_log_info); - } - - if (state_update_->pid_.has_value()) { - dst_state_update->set_worker_pid(state_update_->pid_.value()); - } - - if (state_update_->is_debugger_paused_.has_value()) { - dst_state_update->set_is_debugger_paused(state_update_->is_debugger_paused_.value()); - } -} - void TaskProfileEvent::ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) { // Rate limit on the number of profiling events from the task. This is especially the // case if a driver has many profiling events when submitting tasks @@ -184,24 +127,6 @@ void TaskProfileEvent::ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) { event_entry->set_extra_data(std::move(extra_data_)); } -void TaskProfileEvent::ToRpcTaskExportEvents( - std::shared_ptr rpc_task_export_event_data) { - auto profile_events = rpc_task_export_event_data->mutable_profile_events(); - - // Base fields - rpc_task_export_event_data->set_task_id(task_id_.Binary()); - rpc_task_export_event_data->set_job_id(job_id_.Binary()); - rpc_task_export_event_data->set_attempt_number(attempt_number_); - profile_events->set_component_type(std::move(component_type_)); - profile_events->set_component_id(std::move(component_id_)); - profile_events->set_node_ip_address(std::move(node_ip_address_)); - auto event_entry = profile_events->add_events(); - event_entry->set_event_name(std::move(event_name_)); - event_entry->set_start_time(start_time_); - event_entry->set_end_time(end_time_); - event_entry->set_extra_data(std::move(extra_data_)); -} - TaskEventBufferImpl::TaskEventBufferImpl(std::shared_ptr gcs_client) : work_guard_(boost::asio::make_work_guard(io_service_)), periodical_runner_(io_service_), @@ -212,15 +137,12 @@ TaskEventBufferImpl::~TaskEventBufferImpl() { Stop(); } Status TaskEventBufferImpl::Start(bool auto_flush) { absl::MutexLock lock(&mutex_); - export_event_write_enabled_ = RayConfig::instance().enable_export_api_write(); auto report_interval_ms = RayConfig::instance().task_events_report_interval_ms(); RAY_CHECK(report_interval_ms > 0) << "RAY_task_events_report_interval_ms should be > 0 to use TaskEventBuffer."; status_events_.set_capacity( RayConfig::instance().task_events_max_num_status_events_buffer_on_worker()); - status_events_for_export_.set_capacity( - RayConfig::instance().task_events_max_num_export_status_events_buffer_on_worker()); io_thread_ = std::thread([this]() { #ifndef _WIN32 @@ -288,27 +210,10 @@ void TaskEventBufferImpl::Stop() { bool TaskEventBufferImpl::Enabled() const { return enabled_; } void TaskEventBufferImpl::GetTaskStatusEventsToSend( - std::vector> *status_events_to_send, - std::vector> *status_events_to_write_for_export, + std::vector> *status_events_to_send, absl::flat_hash_set *dropped_task_attempts_to_send) { absl::MutexLock lock(&mutex_); - // Get the export events data to write. - if (export_event_write_enabled_) { - size_t num_to_write = std::min( - static_cast(RayConfig::instance().export_task_events_write_batch_size()), - static_cast(status_events_for_export_.size())); - status_events_to_write_for_export->insert( - status_events_to_write_for_export->end(), - std::make_move_iterator(status_events_for_export_.begin()), - std::make_move_iterator(status_events_for_export_.begin() + num_to_write)); - status_events_for_export_.erase(status_events_for_export_.begin(), - status_events_for_export_.begin() + num_to_write); - stats_counter_.Decrement( - TaskEventBufferCounter::kNumTaskStatusEventsForExportAPIStored, - status_events_to_write_for_export->size()); - } - // No data to send. if (status_events_.empty() && dropped_task_attempts_unreported_.empty()) { return; @@ -347,7 +252,7 @@ void TaskEventBufferImpl::GetTaskStatusEventsToSend( } void TaskEventBufferImpl::GetTaskProfileEventsToSend( - std::vector> *profile_events_to_send) { + std::vector> *profile_events_to_send) { absl::MutexLock lock(&profile_mutex_); size_t batch_size = @@ -373,13 +278,13 @@ void TaskEventBufferImpl::GetTaskProfileEventsToSend( } std::unique_ptr TaskEventBufferImpl::CreateDataToSend( - std::vector> &&status_events_to_send, - std::vector> &&profile_events_to_send, + std::vector> &&status_events_to_send, + std::vector> &&profile_events_to_send, absl::flat_hash_set &&dropped_task_attempts_to_send) { // Aggregate the task events by TaskAttempt. absl::flat_hash_map agg_task_events; auto to_rpc_event_fn = [this, &agg_task_events, &dropped_task_attempts_to_send]( - std::shared_ptr &event) { + std::unique_ptr &event) { if (dropped_task_attempts_to_send.count(event->GetTaskAttempt())) { // We are marking this as data loss due to some missing task status updates. // We will not send this event to GCS. @@ -426,45 +331,6 @@ std::unique_ptr TaskEventBufferImpl::CreateDataToSend( return data; } -void TaskEventBufferImpl::WriteExportData( - std::vector> &&status_events_to_write_for_export, - std::vector> &&profile_events_to_send) { - absl::flat_hash_map> - agg_task_events; - // Maintain insertion order to agg_task_events so events are written - // in the same order as the buffer. - std::vector agg_task_event_insertion_order; - auto to_rpc_event_fn = [&agg_task_events, &agg_task_event_insertion_order]( - std::shared_ptr &event) { - // Aggregate events by task attempt before converting to proto - auto itr = agg_task_events.find(event->GetTaskAttempt()); - if (itr == agg_task_events.end()) { - // Insert event into agg_task_events if the task attempt of that - // event wasn't already added. - auto event_for_attempt = std::make_shared(); - auto inserted = - agg_task_events.insert({event->GetTaskAttempt(), event_for_attempt}); - RAY_CHECK(inserted.second); - agg_task_event_insertion_order.push_back(event->GetTaskAttempt()); - event->ToRpcTaskExportEvents(event_for_attempt); - } else { - event->ToRpcTaskExportEvents(itr->second); - } - }; - - std::for_each(status_events_to_write_for_export.begin(), - status_events_to_write_for_export.end(), - to_rpc_event_fn); - std::for_each( - profile_events_to_send.begin(), profile_events_to_send.end(), to_rpc_event_fn); - - for (const auto &task_attempt : agg_task_event_insertion_order) { - auto it = agg_task_events.find(task_attempt); - RAY_CHECK(it != agg_task_events.end()); - RayExportEvent(it->second).SendEvent(); - } -} - void TaskEventBufferImpl::FlushEvents(bool forced) { if (!enabled_) { return; @@ -484,16 +350,13 @@ void TaskEventBufferImpl::FlushEvents(bool forced) { } // Take out status events from the buffer. - std::vector> status_events_to_send; - std::vector> status_events_to_write_for_export; + std::vector> status_events_to_send; absl::flat_hash_set dropped_task_attempts_to_send; status_events_to_send.reserve(RayConfig::instance().task_events_send_batch_size()); - GetTaskStatusEventsToSend(&status_events_to_send, - &status_events_to_write_for_export, - &dropped_task_attempts_to_send); + GetTaskStatusEventsToSend(&status_events_to_send, &dropped_task_attempts_to_send); // Take profile events from the status events. - std::vector> profile_events_to_send; + std::vector> profile_events_to_send; profile_events_to_send.reserve(RayConfig::instance().task_events_send_batch_size()); GetTaskProfileEventsToSend(&profile_events_to_send); @@ -502,10 +365,6 @@ void TaskEventBufferImpl::FlushEvents(bool forced) { CreateDataToSend(std::move(status_events_to_send), std::move(profile_events_to_send), std::move(dropped_task_attempts_to_send)); - if (export_event_write_enabled_) { - WriteExportData(std::move(status_events_to_write_for_export), - std::move(profile_events_to_send)); - } gcs::TaskInfoAccessor *task_accessor; { @@ -579,19 +438,8 @@ void TaskEventBufferImpl::AddTaskStatusEvent(std::unique_ptr status_e if (!enabled_) { return; } - std::shared_ptr status_event_shared_ptr = std::move(status_event); - if (export_event_write_enabled_) { - // If status_events_for_export_ is full, the oldest event will be - // dropped in the circular buffer and replaced with the current event. - if (!status_events_for_export_.full()) { - stats_counter_.Increment( - TaskEventBufferCounter::kNumTaskStatusEventsForExportAPIStored); - } - status_events_for_export_.push_back(status_event_shared_ptr); - } - if (dropped_task_attempts_unreported_.count( - status_event_shared_ptr->GetTaskAttempt())) { + if (dropped_task_attempts_unreported_.count(status_event->GetTaskAttempt())) { // This task attempt has been dropped before, so we drop this event. stats_counter_.Increment( TaskEventBufferCounter::kNumTaskStatusEventDroppedSinceLastFlush); @@ -606,7 +454,7 @@ void TaskEventBufferImpl::AddTaskStatusEvent(std::unique_ptr status_e RAY_LOG_EVERY_N(WARNING, 100000) << "Dropping task status events for task: " - << status_event_shared_ptr->GetTaskAttempt().first + << status_event->GetTaskAttempt().first << ", set a higher value for " "RAY_task_events_max_num_status_events_buffer_on_worker(" << RayConfig::instance().task_events_max_num_status_events_buffer_on_worker() @@ -618,7 +466,7 @@ void TaskEventBufferImpl::AddTaskStatusEvent(std::unique_ptr status_e } else { stats_counter_.Increment(TaskEventBufferCounter::kNumTaskStatusEventsStored); } - status_events_.push_back(status_event_shared_ptr); + status_events_.push_back(std::move(status_event)); } void TaskEventBufferImpl::AddTaskProfileEvent(std::unique_ptr profile_event) { @@ -626,12 +474,10 @@ void TaskEventBufferImpl::AddTaskProfileEvent(std::unique_ptr profile if (!enabled_) { return; } - std::shared_ptr profile_event_shared_ptr = std::move(profile_event); - auto profile_events_itr = - profile_events_.find(profile_event_shared_ptr->GetTaskAttempt()); + auto profile_events_itr = profile_events_.find(profile_event->GetTaskAttempt()); if (profile_events_itr == profile_events_.end()) { - auto inserted = profile_events_.insert({profile_event_shared_ptr->GetTaskAttempt(), - std::vector>()}); + auto inserted = profile_events_.insert( + {profile_event->GetTaskAttempt(), std::vector>()}); RAY_CHECK(inserted.second); profile_events_itr = inserted.first; } @@ -655,8 +501,7 @@ void TaskEventBufferImpl::AddTaskProfileEvent(std::unique_ptr profile // driver task id and it could generate large number of profile events when submitting // many tasks. RAY_LOG_EVERY_N(WARNING, 100000) - << "Dropping profiling events for task: " - << profile_event_shared_ptr->GetTaskAttempt().first + << "Dropping profiling events for task: " << profile_event->GetTaskAttempt().first << ", set a higher value for RAY_task_events_max_num_profile_events_per_task(" << max_num_profile_event_per_task << "), or RAY_task_events_max_num_profile_events_buffer_on_worker (" @@ -665,7 +510,7 @@ void TaskEventBufferImpl::AddTaskProfileEvent(std::unique_ptr profile } stats_counter_.Increment(TaskEventBufferCounter::kNumTaskProfileEventsStored); - profile_events_itr->second.push_back(profile_event_shared_ptr); + profile_events_itr->second.push_back(std::move(profile_event)); } const std::string TaskEventBufferImpl::DebugString() { diff --git a/src/ray/core_worker/task_event_buffer.h b/src/ray/core_worker/task_event_buffer.h index a2783521a612..30dbb780cab0 100644 --- a/src/ray/core_worker/task_event_buffer.h +++ b/src/ray/core_worker/task_event_buffer.h @@ -27,7 +27,6 @@ #include "ray/common/task/task_spec.h" #include "ray/gcs/gcs_client/gcs_client.h" #include "ray/util/counter_map.h" -#include "src/ray/protobuf/export_api/export_task_event.pb.h" #include "src/ray/protobuf/gcs.pb.h" namespace ray { @@ -58,12 +57,6 @@ class TaskEvent { /// \param[out] rpc_task_events The rpc task event to be filled. virtual void ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) = 0; - /// Convert itself a rpc::ExportTaskEventData - /// - /// \param[out] rpc_task_export_event_data The rpc export task event data to be filled. - virtual void ToRpcTaskExportEvents( - std::shared_ptr rpc_task_export_event_data) = 0; - /// If it is a profile event. virtual bool IsProfileEvent() const = 0; @@ -133,9 +126,6 @@ class TaskStatusEvent : public TaskEvent { void ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) override; - void ToRpcTaskExportEvents( - std::shared_ptr rpc_task_export_event_data) override; - bool IsProfileEvent() const override { return false; } private: @@ -163,9 +153,6 @@ class TaskProfileEvent : public TaskEvent { void ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) override; - void ToRpcTaskExportEvents( - std::shared_ptr rpc_task_export_event_data) override; - bool IsProfileEvent() const override { return true; } void SetEndTime(int64_t end_time) { end_time_ = end_time; } @@ -190,7 +177,6 @@ enum TaskEventBufferCounter { kNumTaskProfileEventsStored, kNumTaskStatusEventsStored, kNumDroppedTaskAttemptsStored, - kNumTaskStatusEventsForExportAPIStored, kTotalNumTaskProfileEventDropped, kTotalNumTaskStatusEventDropped, kTotalNumTaskAttemptsReported, @@ -309,15 +295,10 @@ class TaskEventBufferImpl : public TaskEventBuffer { /// Get data related to task status events to be send to GCS. /// /// \param[out] status_events_to_send Task status events to be sent. - /// \param[out] status_events_to_write_for_export Task status events that will - /// be written to the Export API. This includes both status events - /// that are sent to GCS, and as many dropped status events that - /// fit in the buffer. /// \param[out] dropped_task_attempts_to_send Task attempts that were dropped due to /// status events being dropped. void GetTaskStatusEventsToSend( - std::vector> *status_events_to_send, - std::vector> *status_events_to_write_for_export, + std::vector> *status_events_to_send, absl::flat_hash_set *dropped_task_attempts_to_send) ABSL_LOCKS_EXCLUDED(mutex_); @@ -325,7 +306,7 @@ class TaskEventBufferImpl : public TaskEventBuffer { /// /// \param[out] profile_events_to_send Task profile events to be sent. void GetTaskProfileEventsToSend( - std::vector> *profile_events_to_send) + std::vector> *profile_events_to_send) ABSL_LOCKS_EXCLUDED(profile_mutex_); /// Get the task events to GCS. @@ -336,21 +317,10 @@ class TaskEventBufferImpl : public TaskEventBuffer { /// status events being dropped. /// \return A unique_ptr to rpc::TaskEvents to be sent to GCS. std::unique_ptr CreateDataToSend( - std::vector> &&status_events_to_send, - std::vector> &&profile_events_to_send, + std::vector> &&status_events_to_send, + std::vector> &&profile_events_to_send, absl::flat_hash_set &&dropped_task_attempts_to_send); - /// Write task events for the Export API. - /// - /// \param status_events_to_write_for_export Task status events that will - /// be written to the Export API. This includes both status events - /// that are sent to GCS, and as many dropped status events that - /// fit in the buffer. - /// \param profile_events_to_send Task profile events to be written. - void WriteExportData( - std::vector> &&status_events_to_write_for_export, - std::vector> &&profile_events_to_send); - /// Reset the counters during flushing data to GCS. void ResetCountersForFlush(); @@ -417,13 +387,7 @@ class TaskEventBufferImpl : public TaskEventBuffer { std::atomic enabled_ = false; /// Circular buffered task status events. - boost::circular_buffer> status_events_ - ABSL_GUARDED_BY(mutex_); - - /// Status events that will be written for the export API. This could - /// contain events that were dropped from being sent to GCS. A circular - /// buffer is used to limit memory. - boost::circular_buffer> status_events_for_export_ + boost::circular_buffer> status_events_ ABSL_GUARDED_BY(mutex_); /// Buffered task attempts that were dropped due to status events being dropped. @@ -432,7 +396,7 @@ class TaskEventBufferImpl : public TaskEventBuffer { ABSL_GUARDED_BY(mutex_); /// Buffered task profile events. A FIFO queue to be sent to GCS. - absl::flat_hash_map>> + absl::flat_hash_map>> profile_events_ ABSL_GUARDED_BY(profile_mutex_); /// Stats counter map. @@ -443,9 +407,6 @@ class TaskEventBufferImpl : public TaskEventBuffer { /// process them quick enough. std::atomic grpc_in_progress_ = false; - /// If true, task events are exported for Export API - bool export_event_write_enabled_ = false; - FRIEND_TEST(TaskEventBufferTestManualStart, TestGcsClientFail); FRIEND_TEST(TaskEventBufferTestBatchSend, TestBatchedSend); FRIEND_TEST(TaskEventBufferTest, TestAddEvent); @@ -456,7 +417,6 @@ class TaskEventBufferImpl : public TaskEventBuffer { FRIEND_TEST(TaskEventBufferTestLimitBuffer, TestBufferSizeLimitStatusEvents); FRIEND_TEST(TaskEventBufferTestLimitProfileEvents, TestBufferSizeLimitProfileEvents); FRIEND_TEST(TaskEventBufferTestLimitProfileEvents, TestLimitProfileEventsPerTask); - FRIEND_TEST(TaskEventTestWriteExport, TestWriteTaskExportEvents); }; } // namespace worker diff --git a/src/ray/core_worker/test/task_event_buffer_test.cc b/src/ray/core_worker/test/task_event_buffer_test.cc index 02dd448fa921..63e24485ec09 100644 --- a/src/ray/core_worker/test/task_event_buffer_test.cc +++ b/src/ray/core_worker/test/task_event_buffer_test.cc @@ -16,9 +16,6 @@ #include -#include -#include - #include "absl/base/thread_annotations.h" #include "absl/synchronization/mutex.h" #include "absl/types/optional.h" @@ -27,7 +24,6 @@ #include "mock/ray/gcs/gcs_client/gcs_client.h" #include "ray/common/task/task_spec.h" #include "ray/common/test_util.h" -#include "ray/util/event.h" using ::testing::_; using ::testing::Return; @@ -58,7 +54,6 @@ class TaskEventBufferTest : public ::testing::Test { virtual void TearDown() { if (task_event_buffer_) task_event_buffer_->Stop(); - std::filesystem::remove_all(log_dir_.c_str()); }; std::vector GenTaskIDs(size_t num_tasks) { @@ -129,7 +124,6 @@ class TaskEventBufferTest : public ::testing::Test { } std::unique_ptr task_event_buffer_ = nullptr; - std::string log_dir_ = "event_123"; }; class TaskEventBufferTestManualStart : public TaskEventBufferTest { @@ -180,38 +174,6 @@ class TaskEventBufferTestLimitProfileEvents : public TaskEventBufferTest { } }; -class TaskEventTestWriteExport : public TaskEventBufferTest { - public: - TaskEventTestWriteExport() : TaskEventBufferTest() { - RayConfig::instance().initialize( - R"( -{ - "task_events_report_interval_ms": 1000, - "task_events_max_num_status_events_buffer_on_worker": 10, - "task_events_max_num_profile_events_buffer_on_worker": 5, - "task_events_send_batch_size": 100, - "export_task_events_write_batch_size": 1, - "task_events_max_num_export_status_events_buffer_on_worker": 15, - "enable_export_api_write": true -} - )"); - } -}; - -void ReadContentFromFile(std::vector &vc, - std::string log_file, - std::string filter = "") { - std::string line; - std::ifstream read_file; - read_file.open(log_file, std::ios::binary); - while (std::getline(read_file, line)) { - if (filter.empty() || line.find(filter) != std::string::npos) { - vc.push_back(line); - } - } - read_file.close(); -} - TEST_F(TaskEventBufferTestManualStart, TestGcsClientFail) { ASSERT_NE(task_event_buffer_, nullptr); @@ -287,106 +249,6 @@ TEST_F(TaskEventBufferTest, TestFlushEvents) { ASSERT_EQ(task_event_buffer_->GetNumTaskEventsStored(), 0); } -TEST_F(TaskEventTestWriteExport, TestWriteTaskExportEvents) { - /* - Test writing task events to event_EXPORT_TASK_123.log as part of the export API. - This test verifies the following cases: - 1. Task events that are dropped from being sent to GCS (because more events - added than task_events_max_num_status_events_buffer_on_worker) will still - be written for the export API, if the number of events is less than - task_events_max_num_export_status_events_buffer_on_worker. - 2. Task events over task_events_max_num_status_events_buffer_on_worker - will be dropped and not written for the export API. - 3. Each Flush() call only writes a max of export_task_events_write_batch_size - export events. - In this test, 20 events are added which is greater than the max of 10 status events - that can be stored in buffer. The max export status events in buffer is 15, - so 15 events will be written to event_EXPORT_TASK_123.log. Each Flush() - call will write 1 new event because the batch size is 1. - */ - - // {"task_events_max_num_status_events_buffer_on_worker": 10} and - // {"task_events_max_num_export_status_events_buffer_on_worker": 15} - // in TaskEventTestWriteExport so set num_events > 15. - size_t num_events = 20; - // Value of export_task_events_write_batch_size - size_t batch_size = 1; - // Value of task_events_max_num_export_status_events_buffer_on_worker - size_t max_export_events_on_buffer = 15; - auto task_ids = GenTaskIDs(num_events); - google::protobuf::util::JsonPrintOptions options; - options.preserve_proto_field_names = true; - - std::vector source_types = { - rpc::ExportEvent_SourceType::ExportEvent_SourceType_EXPORT_TASK}; - RayEventInit_(source_types, - absl::flat_hash_map(), - log_dir_, - "warning", - false); - - std::vector> task_events; - for (const auto &task_id : task_ids) { - task_events.push_back(GenStatusTaskEvent(task_id, 0)); - } - - // Convert all task_events that were added with AddTaskEvent - // to the ExportTaskEventData proto message - std::vector> task_event_data_protos; - for (const auto &task_event : task_events) { - std::shared_ptr event = - std::make_shared(); - task_event->ToRpcTaskExportEvents(event); - task_event_data_protos.push_back(event); - } - - // Add all num_events tasks - for (auto &task_event : task_events) { - task_event_buffer_->AddTaskEvent(std::move(task_event)); - } - - // Verify that batch_size events are being written for each flush - std::vector vc; - for (int i = 0; i * batch_size < max_export_events_on_buffer; i++) { - task_event_buffer_->FlushEvents(true); - ReadContentFromFile( - vc, log_dir_ + "/events/event_EXPORT_TASK_" + std::to_string(getpid()) + ".log"); - EXPECT_EQ((int)vc.size(), (i + 1) * batch_size); - vc.clear(); - } - - // Verify that all max_export_events_on_buffer events are written to file even though - // max_export_events_on_buffer > task_events_max_num_status_events_buffer_on_worker - vc.clear(); - ReadContentFromFile( - vc, log_dir_ + "/events/event_EXPORT_TASK_" + std::to_string(getpid()) + ".log"); - EXPECT_EQ((int)vc.size(), max_export_events_on_buffer); - for (size_t i = 0; i < max_export_events_on_buffer; i++) { - json export_event_as_json = json::parse(vc[i]); - EXPECT_EQ(export_event_as_json["source_type"].get(), "EXPORT_TASK"); - EXPECT_EQ(export_event_as_json.contains("event_id"), true); - EXPECT_EQ(export_event_as_json.contains("timestamp"), true); - EXPECT_EQ(export_event_as_json.contains("event_data"), true); - - json event_data = export_event_as_json["event_data"].get(); - - // The events written are the last max_export_events_on_buffer added - // in AddTaskEvent because (num_events-max_export_events_on_buffer) - // were dropped. - std::string expected_event_data_str; - RAY_CHECK(google::protobuf::util::MessageToJsonString( - *task_event_data_protos[i + (num_events - max_export_events_on_buffer)], - &expected_event_data_str, - options) - .ok()); - json expected_event_data = json::parse(expected_event_data_str); - EXPECT_EQ(event_data, expected_event_data); - } - - // Expect no more events. - ASSERT_EQ(task_event_buffer_->GetNumTaskEventsStored(), 0); -} - TEST_F(TaskEventBufferTest, TestFailedFlush) { size_t num_status_events = 20; size_t num_profile_events = 20; diff --git a/src/ray/gcs/pb_util.h b/src/ray/gcs/pb_util.h index 92de8dec6915..3df62d66116c 100644 --- a/src/ray/gcs/pb_util.h +++ b/src/ray/gcs/pb_util.h @@ -21,7 +21,6 @@ #include "ray/common/ray_config.h" #include "ray/common/task/task_spec.h" #include "src/ray/protobuf/autoscaler.pb.h" -#include "src/ray/protobuf/export_api/export_task_event.pb.h" #include "src/ray/protobuf/gcs.pb.h" namespace ray { @@ -248,59 +247,6 @@ inline void FillTaskInfo(rpc::TaskInfoEntry *task_info, } } -// Fill task_info for the export API with task specification from task_spec -inline void FillExportTaskInfo(rpc::ExportTaskEventData::TaskInfoEntry *task_info, - const TaskSpecification &task_spec) { - rpc::TaskType type; - if (task_spec.IsNormalTask()) { - type = rpc::TaskType::NORMAL_TASK; - } else if (task_spec.IsDriverTask()) { - type = rpc::TaskType::DRIVER_TASK; - } else if (task_spec.IsActorCreationTask()) { - type = rpc::TaskType::ACTOR_CREATION_TASK; - task_info->set_actor_id(task_spec.ActorCreationId().Binary()); - } else { - RAY_CHECK(task_spec.IsActorTask()); - type = rpc::TaskType::ACTOR_TASK; - task_info->set_actor_id(task_spec.ActorId().Binary()); - } - task_info->set_type(type); - task_info->set_language(task_spec.GetLanguage()); - task_info->set_func_or_class_name(task_spec.FunctionDescriptor()->CallString()); - - task_info->set_task_id(task_spec.TaskId().Binary()); - // NOTE: we set the parent task id of a task to be submitter's task id, where - // the submitter depends on the owner coreworker's: - // - if the owner coreworker runs a normal task, the submitter's task id is the task id. - // - if the owner coreworker runs an actor, the submitter's task id will be the actor's - // creation task id. - task_info->set_parent_task_id(task_spec.SubmitterTaskId().Binary()); - const auto &resources_map = task_spec.GetRequiredResources().GetResourceMap(); - task_info->mutable_required_resources()->insert(resources_map.begin(), - resources_map.end()); - - auto export_runtime_env_info = task_info->mutable_runtime_env_info(); - export_runtime_env_info->set_serialized_runtime_env( - task_spec.RuntimeEnvInfo().serialized_runtime_env()); - auto export_runtime_env_uris = export_runtime_env_info->mutable_uris(); - export_runtime_env_uris->set_working_dir_uri( - task_spec.RuntimeEnvInfo().uris().working_dir_uri()); - export_runtime_env_uris->mutable_py_modules_uris()->CopyFrom( - task_spec.RuntimeEnvInfo().uris().py_modules_uris()); - auto export_runtime_env_config = export_runtime_env_info->mutable_runtime_env_config(); - export_runtime_env_config->set_setup_timeout_seconds( - task_spec.RuntimeEnvInfo().runtime_env_config().setup_timeout_seconds()); - export_runtime_env_config->set_eager_install( - task_spec.RuntimeEnvInfo().runtime_env_config().eager_install()); - export_runtime_env_config->mutable_log_files()->CopyFrom( - task_spec.RuntimeEnvInfo().runtime_env_config().log_files()); - - const auto &pg_id = task_spec.PlacementGroupBundleId().first; - if (!pg_id.IsNil()) { - task_info->set_placement_group_id(pg_id.Binary()); - } -} - /// Generate a RayErrorInfo from ErrorType inline rpc::RayErrorInfo GetRayErrorInfo(const rpc::ErrorType &error_type, const std::string &error_msg = "") { @@ -381,34 +327,6 @@ inline void FillTaskStatusUpdateTime(const ray::rpc::TaskStatus &task_status, (*state_updates->mutable_state_ts_ns())[task_status] = timestamp; } -/// Fill the rpc::ExportTaskEventData::TaskStateUpdate with the timestamps -/// according to the status change. -/// -/// \param task_status The task status. -/// \param timestamp The timestamp. -/// \param[out] state_updates The state updates with timestamp to be updated. -inline void FillExportTaskStatusUpdateTime( - const ray::rpc::TaskStatus &task_status, - int64_t timestamp, - rpc::ExportTaskEventData::TaskStateUpdate *state_updates) { - if (task_status == rpc::TaskStatus::NIL) { - // Not status change. - return; - } - (*state_updates->mutable_state_ts_ns())[task_status] = timestamp; -} - -/// Convert rpc::TaskLogInfo to rpc::ExportTaskEventData::TaskLogInfo -inline void TaskLogInfoToExport(const rpc::TaskLogInfo &src, - rpc::ExportTaskEventData::TaskLogInfo *dest) { - dest->set_stdout_file(src.stdout_file()); - dest->set_stderr_file(src.stderr_file()); - dest->set_stdout_start(src.stdout_start()); - dest->set_stdout_end(src.stdout_end()); - dest->set_stderr_start(src.stderr_start()); - dest->set_stderr_end(src.stderr_end()); -} - inline std::string FormatPlacementGroupLabelName(const std::string &pg_id) { return kPlacementGroupConstraintKeyPrefix + pg_id; } diff --git a/src/ray/util/event.cc b/src/ray/util/event.cc index 6d95c3333d97..c49a42d30a20 100644 --- a/src/ray/util/event.cc +++ b/src/ray/util/event.cc @@ -198,8 +198,14 @@ void EventManager::Publish(const rpc::Event &event, const json &custom_fields) { void EventManager::PublishExportEvent(const rpc::ExportEvent &export_event) { auto element = export_log_reporter_map_.find(export_event.source_type()); - RAY_CHECK(element != export_log_reporter_map_.end()); - (element->second)->ReportExportEvent(export_event); + if (element != export_log_reporter_map_.end()) { + (element->second)->ReportExportEvent(export_event); + } else { + RAY_LOG(FATAL) + << "RayEventInit wasn't called with the necessary source type " + << ExportEvent_SourceType_Name(export_event.source_type()) + << ". This indicates a bug in the code, and the event will be dropped."; + } } void EventManager::AddReporter(std::shared_ptr reporter) { @@ -416,6 +422,7 @@ void RayExportEvent::SendEvent() { rpc::ExportEvent export_event; export_event.set_event_id(event_id); export_event.set_timestamp(current_sys_time_s()); + if (auto ptr_to_task_event_data_ptr = std::get_if>(&event_data_ptr_)) { export_event.mutable_task_event_data()->CopyFrom(*(*ptr_to_task_event_data_ptr)); @@ -436,6 +443,7 @@ void RayExportEvent::SendEvent() { RAY_LOG(FATAL) << "Invalid event_data type."; return; } + EventManager::Instance().PublishExportEvent(export_event); } From c618a9d49a553fbf4bcb96d63a9fa2a3dff5493c Mon Sep 17 00:00:00 2001 From: Saihajpreet Singh Date: Fri, 6 Sep 2024 11:21:34 -0400 Subject: [PATCH 02/11] fix quickstart image path (#47535) | Before | After | |--------|------| |![CleanShot 2024-09-06 at 10 33 56@2x](https://github.com/user-attachments/assets/0b8dff77-3a7f-4bc7-b117-39fcd4edd69f) | ![CleanShot 2024-09-06 at 10 33 18@2x](https://github.com/user-attachments/assets/ef4c67ba-df95-48c9-8c70-273b75ed5296) | Signed-off-by: Saihajpreet Singh --- doc/source/ray-overview/installation.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/source/ray-overview/installation.rst b/doc/source/ray-overview/installation.rst index 428243ae018c..75c1d8942162 100644 --- a/doc/source/ray-overview/installation.rst +++ b/doc/source/ray-overview/installation.rst @@ -6,7 +6,7 @@ Installing Ray .. raw:: html - Run Quickstart on Anyscale + Run Quickstart on Anyscale

From 5c70d96212c5872bce64e2cb51f2a8fad4efb285 Mon Sep 17 00:00:00 2001 From: simonsays1980 Date: Fri, 6 Sep 2024 18:11:21 +0200 Subject: [PATCH 03/11] [RLlib; Off-policy] Add episode sampling to `EpisodeReplayBuffer`. (#47500) --- rllib/algorithms/dqn/dqn.py | 1 + .../replay_buffers/episode_replay_buffer.py | 257 +++++++++++++++++- .../prioritized_episode_buffer.py | 1 + .../tests/test_episode_replay_buffer.py | 49 ++++ 4 files changed, 307 insertions(+), 1 deletion(-) diff --git a/rllib/algorithms/dqn/dqn.py b/rllib/algorithms/dqn/dqn.py index 013508ab7c2e..6235ccc3d335 100644 --- a/rllib/algorithms/dqn/dqn.py +++ b/rllib/algorithms/dqn/dqn.py @@ -719,6 +719,7 @@ def _training_step_new_api_stack(self, *, with_noise_reset) -> ResultDict: n_step=self.config.n_step, gamma=self.config.gamma, beta=self.config.replay_buffer_config.get("beta"), + sample_episodes=True, ) # Perform an update on the buffer-sampled train batch. diff --git a/rllib/utils/replay_buffers/episode_replay_buffer.py b/rllib/utils/replay_buffers/episode_replay_buffer.py index c61b76596e4f..90ae1fa35664 100644 --- a/rllib/utils/replay_buffers/episode_replay_buffer.py +++ b/rllib/utils/replay_buffers/episode_replay_buffer.py @@ -1,8 +1,9 @@ from collections import deque import copy -from typing import Any, Dict, List, Optional, Union +from typing import Any, Dict, List, Optional, Tuple, Union import numpy as np +import scipy from ray.rllib.env.single_agent_episode import SingleAgentEpisode from ray.rllib.utils.annotations import override @@ -211,6 +212,87 @@ def sample( *, batch_size_B: Optional[int] = None, batch_length_T: Optional[int] = None, + n_step: Optional[Union[int, Tuple]] = None, + beta: float = 0.0, + gamma: float = 0.99, + include_infos: bool = False, + include_extra_model_outputs: bool = False, + sample_episodes: Optional[bool] = False, + **kwargs, + ) -> Union[SampleBatchType, SingleAgentEpisode]: + """Samples from a buffer in a randomized way. + + Each sampled item defines a transition of the form: + + `(o_t, a_t, sum(r_(t+1:t+n+1)), o_(t+n), terminated_(t+n), truncated_(t+n))` + + where `o_t` is drawn by randomized sampling.`n` is defined by the `n_step` + applied. + + If requested, `info`s of a transitions last timestep `t+n` and respective + extra model outputs (e.g. action log-probabilities) are added to + the batch. + + Args: + num_items: Number of items (transitions) to sample from this + buffer. + batch_size_B: The number of rows (transitions) to return in the + batch + batch_length_T: THe sequence length to sample. At this point in time + only sequences of length 1 are possible. + n_step: The n-step to apply. For the default the batch contains in + `"new_obs"` the observation and in `"obs"` the observation `n` + time steps before. The reward will be the sum of rewards + collected in between these two observations and the action will + be the one executed n steps before such that we always have the + state-action pair that triggered the rewards. + If `n_step` is a tuple, it is considered as a range to sample + from. If `None`, we use `n_step=1`. + gamma: The discount factor to be used when applying n-step calculations. + The default of `0.99` should be replaced by the `Algorithm`s + discount factor. + include_infos: A boolean indicating, if `info`s should be included in + the batch. This could be of advantage, if the `info` contains + values from the environment important for loss computation. If + `True`, the info at the `"new_obs"` in the batch is included. + include_extra_model_outputs: A boolean indicating, if + `extra_model_outputs` should be included in the batch. This could be + of advantage, if the `extra_mdoel_outputs` contain outputs from the + model important for loss computation and only able to compute with the + actual state of model e.g. action log-probabilities, etc.). If `True`, + the extra model outputs at the `"obs"` in the batch is included (the + timestep at which the action is computed). + + Returns: + Either a batch with transitions in each row or (if `return_episodes=True`) + a list of 1-step long episodes containing all basic episode data and if + requested infos and extra model outputs. + """ + + if sample_episodes: + return self._sample_episodes( + num_items=num_items, + batch_size_B=batch_size_B, + batch_length_T=batch_length_T, + n_step=n_step, + beta=beta, + gamma=gamma, + include_infos=include_infos, + include_extra_model_outputs=include_extra_model_outputs, + ) + else: + return self._sample_batch( + num_items=num_items, + batch_size_B=batch_size_B, + batch_length_T=batch_length_T, + ) + + def _sample_batch( + self, + num_items: Optional[int] = None, + *, + batch_size_B: Optional[int] = None, + batch_length_T: Optional[int] = None, ) -> SampleBatchType: """Returns a batch of size B (number of "rows"), where each row has length T. @@ -332,6 +414,179 @@ def sample( return ret + def _sample_episodes( + self, + num_items: Optional[int] = None, + *, + batch_size_B: Optional[int] = None, + batch_length_T: Optional[int] = None, + n_step: Optional[Union[int, Tuple]] = None, + gamma: float = 0.99, + include_infos: bool = False, + include_extra_model_outputs: bool = False, + **kwargs, + ) -> List[SingleAgentEpisode]: + """Samples episodes from a buffer in a randomized way. + + Each sampled item defines a transition of the form: + + `(o_t, a_t, sum(r_(t+1:t+n+1)), o_(t+n), terminated_(t+n), truncated_(t+n))` + + where `o_t` is drawn by randomized sampling.`n` is defined by the `n_step` + applied. + + If requested, `info`s of a transitions last timestep `t+n` and respective + extra model outputs (e.g. action log-probabilities) are added to + the batch. + + Args: + num_items: Number of items (transitions) to sample from this + buffer. + batch_size_B: The number of rows (transitions) to return in the + batch + batch_length_T: THe sequence length to sample. At this point in time + only sequences of length 1 are possible. + n_step: The n-step to apply. For the default the batch contains in + `"new_obs"` the observation and in `"obs"` the observation `n` + time steps before. The reward will be the sum of rewards + collected in between these two observations and the action will + be the one executed n steps before such that we always have the + state-action pair that triggered the rewards. + If `n_step` is a tuple, it is considered as a range to sample + from. If `None`, we use `n_step=1`. + gamma: The discount factor to be used when applying n-step calculations. + The default of `0.99` should be replaced by the `Algorithm`s + discount factor. + include_infos: A boolean indicating, if `info`s should be included in + the batch. This could be of advantage, if the `info` contains + values from the environment important for loss computation. If + `True`, the info at the `"new_obs"` in the batch is included. + include_extra_model_outputs: A boolean indicating, if + `extra_model_outputs` should be included in the batch. This could be + of advantage, if the `extra_mdoel_outputs` contain outputs from the + model important for loss computation and only able to compute with the + actual state of model e.g. action log-probabilities, etc.). If `True`, + the extra model outputs at the `"obs"` in the batch is included (the + timestep at which the action is computed). + + Returns: + A list of 1-step long episodes containing all basic episode data and if + requested infos and extra model outputs. + """ + if num_items is not None: + assert batch_size_B is None, ( + "Cannot call `sample()` with both `num_items` and `batch_size_B` " + "provided! Use either one." + ) + batch_size_B = num_items + + # Use our default values if no sizes/lengths provided. + batch_size_B = batch_size_B or self.batch_size_B + # TODO (simon): Implement trajectory sampling for RNNs. + batch_length_T = batch_length_T or self.batch_length_T + + # Sample the n-step if necessary. + actual_n_step = n_step or 1 + random_n_step = isinstance(n_step, tuple) + + # Keep track of the indices that were sampled last for updating the + # weights later (see `ray.rllib.utils.replay_buffer.utils. + # update_priorities_in_episode_replay_buffer`). + self._last_sampled_indices = [] + + sampled_episodes = [] + + B = 0 + while B < batch_size_B: + # Pull a new uniform random index tuple: (eps_idx, ts_in_eps_idx). + index_tuple = self._indices[self.rng.integers(len(self._indices))] + + # Compute the actual episode index (offset by the number of + # already evicted episodes). + episode_idx, episode_ts = ( + index_tuple[0] - self._num_episodes_evicted, + index_tuple[1], + ) + episode = self.episodes[episode_idx] + + # If we use random n-step sampling, draw the n-step for this item. + if random_n_step: + actual_n_step = int(self.rng.integers(n_step[0], n_step[1])) + + # Skip, if we are too far to the end and `episode_ts` + n_step would go + # beyond the episode's end. + if episode_ts + actual_n_step > len(episode): + continue + + # Note, this will be the reward after executing action + # `a_(episode_ts-n_step+1)`. For `n_step>1` this will be the discounted + # sum of all discounted rewards that were collected over the last n steps. + raw_rewards = episode.get_rewards( + slice(episode_ts, episode_ts + actual_n_step) + ) + rewards = scipy.signal.lfilter([1], [1, -gamma], raw_rewards[::-1], axis=0)[ + -1 + ] + + # Generate the episode to be returned. + sampled_episode = SingleAgentEpisode( + # Ensure that each episode contains a tuple of the form: + # (o_t, a_t, sum(r_(t:t+n_step)), o_(t+n_step)) + # Two observations (t and t+n). + observations=[ + episode.get_observations(episode_ts), + episode.get_observations(episode_ts + actual_n_step), + ], + observation_space=episode.observation_space, + infos=( + [ + episode.get_infos(episode_ts), + episode.get_infos(episode_ts + actual_n_step), + ] + if include_infos + else None + ), + actions=[episode.get_actions(episode_ts)], + action_space=episode.action_space, + rewards=[rewards], + # If the sampled time step is the episode's last time step check, if + # the episode is terminated or truncated. + terminated=( + False + if episode_ts + actual_n_step < len(episode) + else episode.is_terminated + ), + truncated=( + False + if episode_ts + actual_n_step < len(episode) + else episode.is_truncated + ), + extra_model_outputs={ + # TODO (simon): Check, if we have to correct here for sequences + # later. + "n_step": [actual_n_step], + **( + { + k: [episode.get_extra_model_outputs(k, episode_ts)] + for k in episode.extra_model_outputs.keys() + } + if include_extra_model_outputs + else {} + ), + }, + # TODO (sven): Support lookback buffers. + len_lookback_buffer=0, + t_started=episode_ts, + ) + sampled_episodes.append(sampled_episode) + + # Increment counter. + B += 1 + + self.sampled_timesteps += batch_size_B + + return sampled_episodes + def get_num_episodes(self) -> int: """Returns number of episodes (completed or truncated) stored in the buffer.""" return len(self.episodes) diff --git a/rllib/utils/replay_buffers/prioritized_episode_buffer.py b/rllib/utils/replay_buffers/prioritized_episode_buffer.py index 38bbf3088697..2eee2982e188 100644 --- a/rllib/utils/replay_buffers/prioritized_episode_buffer.py +++ b/rllib/utils/replay_buffers/prioritized_episode_buffer.py @@ -310,6 +310,7 @@ def sample( gamma: float = 0.99, include_infos: bool = False, include_extra_model_outputs: bool = False, + **kwargs, ) -> SampleBatchType: """Samples from a buffer in a prioritized way. diff --git a/rllib/utils/replay_buffers/tests/test_episode_replay_buffer.py b/rllib/utils/replay_buffers/tests/test_episode_replay_buffer.py index 2f9dd1b20e10..12b4c2ccd309 100644 --- a/rllib/utils/replay_buffers/tests/test_episode_replay_buffer.py +++ b/rllib/utils/replay_buffers/tests/test_episode_replay_buffer.py @@ -6,6 +6,8 @@ EpisodeReplayBuffer, ) +from ray.rllib.utils.test_utils import check + class TestEpisodeReplayBuffer(unittest.TestCase): @staticmethod @@ -139,6 +141,53 @@ def test_episode_replay_buffer_sample_logic(self): # (reset rewards). assert np.all(np.where(is_terminated[:, :-1], rewards[:, 1:] == 0.0, True)) + def test_episode_replay_buffer_episode_sample_logic(self): + + buffer = EpisodeReplayBuffer(capacity=10000) + + for _ in range(200): + episode = self._get_episode() + buffer.add(episode) + + for i in range(1000): + sample = buffer.sample(batch_size_B=16, n_step=1, sample_episodes=True) + check(buffer.get_sampled_timesteps(), 16 * (i + 1)) + for eps in sample: + + ( + obs, + action, + reward, + next_obs, + is_terminated, + is_truncated, + n_step, + ) = ( + eps.get_observations(0), + eps.get_actions(-1), + eps.get_rewards(-1), + eps.get_observations(-1), + eps.is_terminated, + eps.is_truncated, + eps.get_extra_model_outputs("n_step", -1), + ) + + # Make sure terminated and truncated are never both True. + assert not (is_truncated and is_terminated) + + # Note, floating point numbers cannot be compared directly. + tolerance = 1e-8 + # Assert that actions correspond to the observations. + check(obs, action, atol=tolerance) + # Assert that next observations are correctly one step after + # observations. + check(next_obs, obs + 1, atol=tolerance) + # Assert that the reward comes from the next observation. + check(reward * 10, next_obs, atol=tolerance) + + # Assert that all n-steps are 1.0 as passed into `sample`. + check(n_step, 1.0, atol=tolerance) + if __name__ == "__main__": import pytest From bbeee55e84dbb372246c8e062d5c1d21d5e0cf09 Mon Sep 17 00:00:00 2001 From: Rui Qiao <161574667+ruisearch42@users.noreply.github.com> Date: Fri, 6 Sep 2024 13:16:23 -0700 Subject: [PATCH 04/11] [aDAG] Allow custom NCCL group for aDAG (#47141) Allow custom NCCL group for aDAG so that we can reuse what the user already created. Marking NcclGroupInterface as DeveloperAPI for now. After validation by using it in vLLM we can change to alpha stability. vLLM prototype: vllm-project/vllm#7568 --- python/ray/dag/compiled_dag_node.py | 35 +- .../experimental/test_torch_tensor_dag.py | 322 ++++++++++++++++++ python/ray/experimental/channel/__init__.py | 2 + python/ray/experimental/channel/common.py | 12 +- python/ray/experimental/channel/conftest.py | 15 +- .../experimental/channel/gpu_communicator.py | 115 +++++++ python/ray/experimental/channel/nccl_group.py | 39 ++- .../channel/torch_tensor_nccl_channel.py | 140 ++++++-- .../experimental/channel/torch_tensor_type.py | 19 +- 9 files changed, 650 insertions(+), 49 deletions(-) create mode 100644 python/ray/experimental/channel/gpu_communicator.py diff --git a/python/ray/dag/compiled_dag_node.py b/python/ray/dag/compiled_dag_node.py index 7bb474fb9b85..c2f3fc04e308 100644 --- a/python/ray/dag/compiled_dag_node.py +++ b/python/ray/dag/compiled_dag_node.py @@ -10,6 +10,7 @@ from typing import NamedTuple from ray.experimental.channel.cached_channel import CachedChannel +from ray.experimental.channel.gpu_communicator import GPUCommunicator import ray from ray.exceptions import RayTaskError, RayChannelError from ray.util.annotations import PublicAPI @@ -707,6 +708,11 @@ def __init__( # Type hints specified by the user for DAG (intermediate) outputs. self._type_hints = [] + # This is set to true when type hint of `transport="nccl"`` is used + self._use_default_nccl_group = False + # This is set to the specified custom nccl group + # if there exists a type hint of `transport=nccl_group` + self._custom_nccl_group: Optional[GPUCommunicator] = None # Uniquely identifies the NCCL communicator that will be used within # this DAG, if any. self._nccl_group_id: Optional[str] = None @@ -873,6 +879,33 @@ def _preprocess(self) -> None: if dag_node.type_hint.requires_nccl(): # Add all writers to the NCCL group. nccl_actors.add(actor_handle) + custom_nccl_group = dag_node.type_hint.get_custom_nccl_group() + mixed_nccl_group_error_message = ( + "Accelerated DAGs do not support mixed usage of " + "type hints of default NCCL group " + '(i.e., TorchTensor(transport="nccl"))' + "and custom NCCL group " + "(i.e., TorchTensor(transport=nccl_group)). " + "Please check all the TorchTensor type hints and " + "make sure only one type of NCCL transport is specified." + ) + if custom_nccl_group is None: + if self._custom_nccl_group is not None: + raise ValueError(mixed_nccl_group_error_message) + self._use_default_nccl_group = True + else: + if self._use_default_nccl_group: + raise ValueError(mixed_nccl_group_error_message) + if self._custom_nccl_group is not None: + if self._custom_nccl_group != custom_nccl_group: + raise ValueError( + "Accelerated DAGs currently only support " + "a single custom NCCL group, but multiple " + "have been specified. Check all the " + "TorchTensor(transport=nccl_group) type hints " + "to make sure only one NCCL group is used." + ) + self._custom_nccl_group = custom_nccl_group elif isinstance(dag_node, InputNode): if dag_node.type_hint.requires_nccl(): raise ValueError( @@ -983,7 +1016,7 @@ def _preprocess(self) -> None: if None in nccl_actors: raise ValueError("Driver cannot participate in the NCCL group.") if nccl_actors and self._nccl_group_id is None: - self._nccl_group_id = _init_nccl_group(nccl_actors) + self._nccl_group_id = _init_nccl_group(nccl_actors, self._custom_nccl_group) if direct_input: self._input_num_positional_args = 1 diff --git a/python/ray/dag/tests/experimental/test_torch_tensor_dag.py b/python/ray/dag/tests/experimental/test_torch_tensor_dag.py index 1331bc086497..edb089440d8d 100644 --- a/python/ray/dag/tests/experimental/test_torch_tensor_dag.py +++ b/python/ray/dag/tests/experimental/test_torch_tensor_dag.py @@ -3,6 +3,13 @@ import os import re import sys +from typing import List, Optional, Tuple +from ray.experimental.channel.gpu_communicator import ( + GPUCommunicator, + TorchTensorAllocator, +) +from ray.experimental.channel.nccl_group import _NcclGroup +import socket import torch import time @@ -33,6 +40,11 @@ class TorchTensorWorker: def __init__(self): self.device = torch_utils.get_devices()[0] + def init_distributed(self, world_size, rank): + torch.distributed.init_process_group( + backend="nccl", world_size=world_size, rank=rank + ) + def send(self, shape, dtype, value: int, send_tensor=True): if not send_tensor: return 1 @@ -291,6 +303,316 @@ def test_torch_tensor_nccl_dynamic(ray_start_regular): compiled_dag.teardown() +@pytest.mark.parametrize("ray_start_regular", [{"num_cpus": 4}], indirect=True) +def test_torch_tensor_custom_comm(ray_start_regular): + if not USE_GPU: + pytest.skip("NCCL tests require GPUs") + + assert ( + sum(node["Resources"].get("GPU", 0) for node in ray.nodes()) > 1 + ), "This test requires at least 2 GPUs" + + actor_cls = TorchTensorWorker.options(num_cpus=0, num_gpus=1) + + sender = actor_cls.remote() + receiver = actor_cls.remote() + + class TestNcclGroup(GPUCommunicator): + """ + A custom NCCL group for testing. This is a simple wrapper around `_NcclGroup`. + """ + + def __init__(self, world_size, comm_id, actor_handles): + self._world_size = world_size + self._comm_id = comm_id + self._actor_handles = actor_handles + self._inner = None + + def initialize(self, rank: int) -> None: + self._inner = _NcclGroup( + self._world_size, + self._comm_id, + rank, + self._actor_handles, + torch.cuda.current_stream().cuda_stream, + ) + + def get_rank(self, actor: ray.actor.ActorHandle) -> int: + # Implement this without forwarding to `_inner` to allow the method + # to be called before initialization. + actor_ids = [a._ray_actor_id for a in self._actor_handles] + try: + rank = actor_ids.index(actor._ray_actor_id) + except ValueError: + raise ValueError("Actor is not in the NCCL group.") + return rank + + def get_world_size(self) -> int: + # Implement this without forwarding to `_inner` to allow the method + # to be called before initialization. + return self._world_size + + def get_self_rank(self) -> Optional[int]: + if self._inner is None: + return None + return self._inner.get_self_rank() + + def get_actor_handles(self) -> List["ray.actor.ActorHandle"]: + return self._actor_handles + + def send(self, value: "torch.Tensor", peer_rank: int) -> None: + return self._inner.send(value, peer_rank) + + def recv( + self, + shape: Tuple[int], + dtype: "torch.dtype", + peer_rank: int, + allocator: Optional[TorchTensorAllocator] = None, + ) -> "torch.Tensor": + return self._inner.recv(shape, dtype, peer_rank, allocator=allocator) + + def destroy(self) -> None: + return self._inner.destroy() + + from cupy.cuda import nccl + + comm_id = nccl.get_unique_id() + nccl_group = TestNcclGroup(2, comm_id, [sender, receiver]) + with InputNode() as inp: + dag = sender.send_with_tuple_args.bind(inp) + dag = dag.with_type_hint(TorchTensorType(transport=nccl_group)) + dag = receiver.recv.bind(dag) + + compiled_dag = dag.experimental_compile() + for i in range(3): + i += 1 + shape = (i * 10,) + dtype = torch.float16 + args = (shape, dtype, i) + ref = compiled_dag.execute(args) + result = ray.get(ref) + assert result == (i, shape, dtype) + + compiled_dag.teardown() + + +@pytest.mark.parametrize("ray_start_regular", [{"num_cpus": 4}], indirect=True) +def test_torch_tensor_custom_comm_invalid(ray_start_regular): + if not USE_GPU: + pytest.skip("NCCL tests require GPUs") + + assert ( + sum(node["Resources"].get("GPU", 0) for node in ray.nodes()) > 1 + ), "This test requires at least 2 GPUs" + + actor_cls = TorchTensorWorker.options(num_cpus=0, num_gpus=1) + + actor1 = actor_cls.remote() + actor2 = actor_cls.remote() + + class MockNcclGroup(GPUCommunicator): + """ + A mock NCCL group for testing. Send and recv are not implemented. + """ + + def __init__(self, world_size, actor_handles): + self._world_size = world_size + self._actor_handles = actor_handles + self._rank = None + + def initialize(self, rank: int) -> None: + expected_rank = self.get_rank(ray.get_runtime_context().current_actor) + assert ( + rank == expected_rank + ), f"NCCL actor's rank {rank} does not match expected rank {expected_rank}" + self._rank = rank + self._device = torch_utils.get_devices()[0] + + def get_rank(self, actor: ray.actor.ActorHandle) -> int: + actor_ids = [a._ray_actor_id for a in self._actor_handles] + try: + rank = actor_ids.index(actor._ray_actor_id) + except ValueError: + raise ValueError("Actor is not in the NCCL group.") + return rank + + def get_world_size(self) -> int: + return self._world_size + + def get_self_rank(self) -> Optional[int]: + return self._rank + + def get_actor_handles(self) -> List["ray.actor.ActorHandle"]: + return self._actor_handles + + def send(self, value: "torch.Tensor", peer_rank: int) -> None: + return None + + def recv( + self, + shape: Tuple[int], + dtype: "torch.dtype", + peer_rank: int, + allocator: Optional[TorchTensorAllocator] = None, + ) -> "torch.Tensor": + return None + + def destroy(self) -> None: + pass + + nccl_group = MockNcclGroup(2, [actor1, actor2]) + + # Mixed usage of NCCL groups should throw an error + # Case 1: custom NCCL group first, then default NCCL group + with InputNode() as inp: + dag = actor1.send_with_tuple_args.bind(inp) + dag = dag.with_type_hint(TorchTensorType(transport=nccl_group)) + dag = actor2.recv.bind(dag) + dag = actor2.send_with_tuple_args.bind(dag) + dag = dag.with_type_hint(TorchTensorType(transport="nccl")) + dag = actor1.recv.bind(dag) + with pytest.raises( + ValueError, + match=r"Accelerated DAGs do not support mixed usage of type hints.*", + ): + dag.experimental_compile() + + # Case 2: default NCCL group first, then custom NCCL group + with InputNode() as inp: + dag = actor1.send_with_tuple_args.bind(inp) + dag = dag.with_type_hint(TorchTensorType(transport="nccl")) + dag = actor2.recv.bind(dag) + dag = actor2.send_with_tuple_args.bind(dag) + dag = dag.with_type_hint(TorchTensorType(transport=nccl_group)) + dag = actor1.recv.bind(dag) + with pytest.raises( + ValueError, + match=r"Accelerated DAGs do not support mixed usage of type hints.*", + ): + dag.experimental_compile() + + nccl_group2 = MockNcclGroup(2, [actor1, actor2]) + + # Using two different custom NCCL groups are currently not supported + with InputNode() as inp: + dag = actor1.send_with_tuple_args.bind(inp) + dag = dag.with_type_hint(TorchTensorType(transport=nccl_group)) + dag = actor2.recv.bind(dag) + dag = actor2.send_with_tuple_args.bind(dag) + dag = dag.with_type_hint(TorchTensorType(transport=nccl_group2)) + dag = actor1.recv.bind(dag) + with pytest.raises( + ValueError, + match=( + "Accelerated DAGs currently only support " + "a single custom NCCL group, but multiple " + "have been specified." + ), + ): + dag.experimental_compile() + + +@pytest.mark.parametrize("ray_start_regular", [{"num_cpus": 4}], indirect=True) +def test_torch_tensor_custom_comm_inited(ray_start_regular): + if not USE_GPU: + pytest.skip("NCCL tests require GPUs") + + assert ( + sum(node["Resources"].get("GPU", 0) for node in ray.nodes()) > 1 + ), "This test requires at least 2 GPUs" + runtime_env = { + "env_vars": { + "MASTER_ADDR": socket.gethostbyname(socket.gethostname()), + "MASTER_PORT": "8888", + } + } + actor_cls = TorchTensorWorker.options( + num_cpus=0, num_gpus=1, runtime_env=runtime_env + ) + + sender = actor_cls.remote() + receiver = actor_cls.remote() + + # Simulates that the distributed environment (e.g., torch.distributed) + # have already been set up + refs = [ + sender.init_distributed.remote(2, 0), + receiver.init_distributed.remote(2, 1), + ] + ray.wait(refs) + + class InitedNcclGroup(GPUCommunicator): + """ + A custom NCCL group based on existing torch.distributed setup. + """ + + def __init__(self, world_size, actor_handles): + self._world_size = world_size + self._actor_handles = actor_handles + self._rank = None + + def initialize(self, rank: int) -> None: + expected_rank = self.get_rank(ray.get_runtime_context().current_actor) + assert ( + rank == expected_rank + ), f"NCCL actor's rank {rank} does not match expected rank {expected_rank}" + self._rank = rank + self._device = torch_utils.get_devices()[0] + + def get_rank(self, actor: ray.actor.ActorHandle) -> int: + actor_ids = [a._ray_actor_id for a in self._actor_handles] + try: + rank = actor_ids.index(actor._ray_actor_id) + except ValueError: + raise ValueError("Actor is not in the NCCL group.") + return rank + + def get_world_size(self) -> int: + return self._world_size + + def get_self_rank(self) -> Optional[int]: + return self._rank + + def get_actor_handles(self) -> List["ray.actor.ActorHandle"]: + return self._actor_handles + + def send(self, value: "torch.Tensor", peer_rank: int) -> None: + torch.distributed.send(value, peer_rank) + + def recv( + self, + shape: Tuple[int], + dtype: "torch.dtype", + peer_rank: int, + allocator: Optional[TorchTensorAllocator] = None, + ) -> "torch.Tensor": + tensor = torch.empty(torch.Size(shape), dtype=dtype, device=self._device) + torch.distributed.recv(tensor, peer_rank) + return tensor + + def destroy(self) -> None: + pass + + nccl_group = InitedNcclGroup(2, [sender, receiver]) + with InputNode() as inp: + dag = sender.send_with_tuple_args.bind(inp) + dag = dag.with_type_hint(TorchTensorType(transport=nccl_group)) + dag = receiver.recv.bind(dag) + + compiled_dag = dag.experimental_compile() + for i in range(3): + i += 1 + shape = (i * 10,) + dtype = torch.float16 + args = (shape, dtype, i) + ref = compiled_dag.execute(args) + result = ray.get(ref) + assert result == (i, shape, dtype) + + compiled_dag.teardown() + + @pytest.mark.parametrize("ray_start_regular", [{"num_cpus": 4}], indirect=True) def test_torch_tensor_nccl_wrong_shape(ray_start_regular): if not USE_GPU: diff --git a/python/ray/experimental/channel/__init__.py b/python/ray/experimental/channel/__init__.py index bcca146bd8f6..03e3be2d59e1 100644 --- a/python/ray/experimental/channel/__init__.py +++ b/python/ray/experimental/channel/__init__.py @@ -10,6 +10,7 @@ SynchronousWriter, WriterInterface, ) +from ray.experimental.channel.gpu_communicator import GPUCommunicator from ray.experimental.channel.intra_process_channel import IntraProcessChannel from ray.experimental.channel.shared_memory_channel import Channel, CompositeChannel from ray.experimental.channel.torch_tensor_nccl_channel import TorchTensorNcclChannel @@ -19,6 +20,7 @@ "AwaitableBackgroundWriter", "CachedChannel", "Channel", + "GPUCommunicator", "ReaderInterface", "SynchronousReader", "SynchronousWriter", diff --git a/python/ray/experimental/channel/common.py b/python/ray/experimental/channel/common.py index f400788f3398..928d4e2a339d 100644 --- a/python/ray/experimental/channel/common.py +++ b/python/ray/experimental/channel/common.py @@ -7,7 +7,7 @@ from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple import ray -from ray.experimental.channel.nccl_group import _NcclGroup +from ray.experimental.channel.gpu_communicator import GPUCommunicator from ray.experimental.channel.serialization_context import _SerializationContext from ray.util.annotations import DeveloperAPI, PublicAPI @@ -100,6 +100,14 @@ def requires_nccl(self) -> bool: # By default, channels do not require NCCL. return False + def get_custom_nccl_group(self) -> Optional[GPUCommunicator]: + """ + Return the custom NCCL group if one is specified. + """ + if self._contains_type is not None: + return self._contains_type.get_custom_nccl_group() + return None + def set_nccl_group_id(self, group_id: str) -> None: raise NotImplementedError @@ -112,7 +120,7 @@ class ChannelContext: def __init__(self): # Used for the torch.Tensor NCCL transport. - self.nccl_groups: Dict[str, "_NcclGroup"] = {} + self.nccl_groups: Dict[str, "GPUCommunicator"] = {} @staticmethod def get_current() -> "ChannelContext": diff --git a/python/ray/experimental/channel/conftest.py b/python/ray/experimental/channel/conftest.py index 6f53f95c90b3..8886a2cfecd0 100644 --- a/python/ray/experimental/channel/conftest.py +++ b/python/ray/experimental/channel/conftest.py @@ -1,11 +1,13 @@ import asyncio from collections import defaultdict +from typing import Optional, Tuple from unittest import mock import torch import ray import ray.experimental.channel as ray_channel +from ray.experimental.channel.gpu_communicator import TorchTensorAllocator @ray.remote(num_cpus=0) @@ -74,13 +76,24 @@ def send(self, tensor: torch.Tensor, peer_rank: int): ray.get(barrier.wait.remote(self.num_ops[barrier_key], tensor)) self.num_ops[barrier_key] += 1 - def recv(self, buf: torch.Tensor, peer_rank: int): + def recv( + self, + shape: Tuple[int], + dtype: torch.dtype, + peer_rank: int, + allocator: Optional[TorchTensorAllocator] = None, + ): # "Receive" the tensor from the barrier actor. barrier_key = f"barrier-{peer_rank}-{self.get_self_rank()}" barrier = ray.get_actor(name=barrier_key) received_tensor = ray.get(barrier.wait.remote(self.num_ops[barrier_key])) + assert ( + allocator is not None + ), "torch tensor allocator is required for MockNcclGroup" + buf = allocator(shape, dtype) buf[:] = received_tensor[:] self.num_ops[barrier_key] += 1 + return buf def start_nccl_mock(): diff --git a/python/ray/experimental/channel/gpu_communicator.py b/python/ray/experimental/channel/gpu_communicator.py new file mode 100644 index 000000000000..e6bc2fccdb2d --- /dev/null +++ b/python/ray/experimental/channel/gpu_communicator.py @@ -0,0 +1,115 @@ +from abc import ABC, abstractmethod +from typing import TYPE_CHECKING, Callable, List, Optional, Tuple + +import ray +from ray.util.annotations import DeveloperAPI + +if TYPE_CHECKING: + import torch + + +# Signature for a torch.Tensor allocator is: +# (shape: Tuple[int], dtype: torch.dtype) -> torch.Tensor. +TorchTensorAllocator = Callable[[Tuple[int], "torch.dtype"], "torch.Tensor"] + + +@DeveloperAPI +class GPUCommunicator(ABC): + """ + Communicator for a group of aDAG actors on Nvidia GPU. + + The aDAG execution leverages this internally to support communication + between actors in the group. + """ + + @abstractmethod + def initialize(self, rank: int) -> None: + """ + Initialize the communicator from the actor. + + This is called once by aDAG on each actor to initialize the communicator, + before any other methods. + + Args: + rank: The rank of this actor in the group. + """ + raise NotImplementedError + + @abstractmethod + def get_actor_handles(self) -> List["ray.actor.ActorHandle"]: + """ + Get handles of all actors for this communicator group. + """ + raise NotImplementedError + + @abstractmethod + def get_rank(self, actor: ray.actor.ActorHandle) -> int: + """ + Return the given actor's rank in the group. + + Args: + actor: The actor handle to look up. + """ + raise NotImplementedError + + @abstractmethod + def get_self_rank(self) -> Optional[int]: + """ + Return this actor's rank. + """ + raise NotImplementedError + + def get_world_size(self) -> int: + """ + Return the number of ranks in the group. + """ + raise NotImplementedError + + @abstractmethod + def send(self, value: "torch.Tensor", peer_rank: int) -> None: + """ + Send a torch.Tensor to a peer. + + This returns when the send kernel has been queued, but the kernel may + not have completed. Therefore, the caller should ensure that there are + no concurrent writes to the sent `value` until the send has finished. + + Args: + value: The torch.Tensor to send. It should already be on this + actor's default device. + peer_rank: The rank of the actor to send to. + """ + raise NotImplementedError + + @abstractmethod + def recv( + self, + shape: Tuple[int], + dtype: "torch.dtype", + peer_rank: int, + allocator: Optional[TorchTensorAllocator] = None, + ) -> "torch.Tensor": + """ + Receive a torch.Tensor from a peer and synchronize. + + After this call returns, the receive buffer is safe to read from from + any stream. An RayChannelError will be raised if an error occurred (e.g., + remote actor died), and the buffer is not safe to read. + + Args: + shape: The shape of the tensor to receive. + dtype: The dtype of the tensor to receive. + peer_rank: The rank of the actor to receive from. + allocator: A function to allocate the tensor to receive into. + """ + raise NotImplementedError + + @abstractmethod + def destroy() -> None: + """ + Destroy the GPU communicator. + + Any destruction and cleanup for the GPU communicator should be + done here. Implement as a noop is nothing is needed. + """ + raise NotImplementedError diff --git a/python/ray/experimental/channel/nccl_group.py b/python/ray/experimental/channel/nccl_group.py index 84200cf87f0a..753c05ed1d74 100644 --- a/python/ray/experimental/channel/nccl_group.py +++ b/python/ray/experimental/channel/nccl_group.py @@ -1,9 +1,13 @@ import logging from types import ModuleType -from typing import TYPE_CHECKING, List, Optional +from typing import TYPE_CHECKING, List, Optional, Tuple import ray from ray.exceptions import RayChannelError +from ray.experimental.channel.gpu_communicator import ( + GPUCommunicator, + TorchTensorAllocator, +) if TYPE_CHECKING: import cupy as cp @@ -16,9 +20,10 @@ logger = logging.getLogger(__name__) -class _NcclGroup: +class _NcclGroup(GPUCommunicator): """ - Represents an actor's NCCL communicator. + Represents an actor's NCCL communicator. This is the default NCCL communicator + to be used in aDAG if a custom communicator is not provided. This class is not thread-safe. """ @@ -62,6 +67,7 @@ def __init__( cuda_stream: A raw CUDA stream to dispatch NCCL ops to. If rank is specified, then this must be specified too. """ + self._world_size = world_size self._rank: Optional[int] = rank self.nccl_util: Optional[ModuleType] = None self._actor_handles = actor_handles @@ -100,7 +106,11 @@ def __init__( self._closed = False - def _get_actor_handles(self) -> List["ray.actor.ActorHandle"]: + def initialize(self, rank: int) -> None: + # No additional initialization is needed. + pass + + def get_actor_handles(self) -> List["ray.actor.ActorHandle"]: return self._actor_handles def get_rank(self, actor: ray.actor.ActorHandle) -> int: @@ -123,7 +133,13 @@ def get_self_rank(self) -> Optional[int]: """ return self._rank - def send(self, value: "torch.Tensor", peer_rank: int): + def get_world_size(self) -> int: + """ + Return the number of ranks in the NCCL communicator. + """ + return self._world_size + + def send(self, value: "torch.Tensor", peer_rank: int) -> None: """ Send a torch.Tensor to a peer. @@ -151,7 +167,13 @@ def send(self, value: "torch.Tensor", peer_rank: int): self._cuda_stream.ptr, ) - def recv(self, buf: "torch.Tensor", peer_rank: int): + def recv( + self, + shape: Tuple[int], + dtype: "torch.dtype", + peer_rank: int, + allocator=Optional[TorchTensorAllocator], + ) -> "torch.Tensor": """ Receive a torch.Tensor from a peer and synchronize the current stream. @@ -165,6 +187,8 @@ def recv(self, buf: "torch.Tensor", peer_rank: int): """ if self._closed: raise RayChannelError("NCCL group has been destroyed.") + assert allocator is not None, "NCCL group requires a tensor allocator" + buf = allocator(shape, dtype) self._comm.recv( self.nccl_util.get_tensor_ptr(buf), buf.numel(), @@ -180,8 +204,9 @@ def recv(self, buf: "torch.Tensor", peer_rank: int): self._cuda_stream.synchronize() if self._closed: raise RayChannelError("NCCL group has been destroyed.") + return buf - def destroy(self): + def destroy(self) -> None: """ Destroy the NCCL group. """ diff --git a/python/ray/experimental/channel/torch_tensor_nccl_channel.py b/python/ray/experimental/channel/torch_tensor_nccl_channel.py index b4de7b1b862c..ad95c69c7c84 100644 --- a/python/ray/experimental/channel/torch_tensor_nccl_channel.py +++ b/python/ray/experimental/channel/torch_tensor_nccl_channel.py @@ -2,12 +2,16 @@ import logging import uuid from types import ModuleType -from typing import TYPE_CHECKING, Any, Callable, List, Optional, Tuple, Union +from typing import TYPE_CHECKING, Any, List, Optional, Tuple, Union import ray import ray.util.serialization from ray.experimental.channel import ChannelContext from ray.experimental.channel.common import ChannelInterface +from ray.experimental.channel.gpu_communicator import ( + GPUCommunicator, + TorchTensorAllocator, +) from ray.experimental.channel.nccl_group import _NcclGroup from ray.experimental.channel.shared_memory_channel import SharedMemoryType from ray.experimental.channel.torch_tensor_type import TENSOR_METADATA_SIZE_BYTES @@ -26,11 +30,6 @@ logger = logging.getLogger(__name__) -# Signature for a torch.Tensor allocator is: -# (shape: Tuple[int], dtype: torch.dtype) -> torch.Tensor. -TorchTensorAllocator = Callable[[Tuple[int], "torch.dtype"], "torch.Tensor"] - - class NestedTorchTensorNcclChannel(ChannelInterface): def __init__( self, @@ -236,7 +235,7 @@ def __init__( ctx = ChannelContext.get_current() assert self._typ.nccl_group_id is not None, "No NCCL group specified." self._nccl_group_id: str = self._typ.nccl_group_id - self._nccl_group: "_NcclGroup" = ctx.nccl_groups[self._typ.nccl_group_id] + self._nccl_group: "GPUCommunicator" = ctx.nccl_groups[self._typ.nccl_group_id] assert ( self._nccl_group is not None ), "ChannelContext.nccl_group is not initialized." @@ -379,11 +378,6 @@ def write( for rank in self._reader_ranks: self._nccl_group.send(tensor, rank) - def _read_single_tensor(self, typ: "TorchTensorType") -> "torch.Tensor": - buf = self._torch_tensor_allocator(typ._shape, typ._dtype) - self._nccl_group.recv(buf, self._writer_rank) - return buf - def read( self, timeout: Optional[float] = None ) -> Union["torch.Tensor", List["torch.Tensor"]]: @@ -393,11 +387,22 @@ def read( meta = self._typ if not isinstance(meta, list): - return self._read_single_tensor(meta) + return self._nccl_group.recv( + meta._shape, + meta._dtype, + self._writer_rank, + self._torch_tensor_allocator, + ) bufs: List["torch.Tensor"] = [] for typ in meta: - bufs.append(self._read_single_tensor(typ)) + buf = self._nccl_group.recv( + typ._shape, + typ._dtype, + self._writer_rank, + self._torch_tensor_allocator, + ) + bufs.append(buf) # TODO: Sync CUDA stream after receiving all tensors, instead of after # each tensor. return bufs @@ -420,7 +425,15 @@ def has_static_type(self) -> bool: ) -def _do_init_nccl_group(self, group_id, world_size, comm_id, rank, actor_handles): +def _do_init_nccl_group( + self, + group_id, + world_size, + comm_id, + rank, + actor_handles, + custom_nccl_group: Optional[GPUCommunicator] = None, +): import torch assert ( @@ -428,13 +441,17 @@ def _do_init_nccl_group(self, group_id, world_size, comm_id, rank, actor_handles ), "Actors participating in NCCL group must have at least one GPU assigned" ctx = ChannelContext.get_current() - ctx.nccl_groups[group_id] = _NcclGroup( - world_size, - comm_id, - rank, - actor_handles, - torch.cuda.current_stream().cuda_stream, - ) + if custom_nccl_group is not None: + custom_nccl_group.initialize(rank) + ctx.nccl_groups[group_id] = custom_nccl_group + else: + ctx.nccl_groups[group_id] = _NcclGroup( + world_size, + comm_id, + rank, + actor_handles, + torch.cuda.current_stream().cuda_stream, + ) def _do_destroy_nccl_group(self, group_id): @@ -456,9 +473,50 @@ def _do_get_unique_nccl_id(self) -> bool: return nccl.get_unique_id() +def _get_ranks( + actors: List[ray.actor.ActorHandle], custom_nccl_group: Optional[GPUCommunicator] +) -> List[int]: + """ + Get sorted ranks for the NCCL group to use. If custom_nccl_group is specified, + return all ranks from it, otherwise, return list(range(len(actors))). + + Args: + actors: A list of actors that participate in the NCCL group. + custom_nccl_group: The custom NCCL group to use. + """ + if custom_nccl_group is None: + return list(range(len(actors))) + + assert len(actors) == custom_nccl_group.get_world_size(), ( + "The world size of the custom NCCL group does not match the number " + "of actors." + ) + ranks = set() + for actor in actors: + rank = custom_nccl_group.get_rank(actor) + assert rank not in ranks, "Duplicate rank in custom NCCL group" + ranks.add(rank) + assert custom_nccl_group.get_world_size() == len(actors), ( + "The world size of the custom NCCL group " + f"({custom_nccl_group.get_world_size()}) " + "does not match the number of actors " + f"({len(actors)})." + ) + return sorted(ranks) + + def _init_nccl_group( actors: List[ray.actor.ActorHandle], + custom_nccl_group: Optional[GPUCommunicator] = None, ) -> str: + """ + Initialize a NCCL group with the given actors. If a custom NCCL group is + provided, then it will be used, otherwise a new NCCL group will be created. + + Args: + actors: A list of actors that participate in the NCCL group. + custom_nccl_group: A custom NCCL group to initialize. + """ ctx = ChannelContext.get_current() has_gpus = ray.get( @@ -468,8 +526,9 @@ def _init_nccl_group( if not has_gpu: raise ValueError( f"Actor {actor} returns a tensor with type hint " - 'TorchTensor(transport="nccl") but actor does not have a ' - "GPU assigned by Ray." + 'TorchTensor(transport="nccl") or ' + "TorchTensor(transport=nccl_group_handle)" + "but actor does not have a GPU assigned by Ray." ) actor_ids = {actor._ray_actor_id for actor in actors} @@ -482,9 +541,13 @@ def _init_nccl_group( # Used to uniquely identify this NCCL group. group_id = str(uuid.uuid4()) - logger.info(f"Creating NCCL group {group_id} on actors: {actors}") + if custom_nccl_group is not None: + logger.info(f"Initializing custom NCCL group {group_id} on actors: {actors}") + else: + logger.info(f"Creating NCCL group {group_id} on actors: {actors}") world_size = len(actors) + ranks = _get_ranks(actors, custom_nccl_group) init_tasks = [ actor.__ray_call__.remote( _do_init_nccl_group, @@ -493,8 +556,9 @@ def _init_nccl_group( nccl_comm_id, rank, actors, + custom_nccl_group, ) - for rank, actor in enumerate(actors) + for rank, actor in zip(ranks, actors) ] try: ray.get(init_tasks, timeout=30) @@ -504,25 +568,31 @@ def _init_nccl_group( ) ray.get(init_tasks) - logger.info("NCCL group created.") + logger.info("NCCL group initialized.") - ctx.nccl_groups[group_id] = _NcclGroup( - world_size, - nccl_comm_id, - rank=None, - actor_handles=actors, - cuda_stream=None, - ) + if custom_nccl_group is not None: + ctx.nccl_groups[group_id] = custom_nccl_group + else: + ctx.nccl_groups[group_id] = _NcclGroup( + world_size, + nccl_comm_id, + rank=None, + actor_handles=actors, + cuda_stream=None, + ) return group_id def _destroy_nccl_group(group_id: str) -> None: + """ + Destroy the NCCL group with the given ID. + """ ctx = ChannelContext.get_current() if group_id not in ctx.nccl_groups: return group = ctx.nccl_groups[group_id] - actors = group._get_actor_handles() + actors = group.get_actor_handles() destroy_tasks = [ actor.__ray_call__.remote( _do_destroy_nccl_group, diff --git a/python/ray/experimental/channel/torch_tensor_type.py b/python/ray/experimental/channel/torch_tensor_type.py index f573d4bc9931..c37977728b43 100644 --- a/python/ray/experimental/channel/torch_tensor_type.py +++ b/python/ray/experimental/channel/torch_tensor_type.py @@ -3,13 +3,15 @@ import ray from ray.experimental.channel import ChannelContext, ChannelOutputType +from ray.experimental.channel.gpu_communicator import ( + GPUCommunicator, + TorchTensorAllocator, +) from ray.util.annotations import PublicAPI if TYPE_CHECKING: import torch - from ray.experimental.channel.torch_tensor_nccl_channel import TorchTensorAllocator - logger = logging.getLogger(__name__) # 100KB to store metadata and/or exceptions. @@ -28,7 +30,7 @@ def __init__( self, _shape: Union[int, Tuple[int], str] = AUTO, _dtype: "torch.dtype" = AUTO, - transport: Optional[str] = AUTO, + transport: Optional[Union[str, GPUCommunicator]] = AUTO, _direct_return: Optional[bool] = False, ): """ @@ -73,6 +75,11 @@ def __init__( self._dtype = _dtype self._direct_return = _direct_return + self._custom_nccl_group: Optional[GPUCommunicator] = None + if isinstance(transport, GPUCommunicator): + self._custom_nccl_group = transport + transport = self.NCCL + if transport not in [self.AUTO, self.NCCL]: raise ValueError( "`transport` must be TorchTensorType.AUTO or TorchTensorType.NCCL" @@ -170,6 +177,12 @@ def create_channel( def requires_nccl(self) -> bool: return self.transport == self.NCCL + def get_custom_nccl_group(self) -> Optional[GPUCommunicator]: + """ + Return the custom NCCL group if one is specified. + """ + return self._custom_nccl_group + def set_nccl_group_id(self, group_id: str) -> None: self._nccl_group_id = group_id From 8bfed0e5ca2e15ad664b1896e0f0742aa8e89dfc Mon Sep 17 00:00:00 2001 From: Cindy Zhang Date: Fri, 6 Sep 2024 13:17:53 -0700 Subject: [PATCH 05/11] [serve] separate test deployment version into unit and non-unit tests (#47539) ## Why are these changes needed? Separate `test_deployment_version.py` into unit and non-unit tests. Signed-off-by: Cindy Zhang --- .../serve/tests/test_deployment_version.py | 402 ----------------- .../tests/unit/test_deployment_version.py | 412 ++++++++++++++++++ 2 files changed, 412 insertions(+), 402 deletions(-) create mode 100644 python/ray/serve/tests/unit/test_deployment_version.py diff --git a/python/ray/serve/tests/test_deployment_version.py b/python/ray/serve/tests/test_deployment_version.py index dbcd7483ff0a..ce37a9100e74 100644 --- a/python/ray/serve/tests/test_deployment_version.py +++ b/python/ray/serve/tests/test_deployment_version.py @@ -5,408 +5,6 @@ from ray.serve._private.deployment_state import DeploymentVersion -def test_validation(): - # Code version must be a string. - with pytest.raises(TypeError): - DeploymentVersion(123, DeploymentConfig(), {}) - - -def test_other_type_equality(): - v = DeploymentVersion("1", DeploymentConfig(), {}) - - assert v is not None - assert v != "1" - assert v != None # noqa: E711 - - -def test_code_version(): - v1 = DeploymentVersion("1", DeploymentConfig(), {}) - v2 = DeploymentVersion("1", DeploymentConfig(), {}) - v3 = DeploymentVersion("2", DeploymentConfig(), {}) - - assert v1 == v2 - assert hash(v1) == hash(v2) - assert v1 != v3 - assert hash(v1) != hash(v3) - - -def test_deployment_config_basic(): - v1 = DeploymentVersion("1", DeploymentConfig(user_config="1"), {}) - v2 = DeploymentVersion("1", DeploymentConfig(user_config="1"), {}) - v3 = DeploymentVersion("1", DeploymentConfig(user_config="2"), {}) - - assert v1 == v2 - assert hash(v1) == hash(v2) - assert v1 != v3 - assert hash(v1) != hash(v3) - - -def test_user_config_hashable(): - v1 = DeploymentVersion("1", DeploymentConfig(user_config=("1", "2")), {}) - v2 = DeploymentVersion("1", DeploymentConfig(user_config=("1", "2")), {}) - v3 = DeploymentVersion("1", DeploymentConfig(user_config=("1", "3")), {}) - - assert v1 == v2 - assert hash(v1) == hash(v2) - assert v1 != v3 - assert hash(v1) != hash(v3) - - -def test_user_config_list(): - v1 = DeploymentVersion("1", DeploymentConfig(user_config=["1", "2"]), {}) - v2 = DeploymentVersion("1", DeploymentConfig(user_config=["1", "2"]), {}) - v3 = DeploymentVersion("1", DeploymentConfig(user_config=["1", "3"]), {}) - - assert v1 == v2 - assert hash(v1) == hash(v2) - assert v1 != v3 - assert hash(v1) != hash(v3) - - -def test_user_config_dict_keys(): - v1 = DeploymentVersion("1", DeploymentConfig(user_config={"1": "1"}), {}) - v2 = DeploymentVersion("1", DeploymentConfig(user_config={"1": "1"}), {}) - v3 = DeploymentVersion("1", DeploymentConfig(user_config={"2": "1"}), {}) - - assert v1 == v2 - assert hash(v1) == hash(v2) - assert v1 != v3 - assert hash(v1) != hash(v3) - - -def test_user_config_dict_vals(): - v1 = DeploymentVersion("1", DeploymentConfig(user_config={"1": "1"}), {}) - v2 = DeploymentVersion("1", DeploymentConfig(user_config={"1": "1"}), {}) - v3 = DeploymentVersion("1", DeploymentConfig(user_config={"1": "2"}), {}) - - assert v1 == v2 - assert hash(v1) == hash(v2) - assert v1 != v3 - assert hash(v1) != hash(v3) - - -def test_user_config_nested(): - v1 = DeploymentVersion( - "1", DeploymentConfig(user_config=[{"1": "2"}, {"1": "2"}]), {} - ) - v2 = DeploymentVersion( - "1", DeploymentConfig(user_config=[{"1": "2"}, {"1": "2"}]), {} - ) - v3 = DeploymentVersion( - "1", DeploymentConfig(user_config=[{"1": "2"}, {"1": "3"}]), {} - ) - - assert v1 == v2 - assert hash(v1) == hash(v2) - assert v1 != v3 - assert hash(v1) != hash(v3) - - -def test_user_config_nested_in_hashable(): - v1 = DeploymentVersion( - "1", DeploymentConfig(user_config=([{"1": "2"}, {"1": "2"}])), {} - ) - v2 = DeploymentVersion( - "1", DeploymentConfig(user_config=([{"1": "2"}, {"1": "2"}])), {} - ) - v3 = DeploymentVersion( - "1", DeploymentConfig(user_config=([{"1": "2"}, {"1": "3"}])), {} - ) - - assert v1 == v2 - assert hash(v1) == hash(v2) - assert v1 != v3 - assert hash(v1) != hash(v3) - - -def test_num_replicas(): - v1 = DeploymentVersion("1", DeploymentConfig(num_replicas=1), {}) - v2 = DeploymentVersion("1", DeploymentConfig(num_replicas=2), {}) - - assert v1 == v2 - assert hash(v1) == hash(v2) - - -def test_autoscaling_config(): - v1 = DeploymentVersion( - "1", - DeploymentConfig( - autoscaling_config={"max_replicas": 2, "metrics_interval_s": 10} - ), - {}, - ) - v2 = DeploymentVersion( - "1", - DeploymentConfig( - autoscaling_config={"max_replicas": 5, "metrics_interval_s": 10} - ), - {}, - ) - v3 = DeploymentVersion( - "1", - DeploymentConfig( - autoscaling_config={"max_replicas": 2, "metrics_interval_s": 3} - ), - {}, - ) - - assert v1 == v2 - assert hash(v1) == hash(v2) - assert v1 != v3 - assert hash(v1) != hash(v3) - - -def test_max_ongoing_requests(): - v1 = DeploymentVersion("1", DeploymentConfig(max_ongoing_requests=5), {}) - v2 = DeploymentVersion("1", DeploymentConfig(max_ongoing_requests=5), {}) - v3 = DeploymentVersion("1", DeploymentConfig(max_ongoing_requests=10), {}) - - assert v1 == v2 - assert hash(v1) == hash(v2) - assert not v1.requires_actor_reconfigure(v2) - assert v1 != v3 - assert hash(v1) != hash(v3) - assert v1.requires_actor_reconfigure(v3) - - -def test_health_check_period_s(): - v1 = DeploymentVersion("1", DeploymentConfig(health_check_period_s=5), {}) - v2 = DeploymentVersion("1", DeploymentConfig(health_check_period_s=5), {}) - v3 = DeploymentVersion("1", DeploymentConfig(health_check_period_s=10), {}) - - assert v1 == v2 - assert hash(v1) == hash(v2) - assert v1 != v3 - assert hash(v1) != hash(v3) - - -def test_health_check_timeout_s(): - v1 = DeploymentVersion("1", DeploymentConfig(health_check_timeout_s=5), {}) - v2 = DeploymentVersion("1", DeploymentConfig(health_check_timeout_s=5), {}) - v3 = DeploymentVersion("1", DeploymentConfig(health_check_timeout_s=10), {}) - - assert v1 == v2 - assert hash(v1) == hash(v2) - assert v1 != v3 - assert hash(v1) != hash(v3) - - -def test_graceful_shutdown_timeout_s(): - v1 = DeploymentVersion("1", DeploymentConfig(graceful_shutdown_timeout_s=5), {}) - v2 = DeploymentVersion("1", DeploymentConfig(graceful_shutdown_timeout_s=5), {}) - v3 = DeploymentVersion("1", DeploymentConfig(graceful_shutdown_timeout_s=10), {}) - - assert v1 == v2 - assert hash(v1) == hash(v2) - assert v1 != v3 - assert hash(v1) != hash(v3) - - -def test_graceful_shutdown_wait_loop_s(): - v1 = DeploymentVersion("1", DeploymentConfig(graceful_shutdown_wait_loop_s=5), {}) - v2 = DeploymentVersion("1", DeploymentConfig(graceful_shutdown_wait_loop_s=5), {}) - v3 = DeploymentVersion("1", DeploymentConfig(graceful_shutdown_wait_loop_s=10), {}) - - assert v1 == v2 - assert hash(v1) == hash(v2) - assert v1 != v3 - assert hash(v1) != hash(v3) - - -def test_ray_actor_options(): - v1 = DeploymentVersion("1", DeploymentConfig(), {"num_cpus": 0.1}) - v2 = DeploymentVersion("1", DeploymentConfig(), {"num_cpus": 0.1}) - v3 = DeploymentVersion("1", DeploymentConfig(), {"num_gpus": 0.1}) - - assert v1 == v2 - assert hash(v1) == hash(v2) - assert v1 != v3 - assert hash(v1) != hash(v3) - - -def test_max_replicas_per_node(): - v1 = DeploymentVersion("1", DeploymentConfig(), {"num_cpus": 0.1}) - v2 = DeploymentVersion( - "1", - DeploymentConfig(), - {"num_cpus": 0.1}, - max_replicas_per_node=1, - ) - v3 = DeploymentVersion( - "1", - DeploymentConfig(), - {"num_cpus": 0.1}, - max_replicas_per_node=1, - ) - v4 = DeploymentVersion( - "1", - DeploymentConfig(), - {"num_cpus": 0.1}, - max_replicas_per_node=2, - ) - - assert v1 != v2 - assert hash(v1) != hash(v2) - assert v1.requires_actor_restart(v2) - - assert v2 == v3 - assert hash(v2) == hash(v3) - assert not v2.requires_actor_restart(v3) - - assert v3 != v4 - assert hash(v3) != hash(v4) - assert v3.requires_actor_restart(v4) - - -def test_placement_group_options(): - v1 = DeploymentVersion("1", DeploymentConfig(), {"num_cpus": 0.1}) - v2 = DeploymentVersion( - "1", - DeploymentConfig(), - {"num_cpus": 0.1}, - placement_group_bundles=[{"CPU": 0.1}], - ) - v3 = DeploymentVersion( - "1", - DeploymentConfig(), - {"num_cpus": 0.1}, - placement_group_bundles=[{"CPU": 0.1}], - ) - v4 = DeploymentVersion( - "1", - DeploymentConfig(), - {"num_cpus": 0.1}, - placement_group_bundles=[{"GPU": 0.1}], - ) - - assert v1 != v2 - assert hash(v1) != hash(v2) - assert v1.requires_actor_restart(v2) - - assert v2 == v3 - assert hash(v2) == hash(v3) - assert not v2.requires_actor_restart(v3) - - assert v3 != v4 - assert hash(v3) != hash(v4) - assert v3.requires_actor_restart(v4) - - v5 = DeploymentVersion( - "1", - DeploymentConfig(), - {"num_cpus": 0.1}, - placement_group_bundles=[{"CPU": 0.1}], - placement_group_strategy="PACK", - ) - v6 = DeploymentVersion( - "1", - DeploymentConfig(), - {"num_cpus": 0.1}, - placement_group_bundles=[{"CPU": 0.1}], - placement_group_strategy="PACK", - ) - v7 = DeploymentVersion( - "1", - DeploymentConfig(), - {"num_cpus": 0.1}, - placement_group_bundles=[{"CPU": 0.1}], - placement_group_strategy="SPREAD", - ) - - assert v5 == v6 - assert hash(v5) == hash(v6) - assert not v5.requires_actor_restart(v6) - - assert v6 != v7 - assert hash(v6) != hash(v7) - assert v6.requires_actor_restart(v7) - - -def test_requires_actor_restart(): - # Code version different - v1 = DeploymentVersion("1", DeploymentConfig(), {"num_cpus": 0.1}) - v2 = DeploymentVersion("2", DeploymentConfig(), {"num_cpus": 0.1}) - assert v1.requires_actor_restart(v2) - - # Runtime env different - v1 = DeploymentVersion("1", DeploymentConfig(), {"num_cpus": 0.1}) - v2 = DeploymentVersion("1", DeploymentConfig(), {"num_cpus": 0.2}) - assert v1.requires_actor_restart(v2) - - # Placement group bundles different - v1 = DeploymentVersion( - "1", DeploymentConfig(), {}, placement_group_bundles=[{"CPU": 0.1}] - ) - v2 = DeploymentVersion( - "1", DeploymentConfig(), {}, placement_group_bundles=[{"CPU": 0.2}] - ) - assert v1.requires_actor_restart(v2) - - # Placement group strategy different - v1 = DeploymentVersion("1", DeploymentConfig(), {}, placement_group_strategy="PACK") - v2 = DeploymentVersion( - "1", DeploymentConfig(), {}, placement_group_strategy="SPREAD" - ) - assert v1.requires_actor_restart(v2) - - # Both code version and runtime env different - v1 = DeploymentVersion("1", DeploymentConfig(), {"num_cpus": 0.1}) - v2 = DeploymentVersion("2", DeploymentConfig(), {"num_cpus": 0.2}) - assert v1.requires_actor_restart(v2) - - # Num replicas is different - v1 = DeploymentVersion("1", DeploymentConfig(num_replicas=1), {}) - v2 = DeploymentVersion("1", DeploymentConfig(num_replicas=2), {}) - assert not v1.requires_actor_restart(v2) - - # Graceful shutdown timeout is different - v1 = DeploymentVersion("1", DeploymentConfig(graceful_shutdown_timeout_s=5), {}) - v2 = DeploymentVersion("1", DeploymentConfig(graceful_shutdown_timeout_s=10), {}) - assert not v1.requires_actor_restart(v2) - - -def test_requires_actor_reconfigure(): - # Replicas need the updated user config to call the user-defined - # reconfigure method - v1 = DeploymentVersion("1", DeploymentConfig(user_config=1), {}) - v2 = DeploymentVersion("1", DeploymentConfig(user_config=2), {}) - assert v1.requires_actor_reconfigure(v2) - - # Graceful shutdown loop requires actor reconfigure, since the - # replica needs the updated value to correctly execute graceful - # shutdown. - v1 = DeploymentVersion("1", DeploymentConfig(graceful_shutdown_wait_loop_s=1), {}) - v2 = DeploymentVersion("1", DeploymentConfig(graceful_shutdown_wait_loop_s=2), {}) - assert v1.requires_actor_reconfigure(v2) - - # Graceful shutdown timeout shouldn't require actor reconfigure, as - # it's only used by the controller to decide when to force-kill a - # replica - v1 = DeploymentVersion("1", DeploymentConfig(graceful_shutdown_timeout_s=5), {}) - v2 = DeploymentVersion("1", DeploymentConfig(graceful_shutdown_timeout_s=10), {}) - assert not v1.requires_actor_reconfigure(v2) - - # Num replicas shouldn't require actor reconfigure, as it's only - # by the controller to decide when to start or stop replicas. - v1 = DeploymentVersion("1", DeploymentConfig(num_replicas=1), {}) - v2 = DeploymentVersion("1", DeploymentConfig(num_replicas=2), {}) - assert not v1.requires_actor_reconfigure(v2) - - -def test_requires_long_poll_broadcast(): - # If max concurrent queries is updated, it needs to be broadcasted - # to all routers. - v1 = DeploymentVersion("1", DeploymentConfig(max_ongoing_requests=5), {}) - v2 = DeploymentVersion("1", DeploymentConfig(max_ongoing_requests=10), {}) - assert v1.requires_long_poll_broadcast(v2) - - # Something random like health check timeout doesn't require updating - # any info on routers. - v1 = DeploymentVersion("1", DeploymentConfig(health_check_timeout_s=5), {}) - v2 = DeploymentVersion("1", DeploymentConfig(health_check_timeout_s=10), {}) - assert not v1.requires_long_poll_broadcast(v2) - - def test_hash_consistent_across_processes(serve_instance): @ray.remote def get_version(): diff --git a/python/ray/serve/tests/unit/test_deployment_version.py b/python/ray/serve/tests/unit/test_deployment_version.py new file mode 100644 index 000000000000..6a4779769fba --- /dev/null +++ b/python/ray/serve/tests/unit/test_deployment_version.py @@ -0,0 +1,412 @@ +import pytest + +from ray.serve._private.config import DeploymentConfig +from ray.serve._private.deployment_state import DeploymentVersion + + +def test_validation(): + # Code version must be a string. + with pytest.raises(TypeError): + DeploymentVersion(123, DeploymentConfig(), {}) + + +def test_other_type_equality(): + v = DeploymentVersion("1", DeploymentConfig(), {}) + + assert v is not None + assert v != "1" + assert v != None # noqa: E711 + + +def test_code_version(): + v1 = DeploymentVersion("1", DeploymentConfig(), {}) + v2 = DeploymentVersion("1", DeploymentConfig(), {}) + v3 = DeploymentVersion("2", DeploymentConfig(), {}) + + assert v1 == v2 + assert hash(v1) == hash(v2) + assert v1 != v3 + assert hash(v1) != hash(v3) + + +def test_deployment_config_basic(): + v1 = DeploymentVersion("1", DeploymentConfig(user_config="1"), {}) + v2 = DeploymentVersion("1", DeploymentConfig(user_config="1"), {}) + v3 = DeploymentVersion("1", DeploymentConfig(user_config="2"), {}) + + assert v1 == v2 + assert hash(v1) == hash(v2) + assert v1 != v3 + assert hash(v1) != hash(v3) + + +def test_user_config_hashable(): + v1 = DeploymentVersion("1", DeploymentConfig(user_config=("1", "2")), {}) + v2 = DeploymentVersion("1", DeploymentConfig(user_config=("1", "2")), {}) + v3 = DeploymentVersion("1", DeploymentConfig(user_config=("1", "3")), {}) + + assert v1 == v2 + assert hash(v1) == hash(v2) + assert v1 != v3 + assert hash(v1) != hash(v3) + + +def test_user_config_list(): + v1 = DeploymentVersion("1", DeploymentConfig(user_config=["1", "2"]), {}) + v2 = DeploymentVersion("1", DeploymentConfig(user_config=["1", "2"]), {}) + v3 = DeploymentVersion("1", DeploymentConfig(user_config=["1", "3"]), {}) + + assert v1 == v2 + assert hash(v1) == hash(v2) + assert v1 != v3 + assert hash(v1) != hash(v3) + + +def test_user_config_dict_keys(): + v1 = DeploymentVersion("1", DeploymentConfig(user_config={"1": "1"}), {}) + v2 = DeploymentVersion("1", DeploymentConfig(user_config={"1": "1"}), {}) + v3 = DeploymentVersion("1", DeploymentConfig(user_config={"2": "1"}), {}) + + assert v1 == v2 + assert hash(v1) == hash(v2) + assert v1 != v3 + assert hash(v1) != hash(v3) + + +def test_user_config_dict_vals(): + v1 = DeploymentVersion("1", DeploymentConfig(user_config={"1": "1"}), {}) + v2 = DeploymentVersion("1", DeploymentConfig(user_config={"1": "1"}), {}) + v3 = DeploymentVersion("1", DeploymentConfig(user_config={"1": "2"}), {}) + + assert v1 == v2 + assert hash(v1) == hash(v2) + assert v1 != v3 + assert hash(v1) != hash(v3) + + +def test_user_config_nested(): + v1 = DeploymentVersion( + "1", DeploymentConfig(user_config=[{"1": "2"}, {"1": "2"}]), {} + ) + v2 = DeploymentVersion( + "1", DeploymentConfig(user_config=[{"1": "2"}, {"1": "2"}]), {} + ) + v3 = DeploymentVersion( + "1", DeploymentConfig(user_config=[{"1": "2"}, {"1": "3"}]), {} + ) + + assert v1 == v2 + assert hash(v1) == hash(v2) + assert v1 != v3 + assert hash(v1) != hash(v3) + + +def test_user_config_nested_in_hashable(): + v1 = DeploymentVersion( + "1", DeploymentConfig(user_config=([{"1": "2"}, {"1": "2"}])), {} + ) + v2 = DeploymentVersion( + "1", DeploymentConfig(user_config=([{"1": "2"}, {"1": "2"}])), {} + ) + v3 = DeploymentVersion( + "1", DeploymentConfig(user_config=([{"1": "2"}, {"1": "3"}])), {} + ) + + assert v1 == v2 + assert hash(v1) == hash(v2) + assert v1 != v3 + assert hash(v1) != hash(v3) + + +def test_num_replicas(): + v1 = DeploymentVersion("1", DeploymentConfig(num_replicas=1), {}) + v2 = DeploymentVersion("1", DeploymentConfig(num_replicas=2), {}) + + assert v1 == v2 + assert hash(v1) == hash(v2) + + +def test_autoscaling_config(): + v1 = DeploymentVersion( + "1", + DeploymentConfig( + autoscaling_config={"max_replicas": 2, "metrics_interval_s": 10} + ), + {}, + ) + v2 = DeploymentVersion( + "1", + DeploymentConfig( + autoscaling_config={"max_replicas": 5, "metrics_interval_s": 10} + ), + {}, + ) + v3 = DeploymentVersion( + "1", + DeploymentConfig( + autoscaling_config={"max_replicas": 2, "metrics_interval_s": 3} + ), + {}, + ) + + assert v1 == v2 + assert hash(v1) == hash(v2) + assert v1 != v3 + assert hash(v1) != hash(v3) + + +def test_max_ongoing_requests(): + v1 = DeploymentVersion("1", DeploymentConfig(max_ongoing_requests=5), {}) + v2 = DeploymentVersion("1", DeploymentConfig(max_ongoing_requests=5), {}) + v3 = DeploymentVersion("1", DeploymentConfig(max_ongoing_requests=10), {}) + + assert v1 == v2 + assert hash(v1) == hash(v2) + assert not v1.requires_actor_reconfigure(v2) + assert v1 != v3 + assert hash(v1) != hash(v3) + assert v1.requires_actor_reconfigure(v3) + + +def test_health_check_period_s(): + v1 = DeploymentVersion("1", DeploymentConfig(health_check_period_s=5), {}) + v2 = DeploymentVersion("1", DeploymentConfig(health_check_period_s=5), {}) + v3 = DeploymentVersion("1", DeploymentConfig(health_check_period_s=10), {}) + + assert v1 == v2 + assert hash(v1) == hash(v2) + assert v1 != v3 + assert hash(v1) != hash(v3) + + +def test_health_check_timeout_s(): + v1 = DeploymentVersion("1", DeploymentConfig(health_check_timeout_s=5), {}) + v2 = DeploymentVersion("1", DeploymentConfig(health_check_timeout_s=5), {}) + v3 = DeploymentVersion("1", DeploymentConfig(health_check_timeout_s=10), {}) + + assert v1 == v2 + assert hash(v1) == hash(v2) + assert v1 != v3 + assert hash(v1) != hash(v3) + + +def test_graceful_shutdown_timeout_s(): + v1 = DeploymentVersion("1", DeploymentConfig(graceful_shutdown_timeout_s=5), {}) + v2 = DeploymentVersion("1", DeploymentConfig(graceful_shutdown_timeout_s=5), {}) + v3 = DeploymentVersion("1", DeploymentConfig(graceful_shutdown_timeout_s=10), {}) + + assert v1 == v2 + assert hash(v1) == hash(v2) + assert v1 != v3 + assert hash(v1) != hash(v3) + + +def test_graceful_shutdown_wait_loop_s(): + v1 = DeploymentVersion("1", DeploymentConfig(graceful_shutdown_wait_loop_s=5), {}) + v2 = DeploymentVersion("1", DeploymentConfig(graceful_shutdown_wait_loop_s=5), {}) + v3 = DeploymentVersion("1", DeploymentConfig(graceful_shutdown_wait_loop_s=10), {}) + + assert v1 == v2 + assert hash(v1) == hash(v2) + assert v1 != v3 + assert hash(v1) != hash(v3) + + +def test_ray_actor_options(): + v1 = DeploymentVersion("1", DeploymentConfig(), {"num_cpus": 0.1}) + v2 = DeploymentVersion("1", DeploymentConfig(), {"num_cpus": 0.1}) + v3 = DeploymentVersion("1", DeploymentConfig(), {"num_gpus": 0.1}) + + assert v1 == v2 + assert hash(v1) == hash(v2) + assert v1 != v3 + assert hash(v1) != hash(v3) + + +def test_max_replicas_per_node(): + v1 = DeploymentVersion("1", DeploymentConfig(), {"num_cpus": 0.1}) + v2 = DeploymentVersion( + "1", + DeploymentConfig(), + {"num_cpus": 0.1}, + max_replicas_per_node=1, + ) + v3 = DeploymentVersion( + "1", + DeploymentConfig(), + {"num_cpus": 0.1}, + max_replicas_per_node=1, + ) + v4 = DeploymentVersion( + "1", + DeploymentConfig(), + {"num_cpus": 0.1}, + max_replicas_per_node=2, + ) + + assert v1 != v2 + assert hash(v1) != hash(v2) + assert v1.requires_actor_restart(v2) + + assert v2 == v3 + assert hash(v2) == hash(v3) + assert not v2.requires_actor_restart(v3) + + assert v3 != v4 + assert hash(v3) != hash(v4) + assert v3.requires_actor_restart(v4) + + +def test_placement_group_options(): + v1 = DeploymentVersion("1", DeploymentConfig(), {"num_cpus": 0.1}) + v2 = DeploymentVersion( + "1", + DeploymentConfig(), + {"num_cpus": 0.1}, + placement_group_bundles=[{"CPU": 0.1}], + ) + v3 = DeploymentVersion( + "1", + DeploymentConfig(), + {"num_cpus": 0.1}, + placement_group_bundles=[{"CPU": 0.1}], + ) + v4 = DeploymentVersion( + "1", + DeploymentConfig(), + {"num_cpus": 0.1}, + placement_group_bundles=[{"GPU": 0.1}], + ) + + assert v1 != v2 + assert hash(v1) != hash(v2) + assert v1.requires_actor_restart(v2) + + assert v2 == v3 + assert hash(v2) == hash(v3) + assert not v2.requires_actor_restart(v3) + + assert v3 != v4 + assert hash(v3) != hash(v4) + assert v3.requires_actor_restart(v4) + + v5 = DeploymentVersion( + "1", + DeploymentConfig(), + {"num_cpus": 0.1}, + placement_group_bundles=[{"CPU": 0.1}], + placement_group_strategy="PACK", + ) + v6 = DeploymentVersion( + "1", + DeploymentConfig(), + {"num_cpus": 0.1}, + placement_group_bundles=[{"CPU": 0.1}], + placement_group_strategy="PACK", + ) + v7 = DeploymentVersion( + "1", + DeploymentConfig(), + {"num_cpus": 0.1}, + placement_group_bundles=[{"CPU": 0.1}], + placement_group_strategy="SPREAD", + ) + + assert v5 == v6 + assert hash(v5) == hash(v6) + assert not v5.requires_actor_restart(v6) + + assert v6 != v7 + assert hash(v6) != hash(v7) + assert v6.requires_actor_restart(v7) + + +def test_requires_actor_restart(): + # Code version different + v1 = DeploymentVersion("1", DeploymentConfig(), {"num_cpus": 0.1}) + v2 = DeploymentVersion("2", DeploymentConfig(), {"num_cpus": 0.1}) + assert v1.requires_actor_restart(v2) + + # Runtime env different + v1 = DeploymentVersion("1", DeploymentConfig(), {"num_cpus": 0.1}) + v2 = DeploymentVersion("1", DeploymentConfig(), {"num_cpus": 0.2}) + assert v1.requires_actor_restart(v2) + + # Placement group bundles different + v1 = DeploymentVersion( + "1", DeploymentConfig(), {}, placement_group_bundles=[{"CPU": 0.1}] + ) + v2 = DeploymentVersion( + "1", DeploymentConfig(), {}, placement_group_bundles=[{"CPU": 0.2}] + ) + assert v1.requires_actor_restart(v2) + + # Placement group strategy different + v1 = DeploymentVersion("1", DeploymentConfig(), {}, placement_group_strategy="PACK") + v2 = DeploymentVersion( + "1", DeploymentConfig(), {}, placement_group_strategy="SPREAD" + ) + assert v1.requires_actor_restart(v2) + + # Both code version and runtime env different + v1 = DeploymentVersion("1", DeploymentConfig(), {"num_cpus": 0.1}) + v2 = DeploymentVersion("2", DeploymentConfig(), {"num_cpus": 0.2}) + assert v1.requires_actor_restart(v2) + + # Num replicas is different + v1 = DeploymentVersion("1", DeploymentConfig(num_replicas=1), {}) + v2 = DeploymentVersion("1", DeploymentConfig(num_replicas=2), {}) + assert not v1.requires_actor_restart(v2) + + # Graceful shutdown timeout is different + v1 = DeploymentVersion("1", DeploymentConfig(graceful_shutdown_timeout_s=5), {}) + v2 = DeploymentVersion("1", DeploymentConfig(graceful_shutdown_timeout_s=10), {}) + assert not v1.requires_actor_restart(v2) + + +def test_requires_actor_reconfigure(): + # Replicas need the updated user config to call the user-defined + # reconfigure method + v1 = DeploymentVersion("1", DeploymentConfig(user_config=1), {}) + v2 = DeploymentVersion("1", DeploymentConfig(user_config=2), {}) + assert v1.requires_actor_reconfigure(v2) + + # Graceful shutdown loop requires actor reconfigure, since the + # replica needs the updated value to correctly execute graceful + # shutdown. + v1 = DeploymentVersion("1", DeploymentConfig(graceful_shutdown_wait_loop_s=1), {}) + v2 = DeploymentVersion("1", DeploymentConfig(graceful_shutdown_wait_loop_s=2), {}) + assert v1.requires_actor_reconfigure(v2) + + # Graceful shutdown timeout shouldn't require actor reconfigure, as + # it's only used by the controller to decide when to force-kill a + # replica + v1 = DeploymentVersion("1", DeploymentConfig(graceful_shutdown_timeout_s=5), {}) + v2 = DeploymentVersion("1", DeploymentConfig(graceful_shutdown_timeout_s=10), {}) + assert not v1.requires_actor_reconfigure(v2) + + # Num replicas shouldn't require actor reconfigure, as it's only + # by the controller to decide when to start or stop replicas. + v1 = DeploymentVersion("1", DeploymentConfig(num_replicas=1), {}) + v2 = DeploymentVersion("1", DeploymentConfig(num_replicas=2), {}) + assert not v1.requires_actor_reconfigure(v2) + + +def test_requires_long_poll_broadcast(): + # If max concurrent queries is updated, it needs to be broadcasted + # to all routers. + v1 = DeploymentVersion("1", DeploymentConfig(max_ongoing_requests=5), {}) + v2 = DeploymentVersion("1", DeploymentConfig(max_ongoing_requests=10), {}) + assert v1.requires_long_poll_broadcast(v2) + + # Something random like health check timeout doesn't require updating + # any info on routers. + v1 = DeploymentVersion("1", DeploymentConfig(health_check_timeout_s=5), {}) + v2 = DeploymentVersion("1", DeploymentConfig(health_check_timeout_s=10), {}) + assert not v1.requires_long_poll_broadcast(v2) + + +if __name__ == "__main__": + import sys + + sys.exit(pytest.main(["-v", "-s", __file__])) From 53f6408387d19a9eb320af0c313b3e4b1415f518 Mon Sep 17 00:00:00 2001 From: Rui Qiao <161574667+ruisearch42@users.noreply.github.com> Date: Fri, 6 Sep 2024 16:04:27 -0700 Subject: [PATCH 06/11] [aDAG] Fix test_accelerated_dag regression (#47543) Fix CI regression: https://buildkite.com/ray-project/postmerge/builds/6157#0191c4aa-1897-4d42-93c7-5403b67bc5cc https://buildkite.com/ray-project/postmerge/builds/6165#0191c819-53f7-4605-805f-824e85951fde --- python/ray/dag/tests/experimental/test_accelerated_dag.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/ray/dag/tests/experimental/test_accelerated_dag.py b/python/ray/dag/tests/experimental/test_accelerated_dag.py index 59d5edf7deb2..79bb74f759ab 100644 --- a/python/ray/dag/tests/experimental/test_accelerated_dag.py +++ b/python/ray/dag/tests/experimental/test_accelerated_dag.py @@ -1849,6 +1849,8 @@ def test_event_profiling(ray_start_regular, monkeypatch): assert event.method_name == "inc" assert event.operation in ["READ", "COMPUTE", "WRITE"] + adag.teardown() + @ray.remote class TestWorker: From 542f51a657d43e6e5333e70f58a72b290200fc50 Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Fri, 6 Sep 2024 23:55:23 -0700 Subject: [PATCH 07/11] [Data] Remove ineffective retry code in `plan_read_op` (#47456) Currently, Ray Data calls read tasks with retries. The intended purpose is to retry transient errors while reading data. https://github.com/ray-project/ray/blob/eda6d092973831523693be15535872ed8ea14fdd/python/ray/data/_internal/planner/plan_read_op.py#L103-L109 However, the code doesn't achieve the intended result because read tasks return generator objects, and Python will never raise runtime errors while returning a generator (Python might raise runtime errors when the programs iterates over the returned generator). --------- Signed-off-by: Balaji Veeramani --- .../ray/data/_internal/planner/plan_read_op.py | 18 ++---------------- 1 file changed, 2 insertions(+), 16 deletions(-) diff --git a/python/ray/data/_internal/planner/plan_read_op.py b/python/ray/data/_internal/planner/plan_read_op.py index b155bc6143b0..649cf3097163 100644 --- a/python/ray/data/_internal/planner/plan_read_op.py +++ b/python/ray/data/_internal/planner/plan_read_op.py @@ -17,18 +17,13 @@ ) from ray.data._internal.execution.util import memory_string from ray.data._internal.logical.operators.read_operator import Read -from ray.data._internal.util import _warn_on_high_parallelism, call_with_retry +from ray.data._internal.util import _warn_on_high_parallelism from ray.data.block import Block, BlockMetadata from ray.data.datasource.datasource import ReadTask from ray.util.debug import log_once TASK_SIZE_WARN_THRESHOLD_BYTES = 1024 * 1024 # 1 MiB -# Transient errors that can occur during longer reads. Trigger retry when these occur. -READ_FILE_RETRY_ON_ERRORS = ["AWS Error NETWORK_CONNECTION", "AWS Error ACCESS_DENIED"] -READ_FILE_MAX_ATTEMPTS = 10 -READ_FILE_RETRY_MAX_BACKOFF_SECONDS = 32 - logger = logging.getLogger(__name__) @@ -96,17 +91,8 @@ def get_input_data(target_max_block_size) -> List[RefBundle]: ) def do_read(blocks: Iterable[ReadTask], _: TaskContext) -> Iterable[Block]: - """Yield from read tasks, with retry logic upon transient read errors.""" for read_task in blocks: - read_fn_name = read_task._read_fn.__name__ - - yield from call_with_retry( - f=read_task, - description=f"read file {read_fn_name}", - match=READ_FILE_RETRY_ON_ERRORS, - max_attempts=READ_FILE_MAX_ATTEMPTS, - max_backoff_s=READ_FILE_RETRY_MAX_BACKOFF_SECONDS, - ) + yield from read_task() # Create a MapTransformer for a read operator transform_fns: List[MapTransformFn] = [ From ec401cfb6dd8c9fc7292b8ee3173a36243fe001d Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Sat, 7 Sep 2024 00:33:03 -0700 Subject: [PATCH 08/11] [Data] Remove unused `requirements_legacy_compat.txt` requirements file (#47549) We previously tested that you could install ray[tune,data] with older versions of packages like PyArrow and Torch. We don't run this test anymore, so I'm removing the now-unused requirements file. --- .../compat/requirements_legacy_compat.txt | 26 ------------------- 1 file changed, 26 deletions(-) delete mode 100644 python/requirements/compat/requirements_legacy_compat.txt diff --git a/python/requirements/compat/requirements_legacy_compat.txt b/python/requirements/compat/requirements_legacy_compat.txt deleted file mode 100644 index e8b981a35627..000000000000 --- a/python/requirements/compat/requirements_legacy_compat.txt +++ /dev/null @@ -1,26 +0,0 @@ -# ATTENTION: THESE DEPENDENCIES SHOULD USUALLY NOT BE UPDATED! - -# Updating these dependencies means we remove official support for dependency releases older than -# the specified version. - -# These are compatibility requirements to make sure certain workflows continue to work -# with these dependency versions. They thus act as a lower bound for compatibility -# with ML libraries. -# Concretely, we set up a fresh Python 3.7 environment and -# run the pipeline job in `Ray Python legacy dependency ML compatibility tests` with these dependencies installed. - -# ML libraries -torch==1.9.0 -tensorflow==2.7.0 -tensorflow-probability==0.14.1 -keras==2.7.0 - -# Torch addons -torchvision==0.10.0 - -pytorch-lightning==1.5.10 - -# Datasets -pyarrow==6.0.1 - -ray[tune,data] From d47156448e1cbee1fd65d6a08a6221abef22a886 Mon Sep 17 00:00:00 2001 From: Christopher Zhang Date: Sat, 7 Sep 2024 18:37:28 -0700 Subject: [PATCH 09/11] adding run quickstart button to ray serve stable diffusion tutorial (#47546) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Screenshot 2024-09-06 at 4 47 01 PM Signed-off-by: Chris Zhang --- doc/source/serve/tutorials/stable-diffusion.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/doc/source/serve/tutorials/stable-diffusion.md b/doc/source/serve/tutorials/stable-diffusion.md index 8f11ea846ef8..e336af52c234 100644 --- a/doc/source/serve/tutorials/stable-diffusion.md +++ b/doc/source/serve/tutorials/stable-diffusion.md @@ -5,6 +5,9 @@ orphan: true (serve-stable-diffusion-tutorial)= # Serve a Stable Diffusion Model + +[![try-anyscale-quickstart-ray-serve-stable-diffusion-quickstart](../../_static/img/run-quickstart-anyscale.svg)](https://console.anyscale.com/register/ha?utm_source=ray_docs&utm_medium=docs&utm_campaign=ray-serve-stable-diffusion-quickstart&redirectTo=/v2/template-preview/serve-stable-diffusion-v2) + This example runs a Stable Diffusion application with Ray Serve. To run this example, install the following: From 3e8dd0d04e305783017c2aa1eacbd59933feec0f Mon Sep 17 00:00:00 2001 From: Christopher Zhang Date: Sat, 7 Sep 2024 18:37:40 -0700 Subject: [PATCH 10/11] add quickstart button to model serving code snippet (#47545) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Screenshot 2024-09-06 at 4 33 33 PM Signed-off-by: Chris Zhang --- doc/source/_templates/index.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/source/_templates/index.html b/doc/source/_templates/index.html index a0feada9422c..8901fb8503b3 100644 --- a/doc/source/_templates/index.html +++ b/doc/source/_templates/index.html @@ -222,7 +222,7 @@

Scale with Ray

''') }} From 1dd8d60bcbbf74b0d22ea4447a787a33817ff20b Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Mon, 9 Sep 2024 04:55:03 -0700 Subject: [PATCH 11/11] [Core] Remove ray._raylet.check_health (#47526) Signed-off-by: Jiajun Yao --- python/ray/_private/internal_api.py | 2 - python/ray/_raylet.pyx | 42 -------------------- python/ray/includes/common.pxd | 5 --- python/ray/scripts/scripts.py | 49 ++++++++++++------------ python/ray/tests/test_advanced_4.py | 24 ------------ python/ray/tests/test_cli.py | 57 +++++++++++++++++++++------- src/ray/gcs/gcs_client/gcs_client.cc | 39 ------------------- src/ray/gcs/gcs_client/gcs_client.h | 7 ---- 8 files changed, 68 insertions(+), 157 deletions(-) diff --git a/python/ray/_private/internal_api.py b/python/ray/_private/internal_api.py index cf42ba185c7a..f4efbde4db21 100644 --- a/python/ray/_private/internal_api.py +++ b/python/ray/_private/internal_api.py @@ -5,7 +5,6 @@ import ray._private.services as services import ray._private.utils as utils import ray._private.worker -from ray._private import ray_constants from ray._private.state import GlobalState from ray._raylet import GcsClientOptions from ray.core.generated import common_pb2 @@ -34,7 +33,6 @@ def get_state_from_address(address=None): def memory_summary( address=None, - redis_password=ray_constants.REDIS_DEFAULT_PASSWORD, group_by="NODE_ADDRESS", sort_by="OBJECT_SIZE", units="B", diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index f466a1b815cb..9e0953a33b0a 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -134,7 +134,6 @@ from ray.includes.common cimport ( kResourceUnitScaling, kImplicitResourcePrefix, kWorkerSetupHookKeyName, - PythonCheckGcsHealth, PythonGetNodeLabels, PythonGetResourcesTotal, ) @@ -3263,47 +3262,6 @@ cdef class _TestOnly_GcsActorSubscriber(_GcsSubscriber): return [(key_id, info)] -def check_health(address: str, timeout=2, skip_version_check=False): - """Checks Ray cluster health, before / without actually connecting to the - cluster via ray.init(). - - Args: - address: Ray cluster / GCS address string, e.g. ip:port. - timeout: request timeout. - skip_version_check: If True, will skip comparision of GCS Ray version with local - Ray version. If False (default), will raise exception on mismatch. - Returns: - Returns True if the cluster is running and has matching Ray version. - Returns False if no service is running. - Raises an exception otherwise. - """ - - tokens = address.rsplit(":", 1) - if len(tokens) != 2: - raise ValueError("Invalid address: {}. Expect 'ip:port'".format(address)) - gcs_address, gcs_port = tokens - - cdef: - c_string c_gcs_address = gcs_address - int c_gcs_port = int(gcs_port) - int64_t timeout_ms = round(1000 * timeout) if timeout else -1 - c_string c_ray_version = ray.__version__ - c_bool c_skip_version_check = skip_version_check - c_bool c_is_healthy = True - - try: - with nogil: - check_status(PythonCheckGcsHealth( - c_gcs_address, c_gcs_port, timeout_ms, c_ray_version, - c_skip_version_check, c_is_healthy)) - except RpcError: - traceback.print_exc() - except RaySystemError as e: - raise RuntimeError(str(e)) - - return c_is_healthy - - cdef class CoreWorker: def __cinit__(self, worker_type, store_socket, raylet_socket, diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index 9da032607862..cc3eaf7d7389 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -698,11 +698,6 @@ cdef extern from "ray/gcs/gcs_client/gcs_client.h" namespace "ray::gcs" nogil: unordered_map[c_string, c_string] PythonGetNodeLabels( const CGcsNodeInfo& node_info) - CRayStatus PythonCheckGcsHealth( - const c_string& gcs_address, const int gcs_port, const int64_t timeout_ms, - const c_string& ray_version, const c_bool skip_version_check, - c_bool& is_healthy) - cdef extern from "src/ray/protobuf/gcs.pb.h" nogil: cdef enum CChannelType "ray::rpc::ChannelType": RAY_ERROR_INFO_CHANNEL "ray::rpc::ChannelType::RAY_ERROR_INFO_CHANNEL", diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 6ed5fabea435..f94d0dbc9dc2 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -7,7 +7,6 @@ import subprocess import sys import time -import traceback import urllib import urllib.parse import warnings @@ -57,6 +56,18 @@ logger = logging.getLogger(__name__) +def _check_ray_version(gcs_client): + import ray._private.usage.usage_lib as ray_usage_lib + + cluster_metadata = ray_usage_lib.get_cluster_metadata(gcs_client) + if cluster_metadata and cluster_metadata["ray_version"] != ray.__version__: + raise RuntimeError( + "Ray version mismatch: cluster has Ray version " + f'{cluster_metadata["ray_version"]} ' + f"but local Ray version is {ray.__version__}" + ) + + @click.group() @click.option( "--logging-level", @@ -1953,13 +1964,12 @@ def memory( ): """Print object references held in a Ray cluster.""" address = services.canonicalize_bootstrap_address_or_die(address) - if not ray._raylet.check_health(address): - raise click.BadParameter(f"Ray cluster is not found at {address}") + gcs_client = ray._raylet.GcsClient(address=address) + _check_ray_version(gcs_client) time = datetime.now() header = "=" * 8 + f" Object references status: {time} " + "=" * 8 mem_stats = memory_summary( address, - redis_password, group_by, sort_by, units, @@ -1993,16 +2003,11 @@ def memory( def status(address: str, redis_password: str, verbose: bool): """Print cluster status, including autoscaling info.""" address = services.canonicalize_bootstrap_address_or_die(address) - if not ray._raylet.check_health(address): - raise click.BadParameter(f"Ray cluster is not found at {address}") gcs_client = ray._raylet.GcsClient(address=address) + _check_ray_version(gcs_client) ray.experimental.internal_kv._initialize_internal_kv(gcs_client) - status = ray.experimental.internal_kv._internal_kv_get( - ray_constants.DEBUG_AUTOSCALING_STATUS - ) - error = ray.experimental.internal_kv._internal_kv_get( - ray_constants.DEBUG_AUTOSCALING_ERROR - ) + status = gcs_client.internal_kv_get(ray_constants.DEBUG_AUTOSCALING_STATUS.encode()) + error = gcs_client.internal_kv_get(ray_constants.DEBUG_AUTOSCALING_ERROR.encode()) print(debug_status(status, error, verbose=verbose, address=address)) @@ -2292,10 +2297,9 @@ def drain_node( raise click.BadParameter(f"Invalid hex ID of a Ray node, got {node_id}") address = services.canonicalize_bootstrap_address_or_die(address) - if not ray._raylet.check_health(address): - raise click.BadParameter(f"Ray cluster is not found at {address}") gcs_client = ray._raylet.GcsClient(address=address) + _check_ray_version(gcs_client) is_accepted, rejection_error_message = gcs_client.drain_node( node_id, autoscaler_pb2.DrainNodeReason.Value(reason), @@ -2370,20 +2374,15 @@ def healthcheck(address, redis_password, component, skip_version_check): """ address = services.canonicalize_bootstrap_address_or_die(address) + gcs_client = ray._raylet.GcsClient(address=address) + if not skip_version_check: + _check_ray_version(gcs_client) if not component: - try: - if ray._raylet.check_health(address, skip_version_check=skip_version_check): - sys.exit(0) - except Exception: - traceback.print_exc() - pass - sys.exit(1) + sys.exit(0) - gcs_client = ray._raylet.GcsClient(address=address) - ray.experimental.internal_kv._initialize_internal_kv(gcs_client) - report_str = ray.experimental.internal_kv._internal_kv_get( - component, namespace=ray_constants.KV_NAMESPACE_HEALTHCHECK + report_str = gcs_client.internal_kv_get( + component.encode(), namespace=ray_constants.KV_NAMESPACE_HEALTHCHECK ) if not report_str: # Status was never updated diff --git a/python/ray/tests/test_advanced_4.py b/python/ray/tests/test_advanced_4.py index 573380013c8f..1a156a9c64f8 100644 --- a/python/ray/tests/test_advanced_4.py +++ b/python/ray/tests/test_advanced_4.py @@ -1,11 +1,9 @@ -from unittest import mock import subprocess import sys import pytest import ray -from ray._raylet import check_health from ray._private.test_utils import ( Semaphore, client_test_enabled, @@ -114,28 +112,6 @@ def test_jemalloc_env_var_propagate(): assert actual == expected -def test_check_health(shutdown_only): - assert not check_health("127.0.0.1:8888") - # Should not raise error: https://github.com/ray-project/ray/issues/38785 - assert not check_health("ip:address:with:colon:name:8265") - - with pytest.raises(ValueError): - check_health("bad_address_no_port") - - conn = ray.init() - addr = conn.address_info["address"] - assert check_health(addr) - - -def test_check_health_version_check(shutdown_only): - with mock.patch("ray.__version__", "FOO-VERSION"): - conn = ray.init() - addr = conn.address_info["address"] - assert check_health(addr, skip_version_check=True) - with pytest.raises(RuntimeError): - check_health(addr) - - def test_back_pressure(shutdown_only_with_initialization_check): ray.init() diff --git a/python/ray/tests/test_cli.py b/python/ray/tests/test_cli.py index 01e8cdc1a764..eec7b71c0c4c 100644 --- a/python/ray/tests/test_cli.py +++ b/python/ray/tests/test_cli.py @@ -1055,7 +1055,10 @@ def test_ray_check_open_ports(shutdown_only, start_open_port_check_server): assert "[🛑] open ports detected" in result.output -def test_ray_drain_node(): +def test_ray_drain_node(monkeypatch): + monkeypatch.setenv("RAY_py_gcs_connect_timeout_s", "1") + ray._raylet.Config.initialize("") + runner = CliRunner() result = runner.invoke( scripts.drain_node, @@ -1105,7 +1108,9 @@ def test_ray_drain_node(): ], ) assert result.exit_code != 0 - assert "Ray cluster is not found at 127.0.0.2:8888" in result.output + assert "Timed out while waiting for GCS to become available" in str( + result.exception + ) result = runner.invoke( scripts.drain_node, @@ -1121,10 +1126,32 @@ def test_ray_drain_node(): assert result.exit_code != 0 assert "Invalid hex ID of a Ray node, got invalid-node-id" in result.output - with patch("ray._raylet.check_health", return_value=True), patch( - "ray._raylet.GcsClient" - ) as MockGcsClient: + with patch("ray._raylet.GcsClient") as MockGcsClient: + mock_gcs_client = MockGcsClient.return_value + mock_gcs_client.internal_kv_get.return_value = ( + '{"ray_version": "ray_version_mismatch"}'.encode() + ) + result = runner.invoke( + scripts.drain_node, + [ + "--address", + "127.0.0.1:6543", + "--node-id", + "0db0438b5cfd6e84d7ec07212ed76b23be7886cbd426ef4d1879bebf", + "--reason", + "DRAIN_NODE_REASON_IDLE_TERMINATION", + "--reason-message", + "idle termination", + ], + ) + assert result.exit_code != 0 + assert "Ray version mismatch" in str(result.exception) + + with patch("ray._raylet.GcsClient") as MockGcsClient: mock_gcs_client = MockGcsClient.return_value + mock_gcs_client.internal_kv_get.return_value = ( + f'{{"ray_version": "{ray.__version__}"}}'.encode() + ) mock_gcs_client.drain_node.return_value = (True, "") result = runner.invoke( scripts.drain_node, @@ -1140,17 +1167,18 @@ def test_ray_drain_node(): ], ) assert result.exit_code == 0 - assert mock_gcs_client.mock_calls[0] == mock.call.drain_node( + assert mock_gcs_client.mock_calls[1] == mock.call.drain_node( "0db0438b5cfd6e84d7ec07212ed76b23be7886cbd426ef4d1879bebf", 1, "idle termination", 0, ) - with patch("ray._raylet.check_health", return_value=True), patch( - "ray._raylet.GcsClient" - ) as MockGcsClient: + with patch("ray._raylet.GcsClient") as MockGcsClient: mock_gcs_client = MockGcsClient.return_value + mock_gcs_client.internal_kv_get.return_value = ( + f'{{"ray_version": "{ray.__version__}"}}'.encode() + ) mock_gcs_client.drain_node.return_value = (False, "Node not idle") result = runner.invoke( scripts.drain_node, @@ -1168,10 +1196,13 @@ def test_ray_drain_node(): assert result.exit_code != 0 assert "The drain request is not accepted: Node not idle" in result.output - with patch("ray._raylet.check_health", return_value=True), patch( - "time.time_ns", return_value=1000000000 - ), patch("ray._raylet.GcsClient") as MockGcsClient: + with patch("time.time_ns", return_value=1000000000), patch( + "ray._raylet.GcsClient" + ) as MockGcsClient: mock_gcs_client = MockGcsClient.return_value + mock_gcs_client.internal_kv_get.return_value = ( + f'{{"ray_version": "{ray.__version__}"}}'.encode() + ) mock_gcs_client.drain_node.return_value = (True, "") result = runner.invoke( scripts.drain_node, @@ -1189,7 +1220,7 @@ def test_ray_drain_node(): ], ) assert result.exit_code == 0 - assert mock_gcs_client.mock_calls[0] == mock.call.drain_node( + assert mock_gcs_client.mock_calls[1] == mock.call.drain_node( "0db0438b5cfd6e84d7ec07212ed76b23be7886cbd426ef4d1879bebf", 2, "spot preemption", diff --git a/src/ray/gcs/gcs_client/gcs_client.cc b/src/ray/gcs/gcs_client/gcs_client.cc index 27717c6d2f62..c91b5a073be3 100644 --- a/src/ray/gcs/gcs_client/gcs_client.cc +++ b/src/ray/gcs/gcs_client/gcs_client.cc @@ -708,45 +708,6 @@ std::unordered_map PythonGetNodeLabels( node_info.labels().end()); } -Status PythonCheckGcsHealth(const std::string &gcs_address, - const int gcs_port, - const int64_t timeout_ms, - const std::string &ray_version, - const bool skip_version_check, - bool &is_healthy) { - auto channel = rpc::GcsRpcClient::CreateGcsChannel(gcs_address, gcs_port); - auto stub = rpc::NodeInfoGcsService::NewStub(channel); - grpc::ClientContext context; - if (timeout_ms != -1) { - context.set_deadline(std::chrono::system_clock::now() + - std::chrono::milliseconds(timeout_ms)); - } - rpc::CheckAliveRequest request; - rpc::CheckAliveReply reply; - grpc::Status status = stub->CheckAlive(&context, request, &reply); - if (!status.ok()) { - is_healthy = false; - return Status::RpcError(status.error_message(), status.error_code()); - } - if (reply.status().code() != static_cast(StatusCode::OK)) { - is_healthy = false; - return HandleGcsError(reply.status()); - } - if (!skip_version_check) { - // Check for Ray version match - if (reply.ray_version() != ray_version) { - is_healthy = false; - std::ostringstream ss; - ss << "Ray cluster at " << gcs_address << ":" << gcs_port << " has version " - << reply.ray_version() << ", but this process " - << "is running Ray version " << ray_version << "."; - return Status::Invalid(ss.str()); - } - } - is_healthy = true; - return Status::OK(); -} - /// Creates a singleton thread that runs an io_service. /// All ConnectToGcsStandalone calls will share this io_service. class SingletonIoContext { diff --git a/src/ray/gcs/gcs_client/gcs_client.h b/src/ray/gcs/gcs_client/gcs_client.h index 9d66ffcc92c4..fae6d7b8aca5 100644 --- a/src/ray/gcs/gcs_client/gcs_client.h +++ b/src/ray/gcs/gcs_client/gcs_client.h @@ -350,13 +350,6 @@ std::unordered_map PythonGetResourcesTotal( std::unordered_map PythonGetNodeLabels( const rpc::GcsNodeInfo &node_info); -Status PythonCheckGcsHealth(const std::string &gcs_address, - const int gcs_port, - const int64_t timeout_ms, - const std::string &ray_version, - const bool skip_version_check, - bool &is_healthy); - } // namespace gcs } // namespace ray