From fcb4bcc9be3149e3f698f2d231975a6ffea4f1ea Mon Sep 17 00:00:00 2001 From: Lars T Hansen Date: Fri, 10 Jan 2025 16:29:37 +0100 Subject: [PATCH] Fix #228 - Abstract the output format and clean up --- src/main.rs | 89 ++++++++++-- src/output.rs | 354 +++++++++++++++++++++++++++++++++++++++++++++++ src/ps.rs | 350 +++++++++++++++++++--------------------------- src/slurmjobs.rs | 34 +++-- src/sysinfo.rs | 95 ++++++------- 5 files changed, 640 insertions(+), 282 deletions(-) create mode 100644 src/output.rs diff --git a/src/main.rs b/src/main.rs index 5bdd379..2146d34 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,6 +7,7 @@ mod interrupt; mod jobs; mod log; mod nvidia; +mod output; mod procfs; mod procfsapi; mod ps; @@ -17,6 +18,8 @@ mod time; mod users; mod util; +use std::io; + const TIMEOUT_SECONDS: u64 = 5; // For subprocesses const USAGE_ERROR: i32 = 2; // clap, Python, Go @@ -57,9 +60,15 @@ enum Commands { /// One output record per Sonar invocation will contain a load= field with an encoding of /// the per-cpu usage since boot. load: bool, + + /// Output JSON, not CSV + json: bool, }, /// Extract system information - Sysinfo {}, + Sysinfo { + /// Output CSV, not JSON + csv: bool, + }, /// Extract slurm job information Slurmjobs { /// Set the sacct start time to now-`window` and the end time to now, and dump records that @@ -70,6 +79,9 @@ enum Commands { /// From-to dates on the form yyyy-mm-dd,yyyy-mm-dd (with the comma); from is inclusive, /// to is exclusive. Precludes -window. span: Option, + + /// Output json, not CSV + json: bool, }, Version {}, } @@ -83,6 +95,7 @@ fn main() { log::init(); + let mut writer = io::stdout(); match &command_line() { Commands::PS { rollup, @@ -95,6 +108,7 @@ fn main() { exclude_commands, lockdir, load, + json, } => { let opts = ps::PsOptions { rollup: *rollup, @@ -115,30 +129,33 @@ fn main() { vec![] }, lockdir: lockdir.clone(), + json: *json, }; if *batchless { let mut jm = batchless::BatchlessJobManager::new(); - ps::create_snapshot(&mut jm, &opts, ×tamp); + ps::create_snapshot(&mut writer, &mut jm, &opts, ×tamp); } else { let mut jm = slurm::SlurmJobManager {}; - ps::create_snapshot(&mut jm, &opts, ×tamp); + ps::create_snapshot(&mut writer, &mut jm, &opts, ×tamp); } } - Commands::Sysinfo {} => { - sysinfo::show_system(×tamp); + Commands::Sysinfo { csv } => { + sysinfo::show_system(&mut writer, ×tamp, *csv); } - Commands::Slurmjobs { window, span } => { - slurmjobs::show_slurm_jobs(window, span); + Commands::Slurmjobs { window, span, json } => { + slurmjobs::show_slurm_jobs(&mut writer, window, span, *json); } Commands::Version {} => { - show_version(&mut std::io::stdout()); + show_version(&mut writer); } } + //let _ = writer.flush(); } // For the sake of simplicity: // - allow repeated options to overwrite earlier values // - all error reporting is via a generic "usage" message, without specificity as to what was wrong +// - both --json and --csv are accepted to all commands fn command_line() -> Commands { let args = std::env::args().collect::>(); @@ -158,6 +175,8 @@ fn command_line() -> Commands { let mut exclude_commands = None; let mut lockdir = None; let mut load = false; + let mut json = false; + let mut csv = false; while next < args.len() { let arg = args[next].as_ref(); next += 1; @@ -167,6 +186,10 @@ fn command_line() -> Commands { (next, rollup) = (new_next, true); } else if let Some(new_next) = bool_arg(arg, &args, next, "--load") { (next, load) = (new_next, true); + } else if let Some(new_next) = bool_arg(arg, &args, next, "--json") { + (next, json) = (new_next, true); + } else if let Some(new_next) = bool_arg(arg, &args, next, "--csv") { + (next, csv) = (new_next, true); } else if let Some(new_next) = bool_arg(arg, &args, next, "--exclude-system-jobs") { @@ -210,6 +233,10 @@ fn command_line() -> Commands { eprintln!("--rollup and --batchless are incompatible"); std::process::exit(USAGE_ERROR); } + if json && csv { + eprintln!("--csv and --json are incompatible"); + std::process::exit(USAGE_ERROR); + } Commands::PS { batchless, @@ -222,12 +249,34 @@ fn command_line() -> Commands { exclude_commands, lockdir, load, + json, + } + } + "sysinfo" => { + let mut json = false; + let mut csv = false; + while next < args.len() { + let arg = args[next].as_ref(); + next += 1; + if let Some(new_next) = bool_arg(arg, &args, next, "--json") { + (next, json) = (new_next, true); + } else if let Some(new_next) = bool_arg(arg, &args, next, "--csv") { + (next, csv) = (new_next, true); + } else { + usage(true); + } } + if json && csv { + eprintln!("--csv and --json are incompatible"); + std::process::exit(USAGE_ERROR); + } + Commands::Sysinfo { csv } } - "sysinfo" => Commands::Sysinfo {}, "slurm" => { let mut window = None; let mut span = None; + let mut json = false; + let mut csv = false; while next < args.len() { let arg = args[next].as_ref(); next += 1; @@ -237,6 +286,10 @@ fn command_line() -> Commands { (next, window) = (new_next, Some(value)); } else if let Some((new_next, value)) = string_arg(arg, &args, next, "--span") { (next, span) = (new_next, Some(value)); + } else if let Some(new_next) = bool_arg(arg, &args, next, "--json") { + (next, json) = (new_next, true); + } else if let Some(new_next) = bool_arg(arg, &args, next, "--csv") { + (next, csv) = (new_next, true); } else { usage(true); } @@ -244,7 +297,11 @@ fn command_line() -> Commands { if window.is_some() && span.is_some() { usage(true); } - Commands::Slurmjobs { window, span } + if json && csv { + eprintln!("--csv and --json are incompatible"); + std::process::exit(USAGE_ERROR); + } + Commands::Slurmjobs { window, span, json } } "version" => Commands::Version {}, "help" => { @@ -316,9 +373,9 @@ fn usage(is_error: bool) -> ! { Usage: sonar Commands: - ps Take a snapshot of the currently running processes - sysinfo Extract system information - slurm Extract slurm job information for a [start,end) time interval + ps Print process and load information + sysinfo Print system information + slurm Print slurm job information for a [start,end) time interval help Print this message Options for `ps`: @@ -345,6 +402,10 @@ Options for `ps`: --lockdir directory Create a per-host lockfile in this directory and exit early if the file exists on startup [default: none] + --load + Print per-cpu and per-gpu load data + --json + Format output as JSON, not CSV Options for `slurm`: --window minutes @@ -353,6 +414,8 @@ Options for `slurm`: --span start,end Both `start` and `end` are on the form yyyy-mm-dd. Mostly useful for seeding a database with older data. Precludes --window + --json + Format output as JSON, not CSV ", ); let _ = out.flush(); diff --git a/src/output.rs b/src/output.rs new file mode 100644 index 0000000..9c98772 --- /dev/null +++ b/src/output.rs @@ -0,0 +1,354 @@ +// Define a nested data structure of arrays, objects, and scalar values that can subsequently be +// serialized, currently as CSV and JSON, following conventions that are backward compatible with +// the older ad-hoc Sonar formatting code. +// +// Adding eg a compact binary serialization form would be very simple. + +use crate::util; + +use std::io; + +pub enum Value { + A(Array), + O(Object), + S(String), + U(u64), + I(i64), + F(f64), + E(), // Empty array element only, never a field or toplevel value +} + +struct Field { + tag: String, + value: Value, +} + +pub struct Object { + fields: Vec, +} + +#[allow(dead_code)] +impl Object { + pub fn new() -> Object { + Object { fields: vec![] } + } + + pub fn push(&mut self, tag: &str, value: Value) { + self.fields.push(Field { + tag: tag.to_string(), + value, + }) + } + + pub fn prepend(&mut self, tag: &str, value: Value) { + self.fields.insert( + 0, + Field { + tag: tag.to_string(), + value, + }, + ) + } + + pub fn push_o(&mut self, tag: &str, o: Object) { + self.push(tag, Value::O(o)); + } + + pub fn push_a(&mut self, tag: &str, a: Array) { + self.push(tag, Value::A(a)); + } + + pub fn push_s(&mut self, tag: &str, s: String) { + self.push(tag, Value::S(s)); + } + + pub fn prepend_s(&mut self, tag: &str, s: String) { + self.prepend(tag, Value::S(s)); + } + + pub fn push_u(&mut self, tag: &str, u: u64) { + self.push(tag, Value::U(u)); + } + + pub fn push_i(&mut self, tag: &str, i: i64) { + self.push(tag, Value::I(i)); + } + + pub fn push_f(&mut self, tag: &str, f: f64) { + self.push(tag, Value::F(f)); + } +} + +pub struct Array { + elements: Vec, + nonempty_base45: bool, + sep: String, +} + +#[allow(dead_code)] +impl Array { + pub fn new() -> Array { + Array { + elements: vec![], + nonempty_base45: false, + sep: ",".to_string(), + } + } + + pub fn from_vec(elements: Vec) -> Array { + Array { + elements, + nonempty_base45: false, + sep: ",".to_string(), + } + } + + pub fn push(&mut self, value: Value) { + self.elements.push(value) + } + + pub fn len(&self) -> usize { + self.elements.len() + } + + pub fn push_o(&mut self, o: Object) { + self.push(Value::O(o)); + } + + pub fn push_s(&mut self, s: String) { + self.push(Value::S(s)); + } + + pub fn push_u(&mut self, u: u64) { + self.push(Value::U(u)); + } + + pub fn push_i(&mut self, i: i64) { + self.push(Value::I(i)); + } + + pub fn push_f(&mut self, f: f64) { + self.push(Value::F(f)); + } + + // This creates a constraint that: + // + // - there must be at least one element + // - all elements must be Value::U + // - the array is encoded as an offsetted little-endian base45 string (below). + // + // This is an efficient and CSV-friendly encoding of a typical array of cpu-second data. + pub fn set_encode_nonempty_base45(&mut self) { + self.nonempty_base45 = true; + } + + // Use sep as a CSV array separator instead of the default ",". + pub fn set_csv_separator(&mut self, sep: String) { + self.sep = sep; + } +} + +// JSON output follows the standard. + +pub fn write_json(writer: &mut dyn io::Write, v: &Value) { + write_json_int(writer, v); + let _ = writer.write(&[b'\n']); +} + +fn write_json_int(writer: &mut dyn io::Write, v: &Value) { + match v { + Value::A(a) => write_json_array(writer, a), + Value::O(o) => write_json_object(writer, o), + Value::S(s) => write_json_string(writer, s), + Value::U(u) => write_chars(writer, &format!("{u}")), + Value::I(i) => write_chars(writer, &format!("{i}")), + Value::F(f) => write_chars(writer, &format!("{f}")), + Value::E() => {} + } +} + +fn write_json_array(writer: &mut dyn io::Write, a: &Array) { + if a.nonempty_base45 { + let us = a + .elements + .iter() + .map(|x| { + if let Value::U(u) = x { + *u + } else { + panic!("Not a Value::U") + } + }) + .collect::>(); + write_chars(writer, &encode_cpu_secs_base45el(&us)); + return; + } + + let _ = writer.write(&[b'[']); + let mut first = true; + for elt in &a.elements { + if !first { + let _ = writer.write(&[b',']); + } + write_json_int(writer, elt); + first = false; + } + let _ = writer.write(&[b']']); +} + +fn write_json_object(writer: &mut dyn io::Write, o: &Object) { + let _ = writer.write(&[b'{']); + let mut first = true; + for fld in &o.fields { + if !first { + let _ = writer.write(&[b',']); + } + write_json_string(writer, &fld.tag); + let _ = writer.write(&[b':']); + write_json_int(writer, &fld.value); + first = false; + } + let _ = writer.write(&[b'}']); +} + +fn write_json_string(writer: &mut dyn io::Write, s: &String) { + let _ = writer.write(&[b'"']); + write_chars(writer, &util::json_quote(&s)); + let _ = writer.write(&[b'"']); +} + +fn write_chars(writer: &mut dyn io::Write, s: &str) { + let _ = writer.write(s.as_bytes()); +} + +// CSV: +// +// - an object is a comma-separated list of FIELDs +// - an array is an X-separated list of VALUEs (where X is comma by default but can be changed) +// - a TAG is an unquoted string +// - each FIELD is {TAG}={VALUE} +// - a VALUE is the string representation of the value +// - if the FIELD of an object or the VALUE of an array contains ',' or '"', then the FIELD or VALUE +// is prefixed and suffixed by '"' and any '"' in the original string is doubled. +// +// Note that the bare representation of a value of any kind is just the string representation of the +// value itself (unquoted), it's the inclusion in an object or array that forces the quoting. +// +// The format allows nesting but the number of " grows exponentially with the nesting level if array +// separators are not managed carefully. Also, custom array element separators are not handled +// specially by the quoting mechanism, effectively requiring each nesting level to have its own +// custom quoting mechanism and to avoid quoting chars used at outer levels. For data nested more +// than one level, and especially when those data include arbitrary strings, use JSON. + +pub fn write_csv(writer: &mut dyn io::Write, v: &Value) { + write_chars(writer, &format_csv_value(v)); + let _ = writer.write(&[b'\n']); +} + +pub fn format_csv_value(v: &Value) -> String { + match v { + Value::A(a) => format_csv_array(a), + Value::O(o) => format_csv_object(o), + Value::S(s) => s.clone(), + Value::U(u) => format!("{u}"), + Value::I(i) => format!("{i}"), + Value::F(f) => format!("{f}"), + Value::E() => "".to_string(), + } +} + +fn format_csv_object(o: &Object) -> String { + let mut first = true; + let mut s = "".to_string(); + for fld in &o.fields { + if !first { + s += "," + } + let mut tmp = fld.tag.clone(); + tmp += "="; + tmp += &format_csv_value(&fld.value); + s += &util::csv_quote(&tmp); + first = false; + } + return s; +} + +fn format_csv_array(a: &Array) -> String { + if a.nonempty_base45 { + let us = a + .elements + .iter() + .map(|x| { + if let Value::U(u) = x { + *u + } else { + panic!("Not a Value::U") + } + }) + .collect::>(); + return encode_cpu_secs_base45el(&us); + } + let mut first = true; + let mut s = "".to_string(); + for elt in &a.elements { + if !first { + s += &a.sep; + } + s += &util::csv_quote(&format_csv_value(elt)); + first = false; + } + return s; +} + +// Encode a nonempty u64 array compactly. +// +// The output must be ASCII text (32 <= c < 128), ideally without ',' or '"' or '\' or ' ' to not +// make it difficult for the various output formats we use. Also avoid DEL, because it is a weird +// control character. +// +// We have many encodings to choose from, see https://github.com/NordicHPC/sonar/issues/178. +// +// The values to be represented are always cpu seconds of active time since boot, one item per cpu, +// and it is assumed that they are roughly in the vicinity of each other (the largest is rarely more +// than 4x the smallest, say). The assumption does not affect correctness, only compactness. +// +// The encoding first finds the minimum input value and subtracts that from all entries. The +// minimum value, and all the entries, are then emitted as unsigned little-endian base-45 with the +// initial digit chosen from a different character set to indicate that it is initial. + +fn encode_cpu_secs_base45el(cpu_secs: &[u64]) -> String { + let base = *cpu_secs + .iter() + .reduce(std::cmp::min) + .expect("Must have a non-empty array"); + let mut s = encode_u64_base45el(base); + for x in cpu_secs { + s += encode_u64_base45el(*x - base).as_str(); + } + s +} + +// The only character unused by the encoding, other than the ones we're not allowed to use, is '='. +const BASE: u64 = 45; +const INITIAL: &[u8] = "(){}[]<>+-abcdefghijklmnopqrstuvwxyz!@#$%^&*_".as_bytes(); +const SUBSEQUENT: &[u8] = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ~|';:.?/`".as_bytes(); + +fn encode_u64_base45el(mut x: u64) -> String { + let mut s = String::from(INITIAL[(x % BASE) as usize] as char); + x /= BASE; + while x > 0 { + s.push(SUBSEQUENT[(x % BASE) as usize] as char); + x /= BASE; + } + s +} + +#[test] +pub fn test_encoding() { + assert!(INITIAL.len() == BASE as usize); + assert!(SUBSEQUENT.len() == BASE as usize); + // This should be *1, *0, *29, *43, 1, *11 with * denoting an INITIAL char. + let v = vec![1, 30, 89, 12]; + println!("{}", encode_cpu_secs_base45el(&v)); + assert!(encode_cpu_secs_base45el(&v) == ")(t*1b"); +} diff --git a/src/ps.rs b/src/ps.rs index 800bc4c..a328822 100644 --- a/src/ps.rs +++ b/src/ps.rs @@ -6,12 +6,13 @@ use crate::hostname; use crate::interrupt; use crate::jobs; use crate::log; +use crate::output; use crate::procfs; use crate::procfsapi; -use crate::util::{csv_quote, three_places}; +use crate::util::three_places; use std::collections::{HashMap, HashSet}; -use std::io::{self, Result, Write}; +use std::io::{self, Write}; use std::path::PathBuf; // The GpuSet has three states: @@ -175,9 +176,15 @@ pub struct PsOptions<'a> { pub exclude_commands: Vec<&'a str>, pub lockdir: Option, pub load: bool, + pub json: bool, } -pub fn create_snapshot(jobs: &mut dyn jobs::JobManager, opts: &PsOptions, timestamp: &str) { +pub fn create_snapshot( + writer: &mut dyn io::Write, + jobs: &mut dyn jobs::JobManager, + opts: &PsOptions, + timestamp: &str, +) { // If a lock file was requested, create one before the operation, exit early if it already // exists, and if we performed the operation, remove the file afterwards. Otherwise, just // perform the operation. @@ -234,7 +241,7 @@ pub fn create_snapshot(jobs: &mut dyn jobs::JobManager, opts: &PsOptions, timest } if !failed && !skip { - do_create_snapshot(jobs, opts, timestamp); + do_create_snapshot(writer, jobs, opts, timestamp); // Testing code: If we got the lockfile and produced a report, wait 10s after producing // it while holding onto the lockfile. It is then possible to run sonar in that window @@ -262,11 +269,16 @@ pub fn create_snapshot(jobs: &mut dyn jobs::JobManager, opts: &PsOptions, timest log::error("Unable to properly manage or delete lockfile"); } } else { - do_create_snapshot(jobs, opts, timestamp); + do_create_snapshot(writer, jobs, opts, timestamp); } } -fn do_create_snapshot(jobs: &mut dyn jobs::JobManager, opts: &PsOptions, timestamp: &str) { +fn do_create_snapshot( + writer: &mut dyn io::Write, + jobs: &mut dyn jobs::JobManager, + opts: &PsOptions, + timestamp: &str, +) { let no_gpus = empty_gpuset(); let mut proc_by_pid = ProcTable::new(); @@ -340,7 +352,7 @@ fn do_create_snapshot(jobs: &mut dyn jobs::JobManager, opts: &PsOptions, timesta let mut gpu_status = GpuStatus::Ok; let gpu_utilization: Vec; - let mut gpu_info: String = "".to_string(); + let mut gpu_info: Option = None; match gpu::probe() { None => {} Some(mut gpu) => { @@ -349,18 +361,20 @@ fn do_create_snapshot(jobs: &mut dyn jobs::JobManager, opts: &PsOptions, timesta gpu_status = GpuStatus::UnknownFailure; } Ok(ref cards) => { - let mut s = "".to_string(); + let mut s = output::Object::new(); s = add_key(s, "fan%", cards, |c: &gpu::CardState| { nonzero(c.fan_speed_pct as i64) }); s = add_key(s, "mode", cards, |c: &gpu::CardState| { if c.compute_mode == "Default" { - "".to_string() + output::Value::E() } else { - c.compute_mode.clone() + output::Value::S(c.compute_mode.clone()) } }); - s = add_key(s, "perf", cards, |c: &gpu::CardState| c.perf_state.clone()); + s = add_key(s, "perf", cards, |c: &gpu::CardState| { + output::Value::S(c.perf_state.clone()) + }); // Reserved memory is really not interesting, it's possible it would have been // interesting as part of the card configuration. //s = add_key(s, "mreskib", cards, |c: &gpu::CardState| nonzero(c.mem_reserved_kib)); @@ -388,7 +402,7 @@ fn do_create_snapshot(jobs: &mut dyn jobs::JobManager, opts: &PsOptions, timesta s = add_key(s, "memz", cards, |c: &gpu::CardState| { nonzero(c.mem_clock_mhz.into()) }); - gpu_info = s; + gpu_info = Some(s); } } match gpu.get_process_utilization(&user_by_pid) { @@ -449,8 +463,6 @@ fn do_create_snapshot(jobs: &mut dyn jobs::JobManager, opts: &PsOptions, timesta // Once we start printing we'll print everything and not check the interrupted flag any more. - let mut writer = io::stdout(); - let hostname = hostname::get(); const VERSION: &str = env!("CARGO_PKG_VERSION"); let print_params = PrintParameters { @@ -531,106 +543,118 @@ fn do_create_snapshot(jobs: &mut dyn jobs::JobManager, opts: &PsOptions, timesta .filter(|proc_info| filter_proc(proc_info, &print_params)) .collect::>(); - let mut did_print = false; + // Here JSON and CSV will diverge a little. The trick is this: + // + // - first generate all output lines w/o version, timestamp, hostname, per_cpu_secs, and gpu_info + // - then: + // - for json, create an object that has those common fields and an array of per-process data, + // this object is also the heartbeat / synthetic record + // - for csv: + // - if there are no records and must_print is true, synthesize one + // - if we want them, insert per_cpu_secs and gpu_info fields in the first record (there will be one) + // - attach version, timestamp and hostname to all records (ideally we would prepend...) + // + // Then print. + + let mut records: Vec = vec![]; for c in candidates { - match print_record( - &mut writer, - &print_params, - &c, - if !did_print { - Some(&per_cpu_secs) + records.push(generate_candidate(&c)); + } + + if !opts.json { + let have_load_data = { + if print_params.opts.load { + !per_cpu_secs.is_empty() || gpu_info.is_some() } else { - None - }, - if !did_print { Some(&gpu_info) } else { None }, - ) { - Ok(did_print_one) => did_print = did_print_one || did_print, - Err(_) => { - // Discard the error: there's nothing very sensible we can do at this point if the - // write failed, and it will fail if we cut off a pipe, for example, see #132. I - // guess one can argue whether we should try the next record, but it seems sensible - // to just bail out and hope for the best. - break; + false } - } - } + }; - if !did_print && must_print { - // Print a synthetic record - let synth = ProcInfo { - user: "_sonar_", - _uid: 0, - command: "_heartbeat_", - pid: 0, - ppid: 0, - rolledup: 0, - is_system_job: true, - has_children: false, - job_id: 0, - cpu_percentage: 0.0, - cputime_sec: 0, - mem_percentage: 0.0, - mem_size_kib: 0, - rssanon_kib: 0, - gpu_cards: empty_gpuset(), - gpu_percentage: 0.0, - gpu_mem_percentage: 0.0, - gpu_mem_size_kib: 0, - gpu_status: GpuStatus::Ok, + if (must_print || have_load_data) && records.len() == 0 { + let mut fields = output::Object::new(); + fields.push_s("user", "_sonar_".to_string()); + fields.push_s("cmd", "_heartbeat_".to_string()); + records.push(fields); }; - // Discard the error, see above. - let _ = print_record( - &mut writer, - &print_params, - &synth, - if !did_print { - Some(&per_cpu_secs) - } else { - None - }, - if !did_print { Some(&gpu_info) } else { None }, - ); + + if print_params.opts.load { + if !per_cpu_secs.is_empty() { + let mut a = output::Array::from_vec( + per_cpu_secs + .iter() + .map(|x| output::Value::U(*x)) + .collect::>(), + ); + a.set_encode_nonempty_base45(); + records[0].push_a("load", a); + } + if let Some(info) = gpu_info { + records[0].push_o("gpuinfo", info); + } + } + + // Historically, these three fields were always first, and we have test cases that depend on + // "v=" being the very first field. + for r in &mut records { + r.prepend_s("host", print_params.hostname.to_string()); + r.prepend_s("time", print_params.timestamp.to_string()); + r.prepend_s("v", print_params.version.to_string()); + } + + for v in records { + output::write_csv(writer, &output::Value::O(v)); + } + } else { + let mut datum = output::Object::new(); + datum.push_s("host", print_params.hostname.to_string()); + datum.push_s("time", print_params.timestamp.to_string()); + datum.push_s("v", print_params.version.to_string()); + if print_params.opts.load { + if !per_cpu_secs.is_empty() { + let a = output::Array::from_vec( + per_cpu_secs + .iter() + .map(|x| output::Value::U(*x)) + .collect::>(), + ); + datum.push_a("load", a); + } + if let Some(info) = gpu_info { + datum.push_o("gpuinfo", info); + } + } + let mut samples = output::Array::new(); + for o in records { + samples.push_o(o); + } + datum.push_a("samples", samples); + output::write_json(writer, &output::Value::O(datum)); } // Discard the error code, see above. let _ = writer.flush(); } -fn add_key( - mut s: String, +fn add_key<'a>( + mut s: output::Object, key: &str, cards: &[gpu::CardState], - extract: fn(&gpu::CardState) -> String, -) -> String { - let mut vs = "".to_string(); - let mut any = false; - let mut first = true; + extract: fn(&gpu::CardState) -> output::Value, +) -> output::Object { + let mut vs = output::Array::new(); + vs.set_csv_separator("|".to_string()); for c in cards { - let v = extract(c); - if !first { - vs += "|"; - } - if !v.is_empty() { - any = true; - vs = vs + &v; - } - first = false; - } - if any { - if !s.is_empty() { - s += ","; - } - s + key + "=" + &vs - } else { - s + vs.push(extract(c)); } + s.push(key, output::Value::A(vs)); + s } -fn nonzero(x: i64) -> String { +fn nonzero(x: i64) -> output::Value { if x == 0 { - "".to_string() + output::Value::E() } else { - format!("{:?}", x) + output::Value::I(x) } } @@ -700,160 +724,70 @@ struct PrintParameters<'a> { opts: &'a PsOptions<'a>, } -fn print_record( - writer: &mut dyn io::Write, - params: &PrintParameters, - proc_info: &ProcInfo, - per_cpu_secs: Option<&[u64]>, - gpu_info: Option<&str>, -) -> Result { - // Mandatory fields. - - let mut fields = vec![ - format!("v={}", params.version), - format!("time={}", params.timestamp), - format!("host={}", params.hostname), - format!("user={}", proc_info.user), - format!("cmd={}", proc_info.command), - ]; +fn generate_candidate(proc_info: &ProcInfo) -> output::Object { + let mut fields = output::Object::new(); + + fields.push_s("user", proc_info.user.to_string()); + fields.push_s("cmd", proc_info.command.to_string()); // Only print optional fields whose values are not their defaults. The defaults are defined in // README.md. The values there must agree with those used by Jobanalyzer's parser. if proc_info.job_id != 0 { - fields.push(format!("job={}", proc_info.job_id)); + fields.push_u("job", proc_info.job_id as u64); } if proc_info.rolledup == 0 && proc_info.pid != 0 { // pid must be 0 for rolledup > 0 as there is no guarantee that there is any fixed // representative pid for a rolled-up set of processes: the set can change from run to run, // and sonar has no history. - fields.push(format!("pid={}", proc_info.pid)); + fields.push_u("pid", proc_info.pid as u64); } if proc_info.ppid != 0 { - fields.push(format!("ppid={}", proc_info.ppid)); + fields.push_u("ppid", proc_info.ppid as u64); } if proc_info.cpu_percentage != 0.0 { - fields.push(format!("cpu%={}", three_places(proc_info.cpu_percentage))); + fields.push_f("cpu%", three_places(proc_info.cpu_percentage)); } if proc_info.mem_size_kib != 0 { - fields.push(format!("cpukib={}", proc_info.mem_size_kib)); + fields.push_u("cpukib", proc_info.mem_size_kib as u64); } if proc_info.rssanon_kib != 0 { - fields.push(format!("rssanonkib={}", proc_info.rssanon_kib)); + fields.push_u("rssanonkib", proc_info.rssanon_kib as u64); } if let Some(ref cards) = proc_info.gpu_cards { if cards.is_empty() { // Nothing } else { - fields.push(format!( - "gpus={}", + fields.push_s( + "gpus", cards .iter() .map(|&num| num.to_string()) .collect::>() - .join(",") - )) + .join(","), + ); } } else { - fields.push("gpus=unknown".to_string()); + fields.push_s("gpus", "unknown".to_string()); } if proc_info.gpu_percentage != 0.0 { - fields.push(format!("gpu%={}", three_places(proc_info.gpu_percentage))); + fields.push_f("gpu%", three_places(proc_info.gpu_percentage)); } if proc_info.gpu_mem_percentage != 0.0 { - fields.push(format!( - "gpumem%={}", - three_places(proc_info.gpu_mem_percentage) - )); + fields.push_f("gpumem%", three_places(proc_info.gpu_mem_percentage)); } if proc_info.gpu_mem_size_kib != 0 { - fields.push(format!("gpukib={}", proc_info.gpu_mem_size_kib)); + fields.push_u("gpukib", proc_info.gpu_mem_size_kib as u64); } if proc_info.cputime_sec != 0 { - fields.push(format!("cputime_sec={}", proc_info.cputime_sec)); + fields.push_u("cputime_sec", proc_info.cputime_sec as u64); } if proc_info.gpu_status != GpuStatus::Ok { - fields.push(format!("gpufail={}", proc_info.gpu_status as i32)); + fields.push_u("gpufail", proc_info.gpu_status as u64); } if proc_info.rolledup > 0 { - fields.push(format!("rolledup={}", proc_info.rolledup)); - } - if params.opts.load { - if let Some(cpu_secs) = per_cpu_secs { - if !cpu_secs.is_empty() { - fields.push(format!("load={}", encode_cpu_secs_base45el(cpu_secs))); - } - } - if let Some(gpu_info) = gpu_info { - if !gpu_info.is_empty() { - fields.push(format!("gpuinfo={gpu_info}")); - } - } - } - - let mut s = "".to_string(); - for f in fields { - if !s.is_empty() { - s += "," - } - s += &csv_quote(&f); - } - s += "\n"; - - let _ = writer.write(s.as_bytes())?; - - Ok(true) -} - -// Encode a nonempty u64 array compactly. -// -// The output must be ASCII text (32 <= c < 128), ideally without ',' or '"' or '\' or ' ' to not -// make it difficult for the various output formats we use. Also avoid DEL, because it is a weird -// control character. -// -// We have many encodings to choose from, see https://github.com/NordicHPC/sonar/issues/178. -// -// The values to be represented are always cpu seconds of active time since boot, one item per cpu, -// and it is assumed that they are roughly in the vicinity of each other (the largest is rarely more -// than 4x the smallest, say). The assumption does not affect correctness, only compactness. -// -// The encoding first finds the minimum input value and subtracts that from all entries. The -// minimum value, and all the entries, are then emitted as unsigned little-endian base-45 with the -// initial digit chosen from a different character set to indicate that it is initial. - -fn encode_cpu_secs_base45el(cpu_secs: &[u64]) -> String { - let base = *cpu_secs - .iter() - .reduce(std::cmp::min) - .expect("Must have a non-empty array"); - let mut s = encode_u64_base45el(base); - for x in cpu_secs { - s += encode_u64_base45el(*x - base).as_str(); + fields.push_u("rolledup", proc_info.rolledup as u64); } - s -} - -// The only character unused by the encoding, other than the ones we're not allowed to use, is '='. -const BASE: u64 = 45; -const INITIAL: &[u8] = "(){}[]<>+-abcdefghijklmnopqrstuvwxyz!@#$%^&*_".as_bytes(); -const SUBSEQUENT: &[u8] = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ~|';:.?/`".as_bytes(); - -fn encode_u64_base45el(mut x: u64) -> String { - let mut s = String::from(INITIAL[(x % BASE) as usize] as char); - x /= BASE; - while x > 0 { - s.push(SUBSEQUENT[(x % BASE) as usize] as char); - x /= BASE; - } - s -} -#[test] -pub fn test_encoding() { - assert!(INITIAL.len() == BASE as usize); - assert!(SUBSEQUENT.len() == BASE as usize); - // This should be *1, *0, *29, *43, 1, *11 with * denoting an INITIAL char. - let v = vec![1, 30, 89, 12]; - println!("{}", encode_cpu_secs_base45el(&v)); - assert!(encode_cpu_secs_base45el(&v) == ")(t*1b"); + fields } diff --git a/src/slurmjobs.rs b/src/slurmjobs.rs index 8200a48..16f7127 100644 --- a/src/slurmjobs.rs +++ b/src/slurmjobs.rs @@ -2,8 +2,8 @@ use crate::command; use crate::log; +use crate::output; use crate::time; -use crate::util; #[cfg(test)] use std::cmp::min; @@ -19,7 +19,12 @@ const TIMEOUT_S: u64 = 180; // Same output format as sacctd, which uses this version number. const VERSION: &str = "0.1.0"; -pub fn show_slurm_jobs(window: &Option, span: &Option) { +pub fn show_slurm_jobs( + writer: &mut dyn io::Write, + window: &Option, + span: &Option, + json: bool, +) { let (job_states, field_names) = parameters(); // Parse the options to compute the time range to pass to sacct. @@ -59,9 +64,8 @@ pub fn show_slurm_jobs(window: &Option, span: &Option) { log::error(&format!("sacct failed: {:?}", e)); } Ok(sacct_output) => { - let mut writer = io::stdout(); let local = time::now_local(); - format_jobs(&mut writer, &sacct_output, &field_names, &local); + format_jobs(writer, &sacct_output, &field_names, &local, json); } } } @@ -139,6 +143,7 @@ fn format_jobs( sacct_output: &str, field_names: &[&str], local: &libc::tm, + json: bool, ) { // Fields that are dates that may be reinterpreted before transmission. let date_fields = HashSet::from(["Start", "End", "Submit"]); @@ -160,7 +165,8 @@ fn format_jobs( field_store[field_names.len() - 1] = &jobname; let fields = &field_store[..field_names.len()]; - let mut output_line = "v=".to_string() + VERSION; + let mut output_line = output::Object::new(); + output_line.push_s("v", VERSION.to_string()); for (i, name) in field_names.iter().enumerate() { let mut val = fields[i].to_string(); let is_zero = val.is_empty() @@ -179,12 +185,14 @@ fn format_jobs( val = time::format_iso8601(&t).to_string() } } - output_line += ","; - output_line += &util::csv_quote(&(name.to_string() + "=" + &val)); + output_line.push_s(name, val); } } - output_line += "\n"; - let _ = writer.write(output_line.as_bytes()); + if json { + output::write_json(writer, &output::Value::O(output_line)); + } else { + output::write_csv(writer, &output::Value::O(output_line)); + } } } @@ -204,7 +212,7 @@ pub fn test_format_jobs() { // The output below depends on us being in UTC+01:00 and not in dst so mock that. local.tm_gmtoff = 3600; local.tm_isdst = 0; - format_jobs(&mut output, sacct_output, &field_names, &local); + format_jobs(&mut output, sacct_output, &field_names, &local, false); if output != expected.as_bytes() { let xs = &output; let ys = expected.as_bytes(); @@ -224,6 +232,12 @@ pub fn test_format_jobs() { break; } } + println!("{} {}", xs.len(), ys.len()); + if xs.len() > ys.len() { + println!("{}", String::from_utf8_lossy(&xs[ys.len()..])); + } else { + println!("{}", String::from_utf8_lossy(&ys[xs.len()..])); + } assert!(false); } } diff --git a/src/sysinfo.rs b/src/sysinfo.rs index b244587..e2ef908 100644 --- a/src/sysinfo.rs +++ b/src/sysinfo.rs @@ -1,16 +1,15 @@ use crate::gpu; use crate::hostname; use crate::log; +use crate::output; use crate::procfs; use crate::procfsapi; -use crate::util; use std::io; -pub fn show_system(timestamp: &str) { +pub fn show_system(writer: &mut dyn io::Write, timestamp: &str, csv: bool) { let fs = procfsapi::RealFS::new(); - let mut writer = io::stdout(); - match do_show_system(&mut writer, &fs, timestamp) { + match do_show_system(writer, &fs, timestamp, csv) { Ok(_) => {} Err(e) => { log::error(&format!("sysinfo failed: {e}")); @@ -24,6 +23,7 @@ fn do_show_system( writer: &mut dyn io::Write, fs: &dyn procfsapi::ProcfsAPI, timestamp: &str, + csv: bool, ) -> Result<(), String> { let (model, sockets, cores_per_socket, threads_per_core) = procfs::get_cpu_info(fs)?; let mem_by = procfs::get_memtotal_kib(fs)? * 1024; @@ -41,7 +41,9 @@ fn do_show_system( } else { "" }; - let (gpu_desc, gpu_cards, gpumem_gb, gpu_info) = if !cards.is_empty() { + + let mut gpu_info = output::Array::new(); + let (gpu_desc, gpu_cards, gpumem_gb) = if !cards.is_empty() { // Sort cards cards.sort_by(|a: &gpu::Card, b: &gpu::Card| { if a.model == b.model { @@ -78,11 +80,7 @@ fn do_show_system( } // Compute the info blobs - let mut gpu_info = "".to_string(); for c in &cards { - if !gpu_info.is_empty() { - gpu_info += "," - } let gpu::Card { bus_addr, index, @@ -98,56 +96,51 @@ fn do_show_system( max_ce_clock_mhz, max_mem_clock_mhz, } = c; - let manufacturer = util::json_quote(&manufacturer); - let bus_addr = util::json_quote(bus_addr); - let model = util::json_quote(model); - let arch = util::json_quote(arch); - let driver = util::json_quote(driver); - let firmware = util::json_quote(firmware); - gpu_info += &format!( - r###" - {{"bus_addr":"{bus_addr}", "index":{index}, "uuid":"{uuid}", - "manufacturer":"{manufacturer}", "model":"{model}", "arch":"{arch}", "driver":"{driver}", "firmware":"{firmware}", - "mem_size_kib":{mem_size_kib}, - "power_limit_watt":{power_limit_watt}, "max_power_limit_watt":{max_power_limit_watt}, "min_power_limit_watt":{min_power_limit_watt}, - "max_ce_clock_mhz":{max_ce_clock_mhz}, "max_mem_clock_mhz":{max_mem_clock_mhz}}}"### - ); + let mut gpu = output::Object::new(); + gpu.push_s("bus_addr", bus_addr.to_string()); + gpu.push_i("index", *index as i64); + gpu.push_s("uuid", uuid.to_string()); + gpu.push_s("manufacturer", manufacturer.clone()); + gpu.push_s("model", model.to_string()); + gpu.push_s("arch", arch.to_string()); + gpu.push_s("driver", driver.to_string()); + gpu.push_s("firmware", firmware.to_string()); + gpu.push_i("mem_size_kib", *mem_size_kib); + gpu.push_i("power_limit_watt", *power_limit_watt as i64); + gpu.push_i("max_power_limit_watt", *max_power_limit_watt as i64); + gpu.push_i("min_power_limit_watt", *min_power_limit_watt as i64); + gpu.push_i("max_ce_clock_mhz", *max_ce_clock_mhz as i64); + gpu.push_i("max_mem_clock_mhz", *max_mem_clock_mhz as i64); + gpu_info.push_o(gpu); } - (gpu_desc, gpu_cards, total_mem_by / GIB as i64, gpu_info) + (gpu_desc, gpu_cards, total_mem_by / GIB as i64) } else { - ("".to_string(), 0, 0, "".to_string()) + ("".to_string(), 0, 0) }; - let timestamp = util::json_quote(timestamp); - let hostname = util::json_quote(&hostname); - let description = util::json_quote(&format!( - "{sockets}x{cores_per_socket}{ht} {model}, {mem_gib} GiB{gpu_desc}" - )); let cpu_cores = sockets * cores_per_socket * threads_per_core; - // Note the field names here are used by decoders that are developed separately, and they should - // be considered set in stone. - - let version = util::json_quote(env!("CARGO_PKG_VERSION")); - let s = format!( - r#"{{ - "version": "{version}", - "timestamp": "{timestamp}", - "hostname": "{hostname}", - "description": "{description}", - "cpu_cores": {cpu_cores}, - "mem_gb": {mem_gib}, - "gpu_cards": {gpu_cards}, - "gpumem_gb": {gpumem_gb}, - "gpu_info": [{gpu_info}] -}} -"# + let mut sysinfo = output::Object::new(); + sysinfo.push_s("version", env!("CARGO_PKG_VERSION").to_string()); + sysinfo.push_s("timestamp", timestamp.to_string()); + sysinfo.push_s("hostname", hostname); + sysinfo.push_s( + "description", + format!("{sockets}x{cores_per_socket}{ht} {model}, {mem_gib} GiB{gpu_desc}"), ); + sysinfo.push_i("cpu_cores", cpu_cores as i64); + sysinfo.push_i("mem_gb", mem_gib); + sysinfo.push_i("gpu_cards", gpu_cards as i64); + sysinfo.push_i("gpumem_gb", gpumem_gb); + if gpu_info.len() > 0 { + sysinfo.push_a("gpu_info", gpu_info); + } - // Ignore I/O errors. - - let _ = writer.write(s.as_bytes()); - let _ = writer.flush(); + if csv { + output::write_csv(writer, &output::Value::O(sysinfo)); + } else { + output::write_json(writer, &output::Value::O(sysinfo)); + } Ok(()) }