Skip to content

Commit

Permalink
gauge for running and counter for completed
Browse files Browse the repository at this point in the history
Signed-off-by: dentiny <dentinyhao@gmail.com>
  • Loading branch information
dentiny committed Sep 24, 2024
1 parent 9e1f214 commit 208482a
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 21 deletions.
27 changes: 14 additions & 13 deletions src/ray/gcs/gcs_server/gcs_job_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,16 @@
namespace ray {
namespace gcs {

namespace {
// Job state string used for metrics upload.
constexpr const char *const kRunningJobState = "RUNNING";
constexpr const char *const kFinishedJobState = "FINISHED";
} // namespace

void GcsJobManager::Initialize(const GcsInitData &gcs_init_data) {
for (auto &pair : gcs_init_data.Jobs()) {
const auto &job_id = pair.first;
const auto &job_table_data = pair.second;
for (const auto &[job_id, job_table_data] : gcs_init_data.Jobs()) {
cached_job_configs_[job_id] =
std::make_shared<rpc::JobConfig>(job_table_data.config());
function_manager_.AddJobReference(job_id);

// Recover [running_job_ids_] from storage.
if (!job_table_data.is_dead()) {
running_job_ids_.insert(job_id);
}
}
}

Expand Down Expand Up @@ -122,8 +119,6 @@ void GcsJobManager::HandleAddJob(rpc::AddJobRequest request,
} else {
const bool insert_suc = running_job_ids_.insert(job_id).second;
RAY_CHECK(insert_suc) << job_id.Hex() << " already inserted.";
ray::stats::STATS_jobs.Record(running_job_ids_.size(),
{{"State", kRunningJobState}, {"JobId", job_id.Hex()}});
}
}

Expand Down Expand Up @@ -156,8 +151,6 @@ void GcsJobManager::MarkJobAsFinished(rpc::JobTableData job_table_data,
auto iter = running_job_ids_.find(job_id);
RAY_CHECK(iter != running_job_ids_.end());
running_job_ids_.erase(iter);
ray::stats::STATS_jobs.Record(
running_job_ids_.size(), {{"State", kFinishedJobState}, {"JobId", job_id.Hex()}});
}
}

Expand All @@ -184,6 +177,8 @@ void GcsJobManager::HandleMarkJobFinished(rpc::MarkJobFinishedRequest request,
});
if (!status.ok()) {
send_reply(status);
} else {
++new_finished_jobs_;
}
}

Expand Down Expand Up @@ -440,5 +435,11 @@ void GcsJobManager::OnNodeDead(const NodeID &node_id) {
RAY_CHECK_OK(gcs_table_storage_->JobTable().GetAll(on_done));
}

void GcsJobManager::RecordMetrics() {
ray::stats::STATS_running_jobs.Record(running_job_ids_.size());
ray::stats::STATS_finished_jobs.Record(new_finished_jobs_);
new_finished_jobs_ = 0;
}

} // namespace gcs
} // namespace ray
10 changes: 10 additions & 0 deletions src/ray/gcs/gcs_server/gcs_job_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include <cstdint>
#include <functional>
#include <memory>
#include <string>
Expand Down Expand Up @@ -95,6 +96,11 @@ class GcsJobManager : public rpc::JobInfoHandler {

void WriteDriverJobExportEvent(rpc::JobTableData job_data) const;

/// Record metrics.
/// For job manager, (1) running jobs count gauge and (2) new finished jobs (whether
/// succeed or fail) will be reported periodically.
void RecordMetrics();

private:
void ClearJobInfos(const rpc::JobTableData &job_data);

Expand All @@ -104,6 +110,10 @@ class GcsJobManager : public rpc::JobInfoHandler {
// Job ids, which are running.
absl::flat_hash_set<JobID> running_job_ids_;

// Used to report metrics, which indicates the number of jobs finished between two
// metrics reporting events.
int32_t new_finished_jobs_ = 0;

std::shared_ptr<GcsTableStorage> gcs_table_storage_;
std::shared_ptr<GcsPublisher> gcs_publisher_;

Expand Down
1 change: 1 addition & 0 deletions src/ray/gcs/gcs_server/gcs_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,7 @@ void GcsServer::RecordMetrics() const {
gcs_actor_manager_->RecordMetrics();
gcs_placement_group_manager_->RecordMetrics();
gcs_task_manager_->RecordMetrics();
gcs_job_manager_->RecordMetrics();
execute_after(
main_service_,
[this] { RecordMetrics(); },
Expand Down
18 changes: 12 additions & 6 deletions src/ray/stats/metric_defs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,21 @@ DEFINE_stats(actors,
(),
ray::stats::GAUGE);

/// Track job by state, including RUNNING, FINISHED.
DEFINE_stats(jobs,
"Current number of jobs currently in a particular state.",
// State: latest state for the particular job.
// JobId: ID in hex format for this job.
("State", "JobId"),
/// Job related stats.
DEFINE_stats(running_jobs,
"Number of jobs currently running.",
/*tags=*/(),
/*buckets=*/(),
ray::stats::GAUGE);

DEFINE_stats(finished_jobs,
"New finished jobs number.",
// TODO(hjiang): Consider adding task completion status, for example, failed,
// completed in tags.
/*tags=*/(),
/*buckets=*/(),
ray::stats::COUNT);

/// Logical resource usage reported by raylets.
DEFINE_stats(resources,
// TODO(sang): Support placement_group_reserved_available | used
Expand Down
5 changes: 3 additions & 2 deletions src/ray/stats/metric_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ DECLARE_stats(tasks);
/// Actor stats, broken down by state.
DECLARE_stats(actors);

/// Job stats, broken down by state.
DECLARE_stats(jobs);
/// Job stats.
DECLARE_stats(running_jobs);
DECLARE_stats(finished_jobs);

/// Placement group stats, broken down by state.
DECLARE_stats(placement_groups);
Expand Down

0 comments on commit 208482a

Please sign in to comment.