From c95318e8457c8a6fe13a4e56f91af1596fe4acb4 Mon Sep 17 00:00:00 2001 From: Nikita Vemuri Date: Thu, 5 Sep 2024 16:21:14 -0700 Subject: [PATCH] [observability][export-api] Write task events (#47193) Write task events to file as part of the export API. This logic is only run if RayConfig::instance().enable_export_api_write() is true. Default value is false. All tasks that are added to the task event buffer will be written to file. In addition, keep a dropped_status_events_for_export_ buffer which stores status events that were dropped from the buffer to send to GCS, and write these dropped events to file as well. The size of dropped_status_events_for_export_ is 10x larger than task_events_max_num_status_events_buffer_on_worker to prioritize recording data. The tradeoff here is memory on each worker, but this is a relatively small overhead, and it is unlikely the dropped events buffer will fill given the sink for export events (write to file) will succeed on each flush. Task events converted to the export API proto and written to file in a separate thread, which runs this flush operation periodically (every second). Individual task events will be aggregated by task attempt before being written. This is consistent with the final event sent to GCS, and also helps reduce the number of events written to file. --- src/ray/common/ray_config_def.h | 10 + src/ray/core_worker/core_worker_process.cc | 3 +- src/ray/core_worker/task_event_buffer.cc | 187 ++++++++++++++++-- src/ray/core_worker/task_event_buffer.h | 52 ++++- .../test/task_event_buffer_test.cc | 138 +++++++++++++ src/ray/gcs/pb_util.h | 82 ++++++++ src/ray/util/event.cc | 12 +- 7 files changed, 451 insertions(+), 33 deletions(-) diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 849b0b147cff..7f48c7926c8a 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -498,10 +498,20 @@ RAY_CONFIG(int64_t, task_events_dropped_task_attempts_gc_threshold_s, 15 * 60) /// workers. Events will be evicted based on a FIFO order. RAY_CONFIG(uint64_t, task_events_max_num_status_events_buffer_on_worker, 100 * 1000) +/// Max number of task status events that will be stored to export +/// for the export API. Events will be evicted based on a FIFO order. +RAY_CONFIG(uint64_t, + task_events_max_num_export_status_events_buffer_on_worker, + 1000 * 1000) + /// Max number of task events to be send in a single message to GCS. This caps both /// the message size, and also the processing work on GCS. RAY_CONFIG(uint64_t, task_events_send_batch_size, 10 * 1000) +/// Max number of task events to be written in a single flush iteration. This +/// caps the number of file writes per iteration. +RAY_CONFIG(uint64_t, export_task_events_write_batch_size, 10 * 1000) + /// Max number of profile events allowed to be tracked for a single task. /// Setting the value to -1 allows unlimited profile events to be tracked. RAY_CONFIG(int64_t, task_events_max_num_profile_events_per_task, 1000) diff --git a/src/ray/core_worker/core_worker_process.cc b/src/ray/core_worker/core_worker_process.cc index d587fcee8b7e..191788e7e045 100644 --- a/src/ray/core_worker/core_worker_process.cc +++ b/src/ray/core_worker/core_worker_process.cc @@ -137,7 +137,8 @@ CoreWorkerProcessImpl::CoreWorkerProcessImpl(const CoreWorkerOptions &options) // Initialize event framework. if (RayConfig::instance().event_log_reporter_enabled() && !options_.log_dir.empty()) { const std::vector source_types = { - ray::rpc::Event_SourceType::Event_SourceType_CORE_WORKER}; + ray::rpc::Event_SourceType::Event_SourceType_CORE_WORKER, + ray::rpc::ExportEvent_SourceType::ExportEvent_SourceType_EXPORT_TASK}; RayEventInit(source_types, absl::flat_hash_map(), options_.log_dir, diff --git a/src/ray/core_worker/task_event_buffer.cc b/src/ray/core_worker/task_event_buffer.cc index 115811d3a999..1971b5efd563 100644 --- a/src/ray/core_worker/task_event_buffer.cc +++ b/src/ray/core_worker/task_event_buffer.cc @@ -15,6 +15,7 @@ #include "ray/core_worker/task_event_buffer.h" #include "ray/gcs/pb_util.h" +#include "ray/util/event.h" namespace ray { namespace core { @@ -108,6 +109,62 @@ void TaskStatusEvent::ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) { } } +void TaskStatusEvent::ToRpcTaskExportEvents( + std::shared_ptr rpc_task_export_event_data) { + // Base fields + rpc_task_export_event_data->set_task_id(task_id_.Binary()); + rpc_task_export_event_data->set_job_id(job_id_.Binary()); + rpc_task_export_event_data->set_attempt_number(attempt_number_); + + // Task info. + if (task_spec_) { + gcs::FillExportTaskInfo(rpc_task_export_event_data->mutable_task_info(), *task_spec_); + } + + // Task status update. + auto dst_state_update = rpc_task_export_event_data->mutable_state_updates(); + gcs::FillExportTaskStatusUpdateTime(task_status_, timestamp_, dst_state_update); + + if (!state_update_.has_value()) { + return; + } + + if (state_update_->node_id_.has_value()) { + RAY_CHECK(task_status_ == rpc::TaskStatus::SUBMITTED_TO_WORKER) + << "Node ID should be included when task status changes to " + "SUBMITTED_TO_WORKER."; + dst_state_update->set_node_id(state_update_->node_id_->Binary()); + } + + if (state_update_->worker_id_.has_value()) { + RAY_CHECK(task_status_ == rpc::TaskStatus::SUBMITTED_TO_WORKER) + << "Worker ID should be included when task status changes to " + "SUBMITTED_TO_WORKER."; + dst_state_update->set_worker_id(state_update_->worker_id_->Binary()); + } + + if (state_update_->error_info_.has_value()) { + auto error_info = dst_state_update->mutable_error_info(); + error_info->set_error_message((*state_update_->error_info_).error_message()); + error_info->set_error_type((*state_update_->error_info_).error_type()); + } + + if (state_update_->task_log_info_.has_value()) { + rpc::ExportTaskEventData::TaskLogInfo export_task_log_info; + gcs::TaskLogInfoToExport(state_update_->task_log_info_.value(), + &export_task_log_info); + dst_state_update->mutable_task_log_info()->MergeFrom(export_task_log_info); + } + + if (state_update_->pid_.has_value()) { + dst_state_update->set_worker_pid(state_update_->pid_.value()); + } + + if (state_update_->is_debugger_paused_.has_value()) { + dst_state_update->set_is_debugger_paused(state_update_->is_debugger_paused_.value()); + } +} + void TaskProfileEvent::ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) { // Rate limit on the number of profiling events from the task. This is especially the // case if a driver has many profiling events when submitting tasks @@ -127,6 +184,24 @@ void TaskProfileEvent::ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) { event_entry->set_extra_data(std::move(extra_data_)); } +void TaskProfileEvent::ToRpcTaskExportEvents( + std::shared_ptr rpc_task_export_event_data) { + auto profile_events = rpc_task_export_event_data->mutable_profile_events(); + + // Base fields + rpc_task_export_event_data->set_task_id(task_id_.Binary()); + rpc_task_export_event_data->set_job_id(job_id_.Binary()); + rpc_task_export_event_data->set_attempt_number(attempt_number_); + profile_events->set_component_type(std::move(component_type_)); + profile_events->set_component_id(std::move(component_id_)); + profile_events->set_node_ip_address(std::move(node_ip_address_)); + auto event_entry = profile_events->add_events(); + event_entry->set_event_name(std::move(event_name_)); + event_entry->set_start_time(start_time_); + event_entry->set_end_time(end_time_); + event_entry->set_extra_data(std::move(extra_data_)); +} + TaskEventBufferImpl::TaskEventBufferImpl(std::shared_ptr gcs_client) : work_guard_(boost::asio::make_work_guard(io_service_)), periodical_runner_(io_service_), @@ -137,12 +212,15 @@ TaskEventBufferImpl::~TaskEventBufferImpl() { Stop(); } Status TaskEventBufferImpl::Start(bool auto_flush) { absl::MutexLock lock(&mutex_); + export_event_write_enabled_ = RayConfig::instance().enable_export_api_write(); auto report_interval_ms = RayConfig::instance().task_events_report_interval_ms(); RAY_CHECK(report_interval_ms > 0) << "RAY_task_events_report_interval_ms should be > 0 to use TaskEventBuffer."; status_events_.set_capacity( RayConfig::instance().task_events_max_num_status_events_buffer_on_worker()); + status_events_for_export_.set_capacity( + RayConfig::instance().task_events_max_num_export_status_events_buffer_on_worker()); io_thread_ = std::thread([this]() { #ifndef _WIN32 @@ -210,10 +288,27 @@ void TaskEventBufferImpl::Stop() { bool TaskEventBufferImpl::Enabled() const { return enabled_; } void TaskEventBufferImpl::GetTaskStatusEventsToSend( - std::vector> *status_events_to_send, + std::vector> *status_events_to_send, + std::vector> *status_events_to_write_for_export, absl::flat_hash_set *dropped_task_attempts_to_send) { absl::MutexLock lock(&mutex_); + // Get the export events data to write. + if (export_event_write_enabled_) { + size_t num_to_write = std::min( + static_cast(RayConfig::instance().export_task_events_write_batch_size()), + static_cast(status_events_for_export_.size())); + status_events_to_write_for_export->insert( + status_events_to_write_for_export->end(), + std::make_move_iterator(status_events_for_export_.begin()), + std::make_move_iterator(status_events_for_export_.begin() + num_to_write)); + status_events_for_export_.erase(status_events_for_export_.begin(), + status_events_for_export_.begin() + num_to_write); + stats_counter_.Decrement( + TaskEventBufferCounter::kNumTaskStatusEventsForExportAPIStored, + status_events_to_write_for_export->size()); + } + // No data to send. if (status_events_.empty() && dropped_task_attempts_unreported_.empty()) { return; @@ -252,7 +347,7 @@ void TaskEventBufferImpl::GetTaskStatusEventsToSend( } void TaskEventBufferImpl::GetTaskProfileEventsToSend( - std::vector> *profile_events_to_send) { + std::vector> *profile_events_to_send) { absl::MutexLock lock(&profile_mutex_); size_t batch_size = @@ -278,13 +373,13 @@ void TaskEventBufferImpl::GetTaskProfileEventsToSend( } std::unique_ptr TaskEventBufferImpl::CreateDataToSend( - std::vector> &&status_events_to_send, - std::vector> &&profile_events_to_send, + std::vector> &&status_events_to_send, + std::vector> &&profile_events_to_send, absl::flat_hash_set &&dropped_task_attempts_to_send) { // Aggregate the task events by TaskAttempt. absl::flat_hash_map agg_task_events; auto to_rpc_event_fn = [this, &agg_task_events, &dropped_task_attempts_to_send]( - std::unique_ptr &event) { + std::shared_ptr &event) { if (dropped_task_attempts_to_send.count(event->GetTaskAttempt())) { // We are marking this as data loss due to some missing task status updates. // We will not send this event to GCS. @@ -331,6 +426,45 @@ std::unique_ptr TaskEventBufferImpl::CreateDataToSend( return data; } +void TaskEventBufferImpl::WriteExportData( + std::vector> &&status_events_to_write_for_export, + std::vector> &&profile_events_to_send) { + absl::flat_hash_map> + agg_task_events; + // Maintain insertion order to agg_task_events so events are written + // in the same order as the buffer. + std::vector agg_task_event_insertion_order; + auto to_rpc_event_fn = [&agg_task_events, &agg_task_event_insertion_order]( + std::shared_ptr &event) { + // Aggregate events by task attempt before converting to proto + auto itr = agg_task_events.find(event->GetTaskAttempt()); + if (itr == agg_task_events.end()) { + // Insert event into agg_task_events if the task attempt of that + // event wasn't already added. + auto event_for_attempt = std::make_shared(); + auto inserted = + agg_task_events.insert({event->GetTaskAttempt(), event_for_attempt}); + RAY_CHECK(inserted.second); + agg_task_event_insertion_order.push_back(event->GetTaskAttempt()); + event->ToRpcTaskExportEvents(event_for_attempt); + } else { + event->ToRpcTaskExportEvents(itr->second); + } + }; + + std::for_each(status_events_to_write_for_export.begin(), + status_events_to_write_for_export.end(), + to_rpc_event_fn); + std::for_each( + profile_events_to_send.begin(), profile_events_to_send.end(), to_rpc_event_fn); + + for (const auto &task_attempt : agg_task_event_insertion_order) { + auto it = agg_task_events.find(task_attempt); + RAY_CHECK(it != agg_task_events.end()); + RayExportEvent(it->second).SendEvent(); + } +} + void TaskEventBufferImpl::FlushEvents(bool forced) { if (!enabled_) { return; @@ -350,13 +484,16 @@ void TaskEventBufferImpl::FlushEvents(bool forced) { } // Take out status events from the buffer. - std::vector> status_events_to_send; + std::vector> status_events_to_send; + std::vector> status_events_to_write_for_export; absl::flat_hash_set dropped_task_attempts_to_send; status_events_to_send.reserve(RayConfig::instance().task_events_send_batch_size()); - GetTaskStatusEventsToSend(&status_events_to_send, &dropped_task_attempts_to_send); + GetTaskStatusEventsToSend(&status_events_to_send, + &status_events_to_write_for_export, + &dropped_task_attempts_to_send); // Take profile events from the status events. - std::vector> profile_events_to_send; + std::vector> profile_events_to_send; profile_events_to_send.reserve(RayConfig::instance().task_events_send_batch_size()); GetTaskProfileEventsToSend(&profile_events_to_send); @@ -365,6 +502,10 @@ void TaskEventBufferImpl::FlushEvents(bool forced) { CreateDataToSend(std::move(status_events_to_send), std::move(profile_events_to_send), std::move(dropped_task_attempts_to_send)); + if (export_event_write_enabled_) { + WriteExportData(std::move(status_events_to_write_for_export), + std::move(profile_events_to_send)); + } gcs::TaskInfoAccessor *task_accessor; { @@ -438,8 +579,19 @@ void TaskEventBufferImpl::AddTaskStatusEvent(std::unique_ptr status_e if (!enabled_) { return; } + std::shared_ptr status_event_shared_ptr = std::move(status_event); - if (dropped_task_attempts_unreported_.count(status_event->GetTaskAttempt())) { + if (export_event_write_enabled_) { + // If status_events_for_export_ is full, the oldest event will be + // dropped in the circular buffer and replaced with the current event. + if (!status_events_for_export_.full()) { + stats_counter_.Increment( + TaskEventBufferCounter::kNumTaskStatusEventsForExportAPIStored); + } + status_events_for_export_.push_back(status_event_shared_ptr); + } + if (dropped_task_attempts_unreported_.count( + status_event_shared_ptr->GetTaskAttempt())) { // This task attempt has been dropped before, so we drop this event. stats_counter_.Increment( TaskEventBufferCounter::kNumTaskStatusEventDroppedSinceLastFlush); @@ -454,7 +606,7 @@ void TaskEventBufferImpl::AddTaskStatusEvent(std::unique_ptr status_e RAY_LOG_EVERY_N(WARNING, 100000) << "Dropping task status events for task: " - << status_event->GetTaskAttempt().first + << status_event_shared_ptr->GetTaskAttempt().first << ", set a higher value for " "RAY_task_events_max_num_status_events_buffer_on_worker(" << RayConfig::instance().task_events_max_num_status_events_buffer_on_worker() @@ -466,7 +618,7 @@ void TaskEventBufferImpl::AddTaskStatusEvent(std::unique_ptr status_e } else { stats_counter_.Increment(TaskEventBufferCounter::kNumTaskStatusEventsStored); } - status_events_.push_back(std::move(status_event)); + status_events_.push_back(status_event_shared_ptr); } void TaskEventBufferImpl::AddTaskProfileEvent(std::unique_ptr profile_event) { @@ -474,10 +626,12 @@ void TaskEventBufferImpl::AddTaskProfileEvent(std::unique_ptr profile if (!enabled_) { return; } - auto profile_events_itr = profile_events_.find(profile_event->GetTaskAttempt()); + std::shared_ptr profile_event_shared_ptr = std::move(profile_event); + auto profile_events_itr = + profile_events_.find(profile_event_shared_ptr->GetTaskAttempt()); if (profile_events_itr == profile_events_.end()) { - auto inserted = profile_events_.insert( - {profile_event->GetTaskAttempt(), std::vector>()}); + auto inserted = profile_events_.insert({profile_event_shared_ptr->GetTaskAttempt(), + std::vector>()}); RAY_CHECK(inserted.second); profile_events_itr = inserted.first; } @@ -501,7 +655,8 @@ void TaskEventBufferImpl::AddTaskProfileEvent(std::unique_ptr profile // driver task id and it could generate large number of profile events when submitting // many tasks. RAY_LOG_EVERY_N(WARNING, 100000) - << "Dropping profiling events for task: " << profile_event->GetTaskAttempt().first + << "Dropping profiling events for task: " + << profile_event_shared_ptr->GetTaskAttempt().first << ", set a higher value for RAY_task_events_max_num_profile_events_per_task(" << max_num_profile_event_per_task << "), or RAY_task_events_max_num_profile_events_buffer_on_worker (" @@ -510,7 +665,7 @@ void TaskEventBufferImpl::AddTaskProfileEvent(std::unique_ptr profile } stats_counter_.Increment(TaskEventBufferCounter::kNumTaskProfileEventsStored); - profile_events_itr->second.push_back(std::move(profile_event)); + profile_events_itr->second.push_back(profile_event_shared_ptr); } const std::string TaskEventBufferImpl::DebugString() { diff --git a/src/ray/core_worker/task_event_buffer.h b/src/ray/core_worker/task_event_buffer.h index 30dbb780cab0..a2783521a612 100644 --- a/src/ray/core_worker/task_event_buffer.h +++ b/src/ray/core_worker/task_event_buffer.h @@ -27,6 +27,7 @@ #include "ray/common/task/task_spec.h" #include "ray/gcs/gcs_client/gcs_client.h" #include "ray/util/counter_map.h" +#include "src/ray/protobuf/export_api/export_task_event.pb.h" #include "src/ray/protobuf/gcs.pb.h" namespace ray { @@ -57,6 +58,12 @@ class TaskEvent { /// \param[out] rpc_task_events The rpc task event to be filled. virtual void ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) = 0; + /// Convert itself a rpc::ExportTaskEventData + /// + /// \param[out] rpc_task_export_event_data The rpc export task event data to be filled. + virtual void ToRpcTaskExportEvents( + std::shared_ptr rpc_task_export_event_data) = 0; + /// If it is a profile event. virtual bool IsProfileEvent() const = 0; @@ -126,6 +133,9 @@ class TaskStatusEvent : public TaskEvent { void ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) override; + void ToRpcTaskExportEvents( + std::shared_ptr rpc_task_export_event_data) override; + bool IsProfileEvent() const override { return false; } private: @@ -153,6 +163,9 @@ class TaskProfileEvent : public TaskEvent { void ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) override; + void ToRpcTaskExportEvents( + std::shared_ptr rpc_task_export_event_data) override; + bool IsProfileEvent() const override { return true; } void SetEndTime(int64_t end_time) { end_time_ = end_time; } @@ -177,6 +190,7 @@ enum TaskEventBufferCounter { kNumTaskProfileEventsStored, kNumTaskStatusEventsStored, kNumDroppedTaskAttemptsStored, + kNumTaskStatusEventsForExportAPIStored, kTotalNumTaskProfileEventDropped, kTotalNumTaskStatusEventDropped, kTotalNumTaskAttemptsReported, @@ -295,10 +309,15 @@ class TaskEventBufferImpl : public TaskEventBuffer { /// Get data related to task status events to be send to GCS. /// /// \param[out] status_events_to_send Task status events to be sent. + /// \param[out] status_events_to_write_for_export Task status events that will + /// be written to the Export API. This includes both status events + /// that are sent to GCS, and as many dropped status events that + /// fit in the buffer. /// \param[out] dropped_task_attempts_to_send Task attempts that were dropped due to /// status events being dropped. void GetTaskStatusEventsToSend( - std::vector> *status_events_to_send, + std::vector> *status_events_to_send, + std::vector> *status_events_to_write_for_export, absl::flat_hash_set *dropped_task_attempts_to_send) ABSL_LOCKS_EXCLUDED(mutex_); @@ -306,7 +325,7 @@ class TaskEventBufferImpl : public TaskEventBuffer { /// /// \param[out] profile_events_to_send Task profile events to be sent. void GetTaskProfileEventsToSend( - std::vector> *profile_events_to_send) + std::vector> *profile_events_to_send) ABSL_LOCKS_EXCLUDED(profile_mutex_); /// Get the task events to GCS. @@ -317,10 +336,21 @@ class TaskEventBufferImpl : public TaskEventBuffer { /// status events being dropped. /// \return A unique_ptr to rpc::TaskEvents to be sent to GCS. std::unique_ptr CreateDataToSend( - std::vector> &&status_events_to_send, - std::vector> &&profile_events_to_send, + std::vector> &&status_events_to_send, + std::vector> &&profile_events_to_send, absl::flat_hash_set &&dropped_task_attempts_to_send); + /// Write task events for the Export API. + /// + /// \param status_events_to_write_for_export Task status events that will + /// be written to the Export API. This includes both status events + /// that are sent to GCS, and as many dropped status events that + /// fit in the buffer. + /// \param profile_events_to_send Task profile events to be written. + void WriteExportData( + std::vector> &&status_events_to_write_for_export, + std::vector> &&profile_events_to_send); + /// Reset the counters during flushing data to GCS. void ResetCountersForFlush(); @@ -387,7 +417,13 @@ class TaskEventBufferImpl : public TaskEventBuffer { std::atomic enabled_ = false; /// Circular buffered task status events. - boost::circular_buffer> status_events_ + boost::circular_buffer> status_events_ + ABSL_GUARDED_BY(mutex_); + + /// Status events that will be written for the export API. This could + /// contain events that were dropped from being sent to GCS. A circular + /// buffer is used to limit memory. + boost::circular_buffer> status_events_for_export_ ABSL_GUARDED_BY(mutex_); /// Buffered task attempts that were dropped due to status events being dropped. @@ -396,7 +432,7 @@ class TaskEventBufferImpl : public TaskEventBuffer { ABSL_GUARDED_BY(mutex_); /// Buffered task profile events. A FIFO queue to be sent to GCS. - absl::flat_hash_map>> + absl::flat_hash_map>> profile_events_ ABSL_GUARDED_BY(profile_mutex_); /// Stats counter map. @@ -407,6 +443,9 @@ class TaskEventBufferImpl : public TaskEventBuffer { /// process them quick enough. std::atomic grpc_in_progress_ = false; + /// If true, task events are exported for Export API + bool export_event_write_enabled_ = false; + FRIEND_TEST(TaskEventBufferTestManualStart, TestGcsClientFail); FRIEND_TEST(TaskEventBufferTestBatchSend, TestBatchedSend); FRIEND_TEST(TaskEventBufferTest, TestAddEvent); @@ -417,6 +456,7 @@ class TaskEventBufferImpl : public TaskEventBuffer { FRIEND_TEST(TaskEventBufferTestLimitBuffer, TestBufferSizeLimitStatusEvents); FRIEND_TEST(TaskEventBufferTestLimitProfileEvents, TestBufferSizeLimitProfileEvents); FRIEND_TEST(TaskEventBufferTestLimitProfileEvents, TestLimitProfileEventsPerTask); + FRIEND_TEST(TaskEventTestWriteExport, TestWriteTaskExportEvents); }; } // namespace worker diff --git a/src/ray/core_worker/test/task_event_buffer_test.cc b/src/ray/core_worker/test/task_event_buffer_test.cc index 63e24485ec09..02dd448fa921 100644 --- a/src/ray/core_worker/test/task_event_buffer_test.cc +++ b/src/ray/core_worker/test/task_event_buffer_test.cc @@ -16,6 +16,9 @@ #include +#include +#include + #include "absl/base/thread_annotations.h" #include "absl/synchronization/mutex.h" #include "absl/types/optional.h" @@ -24,6 +27,7 @@ #include "mock/ray/gcs/gcs_client/gcs_client.h" #include "ray/common/task/task_spec.h" #include "ray/common/test_util.h" +#include "ray/util/event.h" using ::testing::_; using ::testing::Return; @@ -54,6 +58,7 @@ class TaskEventBufferTest : public ::testing::Test { virtual void TearDown() { if (task_event_buffer_) task_event_buffer_->Stop(); + std::filesystem::remove_all(log_dir_.c_str()); }; std::vector GenTaskIDs(size_t num_tasks) { @@ -124,6 +129,7 @@ class TaskEventBufferTest : public ::testing::Test { } std::unique_ptr task_event_buffer_ = nullptr; + std::string log_dir_ = "event_123"; }; class TaskEventBufferTestManualStart : public TaskEventBufferTest { @@ -174,6 +180,38 @@ class TaskEventBufferTestLimitProfileEvents : public TaskEventBufferTest { } }; +class TaskEventTestWriteExport : public TaskEventBufferTest { + public: + TaskEventTestWriteExport() : TaskEventBufferTest() { + RayConfig::instance().initialize( + R"( +{ + "task_events_report_interval_ms": 1000, + "task_events_max_num_status_events_buffer_on_worker": 10, + "task_events_max_num_profile_events_buffer_on_worker": 5, + "task_events_send_batch_size": 100, + "export_task_events_write_batch_size": 1, + "task_events_max_num_export_status_events_buffer_on_worker": 15, + "enable_export_api_write": true +} + )"); + } +}; + +void ReadContentFromFile(std::vector &vc, + std::string log_file, + std::string filter = "") { + std::string line; + std::ifstream read_file; + read_file.open(log_file, std::ios::binary); + while (std::getline(read_file, line)) { + if (filter.empty() || line.find(filter) != std::string::npos) { + vc.push_back(line); + } + } + read_file.close(); +} + TEST_F(TaskEventBufferTestManualStart, TestGcsClientFail) { ASSERT_NE(task_event_buffer_, nullptr); @@ -249,6 +287,106 @@ TEST_F(TaskEventBufferTest, TestFlushEvents) { ASSERT_EQ(task_event_buffer_->GetNumTaskEventsStored(), 0); } +TEST_F(TaskEventTestWriteExport, TestWriteTaskExportEvents) { + /* + Test writing task events to event_EXPORT_TASK_123.log as part of the export API. + This test verifies the following cases: + 1. Task events that are dropped from being sent to GCS (because more events + added than task_events_max_num_status_events_buffer_on_worker) will still + be written for the export API, if the number of events is less than + task_events_max_num_export_status_events_buffer_on_worker. + 2. Task events over task_events_max_num_status_events_buffer_on_worker + will be dropped and not written for the export API. + 3. Each Flush() call only writes a max of export_task_events_write_batch_size + export events. + In this test, 20 events are added which is greater than the max of 10 status events + that can be stored in buffer. The max export status events in buffer is 15, + so 15 events will be written to event_EXPORT_TASK_123.log. Each Flush() + call will write 1 new event because the batch size is 1. + */ + + // {"task_events_max_num_status_events_buffer_on_worker": 10} and + // {"task_events_max_num_export_status_events_buffer_on_worker": 15} + // in TaskEventTestWriteExport so set num_events > 15. + size_t num_events = 20; + // Value of export_task_events_write_batch_size + size_t batch_size = 1; + // Value of task_events_max_num_export_status_events_buffer_on_worker + size_t max_export_events_on_buffer = 15; + auto task_ids = GenTaskIDs(num_events); + google::protobuf::util::JsonPrintOptions options; + options.preserve_proto_field_names = true; + + std::vector source_types = { + rpc::ExportEvent_SourceType::ExportEvent_SourceType_EXPORT_TASK}; + RayEventInit_(source_types, + absl::flat_hash_map(), + log_dir_, + "warning", + false); + + std::vector> task_events; + for (const auto &task_id : task_ids) { + task_events.push_back(GenStatusTaskEvent(task_id, 0)); + } + + // Convert all task_events that were added with AddTaskEvent + // to the ExportTaskEventData proto message + std::vector> task_event_data_protos; + for (const auto &task_event : task_events) { + std::shared_ptr event = + std::make_shared(); + task_event->ToRpcTaskExportEvents(event); + task_event_data_protos.push_back(event); + } + + // Add all num_events tasks + for (auto &task_event : task_events) { + task_event_buffer_->AddTaskEvent(std::move(task_event)); + } + + // Verify that batch_size events are being written for each flush + std::vector vc; + for (int i = 0; i * batch_size < max_export_events_on_buffer; i++) { + task_event_buffer_->FlushEvents(true); + ReadContentFromFile( + vc, log_dir_ + "/events/event_EXPORT_TASK_" + std::to_string(getpid()) + ".log"); + EXPECT_EQ((int)vc.size(), (i + 1) * batch_size); + vc.clear(); + } + + // Verify that all max_export_events_on_buffer events are written to file even though + // max_export_events_on_buffer > task_events_max_num_status_events_buffer_on_worker + vc.clear(); + ReadContentFromFile( + vc, log_dir_ + "/events/event_EXPORT_TASK_" + std::to_string(getpid()) + ".log"); + EXPECT_EQ((int)vc.size(), max_export_events_on_buffer); + for (size_t i = 0; i < max_export_events_on_buffer; i++) { + json export_event_as_json = json::parse(vc[i]); + EXPECT_EQ(export_event_as_json["source_type"].get(), "EXPORT_TASK"); + EXPECT_EQ(export_event_as_json.contains("event_id"), true); + EXPECT_EQ(export_event_as_json.contains("timestamp"), true); + EXPECT_EQ(export_event_as_json.contains("event_data"), true); + + json event_data = export_event_as_json["event_data"].get(); + + // The events written are the last max_export_events_on_buffer added + // in AddTaskEvent because (num_events-max_export_events_on_buffer) + // were dropped. + std::string expected_event_data_str; + RAY_CHECK(google::protobuf::util::MessageToJsonString( + *task_event_data_protos[i + (num_events - max_export_events_on_buffer)], + &expected_event_data_str, + options) + .ok()); + json expected_event_data = json::parse(expected_event_data_str); + EXPECT_EQ(event_data, expected_event_data); + } + + // Expect no more events. + ASSERT_EQ(task_event_buffer_->GetNumTaskEventsStored(), 0); +} + TEST_F(TaskEventBufferTest, TestFailedFlush) { size_t num_status_events = 20; size_t num_profile_events = 20; diff --git a/src/ray/gcs/pb_util.h b/src/ray/gcs/pb_util.h index 3df62d66116c..92de8dec6915 100644 --- a/src/ray/gcs/pb_util.h +++ b/src/ray/gcs/pb_util.h @@ -21,6 +21,7 @@ #include "ray/common/ray_config.h" #include "ray/common/task/task_spec.h" #include "src/ray/protobuf/autoscaler.pb.h" +#include "src/ray/protobuf/export_api/export_task_event.pb.h" #include "src/ray/protobuf/gcs.pb.h" namespace ray { @@ -247,6 +248,59 @@ inline void FillTaskInfo(rpc::TaskInfoEntry *task_info, } } +// Fill task_info for the export API with task specification from task_spec +inline void FillExportTaskInfo(rpc::ExportTaskEventData::TaskInfoEntry *task_info, + const TaskSpecification &task_spec) { + rpc::TaskType type; + if (task_spec.IsNormalTask()) { + type = rpc::TaskType::NORMAL_TASK; + } else if (task_spec.IsDriverTask()) { + type = rpc::TaskType::DRIVER_TASK; + } else if (task_spec.IsActorCreationTask()) { + type = rpc::TaskType::ACTOR_CREATION_TASK; + task_info->set_actor_id(task_spec.ActorCreationId().Binary()); + } else { + RAY_CHECK(task_spec.IsActorTask()); + type = rpc::TaskType::ACTOR_TASK; + task_info->set_actor_id(task_spec.ActorId().Binary()); + } + task_info->set_type(type); + task_info->set_language(task_spec.GetLanguage()); + task_info->set_func_or_class_name(task_spec.FunctionDescriptor()->CallString()); + + task_info->set_task_id(task_spec.TaskId().Binary()); + // NOTE: we set the parent task id of a task to be submitter's task id, where + // the submitter depends on the owner coreworker's: + // - if the owner coreworker runs a normal task, the submitter's task id is the task id. + // - if the owner coreworker runs an actor, the submitter's task id will be the actor's + // creation task id. + task_info->set_parent_task_id(task_spec.SubmitterTaskId().Binary()); + const auto &resources_map = task_spec.GetRequiredResources().GetResourceMap(); + task_info->mutable_required_resources()->insert(resources_map.begin(), + resources_map.end()); + + auto export_runtime_env_info = task_info->mutable_runtime_env_info(); + export_runtime_env_info->set_serialized_runtime_env( + task_spec.RuntimeEnvInfo().serialized_runtime_env()); + auto export_runtime_env_uris = export_runtime_env_info->mutable_uris(); + export_runtime_env_uris->set_working_dir_uri( + task_spec.RuntimeEnvInfo().uris().working_dir_uri()); + export_runtime_env_uris->mutable_py_modules_uris()->CopyFrom( + task_spec.RuntimeEnvInfo().uris().py_modules_uris()); + auto export_runtime_env_config = export_runtime_env_info->mutable_runtime_env_config(); + export_runtime_env_config->set_setup_timeout_seconds( + task_spec.RuntimeEnvInfo().runtime_env_config().setup_timeout_seconds()); + export_runtime_env_config->set_eager_install( + task_spec.RuntimeEnvInfo().runtime_env_config().eager_install()); + export_runtime_env_config->mutable_log_files()->CopyFrom( + task_spec.RuntimeEnvInfo().runtime_env_config().log_files()); + + const auto &pg_id = task_spec.PlacementGroupBundleId().first; + if (!pg_id.IsNil()) { + task_info->set_placement_group_id(pg_id.Binary()); + } +} + /// Generate a RayErrorInfo from ErrorType inline rpc::RayErrorInfo GetRayErrorInfo(const rpc::ErrorType &error_type, const std::string &error_msg = "") { @@ -327,6 +381,34 @@ inline void FillTaskStatusUpdateTime(const ray::rpc::TaskStatus &task_status, (*state_updates->mutable_state_ts_ns())[task_status] = timestamp; } +/// Fill the rpc::ExportTaskEventData::TaskStateUpdate with the timestamps +/// according to the status change. +/// +/// \param task_status The task status. +/// \param timestamp The timestamp. +/// \param[out] state_updates The state updates with timestamp to be updated. +inline void FillExportTaskStatusUpdateTime( + const ray::rpc::TaskStatus &task_status, + int64_t timestamp, + rpc::ExportTaskEventData::TaskStateUpdate *state_updates) { + if (task_status == rpc::TaskStatus::NIL) { + // Not status change. + return; + } + (*state_updates->mutable_state_ts_ns())[task_status] = timestamp; +} + +/// Convert rpc::TaskLogInfo to rpc::ExportTaskEventData::TaskLogInfo +inline void TaskLogInfoToExport(const rpc::TaskLogInfo &src, + rpc::ExportTaskEventData::TaskLogInfo *dest) { + dest->set_stdout_file(src.stdout_file()); + dest->set_stderr_file(src.stderr_file()); + dest->set_stdout_start(src.stdout_start()); + dest->set_stdout_end(src.stdout_end()); + dest->set_stderr_start(src.stderr_start()); + dest->set_stderr_end(src.stderr_end()); +} + inline std::string FormatPlacementGroupLabelName(const std::string &pg_id) { return kPlacementGroupConstraintKeyPrefix + pg_id; } diff --git a/src/ray/util/event.cc b/src/ray/util/event.cc index 0710157b20fc..aa7aa510cf01 100644 --- a/src/ray/util/event.cc +++ b/src/ray/util/event.cc @@ -204,14 +204,8 @@ void EventManager::Publish(const rpc::Event &event, const json &custom_fields) { void EventManager::PublishExportEvent(const rpc::ExportEvent &export_event) { auto element = export_log_reporter_map_.find(export_event.source_type()); - if (element != export_log_reporter_map_.end()) { - (element->second)->ReportExportEvent(export_event); - } else { - RAY_LOG(FATAL) - << "RayEventInit wasn't called with the necessary source type " - << ExportEvent_SourceType_Name(export_event.source_type()) - << ". This indicates a bug in the code, and the event will be dropped."; - } + RAY_CHECK(element != export_log_reporter_map_.end()); + (element->second)->ReportExportEvent(export_event); } void EventManager::AddReporter(std::shared_ptr reporter) { @@ -428,7 +422,6 @@ void RayExportEvent::SendEvent() { rpc::ExportEvent export_event; export_event.set_event_id(event_id); export_event.set_timestamp(current_sys_time_s()); - if (auto ptr_to_task_event_data_ptr = std::get_if>(&event_data_ptr_)) { export_event.mutable_task_event_data()->CopyFrom(*(*ptr_to_task_event_data_ptr)); @@ -449,7 +442,6 @@ void RayExportEvent::SendEvent() { RAY_LOG(FATAL) << "Invalid event_data type."; return; } - EventManager::Instance().PublishExportEvent(export_event); }