Skip to content

Commit

Permalink
Merge pull request #47 from lars-t-hansen/cleanup
Browse files Browse the repository at this point in the history
Cleanup
  • Loading branch information
bast committed Jun 21, 2023
2 parents af9f0ef + cea2298 commit 5e41077
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 77 deletions.
2 changes: 2 additions & 0 deletions src/command.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::time::Duration;
use subprocess::{Exec, Redirection};

// the pipe is here as a workaround for https://github.com/rust-lang/rust/issues/45572
// see also https://doc.rust-lang.org/std/process/index.html
pub fn safe_command(command: &str, timeout_seconds: u64) -> Option<String> {
let mut p = Exec::shell(command)
.stdout(Redirection::Pipe)
Expand Down
2 changes: 1 addition & 1 deletion src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@
use crate::process;

pub trait JobManager {
fn job_id_from_pid(&mut self, pid: String, processes: &[process::Process]) -> usize;
fn job_id_from_pid(&mut self, pid: usize, processes: &[process::Process]) -> usize;
}
78 changes: 41 additions & 37 deletions src/nvidia.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::collections::HashMap;
#[derive(PartialEq)]
pub struct Process {
pub device: i32, // -1 for "unknown", otherwise 0..num_devices-1
pub pid: String, // Process ID
pub pid: usize, // Process ID
pub user: String, // User name, _zombie_PID for zombies
pub gpu_pct: f64, // Percent of GPU, 0.0 for zombies
pub mem_pct: f64, // Percent of memory, 0.0 for zombies
Expand All @@ -18,12 +18,11 @@ pub struct Process {
}

pub fn get_nvidia_information(
timeout_seconds: u64,
user_by_pid: &HashMap<String, String>,
user_by_pid: &HashMap<usize, String>,
) -> Vec<Process> {
if let Some(pmon_raw_text) = command::safe_command(NVIDIA_PMON_COMMAND, timeout_seconds) {
if let Some(pmon_raw_text) = command::safe_command(NVIDIA_PMON_COMMAND, TIMEOUT_SECONDS) {
let mut processes = parse_pmon_output(&pmon_raw_text, user_by_pid);
if let Some(query_raw_text) = command::safe_command(NVIDIA_QUERY_COMMAND, timeout_seconds) {
if let Some(query_raw_text) = command::safe_command(NVIDIA_QUERY_COMMAND, TIMEOUT_SECONDS) {
processes.append(&mut parse_query_output(&query_raw_text, user_by_pid));
}
processes
Expand All @@ -32,6 +31,8 @@ pub fn get_nvidia_information(
}
}

const TIMEOUT_SECONDS: u64 = 2; // For `nvidia-smi`

// For prototyping purposes (and maybe it's good enough for production?), parse the output of
// `nvidia-smi pmon`. This output has a couple of problems:
//
Expand All @@ -50,7 +51,7 @@ const NVIDIA_PMON_COMMAND: &str = "nvidia-smi pmon -c 1 -s mu";
// GPU devices used by that process. For a system with 8 cards, utilization
// can reach 800% and the memory size can reach the sum of the memories on the cards.

fn parse_pmon_output(raw_text: &str, user_by_pid: &HashMap<String, String>) -> Vec<Process> {
fn parse_pmon_output(raw_text: &str, user_by_pid: &HashMap<usize, String>) -> Vec<Process> {
raw_text
.lines()
.filter(|line| !line.starts_with('#'))
Expand All @@ -64,24 +65,26 @@ fn parse_pmon_output(raw_text: &str, user_by_pid: &HashMap<String, String>) -> V
// For nvidia-smi, we use the first word because the command produces blank-padded
// output. We can maybe do better by considering non-empty words.
let command = parts[8].to_string();
let user = match user_by_pid.get(pid) {
Some(name) => name.clone(),
None => "_zombie_".to_owned() + pid,
};
(pid, device, user, mem_size, gpu_pct, mem_pct, command)
(pid, device, mem_size, gpu_pct, mem_pct, command)
})
.filter(|(pid, ..)| *pid != "-")
.map(
|(pid, device, user, mem_size, gpu_pct, mem_pct, command)| Process {
|(pid_str, device, mem_size, gpu_pct, mem_pct, command)| {
let pid = pid_str.parse::<usize>().unwrap();
let user = match user_by_pid.get(&pid) {
Some(name) => name.clone(),
None => "_zombie_".to_owned() + pid_str,
};
Process {
device,
pid: pid.to_string(),
pid,
user,
gpu_pct,
mem_pct,
mem_size_kib: mem_size * 1024,
command,
},
)
}
})
.collect::<Vec<Process>>()
}

Expand All @@ -94,16 +97,17 @@ const NVIDIA_QUERY_COMMAND: &str =
// Same signature as extract_nvidia_pmon_processes(), q.v. but user is always "_zombie_" and command
// is always "_unknown_". Only pids not in user_by_pid are returned.

fn parse_query_output(raw_text: &str, user_by_pid: &HashMap<String, String>) -> Vec<Process> {
fn parse_query_output(raw_text: &str, user_by_pid: &HashMap<usize, String>) -> Vec<Process> {
raw_text
.lines()
.map(|line| {
let (_start_indices, parts) = util::chunks(line);
let pid = parts[0].strip_suffix(',').unwrap();
let pid_str = parts[0].strip_suffix(',').unwrap();
let pid = pid_str.parse::<usize>().unwrap();
let mem_usage = parts[1].parse::<usize>().unwrap();
let user = "_zombie_".to_owned() + pid;
let user = "_zombie_".to_owned() + pid_str;
let command = "_unknown_";
(pid.to_string(), user, command.to_string(), mem_usage * 1024)
(pid, user, command.to_string(), mem_usage * 1024)
})
.filter(|(pid, ..)| !user_by_pid.contains_key(pid))
.map(|(pid, user, command, mem_size_kib)| Process {
Expand All @@ -119,13 +123,13 @@ fn parse_query_output(raw_text: &str, user_by_pid: &HashMap<String, String>) ->
}

#[cfg(test)]
fn mkusers() -> HashMap<String, String> {
fn mkusers() -> HashMap<usize, String> {
map! {
"447153".to_string() => "bob".to_string(),
"447160".to_string() => "bob".to_string(),
"1864615".to_string() => "alice".to_string(),
"2233095".to_string() => "charlie".to_string(),
"2233469".to_string() => "charlie".to_string()
447153 => "bob".to_string(),
447160 => "bob".to_string(),
1864615 => "alice".to_string(),
2233095 => "charlie".to_string(),
2233469 => "charlie".to_string()
}
}

Expand Down Expand Up @@ -153,7 +157,7 @@ pub fn parsed_pmon_output() -> Vec<Process> {
macro_rules! proc(
{ $a:expr, $b:expr, $c:expr, $d:expr, $e: expr, $f:expr, $g:expr } => {
Process { device: $a,
pid: $b.to_string(),
pid: $b,
user: $c.to_string(),
gpu_pct: $d,
mem_pct: $e,
Expand All @@ -165,16 +169,16 @@ macro_rules! proc(
#[test]
fn test_parse_pmon_output() {
assert!(parsed_pmon_output().into_iter().eq(vec![
proc! { 0, "447153", "bob", 0.0, 0.0, 7669 * 1024, "python3.9" },
proc! { 0, "447160", "bob", 0.0, 0.0, 11057 * 1024, "python3.9" },
proc! { 0, "506826", "_zombie_506826", 0.0, 0.0, 11057 * 1024, "python3.9" },
proc! { 0, "1864615", "alice", 40.0, 0.0, 1635 * 1024, "python" },
proc! { 1, "1864615", "alice", 0.0, 0.0, 535 * 1024, "python" },
proc! { 1, "2233095", "charlie", 84.0, 23.0, 24395 * 1024, "python3" },
proc! { 2, "1864615", "alice", 0.0, 0.0, 535 * 1024, "python" },
proc! { 2, "1448150", "_zombie_1448150", 0.0, 0.0, 9383 * 1024, "python3"},
proc! { 3, "1864615", "alice", 0.0, 0.0, 535 * 1024, "python" },
proc! { 3, "2233469", "charlie", 90.0, 23.0, 15771 * 1024, "python3" }
proc! { 0, 447153, "bob", 0.0, 0.0, 7669 * 1024, "python3.9" },
proc! { 0, 447160, "bob", 0.0, 0.0, 11057 * 1024, "python3.9" },
proc! { 0, 506826, "_zombie_506826", 0.0, 0.0, 11057 * 1024, "python3.9" },
proc! { 0, 1864615, "alice", 40.0, 0.0, 1635 * 1024, "python" },
proc! { 1, 1864615, "alice", 0.0, 0.0, 535 * 1024, "python" },
proc! { 1, 2233095, "charlie", 84.0, 23.0, 24395 * 1024, "python3" },
proc! { 2, 1864615, "alice", 0.0, 0.0, 535 * 1024, "python" },
proc! { 2, 1448150, "_zombie_1448150", 0.0, 0.0, 9383 * 1024, "python3"},
proc! { 3, 1864615, "alice", 0.0, 0.0, 535 * 1024, "python" },
proc! { 3, 2233469, "charlie", 90.0, 23.0, 15771 * 1024, "python3" }
]))
}

Expand All @@ -189,6 +193,6 @@ pub fn parsed_query_output() -> Vec<Process> {
#[test]
fn test_parse_query_output() {
assert!(parsed_query_output().into_iter().eq(vec![
proc! { !0, "3079002", "_zombie_3079002", 0.0, 0.0, 2350 * 1024, "_unknown_" }
proc! { !0, 3079002, "_zombie_3079002", 0.0, 0.0, 2350 * 1024, "_unknown_" }
]))
}
26 changes: 14 additions & 12 deletions src/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,24 @@ use crate::util;

#[derive(PartialEq)]
pub struct Process {
pub pid: String,
pub pid: usize,
pub user: String,
pub cpu_pct: f64,
pub mem_pct: f64,
pub mem_size_kib: usize,
pub command: String,
}

pub fn get_process_information(timeout_seconds: u64) -> Vec<Process> {
if let Some(out) = command::safe_command(PS_COMMAND, timeout_seconds) {
pub fn get_process_information() -> Vec<Process> {
if let Some(out) = command::safe_command(PS_COMMAND, TIMEOUT_SECONDS) {
parse_ps_output(&out)
} else {
vec![]
}
}

const TIMEOUT_SECONDS: u64 = 2; // for `ps`

const PS_COMMAND: &str =
"ps -e --no-header -o pid,user:22,pcpu,pmem,size,comm | grep -v ' 0.0 0.0 '";

Expand All @@ -30,7 +32,7 @@ fn parse_ps_output(raw_text: &str) -> Vec<Process> {
.map(|line| {
let (start_indices, parts) = util::chunks(line);
Process {
pid: parts[0].to_string(),
pid: parts[0].parse::<usize>().unwrap(),
user: parts[1].to_string(),
cpu_pct: parts[2].parse::<f64>().unwrap(),
mem_pct: parts[3].parse::<f64>().unwrap(),
Expand Down Expand Up @@ -59,7 +61,7 @@ pub fn parsed_test_output() -> Vec<Process> {
fn test_parse_ps_output() {
macro_rules! proc(
{ $a:expr, $b:expr, $c:expr, $d:expr, $e: expr, $f:expr } => {
Process { pid: $a.to_string(),
Process { pid: $a,
user: $b.to_string(),
cpu_pct: $c,
mem_pct: $d,
Expand All @@ -69,12 +71,12 @@ fn test_parse_ps_output() {
});

assert!(parsed_test_output().into_iter().eq(vec![
proc! { "2022", "bob", 10.0, 20.0, 553348, "slack" },
proc! { "42178", "bob", 10.0, 15.0, 353348, "chromium" },
proc! { "42178", "bob", 10.0, 15.0, 5536, "chromium" },
proc! { "42189", "alice", 10.0, 5.0, 5528, "slack" },
proc! { "42191", "bob", 10.0, 5.0, 5552, "someapp" },
proc! { "42213", "alice", 10.0, 5.0, 348904, "some app" },
proc! { "42213", "alice", 10.0, 5.0, 135364, "some app" }
proc! { 2022, "bob", 10.0, 20.0, 553348, "slack" },
proc! { 42178, "bob", 10.0, 15.0, 353348, "chromium" },
proc! { 42178, "bob", 10.0, 15.0, 5536, "chromium" },
proc! { 42189, "alice", 10.0, 5.0, 5528, "slack" },
proc! { 42191, "bob", 10.0, 5.0, 5552, "someapp" },
proc! { 42213, "alice", 10.0, 5.0, 348904, "some app" },
proc! { 42213, "alice", 10.0, 5.0, 135364, "some app" }
]))
}
46 changes: 21 additions & 25 deletions src/ps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ fn add_job_info(

fn extract_ps_processes(
processes: &[process::Process],
) -> HashMap<(String, String, String), (f64, f64, usize)> {
) -> HashMap<(String, usize, String), (f64, f64, usize)> {
processes
.iter()
.map(
Expand All @@ -69,7 +69,7 @@ fn extract_ps_processes(
..
}| {
(
(user.clone(), pid.clone(), command.clone()),
(user.clone(), *pid, command.clone()),
(*cpu_pct, *mem_pct, *mem_size_kib),
)
},
Expand All @@ -94,18 +94,18 @@ fn test_extract_ps_processes() {
assert!(
processes
== map! {
("bob".to_string(), "2022".to_string(), "slack".to_string()) => (10.0, 20.0, 553348),
("bob".to_string(), "42178".to_string(), "chromium".to_string()) => (20.0, 30.0, 358884),
("alice".to_string(), "42189".to_string(), "slack".to_string()) => (10.0, 5.0, 5528),
("bob".to_string(), "42191".to_string(), "someapp".to_string()) => (10.0, 5.0, 5552),
("alice".to_string(), "42213".to_string(), "some app".to_string()) => (20.0, 10.0, 484268)
("bob".to_string(), 2022, "slack".to_string()) => (10.0, 20.0, 553348),
("bob".to_string(), 42178, "chromium".to_string()) => (20.0, 30.0, 358884),
("alice".to_string(), 42189, "slack".to_string()) => (10.0, 5.0, 5528),
("bob".to_string(), 42191, "someapp".to_string()) => (10.0, 5.0, 5552),
("alice".to_string(), 42213, "some app".to_string()) => (20.0, 10.0, 484268)
}
);
}

fn extract_nvidia_processes(
processes: &[nvidia::Process],
) -> HashMap<(String, String, String), (u32, f64, f64, usize)> {
) -> HashMap<(String, usize, String), (u32, f64, f64, usize)> {
processes
.iter()
.map(
Expand All @@ -119,7 +119,7 @@ fn extract_nvidia_processes(
command,
}| {
(
(user.clone(), pid.clone(), command.clone()),
(user.clone(), *pid, command.clone()),
(
if *device >= 0 { 1 << device } else { !0 },
*gpu_pct,
Expand Down Expand Up @@ -150,13 +150,13 @@ fn test_extract_nvidia_pmon_processes() {
assert!(
processes
== map! {
("bob".to_string(), "447153".to_string(), "python3.9".to_string()) => (0b1, 0.0, 0.0, 7669*1024),
("bob".to_string(), "447160".to_string(), "python3.9".to_string()) => (0b1, 0.0, 0.0, 11057*1024),
("_zombie_506826".to_string(), "506826".to_string(), "python3.9".to_string()) => (0b1, 0.0, 0.0, 11057*1024),
("alice".to_string(), "1864615".to_string(), "python".to_string()) => (0b1111, 40.0, 0.0, (1635+535+535+535)*1024),
("charlie".to_string(), "2233095".to_string(), "python3".to_string()) => (0b10, 84.0, 23.0, 24395*1024),
("_zombie_1448150".to_string(), "1448150".to_string(), "python3".to_string()) => (0b100, 0.0, 0.0, 9383*1024),
("charlie".to_string(), "2233469".to_string(), "python3".to_string()) => (0b1000, 90.0, 23.0, 15771*1024)
("bob".to_string(), 447153, "python3.9".to_string()) => (0b1, 0.0, 0.0, 7669*1024),
("bob".to_string(), 447160, "python3.9".to_string()) => (0b1, 0.0, 0.0, 11057*1024),
("_zombie_506826".to_string(), 506826, "python3.9".to_string()) => (0b1, 0.0, 0.0, 11057*1024),
("alice".to_string(), 1864615, "python".to_string()) => (0b1111, 40.0, 0.0, (1635+535+535+535)*1024),
("charlie".to_string(), 2233095, "python3".to_string()) => (0b10, 84.0, 23.0, 24395*1024),
("_zombie_1448150".to_string(), 1448150, "python3".to_string()) => (0b100, 0.0, 0.0, 9383*1024),
("charlie".to_string(), 2233469, "python3".to_string()) => (0b1000, 90.0, 23.0, 15771*1024)
}
);
}
Expand All @@ -169,7 +169,7 @@ fn test_extract_nvidia_query_processes() {
assert!(
processes
== map! {
("_zombie_3079002".to_string(), "3079002".to_string(), "_unknown_".to_string()) => (!0, 0.0, 0.0, 2350*1024)
("_zombie_3079002".to_string(), 3079002, "_unknown_".to_string()) => (!0, 0.0, 0.0, 2350*1024)
}
);
}
Expand All @@ -183,18 +183,14 @@ pub fn create_snapshot(
let hostname = hostname::get().unwrap().into_string().unwrap();
let num_cores = num_cpus::get();

// the pipe is here as a workaround for https://github.com/rust-lang/rust/issues/45572
// see also https://doc.rust-lang.org/std/process/index.html
let timeout_seconds = 2;

let mut processes_by_job_id: HashMap<(String, usize, String), JobInfo> = HashMap::new();
let mut user_by_pid: HashMap<String, String> = HashMap::new();
let mut user_by_pid: HashMap<usize, String> = HashMap::new();

let ps_output = process::get_process_information(timeout_seconds);
let ps_output = process::get_process_information();
for ((user, pid, command), (cpu_percentage, mem_percentage, mem_size)) in
extract_ps_processes(&ps_output)
{
user_by_pid.insert(pid.clone(), user.clone());
user_by_pid.insert(pid, user.clone());

if (cpu_percentage >= cpu_cutoff_percent) || (mem_percentage >= mem_cutoff_percent) {
add_job_info(
Expand All @@ -212,7 +208,7 @@ pub fn create_snapshot(
}
}

let nvidia_output = nvidia::get_nvidia_information(timeout_seconds, &user_by_pid);
let nvidia_output = nvidia::get_nvidia_information(&user_by_pid);
for ((user, pid, command), (gpu_mask, gpu_percentage, gpu_mem_percentage, gpu_mem_size)) in
extract_nvidia_processes(&nvidia_output)
{
Expand Down
4 changes: 2 additions & 2 deletions src/slurm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ use crate::process;
pub struct SlurmJobManager {}

impl jobs::JobManager for SlurmJobManager {
fn job_id_from_pid(&mut self, pid: String, _processes: &[process::Process]) -> usize {
fn job_id_from_pid(&mut self, pid: usize, _processes: &[process::Process]) -> usize {
let slurm_job_id = get_slurm_job_id(pid).unwrap_or_default();
slurm_job_id.trim().parse::<usize>().unwrap_or_default()
}
}

fn get_slurm_job_id(pid: String) -> Option<String> {
fn get_slurm_job_id(pid: usize) -> Option<String> {
let path = format!("/proc/{}/cgroup", pid);

if !std::path::Path::new(&path).exists() {
Expand Down

0 comments on commit 5e41077

Please sign in to comment.