From cfdfe8c97882ccd4b3a97ef6e3422333d2520c65 Mon Sep 17 00:00:00 2001 From: Rickard Lundin Date: Tue, 21 May 2024 12:17:41 +0200 Subject: [PATCH 1/3] faster --- Cargo.toml | 2 +- src/chunked/arrow_converter.rs | 70 +++++++++++++++++----------------- 2 files changed, 36 insertions(+), 36 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index da7a6c0..fab667f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,7 +44,7 @@ libc = "0.2.154" arrow = "51.0.0" parquet = "51.0.0" padder = { version = "1.2.0", features = ["serde"] } - +sorted-list = "0.2.0" [dev-dependencies] glob = "0.3.1" diff --git a/src/chunked/arrow_converter.rs b/src/chunked/arrow_converter.rs index 34244c8..4c97e43 100644 --- a/src/chunked/arrow_converter.rs +++ b/src/chunked/arrow_converter.rs @@ -24,6 +24,7 @@ // File created: 2023-12-11 // Last updated: 2024-05-15 // +use sorted_list::SortedList; use arrow::array::{ArrayRef, BooleanBuilder, Int32Builder, Int64Builder, StringBuilder}; use arrow::record_batch::RecordBatch; @@ -56,6 +57,7 @@ use std::thread; use std::thread::JoinHandle; use std::time::{Duration, Instant}; use thread::spawn; +use libc::bsearch; //use crossbeam::channel::{Receiver, Sender}; static GLOBAL_COUNTER: AtomicUsize = ATOMIC_USIZE_INIT; @@ -70,7 +72,12 @@ pub(crate) struct Slice2Arrow<'a> { pub(crate) struct MasterBuilders { builders: Vec>>, outfile: PathBuf, - sender: Option>, + sender: Option>, +} + +pub(crate) struct OrderedRecordBatch { + record_batch : RecordBatch, + batch_nr: i32 } unsafe impl Send for MasterBuilders {} @@ -207,6 +214,19 @@ impl<'a> Converter<'a> for Slice2Arrow<'a> { n, (self.fn_line_break_len)(), ); + let mut br: Vec<(&str, ArrayRef)> = vec![]; + + for bb in n.iter_mut() { + br.push(bb.finish()); + } + let record_batch = OrderedRecordBatch { record_batch: RecordBatch::try_from_iter(br).unwrap(), batch_nr: i as i32 }; + let _ = self + .masterbuilders + .sender + .clone() + .unwrap() + .send(record_batch); + } } }); @@ -214,34 +234,8 @@ impl<'a> Converter<'a> for Slice2Arrow<'a> { for ii in slices.iter() { bytes_in += ii.len(); } - parse_duration = start_parse.elapsed(); - - let mut rb: Vec = Vec::new(); - - for b in self.masterbuilders.builders.iter_mut() { - let mut br: Vec<(&str, ArrayRef)> = vec![]; - - for bb in b.iter_mut() { - br.push(bb.finish()); - } - - let start_builder_write = Instant::now(); - let record_batch = RecordBatch::try_from_iter(br).unwrap(); - // rb.push(RecordBatch::try_from_iter(br).unwrap()); - - let _ = self - .masterbuilders - .sender - .clone() - .unwrap() - .send(record_batch); - - builder_write_duration += start_builder_write.elapsed(); - } - // let writer: ArrowWriter = - - debug!("Batch write: accumulated bytes_written {}", bytes_out); + parse_duration = start_parse.elapsed(); (bytes_in, bytes_out, parse_duration, builder_write_duration) } @@ -252,27 +246,33 @@ impl<'a> Converter<'a> for Slice2Arrow<'a> { &_outfile, schema.clone(), ); - let (sender, receiver) = sync_channel::(1); + let (sender, receiver) = sync_channel::(100); + + let t: JoinHandle> = thread::spawn(move || { 'outer: loop { let message = receiver.recv(); + match message { Ok(rb) => { - writer.write(&rb).expect("Error Writing batch"); - if (rb.num_rows() == 0) { - break 'outer; + break 'outer; + writer.write(&rb.record_batch).expect("Error Writing batch"); + if (rb.record_batch.num_rows() == 0) { + break 'outer; } } Err(e) => { info!("got RecvError in channel , break to outer"); - break 'outer; } + + + } + } info!("closing the writer for parquet"); - writer.finish() }); self.masterbuilders.sender = Some(sender.clone()); @@ -289,7 +289,7 @@ impl<'a> Converter<'a> for Slice2Arrow<'a> { false, )]); - let emptyrb = arrow::record_batch::RecordBatch::new_empty(Arc::new(schema)); + let emptyrb = OrderedRecordBatch { record_batch: arrow::record_batch::RecordBatch::new_empty(Arc::new(schema)), batch_nr: 0 }; let _ = &self.masterbuilders.sender.clone().unwrap().send(emptyrb); } From 7b97a9a958760745159642f1d03a0529fdb46d48 Mon Sep 17 00:00:00 2001 From: Rickard Lundin Date: Tue, 21 May 2024 12:18:28 +0200 Subject: [PATCH 2/3] faster again --- src/chunked/arrow_converter.rs | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/src/chunked/arrow_converter.rs b/src/chunked/arrow_converter.rs index 4c97e43..dd341ac 100644 --- a/src/chunked/arrow_converter.rs +++ b/src/chunked/arrow_converter.rs @@ -49,6 +49,7 @@ use crate::trimmer::{trimmer_factory, ColumnTrimmer}; use crate::{chunked, trimmer}; use arrow::datatypes::{Field, Schema, SchemaRef}; use crossbeam::atomic::AtomicConsume; +use libc::bsearch; use parquet::errors::{ParquetError, Result}; use parquet::file::metadata::FileMetaData; use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT}; @@ -57,7 +58,6 @@ use std::thread; use std::thread::JoinHandle; use std::time::{Duration, Instant}; use thread::spawn; -use libc::bsearch; //use crossbeam::channel::{Receiver, Sender}; static GLOBAL_COUNTER: AtomicUsize = ATOMIC_USIZE_INIT; @@ -76,8 +76,8 @@ pub(crate) struct MasterBuilders { } pub(crate) struct OrderedRecordBatch { - record_batch : RecordBatch, - batch_nr: i32 + record_batch: RecordBatch, + batch_nr: i32, } unsafe impl Send for MasterBuilders {} @@ -219,14 +219,16 @@ impl<'a> Converter<'a> for Slice2Arrow<'a> { for bb in n.iter_mut() { br.push(bb.finish()); } - let record_batch = OrderedRecordBatch { record_batch: RecordBatch::try_from_iter(br).unwrap(), batch_nr: i as i32 }; + let record_batch = OrderedRecordBatch { + record_batch: RecordBatch::try_from_iter(br).unwrap(), + batch_nr: i as i32, + }; let _ = self .masterbuilders .sender .clone() .unwrap() .send(record_batch); - } } }); @@ -248,8 +250,6 @@ impl<'a> Converter<'a> for Slice2Arrow<'a> { ); let (sender, receiver) = sync_channel::(100); - - let t: JoinHandle> = thread::spawn(move || { 'outer: loop { let message = receiver.recv(); @@ -259,18 +259,14 @@ impl<'a> Converter<'a> for Slice2Arrow<'a> { break 'outer; writer.write(&rb.record_batch).expect("Error Writing batch"); if (rb.record_batch.num_rows() == 0) { - break 'outer; + break 'outer; } } Err(e) => { info!("got RecvError in channel , break to outer"); break 'outer; } - - - } - } info!("closing the writer for parquet"); writer.finish() @@ -289,7 +285,10 @@ impl<'a> Converter<'a> for Slice2Arrow<'a> { false, )]); - let emptyrb = OrderedRecordBatch { record_batch: arrow::record_batch::RecordBatch::new_empty(Arc::new(schema)), batch_nr: 0 }; + let emptyrb = OrderedRecordBatch { + record_batch: arrow::record_batch::RecordBatch::new_empty(Arc::new(schema)), + batch_nr: 0, + }; let _ = &self.masterbuilders.sender.clone().unwrap().send(emptyrb); } From 814491aebe4898dad61194ade46dec7e538d0d30 Mon Sep 17 00:00:00 2001 From: Rickard Lundin Date: Tue, 21 May 2024 12:24:17 +0200 Subject: [PATCH 3/3] clippy --- src/chunked/arrow_converter.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/chunked/arrow_converter.rs b/src/chunked/arrow_converter.rs index dd341ac..47f00bb 100644 --- a/src/chunked/arrow_converter.rs +++ b/src/chunked/arrow_converter.rs @@ -256,7 +256,6 @@ impl<'a> Converter<'a> for Slice2Arrow<'a> { match message { Ok(rb) => { - break 'outer; writer.write(&rb.record_batch).expect("Error Writing batch"); if (rb.record_batch.num_rows() == 0) { break 'outer;