Skip to content

Commit

Permalink
Merge pull request #42 from firelink-data/feature/threaded_revolver
Browse files Browse the repository at this point in the history
Feature/threaded revolver
  • Loading branch information
Ignalina authored May 21, 2024
2 parents e990510 + 814491a commit 8d8c860
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 35 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ libc = "0.2.154"
arrow = "51.0.0"
parquet = "51.0.0"
padder = { version = "1.2.0", features = ["serde"] }

sorted-list = "0.2.0"
[dev-dependencies]
glob = "0.3.1"

Expand Down
66 changes: 32 additions & 34 deletions src/chunked/arrow_converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
// File created: 2023-12-11
// Last updated: 2024-05-15
//
use sorted_list::SortedList;

use arrow::array::{ArrayRef, BooleanBuilder, Int32Builder, Int64Builder, StringBuilder};
use arrow::record_batch::RecordBatch;
Expand All @@ -48,6 +49,7 @@ use crate::trimmer::{trimmer_factory, ColumnTrimmer};
use crate::{chunked, trimmer};
use arrow::datatypes::{Field, Schema, SchemaRef};
use crossbeam::atomic::AtomicConsume;
use libc::bsearch;
use parquet::errors::{ParquetError, Result};
use parquet::file::metadata::FileMetaData;
use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
Expand All @@ -70,7 +72,12 @@ pub(crate) struct Slice2Arrow<'a> {
pub(crate) struct MasterBuilders {
builders: Vec<Vec<Box<dyn Sync + Send + ColumnBuilder>>>,
outfile: PathBuf,
sender: Option<SyncSender<RecordBatch>>,
sender: Option<SyncSender<OrderedRecordBatch>>,
}

pub(crate) struct OrderedRecordBatch {
record_batch: RecordBatch,
batch_nr: i32,
}

unsafe impl Send for MasterBuilders {}
Expand Down Expand Up @@ -207,41 +214,30 @@ impl<'a> Converter<'a> for Slice2Arrow<'a> {
n,
(self.fn_line_break_len)(),
);
let mut br: Vec<(&str, ArrayRef)> = vec![];

for bb in n.iter_mut() {
br.push(bb.finish());
}
let record_batch = OrderedRecordBatch {
record_batch: RecordBatch::try_from_iter(br).unwrap(),
batch_nr: i as i32,
};
let _ = self
.masterbuilders
.sender
.clone()
.unwrap()
.send(record_batch);
}
}
});

for ii in slices.iter() {
bytes_in += ii.len();
}
parse_duration = start_parse.elapsed();

let mut rb: Vec<RecordBatch> = Vec::new();

for b in self.masterbuilders.builders.iter_mut() {
let mut br: Vec<(&str, ArrayRef)> = vec![];

for bb in b.iter_mut() {
br.push(bb.finish());
}

let start_builder_write = Instant::now();
let record_batch = RecordBatch::try_from_iter(br).unwrap();
// rb.push(RecordBatch::try_from_iter(br).unwrap());

let _ = self
.masterbuilders
.sender
.clone()
.unwrap()
.send(record_batch);

builder_write_duration += start_builder_write.elapsed();
}
// let writer: ArrowWriter<File> =

debug!("Batch write: accumulated bytes_written {}", bytes_out);

parse_duration = start_parse.elapsed();
(bytes_in, bytes_out, parse_duration, builder_write_duration)
}

Expand All @@ -252,27 +248,26 @@ impl<'a> Converter<'a> for Slice2Arrow<'a> {
&_outfile,
schema.clone(),
);
let (sender, receiver) = sync_channel::<RecordBatch>(1);
let (sender, receiver) = sync_channel::<OrderedRecordBatch>(100);

let t: JoinHandle<Result<format::FileMetaData>> = thread::spawn(move || {
'outer: loop {
let message = receiver.recv();

match message {
Ok(rb) => {
writer.write(&rb).expect("Error Writing batch");
if (rb.num_rows() == 0) {
writer.write(&rb.record_batch).expect("Error Writing batch");
if (rb.record_batch.num_rows() == 0) {
break 'outer;
}
}
Err(e) => {
info!("got RecvError in channel , break to outer");

break 'outer;
}
}
}
info!("closing the writer for parquet");

writer.finish()
});
self.masterbuilders.sender = Some(sender.clone());
Expand All @@ -289,7 +284,10 @@ impl<'a> Converter<'a> for Slice2Arrow<'a> {
false,
)]);

let emptyrb = arrow::record_batch::RecordBatch::new_empty(Arc::new(schema));
let emptyrb = OrderedRecordBatch {
record_batch: arrow::record_batch::RecordBatch::new_empty(Arc::new(schema)),
batch_nr: 0,
};

let _ = &self.masterbuilders.sender.clone().unwrap().send(emptyrb);
}
Expand Down

0 comments on commit 8d8c860

Please sign in to comment.