From 79097ccf8fb54145fc39bcaf2fcd1441a114f81f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wilhelm=20=C3=85gren?= <36638274+wilhelmagren@users.noreply.github.com> Date: Mon, 14 Oct 2024 06:13:52 +0200 Subject: [PATCH] feat: efficient chunked buffer slicing for multithreading (#74) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * build: add bytesize deps for printing file size * feat: chunked buffer slicing for threads * build: bump crate version 1.2.0 → 1.3.0 * fix: cargo fmt --all -- --check * ci/cd: update deploy workflow to only publish evolution full example --- .github/workflows/cd.yml | 4 +- Cargo.toml | 25 ++++--- crates/evolution-converter/Cargo.toml | 1 + crates/evolution-converter/src/converter.rs | 77 +++++---------------- crates/evolution-slicer/src/slicer.rs | 54 ++++++++++++++- 5 files changed, 88 insertions(+), 73 deletions(-) diff --git a/.github/workflows/cd.yml b/.github/workflows/cd.yml index 833eb8e..d2eba85 100644 --- a/.github/workflows/cd.yml +++ b/.github/workflows/cd.yml @@ -25,6 +25,8 @@ jobs: with: command: check - name: Cargo publish - run: cargo publish --token ${CRATES_TOKEN} + run: | + cd examples/full + cargo publish --token ${CRATES_TOKEN} env: CRATES_TOKEN: ${{ secrets.CRATES_TOKEN }} \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index 560f65a..3cacc7b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ authors = [ "Wilhelm Ågren ", ] license = "MIT" -version = "1.2.0" +version = "1.3.0" readme = "README.md" categories = [ "science", @@ -26,16 +26,19 @@ categories = [ keywords = [ "arrow", "parquet", + "concurrency", + "data-engineering", + "ETL", ] -description = "Efficiently evolve your old fixed-length data files into modern file formats. " +description = "Efficiently evolve your old fixed-length data files into modern file formats." [workspace.dependencies] -evolution-builder = { path = "crates/evolution-builder", version = "1.2.0" } -evolution-common = { path = "crates/evolution-common", version = "1.2.0" } -evolution-converter = { path = "crates/evolution-converter", version = "1.2.0" } -evolution-mocker = { path = "crates/evolution-mocker", version = "1.2.0" } -evolution-parser = { path = "crates/evolution-parser", version = "1.2.0" } -evolution-schema = { path = "crates/evolution-schema", version = "1.2.0" } -evolution-slicer = { path = "crates/evolution-slicer", version = "1.2.0" } -evolution-target = { path = "crates/evolution-target", version = "1.2.0" } -evolution-writer = { path = "crates/evolution-writer", version = "1.2.0" } +evolution-builder = { path = "crates/evolution-builder", version = "1.3.0" } +evolution-common = { path = "crates/evolution-common", version = "1.3.0" } +evolution-converter = { path = "crates/evolution-converter", version = "1.3.0" } +evolution-mocker = { path = "crates/evolution-mocker", version = "1.3.0" } +evolution-parser = { path = "crates/evolution-parser", version = "1.3.0" } +evolution-schema = { path = "crates/evolution-schema", version = "1.3.0" } +evolution-slicer = { path = "crates/evolution-slicer", version = "1.3.0" } +evolution-target = { path = "crates/evolution-target", version = "1.3.0" } +evolution-writer = { path = "crates/evolution-writer", version = "1.3.0" } diff --git a/crates/evolution-converter/Cargo.toml b/crates/evolution-converter/Cargo.toml index 1ca03eb..b5a6b54 100644 --- a/crates/evolution-converter/Cargo.toml +++ b/crates/evolution-converter/Cargo.toml @@ -16,6 +16,7 @@ bench = false [dependencies] arrow = "51.0.0" +bytesize = "1.3.0" crossbeam = "0.8.4" evolution-builder = { workspace = true } evolution-common = { workspace = true } diff --git a/crates/evolution-converter/src/converter.rs b/crates/evolution-converter/src/converter.rs index 759f6c5..29fa05c 100644 --- a/crates/evolution-converter/src/converter.rs +++ b/crates/evolution-converter/src/converter.rs @@ -22,10 +22,11 @@ // SOFTWARE. // // File created: 2024-02-17 -// Last updated: 2024-10-11 +// Last updated: 2024-10-14 // use arrow::datatypes::SchemaRef as ArrowSchemaRef; +use bytesize::ByteSize; use crossbeam::channel; use crossbeam::thread::scope; use crossbeam::thread::ScopedJoinHandle; @@ -106,11 +107,12 @@ impl ParquetConverter { pub fn try_convert_multithreaded(&mut self) -> Result<()> { let mut buffer_capacity = self.read_buffer_size; let n_worker_threads: usize = self.n_threads - 1; + let mut ratio_processed: f32; info!("Converting flf to parquet in multithreaded mode."); info!( - "The file to convert is {} bytes in total.", - self.slicer.bytes_to_read(), + "The file to convert is ~{} in total.", + ByteSize::gb((self.slicer.bytes_to_read() / 1_000_000_000) as u64), ); let mut line_break_indices: Vec = Vec::with_capacity(buffer_capacity); @@ -132,29 +134,27 @@ impl ParquetConverter { let mut buffer: Vec = vec![0u8; buffer_capacity]; self.slicer.try_read_to_buffer(&mut buffer)?; self.slicer - .try_find_line_breaks(&buffer, &mut line_break_indices, true)?; - - let byte_idx_last_line_break: usize = self.slicer.try_find_last_line_break(&buffer)?; - let n_bytes_left_after_last_line_break: usize = - buffer_capacity - byte_idx_last_line_break - NUM_BYTES_FOR_NEWLINE; - - self.distribute_worker_thread_workloads(&line_break_indices, &mut thread_workloads); - + .try_distribute_buffer_chunks_on_workers(&buffer, &mut thread_workloads)?; self.spawn_converter_threads(&buffer, &thread_workloads)?; - line_break_indices.clear(); - thread_workloads.clear(); - + let n_bytes_left_after_last_line_break: usize = + buffer_capacity - thread_workloads[n_worker_threads - 1].1 - NUM_BYTES_FOR_NEWLINE; self.slicer .try_seek_relative(-(n_bytes_left_after_last_line_break as i64))?; bytes_processed += buffer_capacity - n_bytes_left_after_last_line_break; bytes_overlapped += n_bytes_left_after_last_line_break; remaining_bytes -= buffer_capacity - n_bytes_left_after_last_line_break; + ratio_processed = 100.0 * bytes_processed as f32 / self.slicer.bytes_to_read() as f32; self.slicer.set_remaining_bytes(remaining_bytes); self.slicer.set_bytes_processed(bytes_processed); self.slicer.set_bytes_overlapped(bytes_overlapped); + + line_break_indices.clear(); + thread_workloads.clear(); + + info!("Estimated progress: {:.2}%", ratio_processed); } #[cfg(debug_assertions)] @@ -208,6 +208,8 @@ impl ParquetConverter { // requires that the buffer contains some values, it can't be empty // but with large enough capacity, really dumb. Look into smarter // way to do this, because it really bothers me!!!!! + // + // But maybe this is not too bad, the allocations don't take that long compared to the I/O... let mut buffer: Vec = vec![0u8; buffer_capacity]; self.slicer.try_read_to_buffer(&mut buffer)?; @@ -244,33 +246,6 @@ impl ParquetConverter { Ok(()) } - /// Divide the current buffer into chunks for each worker thread based on the line breaks. - /// - /// # Note - /// The workload will attempt to be uniform on each worker, however, the worker with the last index might get some - /// extra lines to process due to number of rows not being divisible by the estimated number of rows per thread. - fn distribute_worker_thread_workloads( - &self, - line_break_indices: &[usize], - thread_workloads: &mut Vec<(usize, usize)>, - ) { - let n_line_break_indices: usize = line_break_indices.len(); - let n_rows_per_thread: usize = n_line_break_indices / thread_workloads.capacity(); // usize division will floor the result - - let mut prev_line_break_byte_idx: usize = 0; - for worker_idx in 1..(thread_workloads.capacity()) { - let next_line_break_byte_idx: usize = - line_break_indices[n_rows_per_thread * worker_idx]; - thread_workloads.push((prev_line_break_byte_idx, next_line_break_byte_idx)); - prev_line_break_byte_idx = next_line_break_byte_idx + NUM_BYTES_FOR_NEWLINE; - } - - thread_workloads.push(( - prev_line_break_byte_idx, - line_break_indices[n_line_break_indices - 1], - )); - } - /// Spawn the threads which perform the conversion, specifically, n-1 threads will work on converting and 1 /// thread will collect and buffer the converted results and write those to parquet file, where n was the /// specified number of threads for the program. @@ -297,16 +272,9 @@ impl ParquetConverter { let arc_buffer: Arc<&Vec> = Arc::new(buffer); let thread_result: thread::Result<()> = scope(|s| { - #[cfg(debug_assertions)] - debug!( - "Starting {} worker threads for conversion.", - thread_workloads.len() - ); - let threads = thread_workloads .iter() - .enumerate() - .map(|(t_idx, (from, to))| { + .map(|(from, to)| { let t_sender: channel::Sender = sender.clone(); // Can we do this in another way? So we don't have to allocate a bunch of stuff in our loop... // TODO: pull this out and create them as part of the ParquetConverter struct?.. @@ -316,30 +284,19 @@ impl ParquetConverter { let t_buffer_slice: &[u8] = &t_buffer[*from..*to]; s.spawn(move |_| { - #[cfg(debug_assertions)] - debug!("Thread {} working...", t_idx + 1); - t_builder.try_build_from_slice(t_buffer_slice).unwrap(); t_sender.send(t_builder).unwrap(); drop(t_sender); - #[cfg(debug_assertions)] - debug!("Thread {} done!", t_idx + 1); }) }) .collect::>>(); drop(sender); - - #[cfg(debug_assertions)] - debug!("Thread 0 waiting for batches to write to buffer..."); for mut builder in receiver { self.writer.try_write_from_builder(&mut builder).unwrap(); drop(builder); } - #[cfg(debug_assertions)] - debug!("Writer thread done!"); - for handle in threads { handle.join().expect("Could not join worker thread handle!"); } diff --git a/crates/evolution-slicer/src/slicer.rs b/crates/evolution-slicer/src/slicer.rs index 9f84ace..f8c0366 100644 --- a/crates/evolution-slicer/src/slicer.rs +++ b/crates/evolution-slicer/src/slicer.rs @@ -22,10 +22,11 @@ // SOFTWARE. // // File created: 2023-12-11 -// Last updated: 2024-05-31 +// Last updated: 2024-10-13 // use evolution_common::error::{ExecutionError, Result}; +use evolution_common::NUM_BYTES_FOR_NEWLINE; use log::warn; use std::fs::{File, OpenOptions}; @@ -142,6 +143,57 @@ impl FileSlicer { } } + /// Try and evenly distribute the buffer into uniformly sized chunks for each worker thread. + /// This function expects a [`Vec`] of usize tuples, representing the start and end byte + /// indices for each worker threads chunk. + /// + /// # Note + /// This function is optimized to spend as little time as possible looking for valid chunks, i.e., + /// where there are line breaks, and will not look through the entire buffer. This can have an + /// effect on the CPU cache hit-rate, however, this depends on the size of the buffer. + /// + /// # Errors + /// This function might return an error for the following reasons: + /// * If the buffer was empty. + /// * If there were no line breaks in the buffer. + pub fn try_distribute_buffer_chunks_on_workers( + &self, + buffer: &[u8], + thread_workloads: &mut Vec<(usize, usize)>, + ) -> Result<()> { + let n_bytes_total: usize = buffer.len(); + let n_worker_threads: usize = thread_workloads.capacity(); + + let n_bytes_per_thread: usize = n_bytes_total / n_worker_threads; + let n_bytes_remaining: usize = n_bytes_total - n_bytes_per_thread * n_worker_threads; + + let mut prev_byte_idx: usize = 0; + for _ in 0..(n_worker_threads - 1) { + let next_byte_idx: usize = n_bytes_per_thread + prev_byte_idx; + thread_workloads.push((prev_byte_idx, next_byte_idx)); + prev_byte_idx = next_byte_idx; + } + + thread_workloads.push(( + prev_byte_idx, + prev_byte_idx + n_bytes_per_thread + n_bytes_remaining, + )); + + let mut n_bytes_to_offset_start: usize = 0; + for t_idx in 0..n_worker_threads { + let (mut start_byte_idx, mut end_byte_idx) = thread_workloads[t_idx]; + start_byte_idx -= n_bytes_to_offset_start; + let n_bytes_to_offset_end: usize = (end_byte_idx - start_byte_idx) + - self.try_find_last_line_break(&buffer[start_byte_idx..end_byte_idx])?; + end_byte_idx -= n_bytes_to_offset_end; + thread_workloads[t_idx].0 = start_byte_idx; + thread_workloads[t_idx].1 = end_byte_idx; + n_bytes_to_offset_start = n_bytes_to_offset_end - NUM_BYTES_FOR_NEWLINE; + } + + Ok(()) + } + /// Read from the buffered reader into the provided buffer. This function reads /// enough bytes to fill the buffer, hence, it is up to the caller to ensure that /// that buffer has the correct and/or wanted capacity.