Skip to content

Commit

Permalink
Merge pull request #2178 from jqnatividad/2174-auto-batch-size
Browse files Browse the repository at this point in the history
Automatically determine optimal batch size
  • Loading branch information
jqnatividad authored Sep 29, 2024
2 parents 4d9c872 + 0c24803 commit f0ae7c7
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 62 deletions.
26 changes: 9 additions & 17 deletions src/cmd/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,9 @@ apply options:
-j, --jobs <arg> The number of jobs to run in parallel.
When not set, the number of jobs is set to the number of CPUs detected.
-b, --batch <size> The number of rows per batch to load into memory, before running in parallel.
Set to 0 to load all rows in one batch.
Automatically determined for CSV files with more than 50000 rows.
Set to 0 to load all rows in one batch. Set to 1 to force batch optimization
even for files with less than 50000 rows.
[default: 50000]
Common options:
Expand Down Expand Up @@ -566,32 +568,22 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
#[allow(unused_assignments)]
let mut batch_record = csv::StringRecord::new();

// set RAYON_NUM_THREADS
let num_jobs = util::njobs(args.flag_jobs);

// reuse batch buffers
let batchsize: usize = if args.flag_batch == 0 {
util::count_rows(&rconfig)? as usize
} else {
args.flag_batch
};
let batchsize = util::optimal_batch_size(&rconfig, args.flag_batch, num_jobs);
let mut batch = Vec::with_capacity(batchsize);
let mut batch_results = Vec::with_capacity(batchsize);

// set RAYON_NUM_THREADS
util::njobs(args.flag_jobs);

// main loop to read CSV and construct batches for parallel processing.
// each batch is processed via Rayon parallel iterator.
// loop exits when batch is empty.
'batch_loop: loop {
for _ in 0..batchsize {
match rdr.read_record(&mut batch_record) {
Ok(has_data) => {
if has_data {
batch.push(std::mem::take(&mut batch_record));
} else {
// nothing else to add to batch
break;
}
},
Ok(true) => batch.push(std::mem::take(&mut batch_record)),
Ok(false) => break, // nothing else to add to batch
Err(e) => {
return fail_clierror!("Error reading file: {e}");
},
Expand Down
26 changes: 9 additions & 17 deletions src/cmd/datefmt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ datefmt options:
-j, --jobs <arg> The number of jobs to run in parallel.
When not set, the number of jobs is set to the number of CPUs detected.
-b, --batch <size> The number of rows per batch to load into memory, before running in parallel.
Set to 0 to load all rows in one batch.
Automatically determined for CSV files with more than 50000 rows.
Set to 0 to load all rows in one batch. Set to 1 to force batch optimization
even for files with less than 50000 rows.
[default: 50000]
Common options:
Expand Down Expand Up @@ -252,12 +254,11 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
#[allow(unused_assignments)]
let mut batch_record = csv::StringRecord::new();

// set RAYON_NUM_THREADS
let num_jobs = util::njobs(args.flag_jobs);

// reuse batch buffers
let batchsize: usize = if args.flag_batch == 0 {
util::count_rows(&rconfig)? as usize
} else {
args.flag_batch
};
let batchsize = util::optimal_batch_size(&rconfig, args.flag_batch, num_jobs);
let mut batch = Vec::with_capacity(batchsize);
let mut batch_results = Vec::with_capacity(batchsize);

Expand Down Expand Up @@ -316,23 +317,14 @@ pub fn run(argv: &[&str]) -> CliResult<()> {

let is_output_utc = output_tz == chrono_tz::UTC;

// set RAYON_NUM_THREADS
util::njobs(args.flag_jobs);

// main loop to read CSV and construct batches for parallel processing.
// each batch is processed via Rayon parallel iterator.
// loop exits when batch is empty.
'batch_loop: loop {
for _ in 0..batchsize {
match rdr.read_record(&mut batch_record) {
Ok(has_data) => {
if has_data {
batch.push(std::mem::take(&mut batch_record));
} else {
// nothing else to add to batch
break;
}
},
Ok(true) => batch.push(std::mem::take(&mut batch_record)),
Ok(false) => break, // nothing else to add to batch
Err(e) => {
return fail_clierror!("Error reading file: {e}");
},
Expand Down
30 changes: 12 additions & 18 deletions src/cmd/tojsonl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,12 @@ Tojsonl options:
When not set, the number of jobs is set to the
number of CPUs detected.
-b, --batch <size> The number of rows per batch to load into memory,
before running in parallel. Set to 0 to load all
rows in one batch. [default: 50000]
before running in parallel. Automatically determined
for CSV files with more than 50000 rows.
Set to 0 to load all rows in one batch.
Set to 1 to force batch optimization even for files with
less than 50000 rows.
[default: 50000]
Common options:
-h, --help Display this message
Expand Down Expand Up @@ -255,32 +259,22 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
#[allow(unused_assignments)]
let mut batch_record = csv::StringRecord::new();

// set RAYON_NUM_THREADS
let num_jobs = util::njobs(args.flag_jobs);

// reuse batch buffers
let batchsize: usize = if args.flag_batch == 0 {
record_count as usize
} else {
args.flag_batch
};
let batchsize = util::optimal_batch_size(&conf, args.flag_batch, num_jobs);
let mut batch = Vec::with_capacity(batchsize);
let mut batch_results = Vec::with_capacity(batchsize);

// set RAYON_NUM_THREADS
util::njobs(args.flag_jobs);

// main loop to read CSV and construct batches for parallel processing.
// each batch is processed via Rayon parallel iterator.
// loop exits when batch is empty.
'batch_loop: loop {
for _ in 0..batchsize {
match rdr.read_record(&mut batch_record) {
Ok(has_data) => {
if has_data {
batch.push(batch_record.clone());
} else {
// nothing else to add to batch
break;
}
},
Ok(true) => batch.push(std::mem::take(&mut batch_record)),
Ok(false) => break, // nothing else to add to batch
Err(e) => {
return fail_clierror!("Error reading file: {e}");
},
Expand Down
20 changes: 10 additions & 10 deletions src/cmd/validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,11 @@ Validate options:
When not set, the number of jobs is set to the
number of CPUs detected.
-b, --batch <size> The number of rows per batch to load into memory,
before running in parallel. Set to 0 to load all rows in one batch.
[default: 50000]
before running in parallel. Automatically determined
for CSV files with more than 50000 rows.
Set to 0 to load all rows in one batch.
Set to 1 to force batch optimization even for files with
less than 50000 rows. [default: 50000]
--timeout <seconds> Timeout for downloading json-schemas on URLs and for
'dynamicEnum' lookups on URLs. [default: 30]
Expand Down Expand Up @@ -647,21 +650,18 @@ pub fn run(argv: &[&str]) -> CliResult<()> {

// amortize memory allocation by reusing record
let mut record = csv::ByteRecord::with_capacity(500, header_len);

// set RAYON_NUM_THREADS
let num_jobs = util::njobs(args.flag_jobs);

// reuse batch buffer
let batch_size = if args.flag_batch == 0 {
util::count_rows(&rconfig)? as usize
} else {
args.flag_batch
};
let batch_size = util::optimal_batch_size(&rconfig, args.flag_batch, num_jobs);
let mut batch = Vec::with_capacity(batch_size);
let mut validation_results = Vec::with_capacity(batch_size);
let mut valid_flags: Vec<bool> = Vec::with_capacity(batch_size);
let mut validation_error_messages: Vec<String> = Vec::with_capacity(50);
let flag_trim = args.flag_trim;

// set RAYON_NUM_THREADS
util::njobs(args.flag_jobs);

// amortize buffer allocation
let mut buffer = itoa::Buffer::new();

Expand Down
27 changes: 27 additions & 0 deletions src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ macro_rules! regex_oncelock {
// leave at least 20% of the available memory free
const DEFAULT_FREEMEMORY_HEADROOM_PCT: u8 = 20;

const DEFAULT_BATCH_SIZE: usize = 50_000;

static ROW_COUNT: OnceLock<Option<u64>> = OnceLock::new();

pub type ByteString = Vec<u8>;
Expand Down Expand Up @@ -2207,6 +2209,31 @@ pub fn csv_to_jsonl(
Ok(writer.flush()?)
}

/// get the optimal batch size
/// if batch_size is 0, return the number of rows in the CSV, effectively disabling batching
/// if batch_size is 1, force batch_size to be set to "optimal_size", even though
/// its not recommended (number of rows is too small for parallel processing)
/// if batch_size is equal to DEFAULT_BATCH_SIZE, return the optimal_size
/// failing everything above, return the requested batch_size
#[inline]
pub fn optimal_batch_size(rconfig: &Config, batch_size: usize, num_jobs: usize) -> usize {
if batch_size < DEFAULT_BATCH_SIZE {
return DEFAULT_BATCH_SIZE;
}

let num_rows = count_rows(rconfig).unwrap_or(DEFAULT_BATCH_SIZE as u64) as usize;
if batch_size == 0 {
num_rows
} else if (num_rows > DEFAULT_BATCH_SIZE && (batch_size == DEFAULT_BATCH_SIZE))
|| batch_size == 1
{
let optimal_size = (num_rows / num_jobs) + 1;
optimal_size
} else {
batch_size
}
}

// comment out for now as this is still WIP
// pub fn create_json_record(
// no_headers: bool,
Expand Down

0 comments on commit f0ae7c7

Please sign in to comment.