diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 849b0b147cffa..7f48c7926c8a2 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -498,10 +498,20 @@ RAY_CONFIG(int64_t, task_events_dropped_task_attempts_gc_threshold_s, 15 * 60) /// workers. Events will be evicted based on a FIFO order. RAY_CONFIG(uint64_t, task_events_max_num_status_events_buffer_on_worker, 100 * 1000) +/// Max number of task status events that will be stored to export +/// for the export API. Events will be evicted based on a FIFO order. +RAY_CONFIG(uint64_t, + task_events_max_num_export_status_events_buffer_on_worker, + 1000 * 1000) + /// Max number of task events to be send in a single message to GCS. This caps both /// the message size, and also the processing work on GCS. RAY_CONFIG(uint64_t, task_events_send_batch_size, 10 * 1000) +/// Max number of task events to be written in a single flush iteration. This +/// caps the number of file writes per iteration. +RAY_CONFIG(uint64_t, export_task_events_write_batch_size, 10 * 1000) + /// Max number of profile events allowed to be tracked for a single task. /// Setting the value to -1 allows unlimited profile events to be tracked. RAY_CONFIG(int64_t, task_events_max_num_profile_events_per_task, 1000) diff --git a/src/ray/core_worker/core_worker_process.cc b/src/ray/core_worker/core_worker_process.cc index d587fcee8b7e5..191788e7e0458 100644 --- a/src/ray/core_worker/core_worker_process.cc +++ b/src/ray/core_worker/core_worker_process.cc @@ -137,7 +137,8 @@ CoreWorkerProcessImpl::CoreWorkerProcessImpl(const CoreWorkerOptions &options) // Initialize event framework. if (RayConfig::instance().event_log_reporter_enabled() && !options_.log_dir.empty()) { const std::vector source_types = { - ray::rpc::Event_SourceType::Event_SourceType_CORE_WORKER}; + ray::rpc::Event_SourceType::Event_SourceType_CORE_WORKER, + ray::rpc::ExportEvent_SourceType::ExportEvent_SourceType_EXPORT_TASK}; RayEventInit(source_types, absl::flat_hash_map(), options_.log_dir, diff --git a/src/ray/core_worker/task_event_buffer.cc b/src/ray/core_worker/task_event_buffer.cc index 115811d3a9996..1971b5efd5639 100644 --- a/src/ray/core_worker/task_event_buffer.cc +++ b/src/ray/core_worker/task_event_buffer.cc @@ -15,6 +15,7 @@ #include "ray/core_worker/task_event_buffer.h" #include "ray/gcs/pb_util.h" +#include "ray/util/event.h" namespace ray { namespace core { @@ -108,6 +109,62 @@ void TaskStatusEvent::ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) { } } +void TaskStatusEvent::ToRpcTaskExportEvents( + std::shared_ptr rpc_task_export_event_data) { + // Base fields + rpc_task_export_event_data->set_task_id(task_id_.Binary()); + rpc_task_export_event_data->set_job_id(job_id_.Binary()); + rpc_task_export_event_data->set_attempt_number(attempt_number_); + + // Task info. + if (task_spec_) { + gcs::FillExportTaskInfo(rpc_task_export_event_data->mutable_task_info(), *task_spec_); + } + + // Task status update. + auto dst_state_update = rpc_task_export_event_data->mutable_state_updates(); + gcs::FillExportTaskStatusUpdateTime(task_status_, timestamp_, dst_state_update); + + if (!state_update_.has_value()) { + return; + } + + if (state_update_->node_id_.has_value()) { + RAY_CHECK(task_status_ == rpc::TaskStatus::SUBMITTED_TO_WORKER) + << "Node ID should be included when task status changes to " + "SUBMITTED_TO_WORKER."; + dst_state_update->set_node_id(state_update_->node_id_->Binary()); + } + + if (state_update_->worker_id_.has_value()) { + RAY_CHECK(task_status_ == rpc::TaskStatus::SUBMITTED_TO_WORKER) + << "Worker ID should be included when task status changes to " + "SUBMITTED_TO_WORKER."; + dst_state_update->set_worker_id(state_update_->worker_id_->Binary()); + } + + if (state_update_->error_info_.has_value()) { + auto error_info = dst_state_update->mutable_error_info(); + error_info->set_error_message((*state_update_->error_info_).error_message()); + error_info->set_error_type((*state_update_->error_info_).error_type()); + } + + if (state_update_->task_log_info_.has_value()) { + rpc::ExportTaskEventData::TaskLogInfo export_task_log_info; + gcs::TaskLogInfoToExport(state_update_->task_log_info_.value(), + &export_task_log_info); + dst_state_update->mutable_task_log_info()->MergeFrom(export_task_log_info); + } + + if (state_update_->pid_.has_value()) { + dst_state_update->set_worker_pid(state_update_->pid_.value()); + } + + if (state_update_->is_debugger_paused_.has_value()) { + dst_state_update->set_is_debugger_paused(state_update_->is_debugger_paused_.value()); + } +} + void TaskProfileEvent::ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) { // Rate limit on the number of profiling events from the task. This is especially the // case if a driver has many profiling events when submitting tasks @@ -127,6 +184,24 @@ void TaskProfileEvent::ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) { event_entry->set_extra_data(std::move(extra_data_)); } +void TaskProfileEvent::ToRpcTaskExportEvents( + std::shared_ptr rpc_task_export_event_data) { + auto profile_events = rpc_task_export_event_data->mutable_profile_events(); + + // Base fields + rpc_task_export_event_data->set_task_id(task_id_.Binary()); + rpc_task_export_event_data->set_job_id(job_id_.Binary()); + rpc_task_export_event_data->set_attempt_number(attempt_number_); + profile_events->set_component_type(std::move(component_type_)); + profile_events->set_component_id(std::move(component_id_)); + profile_events->set_node_ip_address(std::move(node_ip_address_)); + auto event_entry = profile_events->add_events(); + event_entry->set_event_name(std::move(event_name_)); + event_entry->set_start_time(start_time_); + event_entry->set_end_time(end_time_); + event_entry->set_extra_data(std::move(extra_data_)); +} + TaskEventBufferImpl::TaskEventBufferImpl(std::shared_ptr gcs_client) : work_guard_(boost::asio::make_work_guard(io_service_)), periodical_runner_(io_service_), @@ -137,12 +212,15 @@ TaskEventBufferImpl::~TaskEventBufferImpl() { Stop(); } Status TaskEventBufferImpl::Start(bool auto_flush) { absl::MutexLock lock(&mutex_); + export_event_write_enabled_ = RayConfig::instance().enable_export_api_write(); auto report_interval_ms = RayConfig::instance().task_events_report_interval_ms(); RAY_CHECK(report_interval_ms > 0) << "RAY_task_events_report_interval_ms should be > 0 to use TaskEventBuffer."; status_events_.set_capacity( RayConfig::instance().task_events_max_num_status_events_buffer_on_worker()); + status_events_for_export_.set_capacity( + RayConfig::instance().task_events_max_num_export_status_events_buffer_on_worker()); io_thread_ = std::thread([this]() { #ifndef _WIN32 @@ -210,10 +288,27 @@ void TaskEventBufferImpl::Stop() { bool TaskEventBufferImpl::Enabled() const { return enabled_; } void TaskEventBufferImpl::GetTaskStatusEventsToSend( - std::vector> *status_events_to_send, + std::vector> *status_events_to_send, + std::vector> *status_events_to_write_for_export, absl::flat_hash_set *dropped_task_attempts_to_send) { absl::MutexLock lock(&mutex_); + // Get the export events data to write. + if (export_event_write_enabled_) { + size_t num_to_write = std::min( + static_cast(RayConfig::instance().export_task_events_write_batch_size()), + static_cast(status_events_for_export_.size())); + status_events_to_write_for_export->insert( + status_events_to_write_for_export->end(), + std::make_move_iterator(status_events_for_export_.begin()), + std::make_move_iterator(status_events_for_export_.begin() + num_to_write)); + status_events_for_export_.erase(status_events_for_export_.begin(), + status_events_for_export_.begin() + num_to_write); + stats_counter_.Decrement( + TaskEventBufferCounter::kNumTaskStatusEventsForExportAPIStored, + status_events_to_write_for_export->size()); + } + // No data to send. if (status_events_.empty() && dropped_task_attempts_unreported_.empty()) { return; @@ -252,7 +347,7 @@ void TaskEventBufferImpl::GetTaskStatusEventsToSend( } void TaskEventBufferImpl::GetTaskProfileEventsToSend( - std::vector> *profile_events_to_send) { + std::vector> *profile_events_to_send) { absl::MutexLock lock(&profile_mutex_); size_t batch_size = @@ -278,13 +373,13 @@ void TaskEventBufferImpl::GetTaskProfileEventsToSend( } std::unique_ptr TaskEventBufferImpl::CreateDataToSend( - std::vector> &&status_events_to_send, - std::vector> &&profile_events_to_send, + std::vector> &&status_events_to_send, + std::vector> &&profile_events_to_send, absl::flat_hash_set &&dropped_task_attempts_to_send) { // Aggregate the task events by TaskAttempt. absl::flat_hash_map agg_task_events; auto to_rpc_event_fn = [this, &agg_task_events, &dropped_task_attempts_to_send]( - std::unique_ptr &event) { + std::shared_ptr &event) { if (dropped_task_attempts_to_send.count(event->GetTaskAttempt())) { // We are marking this as data loss due to some missing task status updates. // We will not send this event to GCS. @@ -331,6 +426,45 @@ std::unique_ptr TaskEventBufferImpl::CreateDataToSend( return data; } +void TaskEventBufferImpl::WriteExportData( + std::vector> &&status_events_to_write_for_export, + std::vector> &&profile_events_to_send) { + absl::flat_hash_map> + agg_task_events; + // Maintain insertion order to agg_task_events so events are written + // in the same order as the buffer. + std::vector agg_task_event_insertion_order; + auto to_rpc_event_fn = [&agg_task_events, &agg_task_event_insertion_order]( + std::shared_ptr &event) { + // Aggregate events by task attempt before converting to proto + auto itr = agg_task_events.find(event->GetTaskAttempt()); + if (itr == agg_task_events.end()) { + // Insert event into agg_task_events if the task attempt of that + // event wasn't already added. + auto event_for_attempt = std::make_shared(); + auto inserted = + agg_task_events.insert({event->GetTaskAttempt(), event_for_attempt}); + RAY_CHECK(inserted.second); + agg_task_event_insertion_order.push_back(event->GetTaskAttempt()); + event->ToRpcTaskExportEvents(event_for_attempt); + } else { + event->ToRpcTaskExportEvents(itr->second); + } + }; + + std::for_each(status_events_to_write_for_export.begin(), + status_events_to_write_for_export.end(), + to_rpc_event_fn); + std::for_each( + profile_events_to_send.begin(), profile_events_to_send.end(), to_rpc_event_fn); + + for (const auto &task_attempt : agg_task_event_insertion_order) { + auto it = agg_task_events.find(task_attempt); + RAY_CHECK(it != agg_task_events.end()); + RayExportEvent(it->second).SendEvent(); + } +} + void TaskEventBufferImpl::FlushEvents(bool forced) { if (!enabled_) { return; @@ -350,13 +484,16 @@ void TaskEventBufferImpl::FlushEvents(bool forced) { } // Take out status events from the buffer. - std::vector> status_events_to_send; + std::vector> status_events_to_send; + std::vector> status_events_to_write_for_export; absl::flat_hash_set dropped_task_attempts_to_send; status_events_to_send.reserve(RayConfig::instance().task_events_send_batch_size()); - GetTaskStatusEventsToSend(&status_events_to_send, &dropped_task_attempts_to_send); + GetTaskStatusEventsToSend(&status_events_to_send, + &status_events_to_write_for_export, + &dropped_task_attempts_to_send); // Take profile events from the status events. - std::vector> profile_events_to_send; + std::vector> profile_events_to_send; profile_events_to_send.reserve(RayConfig::instance().task_events_send_batch_size()); GetTaskProfileEventsToSend(&profile_events_to_send); @@ -365,6 +502,10 @@ void TaskEventBufferImpl::FlushEvents(bool forced) { CreateDataToSend(std::move(status_events_to_send), std::move(profile_events_to_send), std::move(dropped_task_attempts_to_send)); + if (export_event_write_enabled_) { + WriteExportData(std::move(status_events_to_write_for_export), + std::move(profile_events_to_send)); + } gcs::TaskInfoAccessor *task_accessor; { @@ -438,8 +579,19 @@ void TaskEventBufferImpl::AddTaskStatusEvent(std::unique_ptr status_e if (!enabled_) { return; } + std::shared_ptr status_event_shared_ptr = std::move(status_event); - if (dropped_task_attempts_unreported_.count(status_event->GetTaskAttempt())) { + if (export_event_write_enabled_) { + // If status_events_for_export_ is full, the oldest event will be + // dropped in the circular buffer and replaced with the current event. + if (!status_events_for_export_.full()) { + stats_counter_.Increment( + TaskEventBufferCounter::kNumTaskStatusEventsForExportAPIStored); + } + status_events_for_export_.push_back(status_event_shared_ptr); + } + if (dropped_task_attempts_unreported_.count( + status_event_shared_ptr->GetTaskAttempt())) { // This task attempt has been dropped before, so we drop this event. stats_counter_.Increment( TaskEventBufferCounter::kNumTaskStatusEventDroppedSinceLastFlush); @@ -454,7 +606,7 @@ void TaskEventBufferImpl::AddTaskStatusEvent(std::unique_ptr status_e RAY_LOG_EVERY_N(WARNING, 100000) << "Dropping task status events for task: " - << status_event->GetTaskAttempt().first + << status_event_shared_ptr->GetTaskAttempt().first << ", set a higher value for " "RAY_task_events_max_num_status_events_buffer_on_worker(" << RayConfig::instance().task_events_max_num_status_events_buffer_on_worker() @@ -466,7 +618,7 @@ void TaskEventBufferImpl::AddTaskStatusEvent(std::unique_ptr status_e } else { stats_counter_.Increment(TaskEventBufferCounter::kNumTaskStatusEventsStored); } - status_events_.push_back(std::move(status_event)); + status_events_.push_back(status_event_shared_ptr); } void TaskEventBufferImpl::AddTaskProfileEvent(std::unique_ptr profile_event) { @@ -474,10 +626,12 @@ void TaskEventBufferImpl::AddTaskProfileEvent(std::unique_ptr profile if (!enabled_) { return; } - auto profile_events_itr = profile_events_.find(profile_event->GetTaskAttempt()); + std::shared_ptr profile_event_shared_ptr = std::move(profile_event); + auto profile_events_itr = + profile_events_.find(profile_event_shared_ptr->GetTaskAttempt()); if (profile_events_itr == profile_events_.end()) { - auto inserted = profile_events_.insert( - {profile_event->GetTaskAttempt(), std::vector>()}); + auto inserted = profile_events_.insert({profile_event_shared_ptr->GetTaskAttempt(), + std::vector>()}); RAY_CHECK(inserted.second); profile_events_itr = inserted.first; } @@ -501,7 +655,8 @@ void TaskEventBufferImpl::AddTaskProfileEvent(std::unique_ptr profile // driver task id and it could generate large number of profile events when submitting // many tasks. RAY_LOG_EVERY_N(WARNING, 100000) - << "Dropping profiling events for task: " << profile_event->GetTaskAttempt().first + << "Dropping profiling events for task: " + << profile_event_shared_ptr->GetTaskAttempt().first << ", set a higher value for RAY_task_events_max_num_profile_events_per_task(" << max_num_profile_event_per_task << "), or RAY_task_events_max_num_profile_events_buffer_on_worker (" @@ -510,7 +665,7 @@ void TaskEventBufferImpl::AddTaskProfileEvent(std::unique_ptr profile } stats_counter_.Increment(TaskEventBufferCounter::kNumTaskProfileEventsStored); - profile_events_itr->second.push_back(std::move(profile_event)); + profile_events_itr->second.push_back(profile_event_shared_ptr); } const std::string TaskEventBufferImpl::DebugString() { diff --git a/src/ray/core_worker/task_event_buffer.h b/src/ray/core_worker/task_event_buffer.h index 30dbb780cab04..a2783521a6121 100644 --- a/src/ray/core_worker/task_event_buffer.h +++ b/src/ray/core_worker/task_event_buffer.h @@ -27,6 +27,7 @@ #include "ray/common/task/task_spec.h" #include "ray/gcs/gcs_client/gcs_client.h" #include "ray/util/counter_map.h" +#include "src/ray/protobuf/export_api/export_task_event.pb.h" #include "src/ray/protobuf/gcs.pb.h" namespace ray { @@ -57,6 +58,12 @@ class TaskEvent { /// \param[out] rpc_task_events The rpc task event to be filled. virtual void ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) = 0; + /// Convert itself a rpc::ExportTaskEventData + /// + /// \param[out] rpc_task_export_event_data The rpc export task event data to be filled. + virtual void ToRpcTaskExportEvents( + std::shared_ptr rpc_task_export_event_data) = 0; + /// If it is a profile event. virtual bool IsProfileEvent() const = 0; @@ -126,6 +133,9 @@ class TaskStatusEvent : public TaskEvent { void ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) override; + void ToRpcTaskExportEvents( + std::shared_ptr rpc_task_export_event_data) override; + bool IsProfileEvent() const override { return false; } private: @@ -153,6 +163,9 @@ class TaskProfileEvent : public TaskEvent { void ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) override; + void ToRpcTaskExportEvents( + std::shared_ptr rpc_task_export_event_data) override; + bool IsProfileEvent() const override { return true; } void SetEndTime(int64_t end_time) { end_time_ = end_time; } @@ -177,6 +190,7 @@ enum TaskEventBufferCounter { kNumTaskProfileEventsStored, kNumTaskStatusEventsStored, kNumDroppedTaskAttemptsStored, + kNumTaskStatusEventsForExportAPIStored, kTotalNumTaskProfileEventDropped, kTotalNumTaskStatusEventDropped, kTotalNumTaskAttemptsReported, @@ -295,10 +309,15 @@ class TaskEventBufferImpl : public TaskEventBuffer { /// Get data related to task status events to be send to GCS. /// /// \param[out] status_events_to_send Task status events to be sent. + /// \param[out] status_events_to_write_for_export Task status events that will + /// be written to the Export API. This includes both status events + /// that are sent to GCS, and as many dropped status events that + /// fit in the buffer. /// \param[out] dropped_task_attempts_to_send Task attempts that were dropped due to /// status events being dropped. void GetTaskStatusEventsToSend( - std::vector> *status_events_to_send, + std::vector> *status_events_to_send, + std::vector> *status_events_to_write_for_export, absl::flat_hash_set *dropped_task_attempts_to_send) ABSL_LOCKS_EXCLUDED(mutex_); @@ -306,7 +325,7 @@ class TaskEventBufferImpl : public TaskEventBuffer { /// /// \param[out] profile_events_to_send Task profile events to be sent. void GetTaskProfileEventsToSend( - std::vector> *profile_events_to_send) + std::vector> *profile_events_to_send) ABSL_LOCKS_EXCLUDED(profile_mutex_); /// Get the task events to GCS. @@ -317,10 +336,21 @@ class TaskEventBufferImpl : public TaskEventBuffer { /// status events being dropped. /// \return A unique_ptr to rpc::TaskEvents to be sent to GCS. std::unique_ptr CreateDataToSend( - std::vector> &&status_events_to_send, - std::vector> &&profile_events_to_send, + std::vector> &&status_events_to_send, + std::vector> &&profile_events_to_send, absl::flat_hash_set &&dropped_task_attempts_to_send); + /// Write task events for the Export API. + /// + /// \param status_events_to_write_for_export Task status events that will + /// be written to the Export API. This includes both status events + /// that are sent to GCS, and as many dropped status events that + /// fit in the buffer. + /// \param profile_events_to_send Task profile events to be written. + void WriteExportData( + std::vector> &&status_events_to_write_for_export, + std::vector> &&profile_events_to_send); + /// Reset the counters during flushing data to GCS. void ResetCountersForFlush(); @@ -387,7 +417,13 @@ class TaskEventBufferImpl : public TaskEventBuffer { std::atomic enabled_ = false; /// Circular buffered task status events. - boost::circular_buffer> status_events_ + boost::circular_buffer> status_events_ + ABSL_GUARDED_BY(mutex_); + + /// Status events that will be written for the export API. This could + /// contain events that were dropped from being sent to GCS. A circular + /// buffer is used to limit memory. + boost::circular_buffer> status_events_for_export_ ABSL_GUARDED_BY(mutex_); /// Buffered task attempts that were dropped due to status events being dropped. @@ -396,7 +432,7 @@ class TaskEventBufferImpl : public TaskEventBuffer { ABSL_GUARDED_BY(mutex_); /// Buffered task profile events. A FIFO queue to be sent to GCS. - absl::flat_hash_map>> + absl::flat_hash_map>> profile_events_ ABSL_GUARDED_BY(profile_mutex_); /// Stats counter map. @@ -407,6 +443,9 @@ class TaskEventBufferImpl : public TaskEventBuffer { /// process them quick enough. std::atomic grpc_in_progress_ = false; + /// If true, task events are exported for Export API + bool export_event_write_enabled_ = false; + FRIEND_TEST(TaskEventBufferTestManualStart, TestGcsClientFail); FRIEND_TEST(TaskEventBufferTestBatchSend, TestBatchedSend); FRIEND_TEST(TaskEventBufferTest, TestAddEvent); @@ -417,6 +456,7 @@ class TaskEventBufferImpl : public TaskEventBuffer { FRIEND_TEST(TaskEventBufferTestLimitBuffer, TestBufferSizeLimitStatusEvents); FRIEND_TEST(TaskEventBufferTestLimitProfileEvents, TestBufferSizeLimitProfileEvents); FRIEND_TEST(TaskEventBufferTestLimitProfileEvents, TestLimitProfileEventsPerTask); + FRIEND_TEST(TaskEventTestWriteExport, TestWriteTaskExportEvents); }; } // namespace worker diff --git a/src/ray/core_worker/test/task_event_buffer_test.cc b/src/ray/core_worker/test/task_event_buffer_test.cc index 63e24485ec098..02dd448fa921e 100644 --- a/src/ray/core_worker/test/task_event_buffer_test.cc +++ b/src/ray/core_worker/test/task_event_buffer_test.cc @@ -16,6 +16,9 @@ #include +#include +#include + #include "absl/base/thread_annotations.h" #include "absl/synchronization/mutex.h" #include "absl/types/optional.h" @@ -24,6 +27,7 @@ #include "mock/ray/gcs/gcs_client/gcs_client.h" #include "ray/common/task/task_spec.h" #include "ray/common/test_util.h" +#include "ray/util/event.h" using ::testing::_; using ::testing::Return; @@ -54,6 +58,7 @@ class TaskEventBufferTest : public ::testing::Test { virtual void TearDown() { if (task_event_buffer_) task_event_buffer_->Stop(); + std::filesystem::remove_all(log_dir_.c_str()); }; std::vector GenTaskIDs(size_t num_tasks) { @@ -124,6 +129,7 @@ class TaskEventBufferTest : public ::testing::Test { } std::unique_ptr task_event_buffer_ = nullptr; + std::string log_dir_ = "event_123"; }; class TaskEventBufferTestManualStart : public TaskEventBufferTest { @@ -174,6 +180,38 @@ class TaskEventBufferTestLimitProfileEvents : public TaskEventBufferTest { } }; +class TaskEventTestWriteExport : public TaskEventBufferTest { + public: + TaskEventTestWriteExport() : TaskEventBufferTest() { + RayConfig::instance().initialize( + R"( +{ + "task_events_report_interval_ms": 1000, + "task_events_max_num_status_events_buffer_on_worker": 10, + "task_events_max_num_profile_events_buffer_on_worker": 5, + "task_events_send_batch_size": 100, + "export_task_events_write_batch_size": 1, + "task_events_max_num_export_status_events_buffer_on_worker": 15, + "enable_export_api_write": true +} + )"); + } +}; + +void ReadContentFromFile(std::vector &vc, + std::string log_file, + std::string filter = "") { + std::string line; + std::ifstream read_file; + read_file.open(log_file, std::ios::binary); + while (std::getline(read_file, line)) { + if (filter.empty() || line.find(filter) != std::string::npos) { + vc.push_back(line); + } + } + read_file.close(); +} + TEST_F(TaskEventBufferTestManualStart, TestGcsClientFail) { ASSERT_NE(task_event_buffer_, nullptr); @@ -249,6 +287,106 @@ TEST_F(TaskEventBufferTest, TestFlushEvents) { ASSERT_EQ(task_event_buffer_->GetNumTaskEventsStored(), 0); } +TEST_F(TaskEventTestWriteExport, TestWriteTaskExportEvents) { + /* + Test writing task events to event_EXPORT_TASK_123.log as part of the export API. + This test verifies the following cases: + 1. Task events that are dropped from being sent to GCS (because more events + added than task_events_max_num_status_events_buffer_on_worker) will still + be written for the export API, if the number of events is less than + task_events_max_num_export_status_events_buffer_on_worker. + 2. Task events over task_events_max_num_status_events_buffer_on_worker + will be dropped and not written for the export API. + 3. Each Flush() call only writes a max of export_task_events_write_batch_size + export events. + In this test, 20 events are added which is greater than the max of 10 status events + that can be stored in buffer. The max export status events in buffer is 15, + so 15 events will be written to event_EXPORT_TASK_123.log. Each Flush() + call will write 1 new event because the batch size is 1. + */ + + // {"task_events_max_num_status_events_buffer_on_worker": 10} and + // {"task_events_max_num_export_status_events_buffer_on_worker": 15} + // in TaskEventTestWriteExport so set num_events > 15. + size_t num_events = 20; + // Value of export_task_events_write_batch_size + size_t batch_size = 1; + // Value of task_events_max_num_export_status_events_buffer_on_worker + size_t max_export_events_on_buffer = 15; + auto task_ids = GenTaskIDs(num_events); + google::protobuf::util::JsonPrintOptions options; + options.preserve_proto_field_names = true; + + std::vector source_types = { + rpc::ExportEvent_SourceType::ExportEvent_SourceType_EXPORT_TASK}; + RayEventInit_(source_types, + absl::flat_hash_map(), + log_dir_, + "warning", + false); + + std::vector> task_events; + for (const auto &task_id : task_ids) { + task_events.push_back(GenStatusTaskEvent(task_id, 0)); + } + + // Convert all task_events that were added with AddTaskEvent + // to the ExportTaskEventData proto message + std::vector> task_event_data_protos; + for (const auto &task_event : task_events) { + std::shared_ptr event = + std::make_shared(); + task_event->ToRpcTaskExportEvents(event); + task_event_data_protos.push_back(event); + } + + // Add all num_events tasks + for (auto &task_event : task_events) { + task_event_buffer_->AddTaskEvent(std::move(task_event)); + } + + // Verify that batch_size events are being written for each flush + std::vector vc; + for (int i = 0; i * batch_size < max_export_events_on_buffer; i++) { + task_event_buffer_->FlushEvents(true); + ReadContentFromFile( + vc, log_dir_ + "/events/event_EXPORT_TASK_" + std::to_string(getpid()) + ".log"); + EXPECT_EQ((int)vc.size(), (i + 1) * batch_size); + vc.clear(); + } + + // Verify that all max_export_events_on_buffer events are written to file even though + // max_export_events_on_buffer > task_events_max_num_status_events_buffer_on_worker + vc.clear(); + ReadContentFromFile( + vc, log_dir_ + "/events/event_EXPORT_TASK_" + std::to_string(getpid()) + ".log"); + EXPECT_EQ((int)vc.size(), max_export_events_on_buffer); + for (size_t i = 0; i < max_export_events_on_buffer; i++) { + json export_event_as_json = json::parse(vc[i]); + EXPECT_EQ(export_event_as_json["source_type"].get(), "EXPORT_TASK"); + EXPECT_EQ(export_event_as_json.contains("event_id"), true); + EXPECT_EQ(export_event_as_json.contains("timestamp"), true); + EXPECT_EQ(export_event_as_json.contains("event_data"), true); + + json event_data = export_event_as_json["event_data"].get(); + + // The events written are the last max_export_events_on_buffer added + // in AddTaskEvent because (num_events-max_export_events_on_buffer) + // were dropped. + std::string expected_event_data_str; + RAY_CHECK(google::protobuf::util::MessageToJsonString( + *task_event_data_protos[i + (num_events - max_export_events_on_buffer)], + &expected_event_data_str, + options) + .ok()); + json expected_event_data = json::parse(expected_event_data_str); + EXPECT_EQ(event_data, expected_event_data); + } + + // Expect no more events. + ASSERT_EQ(task_event_buffer_->GetNumTaskEventsStored(), 0); +} + TEST_F(TaskEventBufferTest, TestFailedFlush) { size_t num_status_events = 20; size_t num_profile_events = 20; diff --git a/src/ray/gcs/pb_util.h b/src/ray/gcs/pb_util.h index 3df62d66116c4..92de8dec69158 100644 --- a/src/ray/gcs/pb_util.h +++ b/src/ray/gcs/pb_util.h @@ -21,6 +21,7 @@ #include "ray/common/ray_config.h" #include "ray/common/task/task_spec.h" #include "src/ray/protobuf/autoscaler.pb.h" +#include "src/ray/protobuf/export_api/export_task_event.pb.h" #include "src/ray/protobuf/gcs.pb.h" namespace ray { @@ -247,6 +248,59 @@ inline void FillTaskInfo(rpc::TaskInfoEntry *task_info, } } +// Fill task_info for the export API with task specification from task_spec +inline void FillExportTaskInfo(rpc::ExportTaskEventData::TaskInfoEntry *task_info, + const TaskSpecification &task_spec) { + rpc::TaskType type; + if (task_spec.IsNormalTask()) { + type = rpc::TaskType::NORMAL_TASK; + } else if (task_spec.IsDriverTask()) { + type = rpc::TaskType::DRIVER_TASK; + } else if (task_spec.IsActorCreationTask()) { + type = rpc::TaskType::ACTOR_CREATION_TASK; + task_info->set_actor_id(task_spec.ActorCreationId().Binary()); + } else { + RAY_CHECK(task_spec.IsActorTask()); + type = rpc::TaskType::ACTOR_TASK; + task_info->set_actor_id(task_spec.ActorId().Binary()); + } + task_info->set_type(type); + task_info->set_language(task_spec.GetLanguage()); + task_info->set_func_or_class_name(task_spec.FunctionDescriptor()->CallString()); + + task_info->set_task_id(task_spec.TaskId().Binary()); + // NOTE: we set the parent task id of a task to be submitter's task id, where + // the submitter depends on the owner coreworker's: + // - if the owner coreworker runs a normal task, the submitter's task id is the task id. + // - if the owner coreworker runs an actor, the submitter's task id will be the actor's + // creation task id. + task_info->set_parent_task_id(task_spec.SubmitterTaskId().Binary()); + const auto &resources_map = task_spec.GetRequiredResources().GetResourceMap(); + task_info->mutable_required_resources()->insert(resources_map.begin(), + resources_map.end()); + + auto export_runtime_env_info = task_info->mutable_runtime_env_info(); + export_runtime_env_info->set_serialized_runtime_env( + task_spec.RuntimeEnvInfo().serialized_runtime_env()); + auto export_runtime_env_uris = export_runtime_env_info->mutable_uris(); + export_runtime_env_uris->set_working_dir_uri( + task_spec.RuntimeEnvInfo().uris().working_dir_uri()); + export_runtime_env_uris->mutable_py_modules_uris()->CopyFrom( + task_spec.RuntimeEnvInfo().uris().py_modules_uris()); + auto export_runtime_env_config = export_runtime_env_info->mutable_runtime_env_config(); + export_runtime_env_config->set_setup_timeout_seconds( + task_spec.RuntimeEnvInfo().runtime_env_config().setup_timeout_seconds()); + export_runtime_env_config->set_eager_install( + task_spec.RuntimeEnvInfo().runtime_env_config().eager_install()); + export_runtime_env_config->mutable_log_files()->CopyFrom( + task_spec.RuntimeEnvInfo().runtime_env_config().log_files()); + + const auto &pg_id = task_spec.PlacementGroupBundleId().first; + if (!pg_id.IsNil()) { + task_info->set_placement_group_id(pg_id.Binary()); + } +} + /// Generate a RayErrorInfo from ErrorType inline rpc::RayErrorInfo GetRayErrorInfo(const rpc::ErrorType &error_type, const std::string &error_msg = "") { @@ -327,6 +381,34 @@ inline void FillTaskStatusUpdateTime(const ray::rpc::TaskStatus &task_status, (*state_updates->mutable_state_ts_ns())[task_status] = timestamp; } +/// Fill the rpc::ExportTaskEventData::TaskStateUpdate with the timestamps +/// according to the status change. +/// +/// \param task_status The task status. +/// \param timestamp The timestamp. +/// \param[out] state_updates The state updates with timestamp to be updated. +inline void FillExportTaskStatusUpdateTime( + const ray::rpc::TaskStatus &task_status, + int64_t timestamp, + rpc::ExportTaskEventData::TaskStateUpdate *state_updates) { + if (task_status == rpc::TaskStatus::NIL) { + // Not status change. + return; + } + (*state_updates->mutable_state_ts_ns())[task_status] = timestamp; +} + +/// Convert rpc::TaskLogInfo to rpc::ExportTaskEventData::TaskLogInfo +inline void TaskLogInfoToExport(const rpc::TaskLogInfo &src, + rpc::ExportTaskEventData::TaskLogInfo *dest) { + dest->set_stdout_file(src.stdout_file()); + dest->set_stderr_file(src.stderr_file()); + dest->set_stdout_start(src.stdout_start()); + dest->set_stdout_end(src.stdout_end()); + dest->set_stderr_start(src.stderr_start()); + dest->set_stderr_end(src.stderr_end()); +} + inline std::string FormatPlacementGroupLabelName(const std::string &pg_id) { return kPlacementGroupConstraintKeyPrefix + pg_id; } diff --git a/src/ray/util/event.cc b/src/ray/util/event.cc index 0710157b20fca..aa7aa510cf01a 100644 --- a/src/ray/util/event.cc +++ b/src/ray/util/event.cc @@ -204,14 +204,8 @@ void EventManager::Publish(const rpc::Event &event, const json &custom_fields) { void EventManager::PublishExportEvent(const rpc::ExportEvent &export_event) { auto element = export_log_reporter_map_.find(export_event.source_type()); - if (element != export_log_reporter_map_.end()) { - (element->second)->ReportExportEvent(export_event); - } else { - RAY_LOG(FATAL) - << "RayEventInit wasn't called with the necessary source type " - << ExportEvent_SourceType_Name(export_event.source_type()) - << ". This indicates a bug in the code, and the event will be dropped."; - } + RAY_CHECK(element != export_log_reporter_map_.end()); + (element->second)->ReportExportEvent(export_event); } void EventManager::AddReporter(std::shared_ptr reporter) { @@ -428,7 +422,6 @@ void RayExportEvent::SendEvent() { rpc::ExportEvent export_event; export_event.set_event_id(event_id); export_event.set_timestamp(current_sys_time_s()); - if (auto ptr_to_task_event_data_ptr = std::get_if>(&event_data_ptr_)) { export_event.mutable_task_event_data()->CopyFrom(*(*ptr_to_task_event_data_ptr)); @@ -449,7 +442,6 @@ void RayExportEvent::SendEvent() { RAY_LOG(FATAL) << "Invalid event_data type."; return; } - EventManager::Instance().PublishExportEvent(export_event); }