Skip to content

Commit

Permalink
fix(mock): add channel bound to avoid io bottleneck (#27)
Browse files Browse the repository at this point in the history
  • Loading branch information
wilhelmagren authored Feb 17, 2024
1 parent 7753348 commit 1d743d5
Showing 1 changed file with 10 additions and 4 deletions.
14 changes: 10 additions & 4 deletions src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,15 @@ use std::sync::Arc;
use std::thread::JoinHandle;
use std::{thread, usize};

pub(crate) static MINIMUM_N_ROWS_FOR_MULTITHREADING: usize = 1000;
// This default value should depend on the memory capacity of the system
// running the program. Because the workers produce buffers faster than
// the master can write them to disk we need to bound the worker/master
// channel. If you have a lot of system memory, you can increase this value,
// but if you increase it too much the program will run out of system memory.
pub(crate) static DEFAULT_THREAD_CHANNEL_CAPACITY: usize = 128;
pub(crate) static DEFAULT_MIN_N_ROWS_FOR_MULTITHREADING: usize = 1000;
pub(crate) static DEFAULT_MOCKED_FILENAME_LEN: usize = 8;
pub(crate) static DEFAULT_ROW_BUFFER_LEN: usize = 1024 * 1024;
pub(crate) static DEFAULT_ROW_BUFFER_LEN: usize = 1048 * 1024;

///
pub struct Mocker {
Expand Down Expand Up @@ -67,7 +73,7 @@ impl Mocker {

///
pub fn generate(&self, n_rows: usize) {
if self.multithreaded && n_rows > MINIMUM_N_ROWS_FOR_MULTITHREADING {
if self.multithreaded && n_rows > DEFAULT_MIN_N_ROWS_FOR_MULTITHREADING {
self.generate_multithreaded(n_rows);
} else {
if self.multithreaded {
Expand Down Expand Up @@ -221,7 +227,7 @@ pub fn spawn_workers(
info!("Remaining rows to handle: {}", remaining_rows);

let arc_schema = Arc::new(schema);
let (sender, receiver) = channel::unbounded();
let (sender, receiver) = channel::bounded(DEFAULT_THREAD_CHANNEL_CAPACITY);

let threads: Vec<JoinHandle<()>> = (0..n_threads)
.map(|t| {
Expand Down

0 comments on commit 1d743d5

Please sign in to comment.