Skip to content

Commit

Permalink
Merge pull request #1613 from jqnatividad/1609-split-by-size
Browse files Browse the repository at this point in the history
`split`: add `--kb-size` option
  • Loading branch information
jqnatividad authored Feb 20, 2024
2 parents 8d5471d + 1870869 commit 43e7e3f
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 10 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
| [sniff](/src/cmd/sniff.rs#L2)<br>🌐 ![CKAN](docs/images/ckan.png) | Quickly sniff & infer CSV metadata (delimiter, header row, preamble rows, quote character, flexible, is_utf8, average record length, number of records, content length & estimated number of records if sniffing a CSV on a URL, number of fields, field names & data types). It is also a general mime type detector. |
| [sort](/src/cmd/sort.rs#L2)<br>🚀🤯 | Sorts CSV data in alphabetical (with case-insensitive option), numerical, reverse, unique or random (with optional seed) order (See also `extsort` & `sortcheck` commands). |
| [sortcheck](/src/cmd/sortcheck.rs#L2)<br>📇 | Check if a CSV is sorted. With the --json options, also retrieve record count, sort breaks & duplicate count. |
| [split](/src/cmd/split.rs#L2)<br>📇🏎️ | Split one CSV file into many CSV files of N chunks. Uses multithreading to go faster if an index is present. |
| [split](/src/cmd/split.rs#L2)<br>📇🏎️ | Split one CSV file into many CSV files. It can split by number of rows, number of chunks or file size. Uses multithreading to go faster if an index is present when splitting by rows or chunks. |
| [sqlp](/src/cmd/sqlp.rs#L2)<br>✨🚀🐻‍❄️🗄️ | Run [Polars](https://pola.rs) SQL queries against several CSVs - converting queries to blazing-fast [LazyFrame](https://docs.pola.rs/user-guide/lazy/using/) expressions, processing larger than memory CSV files. |
| [stats](/src/cmd/stats.rs#L2)<br>📇🤯🏎️ | Compute [summary statistics](https://en.wikipedia.org/wiki/Summary_statistics) (sum, min/max/range, min/max length, mean, stddev, variance, nullcount, sparsity, quartiles, IQR, lower/upper fences, skewness, median, mode/s, antimode/s & cardinality) & make GUARANTEED data type inferences (Null, String, Float, Integer, Date, DateTime, Boolean) for each column in a CSV.<br>Uses multithreading to go faster if an index is present (with an index, can compile "streaming" stats on NYC's 311 data (15gb, 28m rows) in less than 7.3 seconds). |
| [table](/src/cmd/table.rs#L2)<br>🤯 | Show aligned output of a CSV using [elastic tabstops](https://github.com/BurntSushi/tabwriter). To interactively view CSV files, qsv pairs well with [csvlens](https://github.com/YS-L/csvlens#csvlens). |
Expand Down
106 changes: 97 additions & 9 deletions src/cmd/split.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,44 @@
static USAGE: &str = r#"
Splits the given CSV data into chunks.
Splits the given CSV data into chunks. It has three modes: by rowcount, by number of chunks
and by size.
Uses multithreading to go faster if the CSV has an index.
When splitting by rowcount, the CSV data is split into chunks of the given number of
rows. The last chunk may have fewer rows if the number of records is not evenly
divisible by the given rowcount.
When splitting by number of chunks, the CSV data is split into the given number of
chunks. The number of rows in each chunk is determined by the number of records in
the CSV data and the number of desired chunks. If the number of records is not evenly
divisible by the number of chunks, the last chunk will have fewer records.
When splitting by size, the CSV data is split into chunks of the given size in kilobytes.
The number of rows in each chunk may vary, but the size of each chunk will be close to the
desired size.
Uses multithreading to go faster if the CSV has an index when splitting by rowcount or
by number of chunks. Splitting by size is always done sequentially with a single thread.
The default is to split by rowcount with a chunk size of 500.
The files are written to the directory given with the name '{start}.csv',
where {start} is the index of the first record of the chunk (starting at 0).
Examples:
qsv split outdir --size 100 --filename chunk_{}.csv input.csv
# This will create files with names like chunk_0.csv, chunk_100.csv, etc.
# in the directory 'outdir', creating the directory if it does not exist.
qsv split outputdir -s 100 --filename chunk_{}.csv --pad 5 input.csv
qsv split outdir/subdir -s 100 --filename chunk_{}.csv --pad 5 input.csv
# This will create files with names like chunk_00000.csv, chunk_00100.csv, etc.
# in the directory 'outdir/subdir', creating the directories if they do not exist.
qsv split . -s 100 input.csv
# This will create files like 0.csv, 100.csv, etc. in the current directory.
qsv split outdir --kb-size 1000 input.csv
# This will create files with names like 0.csv, 994.csv, etc. in the directory
# 'outdir', creating the directory if it does not exist. Each file will be close
# to 1000KB in size.
cat in.csv | qsv split mysplitoutput -s 1000
Expand All @@ -22,7 +49,7 @@ Examples:
For more examples, see https://github.com/jqnatividad/qsv/blob/master/tests/test_split.rs.
Usage:
qsv split [options] (--size <arg> | --chunks <arg>) <outdir> [<input>]
qsv split [options] (--size <arg> | --chunks <arg> | --kb-size <arg>) <outdir> [<input>]
qsv split --help
split arguments:
Expand All @@ -41,6 +68,13 @@ split options:
of desired chunks. If the number of records is not evenly
divisible by the number of chunks, the last chunk will
have fewer records.
-k, --kb-size <arg> The size of each chunk in kilobytes. The number of rows
in each chunk may vary, but the size of each chunk will
be close to the desired size.
This option is mutually exclusive with --size and --chunks.
--sep-factor <arg> The factor to use when estimating the size of the
separators (delimiters, quotes & spaces) in the CSV data
when splitting by size. [default: 1.5]
-j, --jobs <arg> The number of splitting jobs to run in parallel.
This only works when the given CSV data has
Expand All @@ -50,8 +84,8 @@ split options:
number of CPUs detected.
--filename <filename> A filename template to use when constructing
the names of the output files. The string '{}'
will be replaced by a value based on the value
of the field, but sanitized for shell safety.
will be replaced by the zero-based row number
of the first row in the chunk.
[default: {}.csv]
--pad <arg> The zero padding width that is used in the
generated filename.
Expand Down Expand Up @@ -85,6 +119,8 @@ struct Args {
arg_outdir: String,
flag_size: usize,
flag_chunks: Option<usize>,
flag_kb_size: Option<usize>,
flag_sep_factor: f32,
flag_jobs: Option<usize>,
flag_filename: FilenameTemplate,
flag_pad: usize,
Expand All @@ -106,13 +142,65 @@ pub fn run(argv: &[&str]) -> CliResult<()> {

fs::create_dir_all(&args.arg_outdir)?;

match args.rconfig().indexed()? {
Some(idx) => args.parallel_split(&idx),
None => args.sequential_split(),
if args.flag_kb_size.is_some() {
args.split_by_kb_size()
} else {
// we're splitting by rowcount or by number of chunks
match args.rconfig().indexed()? {
Some(idx) => args.parallel_split(&idx),
None => args.sequential_split(),
}
}
}

impl Args {
fn split_by_kb_size(&self) -> CliResult<()> {
let rconfig = self.rconfig();
let mut rdr = rconfig.reader()?;
let headers = rdr.byte_headers()?.clone();
let num_fields = headers.len();

// estimate the size of the separators
// the sep_factor is to account for delimiters, quotes and spaces
#[allow(clippy::cast_precision_loss)]
let separators_byte_size =
((num_fields as f32 - 1.0) * self.flag_sep_factor).ceil() as usize;

let header_byte_size = headers.as_slice().len() + separators_byte_size;

let chunk_size = self.flag_kb_size.unwrap();
let mut wtr = self.new_writer(&headers, 0, self.flag_pad)?;
let mut i = 0;
let mut num_chunks = 0;
let mut row = csv::ByteRecord::new();
let chunk_size_bytes = chunk_size * 1024;
let mut chunk_size_bytes_left = chunk_size_bytes - header_byte_size;
while rdr.read_byte_record(&mut row)? {
let row_size_bytes = row.as_slice().len() + separators_byte_size;
if row_size_bytes >= chunk_size_bytes_left {
wtr.flush()?;
wtr = self.new_writer(&headers, i, self.flag_pad)?;
chunk_size_bytes_left = chunk_size_bytes;
num_chunks += 1;
}
wtr.write_byte_record(&row)?;
chunk_size_bytes_left -= row_size_bytes;
i += 1;
}
wtr.flush()?;

if !self.flag_quiet {
eprintln!(
"Wrote chunk/s to '{}'. Size/chunk: ~{}KB Num chunks: {}",
Path::new(&self.arg_outdir).canonicalize()?.display(),
chunk_size,
num_chunks + 1
);
}

Ok(())
}

fn sequential_split(&self) -> CliResult<()> {
let rconfig = self.rconfig();
let mut rdr = rconfig.reader()?;
Expand Down
50 changes: 50 additions & 0 deletions tests/test_split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -778,3 +778,53 @@ fn split_nooutdir() {
let expected = "usage error: <outdir> is not specified or is a file.\n";
assert_eq!(got, expected);
}

#[test]
fn split_kbsize_boston_2k() {
let wrk = Workdir::new("split_kbsize_boston_2k");
let test_file = wrk.load_test_file("boston311-100.csv");

let mut cmd = wrk.command("split");
cmd.args(["--kb-size", "5"])
.arg(&wrk.path("."))
.arg(test_file);
wrk.run(&mut cmd);

assert!(wrk.path("0.csv").exists());
assert!(wrk.path("11.csv").exists());
assert!(wrk.path("20.csv").exists());
assert!(wrk.path("29.csv").exists());
assert!(wrk.path("39.csv").exists());
assert!(wrk.path("48.csv").exists());
assert!(wrk.path("57.csv").exists());
assert!(wrk.path("68.csv").exists());
assert!(wrk.path("78.csv").exists());
assert!(wrk.path("88.csv").exists());
assert!(wrk.path("98.csv").exists());
}

#[test]
fn split_kbsize_boston_2k_padded() {
let wrk = Workdir::new("split_kbsize_boston_2k_padded");
let test_file = wrk.load_test_file("boston311-100.csv");

let mut cmd = wrk.command("split");
cmd.args(["--kb-size", "5"])
.arg(&wrk.path("."))
.args(["--filename", "testme-{}.csv"])
.args(["--pad", "3"])
.arg(test_file);
wrk.run(&mut cmd);

assert!(wrk.path("testme-000.csv").exists());
assert!(wrk.path("testme-011.csv").exists());
assert!(wrk.path("testme-020.csv").exists());
assert!(wrk.path("testme-029.csv").exists());
assert!(wrk.path("testme-039.csv").exists());
assert!(wrk.path("testme-048.csv").exists());
assert!(wrk.path("testme-057.csv").exists());
assert!(wrk.path("testme-068.csv").exists());
assert!(wrk.path("testme-078.csv").exists());
assert!(wrk.path("testme-088.csv").exists());
assert!(wrk.path("testme-098.csv").exists());
}

0 comments on commit 43e7e3f

Please sign in to comment.