Skip to content

Commit

Permalink
stats: micro-optimizations
Browse files Browse the repository at this point in the history
- use simd_json when parsing stats JSON
- microoptimize compute fn
  • Loading branch information
jqnatividad committed Sep 4, 2024
1 parent e42f499 commit 0e8b734
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 22 deletions.
26 changes: 20 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ rfd = "0.14"
rust_decimal = { version = "1.36", default-features = false }
ryu = "1"
sanitize-filename = { version = "0.5", optional = true }
simd-json = "0.13"
self_update = { version = "0.41", features = [
"archive-zip",
"compression-zip-deflate",
Expand Down
107 changes: 91 additions & 16 deletions src/cmd/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ use std::{
use itertools::Itertools;
use qsv_dateparser::parse_with_preference;
use serde::{Deserialize, Serialize};
use simd_json::{prelude::ValueAsScalar, OwnedValue};
use simdutf8::basic::from_utf8;
use stats::{merge_all, Commute, MinMax, OnlineStats, Unsorted};
use tempfile::NamedTempFile;
Expand Down Expand Up @@ -319,6 +320,58 @@ struct StatsArgs {
qsv_version: String,
}

impl StatsArgs {
fn from_owned_value(value: &OwnedValue) -> Result<Self, Box<dyn std::error::Error>> {
Ok(Self {
arg_input: value["arg_input"].as_str().unwrap_or_default().to_string(),
flag_select: value["flag_select"]
.as_str()
.unwrap_or_default()
.to_string(),
flag_everything: value["flag_everything"].as_bool().unwrap_or_default(),
flag_typesonly: value["flag_typesonly"].as_bool().unwrap_or_default(),
flag_infer_boolean: value["flag_infer_boolean"].as_bool().unwrap_or_default(),
flag_mode: value["flag_mode"].as_bool().unwrap_or_default(),
flag_cardinality: value["flag_cardinality"].as_bool().unwrap_or_default(),
flag_median: value["flag_median"].as_bool().unwrap_or_default(),
flag_mad: value["flag_mad"].as_bool().unwrap_or_default(),
flag_quartiles: value["flag_quartiles"].as_bool().unwrap_or_default(),
flag_round: value["flag_round"].as_u64().unwrap_or_default() as u32,
flag_nulls: value["flag_nulls"].as_bool().unwrap_or_default(),
flag_infer_dates: value["flag_infer_dates"].as_bool().unwrap_or_default(),
flag_dates_whitelist: value["flag_dates_whitelist"]
.as_str()
.unwrap_or_default()
.to_string(),
flag_prefer_dmy: value["flag_prefer_dmy"].as_bool().unwrap_or_default(),
flag_no_headers: value["flag_no_headers"].as_bool().unwrap_or_default(),
flag_delimiter: value["flag_delimiter"]
.as_str()
.unwrap_or_default()
.to_string(),
flag_output_snappy: value["flag_output_snappy"].as_bool().unwrap_or_default(),
canonical_input_path: value["canonical_input_path"]
.as_str()
.unwrap_or_default()
.to_string(),
canonical_stats_path: value["canonical_stats_path"]
.as_str()
.unwrap_or_default()
.to_string(),
record_count: value["record_count"].as_u64().unwrap_or_default(),
date_generated: value["date_generated"]
.as_str()
.unwrap_or_default()
.to_string(),
compute_duration_ms: value["compute_duration_ms"].as_u64().unwrap_or_default(),
qsv_version: value["qsv_version"]
.as_str()
.unwrap_or_default()
.to_string(),
})
}
}

#[derive(Clone, Serialize, Deserialize, PartialEq, Default)]
pub struct StatsData {
pub field: String,
Expand Down Expand Up @@ -570,30 +623,48 @@ pub fn run(argv: &[&str]) -> CliResult<()> {

let time_saved: u64;
// deserialize the existing stats args json
let existing_stats_args_json: StatsArgs =
match serde_json::from_str::<StatsArgs>(&existing_stats_args_json_str) {
Ok(mut stat_args) => {
// we init these fields to empty values because we don't want to compare
// them when checking if the args are the same
stat_args.canonical_input_path = String::new();
stat_args.canonical_stats_path = String::new();
stat_args.record_count = 0;
stat_args.date_generated = String::new();
time_saved = stat_args.compute_duration_ms;
stat_args.compute_duration_ms = 0;
stat_args
let existing_stats_args_json: StatsArgs = {
let mut json_buffer = existing_stats_args_json_str.into_bytes();
match simd_json::to_owned_value(&mut json_buffer) {
Ok(value) => {
// Convert OwnedValue to StatsArgs
match StatsArgs::from_owned_value(&value) {
Ok(mut stat_args) => {
// we init these fields to empty values because we don't want to
// compare them when checking if the
// args are the same
stat_args.canonical_input_path = String::new();
stat_args.canonical_stats_path = String::new();
stat_args.record_count = 0;
stat_args.date_generated = String::new();
time_saved = stat_args.compute_duration_ms;
stat_args.compute_duration_ms = 0;
stat_args
},
Err(e) => {
time_saved = 0;
log::warn!(
"Could not deserialize {path_file_stem}.stats.csv.json: \
{e:?}, recomputing..."
);
fs::remove_file(&stats_file)?;
fs::remove_file(&stats_args_json_file)?;
StatsArgs::default()
},
}
},
Err(e) => {
time_saved = 0;
log::warn!(
"Could not serialize {path_file_stem}.stats.csv.json: {e:?}, \
"Could not parse {path_file_stem}.stats.csv.json: {e:?}, \
recomputing..."
);
fs::remove_file(&stats_file)?;
fs::remove_file(&stats_args_json_file)?;
StatsArgs::default()
},
};
}
};

// check if the cached stats are current (ie the stats file is newer than the input
// file), use the same args or if the --everything flag was set, and
Expand Down Expand Up @@ -897,7 +968,8 @@ impl Args {
where
I: Iterator<Item = csv::Result<csv::ByteRecord>>,
{
let mut stats = self.new_stats(sel.len());
let sel_len = sel.len();
let mut stats = self.new_stats(sel_len);

// safety: we know INFER_DATE_FLAGS is Some because we called init_date_inference
let infer_date_flags = INFER_DATE_FLAGS.get().unwrap();
Expand All @@ -908,12 +980,15 @@ impl Args {
let prefer_dmy = self.flag_prefer_dmy;

let mut i;
#[allow(unused_assignments)]
let mut current_row = csv::ByteRecord::with_capacity(1024, sel_len);
for row in it {
i = 0;
// safety: because we're using iterators and INFER_DATE_FLAGS has the same size,
// we know we don't need to bounds check
unsafe {
for field in sel.select(&row.unwrap_unchecked()) {
current_row = row.unwrap_unchecked();
for field in sel.select(&current_row) {
stats.get_unchecked_mut(i).add(
field,
*infer_date_flags.get_unchecked(i),
Expand Down

0 comments on commit 0e8b734

Please sign in to comment.