Skip to content

Commit

Permalink
stats: smarter cardinality computation
Browse files Browse the repository at this point in the history
Since `stats` knows the sort order of a column, it now uses that information to skip sorting before computing stats that require sorting - e.g. cardinality, modes/antimodes, quartiles, median, mad.

Also tweaked qsv-stats cardinality computation to be more memory efficient and faster, so qsv should be able to handle even larger datasets when computing advanced stats
  • Loading branch information
jqnatividad committed Sep 8, 2024
1 parent fd1a0e5 commit 4e63fec
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 39 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ pyo3 = { version = "0.22", features = [
], optional = true }
qsv-dateparser = "0.12"
qsv_docopt = "1.7"
qsv-stats = "0.20"
qsv-stats = "0.21"
qsv_currency = { version = "0.6", optional = true }
qsv-sniffer = { version = "0.10", default-features = false, features = [
"runtime-dispatch-simd",
Expand Down
96 changes: 60 additions & 36 deletions src/cmd/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ struct StatsArgs {
}

impl StatsArgs {
/// this is for deserializing the stats.csv.jsonl file
fn from_owned_value(value: &OwnedValue) -> Result<Self, Box<dyn std::error::Error>> {
Ok(Self {
arg_input: value["arg_input"].as_str().unwrap_or_default().to_string(),
Expand Down Expand Up @@ -470,8 +471,10 @@ pub static STATSDATA_TYPES_ARRAY: [JsonTypes; MAX_STAT_COLUMNS] = [
static INFER_DATE_FLAGS: OnceLock<Vec<bool>> = OnceLock::new();
static RECORD_COUNT: OnceLock<u64> = OnceLock::new();

pub const OVERFLOW_STRING: &str = "*OVERFLOW*";
pub const UNDERFLOW_STRING: &str = "*UNDERFLOW*";
// standard overflow and underflow strings
// for sum, sum_length and avg_length
const OVERFLOW_STRING: &str = "*OVERFLOW*";
const UNDERFLOW_STRING: &str = "*UNDERFLOW*";

// number of milliseconds per day
const MS_IN_DAY: f64 = 86_400_000.0;
Expand Down Expand Up @@ -566,6 +569,7 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
};
let stats_csv_tempfile_fname = format!(
"{stem}.{prime_ext}{snappy_ext}",
//safety: we know the tempfile is a valid NamedTempFile, so we can use unwrap
stem = stats_csv_tempfile.path().to_str().unwrap(),
prime_ext = output_extension,
snappy_ext = if snappy { ".sz" } else { "" }
Expand Down Expand Up @@ -608,6 +612,7 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
let write_stats_jsonl = args.flag_stats_jsonl;

if let Some(path) = rconfig.path.clone() {
//safety: we know the path is a valid PathBuf, so we can use unwrap
let path_file_stem = path.file_stem().unwrap().to_str().unwrap();
let stats_file = stats_path(&path, false)?;
// check if <FILESTEM>.stats.csv file already exists.
Expand Down Expand Up @@ -733,8 +738,9 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
}

// we need to count the number of records in the file to calculate sparsity
// safety: we know util::count_rows() will not return an Err, so we can use unwrap
let record_count = RECORD_COUNT.get_or_init(|| util::count_rows(&rconfig).unwrap());
log::info!("scanning {record_count} records...");
// log::info!("scanning {record_count} records...");

let (headers, stats) = match rconfig.indexed()? {
None => args.sequential_stats(&args.flag_dates_whitelist),
Expand Down Expand Up @@ -775,6 +781,7 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
{
// if the stats run took longer than the cache threshold and the threshold > 0,
// cache the stats so we don't have to recompute it next time
// safety: we know the path is a valid PathBuf, so we can use unwrap
current_stats_args.canonical_input_path =
path.canonicalize()?.to_str().unwrap().to_string();
current_stats_args.record_count = *record_count;
Expand All @@ -799,6 +806,7 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
stats_csv_tempfile_fname
} else {
// we didn't compute the stats, re-use the existing stats file
// safety: we know the path is a valid PathBuf, so we can use unwrap
stats_path(rconfig.path.as_ref().unwrap(), false)?
.to_str()
.unwrap()
Expand All @@ -807,19 +815,21 @@ pub fn run(argv: &[&str]) -> CliResult<()> {

if rconfig.is_stdin() {
// if we read from stdin, copy the temp stats file to "stdin.stats.csv"
// safety: we know the path is a valid PathBuf, so we can use unwrap
let mut stats_pathbuf = stats_path(rconfig.path.as_ref().unwrap(), true)?;
fs::copy(currstats_filename.clone(), stats_pathbuf.clone())?;

// save the stats args to "stdin.stats.csv.json"
stats_pathbuf.set_extension("csv.json");
std::fs::write(
stats_pathbuf,
serde_json::to_string_pretty(&current_stats_args).unwrap(),
serde_json::to_string_pretty(&current_stats_args)?,
)?;
} else if let Some(path) = rconfig.path {
// if we read from a file, copy the temp stats file to "<FILESTEM>.stats.csv"
let mut stats_pathbuf = path.clone();
stats_pathbuf.set_extension("stats.csv");
// safety: we know the path is a valid PathBuf, so we can use unwrap
if currstats_filename != stats_pathbuf.to_str().unwrap() {
// if the stats file is not the same as the input file, copy it
fs::copy(currstats_filename.clone(), stats_pathbuf.clone())?;
Expand Down Expand Up @@ -856,6 +866,7 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
stats_pathbuf.set_extension("csv.json");
// write empty file first so we can canonicalize it
std::fs::File::create(stats_pathbuf.clone())?;
// safety: we know the path is a valid PathBuf, so we can use unwrap
current_stats_args.canonical_stats_path = stats_pathbuf
.clone()
.canonicalize()?
Expand All @@ -864,8 +875,7 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
.to_string();
std::fs::write(
stats_pathbuf.clone(),
// safety: we know that current_stats_args is JSON serializable
serde_json::to_string_pretty(&current_stats_args).unwrap(),
serde_json::to_string_pretty(&current_stats_args)?,
)?;

// save the stats data to "<FILESTEM>.stats.csv.data.jsonl"
Expand Down Expand Up @@ -1056,6 +1066,9 @@ impl Args {

// with --everything, we have MAX_STAT_COLUMNS columns at most
let mut fields = Vec::with_capacity(MAX_STAT_COLUMNS);

// these are the standard stats columns that are always output
// the "streaming" stats
fields.extend_from_slice(&[
"field",
"type",
Expand All @@ -1078,6 +1091,8 @@ impl Args {
"max_precision",
"sparsity",
]);

// these are the stats columns that are only output if the user requested them
let all = self.flag_everything;
if self.flag_median && !self.flag_quartiles && !all {
fields.push("median");
Expand Down Expand Up @@ -1163,6 +1178,7 @@ fn init_date_inference(
headers
.iter()
.map(|header| {
// safety: we know the header is a valid String, so we can use unwrap
util::to_lowercase_into(&from_bytes::<String>(header).unwrap(), &mut header_str);
date_found = whitelist
.iter()
Expand Down Expand Up @@ -1324,6 +1340,7 @@ impl Stats {
};
}
} else {
// safety: we know the sample is a valid f64, so we can use unwrap
let n = from_bytes::<f64>(sample).unwrap();
if let Some(v) = self.median.as_mut() {
v.add(n);
Expand Down Expand Up @@ -1398,10 +1415,37 @@ impl Stats {

let empty = String::new;

// mode/modes & cardinality
// we do this first because we need to know the cardinality to --infer-boolean
// should that be enabled
let mut cardinality = 0_usize;
// min/max/range/sort_order
// we do this first as we want to get the sort_order, so we can skip sorting if not
// required. We also need to do this before --infer-boolean because we need to know
// the min/max values to determine if the range is equal to the supported boolean
// ranges (0/1, f/t, n/y)
let mut minmax_range_sortorder_pieces = Vec::with_capacity(4);
let mut minval_lower: char = '\0';
let mut maxval_lower: char = '\0';
let mut column_sorted = false;
if let Some(mm) = self
.minmax
.as_ref()
.and_then(|mm| mm.show(typ, round_places))
{
// get first character of min/max values
minval_lower = mm.0.chars().next().unwrap_or_default().to_ascii_lowercase();
maxval_lower = mm.1.chars().next().unwrap_or_default().to_ascii_lowercase();
if mm.3.starts_with("Ascending") {
column_sorted = true;
}
minmax_range_sortorder_pieces.extend_from_slice(&[mm.0, mm.1, mm.2, mm.3]);
} else {
minmax_range_sortorder_pieces.extend_from_slice(&[empty(), empty(), empty(), empty()]);
}

// modes/antimodes & cardinality
// we do this second because we can use the sort order with cardinality, to skip sorting
// if its not required. This makes not only cardinality computation faster, it also makes
// modes/antimodes computation faster.
// We also need to know the cardinality to --infer-boolean should that be enabled
let mut cardinality = 0;
let mut mc_pieces = Vec::with_capacity(7);
match self.modes.as_mut() {
None => {
Expand All @@ -1414,7 +1458,7 @@ impl Stats {
},
Some(ref mut v) => {
if self.which.cardinality {
cardinality = v.cardinality();
cardinality = v.cardinality(column_sorted);
let mut buffer = itoa::Buffer::new();
mc_pieces.push(buffer.format(cardinality).to_owned());
}
Expand Down Expand Up @@ -1476,25 +1520,6 @@ impl Stats {
},
}

// min/max/range/sort_order
// we also do this before --infer-boolean because we need to know the min/max values
// to determine if the range is equal to the supported boolean ranges (0/1, f/t, n/y)
let mut minmax_range_sortorder_pieces = Vec::with_capacity(4);
let mut minval_lower: char = '\0';
let mut maxval_lower: char = '\0';
if let Some(mm) = self
.minmax
.as_ref()
.and_then(|mm| mm.show(typ, round_places))
{
// get first character of min/max values
minval_lower = mm.0.chars().next().unwrap_or_default().to_ascii_lowercase();
maxval_lower = mm.1.chars().next().unwrap_or_default().to_ascii_lowercase();
minmax_range_sortorder_pieces.extend_from_slice(&[mm.0, mm.1, mm.2, mm.3]);
} else {
minmax_range_sortorder_pieces.extend_from_slice(&[empty(), empty(), empty(), empty()]);
}

// type
if cardinality == 2 && infer_boolean {
// if cardinality is 2, it's a boolean if its values' first character are 0/1, f/t, n/y
Expand Down Expand Up @@ -1551,17 +1576,19 @@ impl Stats {
pieces.extend_from_slice(&[empty(), empty(), empty(), empty()]);
} else if let Some(mm) = self.minmax.as_ref().and_then(TypedMinMax::len_range) {
pieces.extend_from_slice(&[mm.0, mm.1]);
// we have a sum_length
if stotlen > 0 {
// if we saturated the sum, it means we had an overflow
// so we return empty for sum and avg length
if stotlen < u64::MAX {
// so we can compute avg_length
let mut buffer = itoa::Buffer::new();
pieces.push(buffer.format(stotlen).to_owned());
pieces.push(util::round_num(
stotlen as f64 / *RECORD_COUNT.get().unwrap_or(&1) as f64,
4,
));
} else {
// however, we saturated the sum, it means we had an overflow
// so we return OVERFLOW_STRING for sum and avg length
pieces.extend_from_slice(&[
OVERFLOW_STRING.to_string(),
OVERFLOW_STRING.to_string(),
Expand Down Expand Up @@ -1627,10 +1654,6 @@ impl Stats {
}

// sparsity
// stats is also called by the `schema` and `tojsonl` commands to infer a schema,
// sparsity is not required by those cmds and we don't necessarily have a valid
// RECORD_COUNT when called by those cmds, so just set sparsity to nullcount
// (i.e. divide by 1) so we don't panic.
#[allow(clippy::cast_precision_loss)]
let sparsity: f64 = self.nullcount as f64 / *RECORD_COUNT.get().unwrap_or(&1) as f64;
pieces.push(util::round_num(sparsity, round_places));
Expand Down Expand Up @@ -1929,6 +1952,7 @@ impl TypedSum {
return;
}
#[allow(clippy::cast_precision_loss)]
// safety: we know the sample is valid for their respective types, so we can use unwrap
match typ {
TFloat => {
self.stotlen = self.stotlen.saturating_add(sample.len() as u64);
Expand Down

0 comments on commit 4e63fec

Please sign in to comment.