diff --git a/Cargo.toml b/Cargo.toml index da7a6c0..fab667f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,7 +44,7 @@ libc = "0.2.154" arrow = "51.0.0" parquet = "51.0.0" padder = { version = "1.2.0", features = ["serde"] } - +sorted-list = "0.2.0" [dev-dependencies] glob = "0.3.1" diff --git a/src/chunked/arrow_converter.rs b/src/chunked/arrow_converter.rs index 34244c8..47f00bb 100644 --- a/src/chunked/arrow_converter.rs +++ b/src/chunked/arrow_converter.rs @@ -24,6 +24,7 @@ // File created: 2023-12-11 // Last updated: 2024-05-15 // +use sorted_list::SortedList; use arrow::array::{ArrayRef, BooleanBuilder, Int32Builder, Int64Builder, StringBuilder}; use arrow::record_batch::RecordBatch; @@ -48,6 +49,7 @@ use crate::trimmer::{trimmer_factory, ColumnTrimmer}; use crate::{chunked, trimmer}; use arrow::datatypes::{Field, Schema, SchemaRef}; use crossbeam::atomic::AtomicConsume; +use libc::bsearch; use parquet::errors::{ParquetError, Result}; use parquet::file::metadata::FileMetaData; use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT}; @@ -70,7 +72,12 @@ pub(crate) struct Slice2Arrow<'a> { pub(crate) struct MasterBuilders { builders: Vec>>, outfile: PathBuf, - sender: Option>, + sender: Option>, +} + +pub(crate) struct OrderedRecordBatch { + record_batch: RecordBatch, + batch_nr: i32, } unsafe impl Send for MasterBuilders {} @@ -207,6 +214,21 @@ impl<'a> Converter<'a> for Slice2Arrow<'a> { n, (self.fn_line_break_len)(), ); + let mut br: Vec<(&str, ArrayRef)> = vec![]; + + for bb in n.iter_mut() { + br.push(bb.finish()); + } + let record_batch = OrderedRecordBatch { + record_batch: RecordBatch::try_from_iter(br).unwrap(), + batch_nr: i as i32, + }; + let _ = self + .masterbuilders + .sender + .clone() + .unwrap() + .send(record_batch); } } }); @@ -214,34 +236,8 @@ impl<'a> Converter<'a> for Slice2Arrow<'a> { for ii in slices.iter() { bytes_in += ii.len(); } - parse_duration = start_parse.elapsed(); - - let mut rb: Vec = Vec::new(); - - for b in self.masterbuilders.builders.iter_mut() { - let mut br: Vec<(&str, ArrayRef)> = vec![]; - - for bb in b.iter_mut() { - br.push(bb.finish()); - } - - let start_builder_write = Instant::now(); - let record_batch = RecordBatch::try_from_iter(br).unwrap(); - // rb.push(RecordBatch::try_from_iter(br).unwrap()); - - let _ = self - .masterbuilders - .sender - .clone() - .unwrap() - .send(record_batch); - - builder_write_duration += start_builder_write.elapsed(); - } - // let writer: ArrowWriter = - - debug!("Batch write: accumulated bytes_written {}", bytes_out); + parse_duration = start_parse.elapsed(); (bytes_in, bytes_out, parse_duration, builder_write_duration) } @@ -252,27 +248,26 @@ impl<'a> Converter<'a> for Slice2Arrow<'a> { &_outfile, schema.clone(), ); - let (sender, receiver) = sync_channel::(1); + let (sender, receiver) = sync_channel::(100); let t: JoinHandle> = thread::spawn(move || { 'outer: loop { let message = receiver.recv(); + match message { Ok(rb) => { - writer.write(&rb).expect("Error Writing batch"); - if (rb.num_rows() == 0) { + writer.write(&rb.record_batch).expect("Error Writing batch"); + if (rb.record_batch.num_rows() == 0) { break 'outer; } } Err(e) => { info!("got RecvError in channel , break to outer"); - break 'outer; } } } info!("closing the writer for parquet"); - writer.finish() }); self.masterbuilders.sender = Some(sender.clone()); @@ -289,7 +284,10 @@ impl<'a> Converter<'a> for Slice2Arrow<'a> { false, )]); - let emptyrb = arrow::record_batch::RecordBatch::new_empty(Arc::new(schema)); + let emptyrb = OrderedRecordBatch { + record_batch: arrow::record_batch::RecordBatch::new_empty(Arc::new(schema)), + batch_nr: 0, + }; let _ = &self.masterbuilders.sender.clone().unwrap().send(emptyrb); }