Skip to content

Commit

Permalink
feat: efficient chunked buffer slicing for multithreading (#74)
Browse files Browse the repository at this point in the history
* build: add bytesize deps for printing file size

* feat: chunked buffer slicing for threads

* build: bump crate version 1.2.0 → 1.3.0

* fix: cargo fmt --all -- --check

* ci/cd: update deploy workflow to only publish evolution full example
  • Loading branch information
wilhelmagren authored Oct 14, 2024
1 parent 3936321 commit 79097cc
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 73 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ jobs:
with:
command: check
- name: Cargo publish
run: cargo publish --token ${CRATES_TOKEN}
run: |
cd examples/full
cargo publish --token ${CRATES_TOKEN}
env:
CRATES_TOKEN: ${{ secrets.CRATES_TOKEN }}
25 changes: 14 additions & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ authors = [
"Wilhelm Ågren <wilhelmagren98@gmail.com>",
]
license = "MIT"
version = "1.2.0"
version = "1.3.0"
readme = "README.md"
categories = [
"science",
Expand All @@ -26,16 +26,19 @@ categories = [
keywords = [
"arrow",
"parquet",
"concurrency",
"data-engineering",
"ETL",
]
description = "Efficiently evolve your old fixed-length data files into modern file formats. "
description = "Efficiently evolve your old fixed-length data files into modern file formats."

[workspace.dependencies]
evolution-builder = { path = "crates/evolution-builder", version = "1.2.0" }
evolution-common = { path = "crates/evolution-common", version = "1.2.0" }
evolution-converter = { path = "crates/evolution-converter", version = "1.2.0" }
evolution-mocker = { path = "crates/evolution-mocker", version = "1.2.0" }
evolution-parser = { path = "crates/evolution-parser", version = "1.2.0" }
evolution-schema = { path = "crates/evolution-schema", version = "1.2.0" }
evolution-slicer = { path = "crates/evolution-slicer", version = "1.2.0" }
evolution-target = { path = "crates/evolution-target", version = "1.2.0" }
evolution-writer = { path = "crates/evolution-writer", version = "1.2.0" }
evolution-builder = { path = "crates/evolution-builder", version = "1.3.0" }
evolution-common = { path = "crates/evolution-common", version = "1.3.0" }
evolution-converter = { path = "crates/evolution-converter", version = "1.3.0" }
evolution-mocker = { path = "crates/evolution-mocker", version = "1.3.0" }
evolution-parser = { path = "crates/evolution-parser", version = "1.3.0" }
evolution-schema = { path = "crates/evolution-schema", version = "1.3.0" }
evolution-slicer = { path = "crates/evolution-slicer", version = "1.3.0" }
evolution-target = { path = "crates/evolution-target", version = "1.3.0" }
evolution-writer = { path = "crates/evolution-writer", version = "1.3.0" }
1 change: 1 addition & 0 deletions crates/evolution-converter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ bench = false

[dependencies]
arrow = "51.0.0"
bytesize = "1.3.0"
crossbeam = "0.8.4"
evolution-builder = { workspace = true }
evolution-common = { workspace = true }
Expand Down
77 changes: 17 additions & 60 deletions crates/evolution-converter/src/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@
// SOFTWARE.
//
// File created: 2024-02-17
// Last updated: 2024-10-11
// Last updated: 2024-10-14
//

use arrow::datatypes::SchemaRef as ArrowSchemaRef;
use bytesize::ByteSize;
use crossbeam::channel;
use crossbeam::thread::scope;
use crossbeam::thread::ScopedJoinHandle;
Expand Down Expand Up @@ -106,11 +107,12 @@ impl ParquetConverter {
pub fn try_convert_multithreaded(&mut self) -> Result<()> {
let mut buffer_capacity = self.read_buffer_size;
let n_worker_threads: usize = self.n_threads - 1;
let mut ratio_processed: f32;

info!("Converting flf to parquet in multithreaded mode.");
info!(
"The file to convert is {} bytes in total.",
self.slicer.bytes_to_read(),
"The file to convert is ~{} in total.",
ByteSize::gb((self.slicer.bytes_to_read() / 1_000_000_000) as u64),
);

let mut line_break_indices: Vec<usize> = Vec::with_capacity(buffer_capacity);
Expand All @@ -132,29 +134,27 @@ impl ParquetConverter {
let mut buffer: Vec<u8> = vec![0u8; buffer_capacity];
self.slicer.try_read_to_buffer(&mut buffer)?;
self.slicer
.try_find_line_breaks(&buffer, &mut line_break_indices, true)?;

let byte_idx_last_line_break: usize = self.slicer.try_find_last_line_break(&buffer)?;
let n_bytes_left_after_last_line_break: usize =
buffer_capacity - byte_idx_last_line_break - NUM_BYTES_FOR_NEWLINE;

self.distribute_worker_thread_workloads(&line_break_indices, &mut thread_workloads);

.try_distribute_buffer_chunks_on_workers(&buffer, &mut thread_workloads)?;
self.spawn_converter_threads(&buffer, &thread_workloads)?;

line_break_indices.clear();
thread_workloads.clear();

let n_bytes_left_after_last_line_break: usize =
buffer_capacity - thread_workloads[n_worker_threads - 1].1 - NUM_BYTES_FOR_NEWLINE;
self.slicer
.try_seek_relative(-(n_bytes_left_after_last_line_break as i64))?;

bytes_processed += buffer_capacity - n_bytes_left_after_last_line_break;
bytes_overlapped += n_bytes_left_after_last_line_break;
remaining_bytes -= buffer_capacity - n_bytes_left_after_last_line_break;
ratio_processed = 100.0 * bytes_processed as f32 / self.slicer.bytes_to_read() as f32;

self.slicer.set_remaining_bytes(remaining_bytes);
self.slicer.set_bytes_processed(bytes_processed);
self.slicer.set_bytes_overlapped(bytes_overlapped);

line_break_indices.clear();
thread_workloads.clear();

info!("Estimated progress: {:.2}%", ratio_processed);
}

#[cfg(debug_assertions)]
Expand Down Expand Up @@ -208,6 +208,8 @@ impl ParquetConverter {
// requires that the buffer contains some values, it can't be empty
// but with large enough capacity, really dumb. Look into smarter
// way to do this, because it really bothers me!!!!!
//
// But maybe this is not too bad, the allocations don't take that long compared to the I/O...
let mut buffer: Vec<u8> = vec![0u8; buffer_capacity];
self.slicer.try_read_to_buffer(&mut buffer)?;

Expand Down Expand Up @@ -244,33 +246,6 @@ impl ParquetConverter {
Ok(())
}

/// Divide the current buffer into chunks for each worker thread based on the line breaks.
///
/// # Note
/// The workload will attempt to be uniform on each worker, however, the worker with the last index might get some
/// extra lines to process due to number of rows not being divisible by the estimated number of rows per thread.
fn distribute_worker_thread_workloads(
&self,
line_break_indices: &[usize],
thread_workloads: &mut Vec<(usize, usize)>,
) {
let n_line_break_indices: usize = line_break_indices.len();
let n_rows_per_thread: usize = n_line_break_indices / thread_workloads.capacity(); // usize division will floor the result

let mut prev_line_break_byte_idx: usize = 0;
for worker_idx in 1..(thread_workloads.capacity()) {
let next_line_break_byte_idx: usize =
line_break_indices[n_rows_per_thread * worker_idx];
thread_workloads.push((prev_line_break_byte_idx, next_line_break_byte_idx));
prev_line_break_byte_idx = next_line_break_byte_idx + NUM_BYTES_FOR_NEWLINE;
}

thread_workloads.push((
prev_line_break_byte_idx,
line_break_indices[n_line_break_indices - 1],
));
}

/// Spawn the threads which perform the conversion, specifically, n-1 threads will work on converting and 1
/// thread will collect and buffer the converted results and write those to parquet file, where n was the
/// specified number of threads for the program.
Expand All @@ -297,16 +272,9 @@ impl ParquetConverter {
let arc_buffer: Arc<&Vec<u8>> = Arc::new(buffer);

let thread_result: thread::Result<()> = scope(|s| {
#[cfg(debug_assertions)]
debug!(
"Starting {} worker threads for conversion.",
thread_workloads.len()
);

let threads = thread_workloads
.iter()
.enumerate()
.map(|(t_idx, (from, to))| {
.map(|(from, to)| {
let t_sender: channel::Sender<ParquetBuilder> = sender.clone();
// Can we do this in another way? So we don't have to allocate a bunch of stuff in our loop...
// TODO: pull this out and create them as part of the ParquetConverter struct?..
Expand All @@ -316,30 +284,19 @@ impl ParquetConverter {
let t_buffer_slice: &[u8] = &t_buffer[*from..*to];

s.spawn(move |_| {
#[cfg(debug_assertions)]
debug!("Thread {} working...", t_idx + 1);

t_builder.try_build_from_slice(t_buffer_slice).unwrap();
t_sender.send(t_builder).unwrap();
drop(t_sender);
#[cfg(debug_assertions)]
debug!("Thread {} done!", t_idx + 1);
})
})
.collect::<Vec<ScopedJoinHandle<()>>>();

drop(sender);

#[cfg(debug_assertions)]
debug!("Thread 0 waiting for batches to write to buffer...");
for mut builder in receiver {
self.writer.try_write_from_builder(&mut builder).unwrap();
drop(builder);
}

#[cfg(debug_assertions)]
debug!("Writer thread done!");

for handle in threads {
handle.join().expect("Could not join worker thread handle!");
}
Expand Down
54 changes: 53 additions & 1 deletion crates/evolution-slicer/src/slicer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@
// SOFTWARE.
//
// File created: 2023-12-11
// Last updated: 2024-05-31
// Last updated: 2024-10-13
//

use evolution_common::error::{ExecutionError, Result};
use evolution_common::NUM_BYTES_FOR_NEWLINE;
use log::warn;

use std::fs::{File, OpenOptions};
Expand Down Expand Up @@ -142,6 +143,57 @@ impl FileSlicer {
}
}

/// Try and evenly distribute the buffer into uniformly sized chunks for each worker thread.
/// This function expects a [`Vec`] of usize tuples, representing the start and end byte
/// indices for each worker threads chunk.
///
/// # Note
/// This function is optimized to spend as little time as possible looking for valid chunks, i.e.,
/// where there are line breaks, and will not look through the entire buffer. This can have an
/// effect on the CPU cache hit-rate, however, this depends on the size of the buffer.
///
/// # Errors
/// This function might return an error for the following reasons:
/// * If the buffer was empty.
/// * If there were no line breaks in the buffer.
pub fn try_distribute_buffer_chunks_on_workers(
&self,
buffer: &[u8],
thread_workloads: &mut Vec<(usize, usize)>,
) -> Result<()> {
let n_bytes_total: usize = buffer.len();
let n_worker_threads: usize = thread_workloads.capacity();

let n_bytes_per_thread: usize = n_bytes_total / n_worker_threads;
let n_bytes_remaining: usize = n_bytes_total - n_bytes_per_thread * n_worker_threads;

let mut prev_byte_idx: usize = 0;
for _ in 0..(n_worker_threads - 1) {
let next_byte_idx: usize = n_bytes_per_thread + prev_byte_idx;
thread_workloads.push((prev_byte_idx, next_byte_idx));
prev_byte_idx = next_byte_idx;
}

thread_workloads.push((
prev_byte_idx,
prev_byte_idx + n_bytes_per_thread + n_bytes_remaining,
));

let mut n_bytes_to_offset_start: usize = 0;
for t_idx in 0..n_worker_threads {
let (mut start_byte_idx, mut end_byte_idx) = thread_workloads[t_idx];
start_byte_idx -= n_bytes_to_offset_start;
let n_bytes_to_offset_end: usize = (end_byte_idx - start_byte_idx)
- self.try_find_last_line_break(&buffer[start_byte_idx..end_byte_idx])?;
end_byte_idx -= n_bytes_to_offset_end;
thread_workloads[t_idx].0 = start_byte_idx;
thread_workloads[t_idx].1 = end_byte_idx;
n_bytes_to_offset_start = n_bytes_to_offset_end - NUM_BYTES_FOR_NEWLINE;
}

Ok(())
}

/// Read from the buffered reader into the provided buffer. This function reads
/// enough bytes to fill the buffer, hence, it is up to the caller to ensure that
/// that buffer has the correct and/or wanted capacity.
Expand Down

0 comments on commit 79097cc

Please sign in to comment.