Skip to content

Commit

Permalink
[observability][export-api] Write task events (#47193)
Browse files Browse the repository at this point in the history
Write task events to file as part of the export API. This logic is only run if RayConfig::instance().enable_export_api_write() is true. Default value is false.
All tasks that are added to the task event buffer will be written to file. In addition, keep a dropped_status_events_for_export_ buffer which stores status events that were dropped from the buffer to send to GCS, and write these dropped events to file as well.
The size of dropped_status_events_for_export_ is 10x larger than task_events_max_num_status_events_buffer_on_worker to prioritize recording data. The tradeoff here is memory on each worker, but this is a relatively small overhead, and it is unlikely the dropped events buffer will fill given the sink for export events (write to file) will succeed on each flush.
Task events converted to the export API proto and written to file in a separate thread, which runs this flush operation periodically (every second).
Individual task events will be aggregated by task attempt before being written. This is consistent with the final event sent to GCS, and also helps reduce the number of events written to file.
  • Loading branch information
nikitavemuri authored Sep 5, 2024
1 parent 0392390 commit c95318e
Show file tree
Hide file tree
Showing 7 changed files with 451 additions and 33 deletions.
10 changes: 10 additions & 0 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -498,10 +498,20 @@ RAY_CONFIG(int64_t, task_events_dropped_task_attempts_gc_threshold_s, 15 * 60)
/// workers. Events will be evicted based on a FIFO order.
RAY_CONFIG(uint64_t, task_events_max_num_status_events_buffer_on_worker, 100 * 1000)

/// Max number of task status events that will be stored to export
/// for the export API. Events will be evicted based on a FIFO order.
RAY_CONFIG(uint64_t,
task_events_max_num_export_status_events_buffer_on_worker,
1000 * 1000)

/// Max number of task events to be send in a single message to GCS. This caps both
/// the message size, and also the processing work on GCS.
RAY_CONFIG(uint64_t, task_events_send_batch_size, 10 * 1000)

/// Max number of task events to be written in a single flush iteration. This
/// caps the number of file writes per iteration.
RAY_CONFIG(uint64_t, export_task_events_write_batch_size, 10 * 1000)

/// Max number of profile events allowed to be tracked for a single task.
/// Setting the value to -1 allows unlimited profile events to be tracked.
RAY_CONFIG(int64_t, task_events_max_num_profile_events_per_task, 1000)
Expand Down
3 changes: 2 additions & 1 deletion src/ray/core_worker/core_worker_process.cc
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ CoreWorkerProcessImpl::CoreWorkerProcessImpl(const CoreWorkerOptions &options)
// Initialize event framework.
if (RayConfig::instance().event_log_reporter_enabled() && !options_.log_dir.empty()) {
const std::vector<SourceTypeVariant> source_types = {
ray::rpc::Event_SourceType::Event_SourceType_CORE_WORKER};
ray::rpc::Event_SourceType::Event_SourceType_CORE_WORKER,
ray::rpc::ExportEvent_SourceType::ExportEvent_SourceType_EXPORT_TASK};
RayEventInit(source_types,
absl::flat_hash_map<std::string, std::string>(),
options_.log_dir,
Expand Down
187 changes: 171 additions & 16 deletions src/ray/core_worker/task_event_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "ray/core_worker/task_event_buffer.h"

#include "ray/gcs/pb_util.h"
#include "ray/util/event.h"

namespace ray {
namespace core {
Expand Down Expand Up @@ -108,6 +109,62 @@ void TaskStatusEvent::ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) {
}
}

void TaskStatusEvent::ToRpcTaskExportEvents(
std::shared_ptr<rpc::ExportTaskEventData> rpc_task_export_event_data) {
// Base fields
rpc_task_export_event_data->set_task_id(task_id_.Binary());
rpc_task_export_event_data->set_job_id(job_id_.Binary());
rpc_task_export_event_data->set_attempt_number(attempt_number_);

// Task info.
if (task_spec_) {
gcs::FillExportTaskInfo(rpc_task_export_event_data->mutable_task_info(), *task_spec_);
}

// Task status update.
auto dst_state_update = rpc_task_export_event_data->mutable_state_updates();
gcs::FillExportTaskStatusUpdateTime(task_status_, timestamp_, dst_state_update);

if (!state_update_.has_value()) {
return;
}

if (state_update_->node_id_.has_value()) {
RAY_CHECK(task_status_ == rpc::TaskStatus::SUBMITTED_TO_WORKER)
<< "Node ID should be included when task status changes to "
"SUBMITTED_TO_WORKER.";
dst_state_update->set_node_id(state_update_->node_id_->Binary());
}

if (state_update_->worker_id_.has_value()) {
RAY_CHECK(task_status_ == rpc::TaskStatus::SUBMITTED_TO_WORKER)
<< "Worker ID should be included when task status changes to "
"SUBMITTED_TO_WORKER.";
dst_state_update->set_worker_id(state_update_->worker_id_->Binary());
}

if (state_update_->error_info_.has_value()) {
auto error_info = dst_state_update->mutable_error_info();
error_info->set_error_message((*state_update_->error_info_).error_message());
error_info->set_error_type((*state_update_->error_info_).error_type());
}

if (state_update_->task_log_info_.has_value()) {
rpc::ExportTaskEventData::TaskLogInfo export_task_log_info;
gcs::TaskLogInfoToExport(state_update_->task_log_info_.value(),
&export_task_log_info);
dst_state_update->mutable_task_log_info()->MergeFrom(export_task_log_info);
}

if (state_update_->pid_.has_value()) {
dst_state_update->set_worker_pid(state_update_->pid_.value());
}

if (state_update_->is_debugger_paused_.has_value()) {
dst_state_update->set_is_debugger_paused(state_update_->is_debugger_paused_.value());
}
}

void TaskProfileEvent::ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) {
// Rate limit on the number of profiling events from the task. This is especially the
// case if a driver has many profiling events when submitting tasks
Expand All @@ -127,6 +184,24 @@ void TaskProfileEvent::ToRpcTaskEvents(rpc::TaskEvents *rpc_task_events) {
event_entry->set_extra_data(std::move(extra_data_));
}

void TaskProfileEvent::ToRpcTaskExportEvents(
std::shared_ptr<rpc::ExportTaskEventData> rpc_task_export_event_data) {
auto profile_events = rpc_task_export_event_data->mutable_profile_events();

// Base fields
rpc_task_export_event_data->set_task_id(task_id_.Binary());
rpc_task_export_event_data->set_job_id(job_id_.Binary());
rpc_task_export_event_data->set_attempt_number(attempt_number_);
profile_events->set_component_type(std::move(component_type_));
profile_events->set_component_id(std::move(component_id_));
profile_events->set_node_ip_address(std::move(node_ip_address_));
auto event_entry = profile_events->add_events();
event_entry->set_event_name(std::move(event_name_));
event_entry->set_start_time(start_time_);
event_entry->set_end_time(end_time_);
event_entry->set_extra_data(std::move(extra_data_));
}

TaskEventBufferImpl::TaskEventBufferImpl(std::shared_ptr<gcs::GcsClient> gcs_client)
: work_guard_(boost::asio::make_work_guard(io_service_)),
periodical_runner_(io_service_),
Expand All @@ -137,12 +212,15 @@ TaskEventBufferImpl::~TaskEventBufferImpl() { Stop(); }

Status TaskEventBufferImpl::Start(bool auto_flush) {
absl::MutexLock lock(&mutex_);
export_event_write_enabled_ = RayConfig::instance().enable_export_api_write();
auto report_interval_ms = RayConfig::instance().task_events_report_interval_ms();
RAY_CHECK(report_interval_ms > 0)
<< "RAY_task_events_report_interval_ms should be > 0 to use TaskEventBuffer.";

status_events_.set_capacity(
RayConfig::instance().task_events_max_num_status_events_buffer_on_worker());
status_events_for_export_.set_capacity(
RayConfig::instance().task_events_max_num_export_status_events_buffer_on_worker());

io_thread_ = std::thread([this]() {
#ifndef _WIN32
Expand Down Expand Up @@ -210,10 +288,27 @@ void TaskEventBufferImpl::Stop() {
bool TaskEventBufferImpl::Enabled() const { return enabled_; }

void TaskEventBufferImpl::GetTaskStatusEventsToSend(
std::vector<std::unique_ptr<TaskEvent>> *status_events_to_send,
std::vector<std::shared_ptr<TaskEvent>> *status_events_to_send,
std::vector<std::shared_ptr<TaskEvent>> *status_events_to_write_for_export,
absl::flat_hash_set<TaskAttempt> *dropped_task_attempts_to_send) {
absl::MutexLock lock(&mutex_);

// Get the export events data to write.
if (export_event_write_enabled_) {
size_t num_to_write = std::min(
static_cast<size_t>(RayConfig::instance().export_task_events_write_batch_size()),
static_cast<size_t>(status_events_for_export_.size()));
status_events_to_write_for_export->insert(
status_events_to_write_for_export->end(),
std::make_move_iterator(status_events_for_export_.begin()),
std::make_move_iterator(status_events_for_export_.begin() + num_to_write));
status_events_for_export_.erase(status_events_for_export_.begin(),
status_events_for_export_.begin() + num_to_write);
stats_counter_.Decrement(
TaskEventBufferCounter::kNumTaskStatusEventsForExportAPIStored,
status_events_to_write_for_export->size());
}

// No data to send.
if (status_events_.empty() && dropped_task_attempts_unreported_.empty()) {
return;
Expand Down Expand Up @@ -252,7 +347,7 @@ void TaskEventBufferImpl::GetTaskStatusEventsToSend(
}

void TaskEventBufferImpl::GetTaskProfileEventsToSend(
std::vector<std::unique_ptr<TaskEvent>> *profile_events_to_send) {
std::vector<std::shared_ptr<TaskEvent>> *profile_events_to_send) {
absl::MutexLock lock(&profile_mutex_);

size_t batch_size =
Expand All @@ -278,13 +373,13 @@ void TaskEventBufferImpl::GetTaskProfileEventsToSend(
}

std::unique_ptr<rpc::TaskEventData> TaskEventBufferImpl::CreateDataToSend(
std::vector<std::unique_ptr<TaskEvent>> &&status_events_to_send,
std::vector<std::unique_ptr<TaskEvent>> &&profile_events_to_send,
std::vector<std::shared_ptr<TaskEvent>> &&status_events_to_send,
std::vector<std::shared_ptr<TaskEvent>> &&profile_events_to_send,
absl::flat_hash_set<TaskAttempt> &&dropped_task_attempts_to_send) {
// Aggregate the task events by TaskAttempt.
absl::flat_hash_map<TaskAttempt, rpc::TaskEvents> agg_task_events;
auto to_rpc_event_fn = [this, &agg_task_events, &dropped_task_attempts_to_send](
std::unique_ptr<TaskEvent> &event) {
std::shared_ptr<TaskEvent> &event) {
if (dropped_task_attempts_to_send.count(event->GetTaskAttempt())) {
// We are marking this as data loss due to some missing task status updates.
// We will not send this event to GCS.
Expand Down Expand Up @@ -331,6 +426,45 @@ std::unique_ptr<rpc::TaskEventData> TaskEventBufferImpl::CreateDataToSend(
return data;
}

void TaskEventBufferImpl::WriteExportData(
std::vector<std::shared_ptr<TaskEvent>> &&status_events_to_write_for_export,
std::vector<std::shared_ptr<TaskEvent>> &&profile_events_to_send) {
absl::flat_hash_map<TaskAttempt, std::shared_ptr<rpc::ExportTaskEventData>>
agg_task_events;
// Maintain insertion order to agg_task_events so events are written
// in the same order as the buffer.
std::vector<TaskAttempt> agg_task_event_insertion_order;
auto to_rpc_event_fn = [&agg_task_events, &agg_task_event_insertion_order](
std::shared_ptr<TaskEvent> &event) {
// Aggregate events by task attempt before converting to proto
auto itr = agg_task_events.find(event->GetTaskAttempt());
if (itr == agg_task_events.end()) {
// Insert event into agg_task_events if the task attempt of that
// event wasn't already added.
auto event_for_attempt = std::make_shared<rpc::ExportTaskEventData>();
auto inserted =
agg_task_events.insert({event->GetTaskAttempt(), event_for_attempt});
RAY_CHECK(inserted.second);
agg_task_event_insertion_order.push_back(event->GetTaskAttempt());
event->ToRpcTaskExportEvents(event_for_attempt);
} else {
event->ToRpcTaskExportEvents(itr->second);
}
};

std::for_each(status_events_to_write_for_export.begin(),
status_events_to_write_for_export.end(),
to_rpc_event_fn);
std::for_each(
profile_events_to_send.begin(), profile_events_to_send.end(), to_rpc_event_fn);

for (const auto &task_attempt : agg_task_event_insertion_order) {
auto it = agg_task_events.find(task_attempt);
RAY_CHECK(it != agg_task_events.end());
RayExportEvent(it->second).SendEvent();
}
}

void TaskEventBufferImpl::FlushEvents(bool forced) {
if (!enabled_) {
return;
Expand All @@ -350,13 +484,16 @@ void TaskEventBufferImpl::FlushEvents(bool forced) {
}

// Take out status events from the buffer.
std::vector<std::unique_ptr<TaskEvent>> status_events_to_send;
std::vector<std::shared_ptr<TaskEvent>> status_events_to_send;
std::vector<std::shared_ptr<TaskEvent>> status_events_to_write_for_export;
absl::flat_hash_set<TaskAttempt> dropped_task_attempts_to_send;
status_events_to_send.reserve(RayConfig::instance().task_events_send_batch_size());
GetTaskStatusEventsToSend(&status_events_to_send, &dropped_task_attempts_to_send);
GetTaskStatusEventsToSend(&status_events_to_send,
&status_events_to_write_for_export,
&dropped_task_attempts_to_send);

// Take profile events from the status events.
std::vector<std::unique_ptr<TaskEvent>> profile_events_to_send;
std::vector<std::shared_ptr<TaskEvent>> profile_events_to_send;
profile_events_to_send.reserve(RayConfig::instance().task_events_send_batch_size());
GetTaskProfileEventsToSend(&profile_events_to_send);

Expand All @@ -365,6 +502,10 @@ void TaskEventBufferImpl::FlushEvents(bool forced) {
CreateDataToSend(std::move(status_events_to_send),
std::move(profile_events_to_send),
std::move(dropped_task_attempts_to_send));
if (export_event_write_enabled_) {
WriteExportData(std::move(status_events_to_write_for_export),
std::move(profile_events_to_send));
}

gcs::TaskInfoAccessor *task_accessor;
{
Expand Down Expand Up @@ -438,8 +579,19 @@ void TaskEventBufferImpl::AddTaskStatusEvent(std::unique_ptr<TaskEvent> status_e
if (!enabled_) {
return;
}
std::shared_ptr<TaskEvent> status_event_shared_ptr = std::move(status_event);

if (dropped_task_attempts_unreported_.count(status_event->GetTaskAttempt())) {
if (export_event_write_enabled_) {
// If status_events_for_export_ is full, the oldest event will be
// dropped in the circular buffer and replaced with the current event.
if (!status_events_for_export_.full()) {
stats_counter_.Increment(
TaskEventBufferCounter::kNumTaskStatusEventsForExportAPIStored);
}
status_events_for_export_.push_back(status_event_shared_ptr);
}
if (dropped_task_attempts_unreported_.count(
status_event_shared_ptr->GetTaskAttempt())) {
// This task attempt has been dropped before, so we drop this event.
stats_counter_.Increment(
TaskEventBufferCounter::kNumTaskStatusEventDroppedSinceLastFlush);
Expand All @@ -454,7 +606,7 @@ void TaskEventBufferImpl::AddTaskStatusEvent(std::unique_ptr<TaskEvent> status_e

RAY_LOG_EVERY_N(WARNING, 100000)
<< "Dropping task status events for task: "
<< status_event->GetTaskAttempt().first
<< status_event_shared_ptr->GetTaskAttempt().first
<< ", set a higher value for "
"RAY_task_events_max_num_status_events_buffer_on_worker("
<< RayConfig::instance().task_events_max_num_status_events_buffer_on_worker()
Expand All @@ -466,18 +618,20 @@ void TaskEventBufferImpl::AddTaskStatusEvent(std::unique_ptr<TaskEvent> status_e
} else {
stats_counter_.Increment(TaskEventBufferCounter::kNumTaskStatusEventsStored);
}
status_events_.push_back(std::move(status_event));
status_events_.push_back(status_event_shared_ptr);
}

void TaskEventBufferImpl::AddTaskProfileEvent(std::unique_ptr<TaskEvent> profile_event) {
absl::MutexLock lock(&profile_mutex_);
if (!enabled_) {
return;
}
auto profile_events_itr = profile_events_.find(profile_event->GetTaskAttempt());
std::shared_ptr<TaskEvent> profile_event_shared_ptr = std::move(profile_event);
auto profile_events_itr =
profile_events_.find(profile_event_shared_ptr->GetTaskAttempt());
if (profile_events_itr == profile_events_.end()) {
auto inserted = profile_events_.insert(
{profile_event->GetTaskAttempt(), std::vector<std::unique_ptr<TaskEvent>>()});
auto inserted = profile_events_.insert({profile_event_shared_ptr->GetTaskAttempt(),
std::vector<std::shared_ptr<TaskEvent>>()});
RAY_CHECK(inserted.second);
profile_events_itr = inserted.first;
}
Expand All @@ -501,7 +655,8 @@ void TaskEventBufferImpl::AddTaskProfileEvent(std::unique_ptr<TaskEvent> profile
// driver task id and it could generate large number of profile events when submitting
// many tasks.
RAY_LOG_EVERY_N(WARNING, 100000)
<< "Dropping profiling events for task: " << profile_event->GetTaskAttempt().first
<< "Dropping profiling events for task: "
<< profile_event_shared_ptr->GetTaskAttempt().first
<< ", set a higher value for RAY_task_events_max_num_profile_events_per_task("
<< max_num_profile_event_per_task
<< "), or RAY_task_events_max_num_profile_events_buffer_on_worker ("
Expand All @@ -510,7 +665,7 @@ void TaskEventBufferImpl::AddTaskProfileEvent(std::unique_ptr<TaskEvent> profile
}

stats_counter_.Increment(TaskEventBufferCounter::kNumTaskProfileEventsStored);
profile_events_itr->second.push_back(std::move(profile_event));
profile_events_itr->second.push_back(profile_event_shared_ptr);
}

const std::string TaskEventBufferImpl::DebugString() {
Expand Down
Loading

0 comments on commit c95318e

Please sign in to comment.