Skip to content

Commit

Permalink
client: avoid overscheduling CPUs in presence of MT jobs
Browse files Browse the repository at this point in the history
In 20ff585 we changed the sched policy so that e.g.
if there are 2 4-CPU jobs on a 6-CPU host, it runs them both.
I.e. overscheduling the CPUs is better than starving them.

This commit refines this a bit: if in addition to the MT jobs
there are some 1-CPU jobs,
it runs one MT job and two of the 1-CPU jobs.

Also: show resource usage in cpu_sched_debug messages

Also: if CPUs are starved, trigger a work request.
This logic was mistakenly hidden in an
if (log_flags.cpu_sched_debug)

Also: don't ignore log flags in the simulator
  • Loading branch information
davidpanderson committed May 29, 2023
1 parent 3ca95a1 commit f0685c4
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 56 deletions.
91 changes: 52 additions & 39 deletions client/cpu_sched.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1087,11 +1087,14 @@ static inline bool more_important(RESULT* r0, RESULT* r1) {
}

static void print_job_list(vector<RESULT*>& jobs) {
char buf[256];
for (unsigned int i=0; i<jobs.size(); i++) {
RESULT* rp = jobs[i];
rp->rsc_string(buf, 256);
msg_printf(rp->project, MSG_INFO,
"[cpu_sched_debug] %d: %s (MD: %s; UTS: %s)",
"[cpu_sched_debug] %d: %s (%s; MD: %s; UTS: %s)",
i, rp->name,
buf,
rp->edf_scheduled?"yes":"no",
rp->unfinished_time_slice?"yes":"no"
);
Expand Down Expand Up @@ -1122,18 +1125,21 @@ void CLIENT_STATE::append_unfinished_time_slice(vector<RESULT*> &run_list) {

// Enforce the CPU schedule.
// Inputs:
// ordered_scheduled_results
// List of tasks that should (ideally) run, set by schedule_cpus().
// Most important tasks (e.g. early deadline) are first.
// The set of tasks that actually run may be different:
// - if a task hasn't checkpointed recently we avoid preempting it
// - we don't run tasks that would exceed working-set limits
// Details:
// Initially, each task's scheduler_state is PREEMPTED or SCHEDULED
// depending on whether or not it is running.
// This function sets each task's next_scheduler_state,
// and at the end it starts/resumes and preempts tasks
// based on scheduler_state and next_scheduler_state.
// run_list: list of runnable jobs, ordered by decreasing project priority
// (created by make_run_list())
// Doesn't include all jobs, but enough to fill CPUs even in MT scenarios.
//
// - append running jobs that haven't finished their time slice
// - order the list by "important" (which includes various factor)
// - then scan the list and run jobs
// - until we've used all resources
// - skip jobs that would exceed mem limits
//
// Initially, each task's scheduler_state is PREEMPTED or SCHEDULED
// depending on whether or not it is running.
// This function sets each task's next_scheduler_state,
// and at the end it starts/resumes and preempts tasks
// based on scheduler_state and next_scheduler_state.
//
bool CLIENT_STATE::enforce_run_list(vector<RESULT*>& run_list) {
unsigned int i;
Expand Down Expand Up @@ -1175,11 +1181,11 @@ bool CLIENT_STATE::enforce_run_list(vector<RESULT*>& run_list) {
rp->unfinished_time_slice = false;
}

// append running jobs not done with time slice to the to-run list
// add running jobs not done with time slice to the run list
//
append_unfinished_time_slice(run_list);

// sort to-run list by decreasing importance
// sort run list by decreasing importance
//
std::sort(
run_list.begin(),
Expand Down Expand Up @@ -1207,7 +1213,6 @@ bool CLIENT_STATE::enforce_run_list(vector<RESULT*>& run_list) {
}

// schedule non-CPU-intensive tasks,
// and look for backed-off GPU jobs
//
for (i=0; i<results.size(); i++) {
RESULT* rp = results[i];
Expand All @@ -1233,10 +1238,17 @@ bool CLIENT_STATE::enforce_run_list(vector<RESULT*>& run_list) {
// and prune those that can't be assigned
//
assign_coprocs(run_list);
//bool scheduled_mt = false;

// prune jobs that don't fit in RAM or that exceed CPU usage limits.
// Mark the rest as SCHEDULED
// keep track of the number of remaining single-CPU jobs
//
int n_single_cpu_jobs = 0;
for (RESULT* rp: run_list) {
if (!rp->uses_coprocs() && rp->avp->avg_ncpus==1) {
n_single_cpu_jobs++;
}
}

// scan the run list
//
for (i=0; i<run_list.size(); i++) {
RESULT* rp = run_list[i];
Expand All @@ -1253,8 +1265,7 @@ bool CLIENT_STATE::enforce_run_list(vector<RESULT*>& run_list) {

atp = lookup_active_task_by_result(rp);

// if we're already using all the CPUs,
// don't allow additional CPU jobs;
// if we're already using all the CPUs, don't allow additional CPU jobs;
// allow coproc jobs if the resulting CPU load is at most ncpus+1
//
if (ncpus_used >= n_usable_cpus) {
Expand All @@ -1280,13 +1291,12 @@ bool CLIENT_STATE::enforce_run_list(vector<RESULT*>& run_list) {
}
}

#if 0
// Don't overcommit CPUs by > 1 if a MT job is scheduled.
// Skip this check for coproc jobs.
// Don't overcommit CPUs by > 1 unless needed to avoid starvation
//
if (!rp->uses_coprocs()
&& (scheduled_mt || (rp->avp->avg_ncpus > 1))
&& (ncpus_used + rp->avp->avg_ncpus > ncpus + 1)
&& (rp->avp->avg_ncpus > 1)
&& (ncpus_used + rp->avp->avg_ncpus > n_usable_cpus + 1)
&& (ncpus_used + n_single_cpu_jobs >= n_usable_cpus)
) {
if (log_flags.cpu_sched_debug) {
msg_printf(rp->project, MSG_INFO,
Expand All @@ -1296,7 +1306,6 @@ bool CLIENT_STATE::enforce_run_list(vector<RESULT*>& run_list) {
}
continue;
}
#endif

// skip jobs whose working set is too large to fit in available RAM
//
Expand All @@ -1316,22 +1325,23 @@ bool CLIENT_STATE::enforce_run_list(vector<RESULT*>& run_list) {
}
if (log_flags.cpu_sched_debug || log_flags.mem_usage_debug) {
msg_printf(rp->project, MSG_INFO,
"[cpu_sched_debug] enforce: task %s can't run, too big %.2fMB > %.2fMB",
"[cpu_sched_debug] can't run %s: WS too big %.2fMB > %.2fMB",
rp->name, wss/MEGA, ram_left/MEGA
);
}
continue;
}

// We've decided to run this job
//
if (log_flags.cpu_sched_debug) {
msg_printf(rp->project, MSG_INFO,
"[cpu_sched_debug] scheduling %s%s",
rp->name,
rp->edf_scheduled?" (high priority)":""
rp->name, rp->edf_scheduled?" (high priority)":""
);
}

// We've decided to run this job; create an ACTIVE_TASK if needed.
// create an ACTIVE_TASK if needed.
//
if (!atp) {
atp = get_task(rp);
Expand All @@ -1343,11 +1353,10 @@ bool CLIENT_STATE::enforce_run_list(vector<RESULT*>& run_list) {
continue;
}

#if 0
if (rp->avp->avg_ncpus > 1) {
scheduled_mt = true;
if (!rp->uses_coprocs() && rp->avp->avg_ncpus == 1) {
n_single_cpu_jobs--;
}
#endif

ncpus_used += rp->avp->avg_ncpus;
atp->next_scheduler_state = CPU_SCHED_SCHEDULED;
ram_left -= wss;
Expand All @@ -1356,13 +1365,17 @@ bool CLIENT_STATE::enforce_run_list(vector<RESULT*>& run_list) {
}
}

if (log_flags.cpu_sched_debug && ncpus_used < n_usable_cpus) {
msg_printf(0, MSG_INFO, "[cpu_sched_debug] using %.2f out of %d CPUs",
ncpus_used, n_usable_cpus
);
// if CPUs are starved, ask for more jobs
//
if (ncpus_used < n_usable_cpus) {
if (ncpus_used < n_usable_cpus) {
request_work_fetch("CPUs idle");
}
if (log_flags.cpu_sched_debug) {
msg_printf(0, MSG_INFO, "[cpu_sched_debug] using only %.2f out of %d CPUs",
ncpus_used, n_usable_cpus
);
}
}

bool check_swap = (host_info.m_swap != 0);
Expand Down
13 changes: 13 additions & 0 deletions client/result.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,19 @@ struct RESULT {
if (avp->dont_throttle) return true;
return false;
}
// make a string describing resource usage
inline void rsc_string(char* buf, int len) {
if (avp->gpu_usage.rsc_type) {
snprintf(buf, len,
"%.2f CPU + %.2f %s",
avp->avg_ncpus, avp->gpu_usage.usage,
rsc_name_long(avp->gpu_usage.rsc_type)
);
} else {
snprintf(buf, len, "%.2f CPU", avp->avg_ncpus);
}
}


// temporaries used in CLIENT_STATE::rr_simulation():
double rrsim_flops_left;
Expand Down
17 changes: 2 additions & 15 deletions client/rr_sim.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,6 @@

using std::vector;

inline void rsc_string(RESULT* rp, char* buf, int len) {
APP_VERSION* avp = rp->avp;
if (avp->gpu_usage.rsc_type) {
snprintf(buf, len,
"%.2f CPU + %.2f %s",
avp->avg_ncpus, avp->gpu_usage.usage,
rsc_name_long(avp->gpu_usage.rsc_type)
);
} else {
snprintf(buf, len, "%.2f CPU", avp->avg_ncpus);
}
}

// set "nused" bits of the source bitmap in the dest bitmap
//
static inline void set_bits(
Expand Down Expand Up @@ -314,7 +301,7 @@ void RR_SIM::pick_jobs_to_run(double reltime) {
adjust_rec_sched(rp);
if (log_flags.rrsim_detail && !rp->already_selected) {
char buf[256];
rsc_string(rp, buf, sizeof(buf));
rp->rsc_string(buf, sizeof(buf));
msg_printf(rp->project, MSG_INFO,
"[rr_sim_detail] %.2f: starting %s (%s) (%.2fG/%.2fG)",
reltime, rp->name, buf, rp->rrsim_flops_left/1e9,
Expand Down Expand Up @@ -558,7 +545,7 @@ void RR_SIM::simulate() {
pbest = rpbest->project;
if (log_flags.rr_simulation) {
char buf[256];
rsc_string(rpbest, buf, sizeof(buf));
rpbest->rsc_string(buf, sizeof(buf));
msg_printf(pbest, MSG_INFO,
"[rr_sim] %.2f: %s finishes (%s) (%.2fG/%.2fG)",
sim_now + delta_t - gstate.now,
Expand Down
2 changes: 1 addition & 1 deletion client/sim.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1423,9 +1423,9 @@ void do_client_simulation() {

snprintf(buf, sizeof(buf), "%s%s", infile_prefix, CONFIG_FILE);
cc_config.defaults();
log_flags.init();
read_config_file(true, buf);

log_flags.init();
snprintf(buf, sizeof(buf), "%s%s", outfile_prefix, "log_flags.xml");
f = fopen(buf, "r");
if (f) {
Expand Down
2 changes: 1 addition & 1 deletion client/sim_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ int ACTIVE_TASK::resume_or_start(bool first_time) {
);
}
set_task_state(PROCESS_EXECUTING, "start");
char buf[256];
char buf[1024];
snprintf(buf, sizeof(buf), "Starting %s<br>&nbsp;&nbsp;%s<br>&nbsp;&nbsp;deadline %s<br>",
result->name, result->project->get_project_name(),
sim_time_string(result->report_deadline)
Expand Down

0 comments on commit f0685c4

Please sign in to comment.