Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: avoid overscheduling CPUs in presence of MT jobs #5257

Merged
merged 2 commits into from
Jun 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 46 additions & 51 deletions client/cpu_sched.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1087,11 +1087,14 @@ static inline bool more_important(RESULT* r0, RESULT* r1) {
}

static void print_job_list(vector<RESULT*>& jobs) {
char buf[256];
for (unsigned int i=0; i<jobs.size(); i++) {
RESULT* rp = jobs[i];
rp->rsc_string(buf, 256);
msg_printf(rp->project, MSG_INFO,
"[cpu_sched_debug] %d: %s (MD: %s; UTS: %s)",
"[cpu_sched_debug] %d: %s (%s; MD: %s; UTS: %s)",
i, rp->name,
buf,
rp->edf_scheduled?"yes":"no",
rp->unfinished_time_slice?"yes":"no"
);
Expand Down Expand Up @@ -1122,18 +1125,21 @@ void CLIENT_STATE::append_unfinished_time_slice(vector<RESULT*> &run_list) {

// Enforce the CPU schedule.
// Inputs:
// ordered_scheduled_results
// List of tasks that should (ideally) run, set by schedule_cpus().
// Most important tasks (e.g. early deadline) are first.
// The set of tasks that actually run may be different:
// - if a task hasn't checkpointed recently we avoid preempting it
// - we don't run tasks that would exceed working-set limits
// Details:
// Initially, each task's scheduler_state is PREEMPTED or SCHEDULED
// depending on whether or not it is running.
// This function sets each task's next_scheduler_state,
// and at the end it starts/resumes and preempts tasks
// based on scheduler_state and next_scheduler_state.
// run_list: list of runnable jobs, ordered by decreasing project priority
// (created by make_run_list())
// Doesn't include all jobs, but enough to fill CPUs even in MT scenarios.
//
// - append running jobs that haven't finished their time slice
// - order the list by "important" (which includes various factor)
// - then scan the list and run jobs
// - until we've used all resources
// - skip jobs that would exceed mem limits
//
// Initially, each task's scheduler_state is PREEMPTED or SCHEDULED
// depending on whether or not it is running.
// This function sets each task's next_scheduler_state,
// and at the end it starts/resumes and preempts tasks
// based on scheduler_state and next_scheduler_state.
//
bool CLIENT_STATE::enforce_run_list(vector<RESULT*>& run_list) {
unsigned int i;
Expand Down Expand Up @@ -1175,11 +1181,11 @@ bool CLIENT_STATE::enforce_run_list(vector<RESULT*>& run_list) {
rp->unfinished_time_slice = false;
}

// append running jobs not done with time slice to the to-run list
// add running jobs not done with time slice to the run list
//
append_unfinished_time_slice(run_list);

// sort to-run list by decreasing importance
// sort run list by decreasing importance
//
std::sort(
run_list.begin(),
Expand Down Expand Up @@ -1207,7 +1213,6 @@ bool CLIENT_STATE::enforce_run_list(vector<RESULT*>& run_list) {
}

// schedule non-CPU-intensive tasks,
// and look for backed-off GPU jobs
//
for (i=0; i<results.size(); i++) {
RESULT* rp = results[i];
Expand All @@ -1233,10 +1238,8 @@ bool CLIENT_STATE::enforce_run_list(vector<RESULT*>& run_list) {
// and prune those that can't be assigned
//
assign_coprocs(run_list);
//bool scheduled_mt = false;

// prune jobs that don't fit in RAM or that exceed CPU usage limits.
// Mark the rest as SCHEDULED
// scan the run list
//
for (i=0; i<run_list.size(); i++) {
RESULT* rp = run_list[i];
Expand All @@ -1253,8 +1256,7 @@ bool CLIENT_STATE::enforce_run_list(vector<RESULT*>& run_list) {

atp = lookup_active_task_by_result(rp);

// if we're already using all the CPUs,
// don't allow additional CPU jobs;
// if we're already using all the CPUs, don't allow additional CPU jobs;
// allow coproc jobs if the resulting CPU load is at most ncpus+1
//
if (ncpus_used >= n_usable_cpus) {
Expand All @@ -1280,23 +1282,16 @@ bool CLIENT_STATE::enforce_run_list(vector<RESULT*>& run_list) {
}
}

#if 0
// Don't overcommit CPUs by > 1 if a MT job is scheduled.
// Skip this check for coproc jobs.
//
if (!rp->uses_coprocs()
&& (scheduled_mt || (rp->avp->avg_ncpus > 1))
&& (ncpus_used + rp->avp->avg_ncpus > ncpus + 1)
) {
if (log_flags.cpu_sched_debug) {
msg_printf(rp->project, MSG_INFO,
"[cpu_sched_debug] avoid MT overcommit: skipping %s",
rp->name
);
}
continue;
}
#endif
// There's a possibility that this job is MT
// and would overcommit the CPUs by > 1.
// Options are:
// 1) run it anyway, and overcommit the CPUs
// 2) don't run it.
// This can result in starvation.
// 3) don't run it if there are additional 1-CPU jobs.
// The problem here is that we may never run the MT job
// until it reaches deadline pressure.
// So we'll go with 1).

// skip jobs whose working set is too large to fit in available RAM
//
Expand All @@ -1316,22 +1311,23 @@ bool CLIENT_STATE::enforce_run_list(vector<RESULT*>& run_list) {
}
if (log_flags.cpu_sched_debug || log_flags.mem_usage_debug) {
msg_printf(rp->project, MSG_INFO,
"[cpu_sched_debug] enforce: task %s can't run, too big %.2fMB > %.2fMB",
"[cpu_sched_debug] can't run %s: WS too big %.2fMB > %.2fMB",
rp->name, wss/MEGA, ram_left/MEGA
);
}
continue;
}

// We've decided to run this job
//
if (log_flags.cpu_sched_debug) {
msg_printf(rp->project, MSG_INFO,
"[cpu_sched_debug] scheduling %s%s",
rp->name,
rp->edf_scheduled?" (high priority)":""
rp->name, rp->edf_scheduled?" (high priority)":""
);
}

// We've decided to run this job; create an ACTIVE_TASK if needed.
// create an ACTIVE_TASK if needed.
//
if (!atp) {
atp = get_task(rp);
Expand All @@ -1343,11 +1339,6 @@ bool CLIENT_STATE::enforce_run_list(vector<RESULT*>& run_list) {
continue;
}

#if 0
if (rp->avp->avg_ncpus > 1) {
scheduled_mt = true;
}
#endif
ncpus_used += rp->avp->avg_ncpus;
atp->next_scheduler_state = CPU_SCHED_SCHEDULED;
ram_left -= wss;
Expand All @@ -1356,13 +1347,17 @@ bool CLIENT_STATE::enforce_run_list(vector<RESULT*>& run_list) {
}
}

if (log_flags.cpu_sched_debug && ncpus_used < n_usable_cpus) {
msg_printf(0, MSG_INFO, "[cpu_sched_debug] using %.2f out of %d CPUs",
ncpus_used, n_usable_cpus
);
// if CPUs are starved, ask for more jobs
//
if (ncpus_used < n_usable_cpus) {
if (ncpus_used < n_usable_cpus) {
request_work_fetch("CPUs idle");
}
if (log_flags.cpu_sched_debug) {
msg_printf(0, MSG_INFO, "[cpu_sched_debug] using only %.2f out of %d CPUs",
ncpus_used, n_usable_cpus
);
}
}

bool check_swap = (host_info.m_swap != 0);
Expand Down
13 changes: 13 additions & 0 deletions client/result.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,19 @@ struct RESULT {
if (avp->dont_throttle) return true;
return false;
}
// make a string describing resource usage
inline void rsc_string(char* buf, int len) {
if (avp->gpu_usage.rsc_type) {
snprintf(buf, len,
"%.2f CPU + %.2f %s",
avp->avg_ncpus, avp->gpu_usage.usage,
rsc_name_long(avp->gpu_usage.rsc_type)
);
} else {
snprintf(buf, len, "%.2f CPU", avp->avg_ncpus);
}
}


// temporaries used in CLIENT_STATE::rr_simulation():
double rrsim_flops_left;
Expand Down
17 changes: 2 additions & 15 deletions client/rr_sim.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,6 @@

using std::vector;

inline void rsc_string(RESULT* rp, char* buf, int len) {
APP_VERSION* avp = rp->avp;
if (avp->gpu_usage.rsc_type) {
snprintf(buf, len,
"%.2f CPU + %.2f %s",
avp->avg_ncpus, avp->gpu_usage.usage,
rsc_name_long(avp->gpu_usage.rsc_type)
);
} else {
snprintf(buf, len, "%.2f CPU", avp->avg_ncpus);
}
}

// set "nused" bits of the source bitmap in the dest bitmap
//
static inline void set_bits(
Expand Down Expand Up @@ -314,7 +301,7 @@ void RR_SIM::pick_jobs_to_run(double reltime) {
adjust_rec_sched(rp);
if (log_flags.rrsim_detail && !rp->already_selected) {
char buf[256];
rsc_string(rp, buf, sizeof(buf));
rp->rsc_string(buf, sizeof(buf));
msg_printf(rp->project, MSG_INFO,
"[rr_sim_detail] %.2f: starting %s (%s) (%.2fG/%.2fG)",
reltime, rp->name, buf, rp->rrsim_flops_left/1e9,
Expand Down Expand Up @@ -558,7 +545,7 @@ void RR_SIM::simulate() {
pbest = rpbest->project;
if (log_flags.rr_simulation) {
char buf[256];
rsc_string(rpbest, buf, sizeof(buf));
rpbest->rsc_string(buf, sizeof(buf));
msg_printf(pbest, MSG_INFO,
"[rr_sim] %.2f: %s finishes (%s) (%.2fG/%.2fG)",
sim_now + delta_t - gstate.now,
Expand Down
2 changes: 1 addition & 1 deletion client/sim.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1423,9 +1423,9 @@ void do_client_simulation() {

snprintf(buf, sizeof(buf), "%s%s", infile_prefix, CONFIG_FILE);
cc_config.defaults();
log_flags.init();
read_config_file(true, buf);

log_flags.init();
snprintf(buf, sizeof(buf), "%s%s", outfile_prefix, "log_flags.xml");
f = fopen(buf, "r");
if (f) {
Expand Down
2 changes: 1 addition & 1 deletion client/sim_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ int ACTIVE_TASK::resume_or_start(bool first_time) {
);
}
set_task_state(PROCESS_EXECUTING, "start");
char buf[256];
char buf[1024];
snprintf(buf, sizeof(buf), "Starting %s<br>&nbsp;&nbsp;%s<br>&nbsp;&nbsp;deadline %s<br>",
result->name, result->project->get_project_name(),
sim_time_string(result->report_deadline)
Expand Down