Skip to content

Commit

Permalink
Merge pull request #5257 from BOINC/dpa_mt_sched
Browse files Browse the repository at this point in the history
client: avoid overscheduling CPUs in presence of MT jobs
  • Loading branch information
davidpanderson authored Jun 15, 2023
2 parents c43b4b3 + 8f4aa5a commit a0d0a9d
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 68 deletions.
97 changes: 46 additions & 51 deletions client/cpu_sched.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1087,11 +1087,14 @@ static inline bool more_important(RESULT* r0, RESULT* r1) {
}

static void print_job_list(vector<RESULT*>& jobs) {
char buf[256];
for (unsigned int i=0; i<jobs.size(); i++) {
RESULT* rp = jobs[i];
rp->rsc_string(buf, 256);
msg_printf(rp->project, MSG_INFO,
"[cpu_sched_debug] %d: %s (MD: %s; UTS: %s)",
"[cpu_sched_debug] %d: %s (%s; MD: %s; UTS: %s)",
i, rp->name,
buf,
rp->edf_scheduled?"yes":"no",
rp->unfinished_time_slice?"yes":"no"
);
Expand Down Expand Up @@ -1122,18 +1125,21 @@ void CLIENT_STATE::append_unfinished_time_slice(vector<RESULT*> &run_list) {

// Enforce the CPU schedule.
// Inputs:
// ordered_scheduled_results
// List of tasks that should (ideally) run, set by schedule_cpus().
// Most important tasks (e.g. early deadline) are first.
// The set of tasks that actually run may be different:
// - if a task hasn't checkpointed recently we avoid preempting it
// - we don't run tasks that would exceed working-set limits
// Details:
// Initially, each task's scheduler_state is PREEMPTED or SCHEDULED
// depending on whether or not it is running.
// This function sets each task's next_scheduler_state,
// and at the end it starts/resumes and preempts tasks
// based on scheduler_state and next_scheduler_state.
// run_list: list of runnable jobs, ordered by decreasing project priority
// (created by make_run_list())
// Doesn't include all jobs, but enough to fill CPUs even in MT scenarios.
//
// - append running jobs that haven't finished their time slice
// - order the list by "important" (which includes various factor)
// - then scan the list and run jobs
// - until we've used all resources
// - skip jobs that would exceed mem limits
//
// Initially, each task's scheduler_state is PREEMPTED or SCHEDULED
// depending on whether or not it is running.
// This function sets each task's next_scheduler_state,
// and at the end it starts/resumes and preempts tasks
// based on scheduler_state and next_scheduler_state.
//
bool CLIENT_STATE::enforce_run_list(vector<RESULT*>& run_list) {
unsigned int i;
Expand Down Expand Up @@ -1175,11 +1181,11 @@ bool CLIENT_STATE::enforce_run_list(vector<RESULT*>& run_list) {
rp->unfinished_time_slice = false;
}

// append running jobs not done with time slice to the to-run list
// add running jobs not done with time slice to the run list
//
append_unfinished_time_slice(run_list);

// sort to-run list by decreasing importance
// sort run list by decreasing importance
//
std::sort(
run_list.begin(),
Expand Down Expand Up @@ -1207,7 +1213,6 @@ bool CLIENT_STATE::enforce_run_list(vector<RESULT*>& run_list) {
}

// schedule non-CPU-intensive tasks,
// and look for backed-off GPU jobs
//
for (i=0; i<results.size(); i++) {
RESULT* rp = results[i];
Expand All @@ -1233,10 +1238,8 @@ bool CLIENT_STATE::enforce_run_list(vector<RESULT*>& run_list) {
// and prune those that can't be assigned
//
assign_coprocs(run_list);
//bool scheduled_mt = false;

// prune jobs that don't fit in RAM or that exceed CPU usage limits.
// Mark the rest as SCHEDULED
// scan the run list
//
for (i=0; i<run_list.size(); i++) {
RESULT* rp = run_list[i];
Expand All @@ -1253,8 +1256,7 @@ bool CLIENT_STATE::enforce_run_list(vector<RESULT*>& run_list) {

atp = lookup_active_task_by_result(rp);

// if we're already using all the CPUs,
// don't allow additional CPU jobs;
// if we're already using all the CPUs, don't allow additional CPU jobs;
// allow coproc jobs if the resulting CPU load is at most ncpus+1
//
if (ncpus_used >= n_usable_cpus) {
Expand All @@ -1280,23 +1282,16 @@ bool CLIENT_STATE::enforce_run_list(vector<RESULT*>& run_list) {
}
}

#if 0
// Don't overcommit CPUs by > 1 if a MT job is scheduled.
// Skip this check for coproc jobs.
//
if (!rp->uses_coprocs()
&& (scheduled_mt || (rp->avp->avg_ncpus > 1))
&& (ncpus_used + rp->avp->avg_ncpus > ncpus + 1)
) {
if (log_flags.cpu_sched_debug) {
msg_printf(rp->project, MSG_INFO,
"[cpu_sched_debug] avoid MT overcommit: skipping %s",
rp->name
);
}
continue;
}
#endif
// There's a possibility that this job is MT
// and would overcommit the CPUs by > 1.
// Options are:
// 1) run it anyway, and overcommit the CPUs
// 2) don't run it.
// This can result in starvation.
// 3) don't run it if there are additional 1-CPU jobs.
// The problem here is that we may never run the MT job
// until it reaches deadline pressure.
// So we'll go with 1).

// skip jobs whose working set is too large to fit in available RAM
//
Expand All @@ -1316,22 +1311,23 @@ bool CLIENT_STATE::enforce_run_list(vector<RESULT*>& run_list) {
}
if (log_flags.cpu_sched_debug || log_flags.mem_usage_debug) {
msg_printf(rp->project, MSG_INFO,
"[cpu_sched_debug] enforce: task %s can't run, too big %.2fMB > %.2fMB",
"[cpu_sched_debug] can't run %s: WS too big %.2fMB > %.2fMB",
rp->name, wss/MEGA, ram_left/MEGA
);
}
continue;
}

// We've decided to run this job
//
if (log_flags.cpu_sched_debug) {
msg_printf(rp->project, MSG_INFO,
"[cpu_sched_debug] scheduling %s%s",
rp->name,
rp->edf_scheduled?" (high priority)":""
rp->name, rp->edf_scheduled?" (high priority)":""
);
}

// We've decided to run this job; create an ACTIVE_TASK if needed.
// create an ACTIVE_TASK if needed.
//
if (!atp) {
atp = get_task(rp);
Expand All @@ -1343,11 +1339,6 @@ bool CLIENT_STATE::enforce_run_list(vector<RESULT*>& run_list) {
continue;
}

#if 0
if (rp->avp->avg_ncpus > 1) {
scheduled_mt = true;
}
#endif
ncpus_used += rp->avp->avg_ncpus;
atp->next_scheduler_state = CPU_SCHED_SCHEDULED;
ram_left -= wss;
Expand All @@ -1356,13 +1347,17 @@ bool CLIENT_STATE::enforce_run_list(vector<RESULT*>& run_list) {
}
}

if (log_flags.cpu_sched_debug && ncpus_used < n_usable_cpus) {
msg_printf(0, MSG_INFO, "[cpu_sched_debug] using %.2f out of %d CPUs",
ncpus_used, n_usable_cpus
);
// if CPUs are starved, ask for more jobs
//
if (ncpus_used < n_usable_cpus) {
if (ncpus_used < n_usable_cpus) {
request_work_fetch("CPUs idle");
}
if (log_flags.cpu_sched_debug) {
msg_printf(0, MSG_INFO, "[cpu_sched_debug] using only %.2f out of %d CPUs",
ncpus_used, n_usable_cpus
);
}
}

bool check_swap = (host_info.m_swap != 0);
Expand Down
13 changes: 13 additions & 0 deletions client/result.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,19 @@ struct RESULT {
if (avp->dont_throttle) return true;
return false;
}
// make a string describing resource usage
inline void rsc_string(char* buf, int len) {
if (avp->gpu_usage.rsc_type) {
snprintf(buf, len,
"%.2f CPU + %.2f %s",
avp->avg_ncpus, avp->gpu_usage.usage,
rsc_name_long(avp->gpu_usage.rsc_type)
);
} else {
snprintf(buf, len, "%.2f CPU", avp->avg_ncpus);
}
}


// temporaries used in CLIENT_STATE::rr_simulation():
double rrsim_flops_left;
Expand Down
17 changes: 2 additions & 15 deletions client/rr_sim.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,6 @@

using std::vector;

inline void rsc_string(RESULT* rp, char* buf, int len) {
APP_VERSION* avp = rp->avp;
if (avp->gpu_usage.rsc_type) {
snprintf(buf, len,
"%.2f CPU + %.2f %s",
avp->avg_ncpus, avp->gpu_usage.usage,
rsc_name_long(avp->gpu_usage.rsc_type)
);
} else {
snprintf(buf, len, "%.2f CPU", avp->avg_ncpus);
}
}

// set "nused" bits of the source bitmap in the dest bitmap
//
static inline void set_bits(
Expand Down Expand Up @@ -314,7 +301,7 @@ void RR_SIM::pick_jobs_to_run(double reltime) {
adjust_rec_sched(rp);
if (log_flags.rrsim_detail && !rp->already_selected) {
char buf[256];
rsc_string(rp, buf, sizeof(buf));
rp->rsc_string(buf, sizeof(buf));
msg_printf(rp->project, MSG_INFO,
"[rr_sim_detail] %.2f: starting %s (%s) (%.2fG/%.2fG)",
reltime, rp->name, buf, rp->rrsim_flops_left/1e9,
Expand Down Expand Up @@ -558,7 +545,7 @@ void RR_SIM::simulate() {
pbest = rpbest->project;
if (log_flags.rr_simulation) {
char buf[256];
rsc_string(rpbest, buf, sizeof(buf));
rpbest->rsc_string(buf, sizeof(buf));
msg_printf(pbest, MSG_INFO,
"[rr_sim] %.2f: %s finishes (%s) (%.2fG/%.2fG)",
sim_now + delta_t - gstate.now,
Expand Down
2 changes: 1 addition & 1 deletion client/sim.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1423,9 +1423,9 @@ void do_client_simulation() {

snprintf(buf, sizeof(buf), "%s%s", infile_prefix, CONFIG_FILE);
cc_config.defaults();
log_flags.init();
read_config_file(true, buf);

log_flags.init();
snprintf(buf, sizeof(buf), "%s%s", outfile_prefix, "log_flags.xml");
f = fopen(buf, "r");
if (f) {
Expand Down
2 changes: 1 addition & 1 deletion client/sim_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ int ACTIVE_TASK::resume_or_start(bool first_time) {
);
}
set_task_state(PROCESS_EXECUTING, "start");
char buf[256];
char buf[1024];
snprintf(buf, sizeof(buf), "Starting %s<br>&nbsp;&nbsp;%s<br>&nbsp;&nbsp;deadline %s<br>",
result->name, result->project->get_project_name(),
sim_time_string(result->report_deadline)
Expand Down

0 comments on commit a0d0a9d

Please sign in to comment.