Skip to content

Commit

Permalink
[feat] chunked as a module (#33)
Browse files Browse the repository at this point in the history
Co-authored-by: Rickard Lundin <rickard.lundin@enkla.com>
  • Loading branch information
Ignalina and Rickard Lundin authored May 13, 2024
1 parent 1bdf8eb commit 7b81d27
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 164 deletions.
113 changes: 111 additions & 2 deletions src/converters.rs → src/chunked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,18 @@
* Last updated: 2023-11-21
*/

use crate::slicers::FnFindLastLineBreak;
use arrow::array::ArrayRef;
use arrow_array::array::ArrayRef;
use parquet::format;
use std::cmp::min;
use std::fs;
use crate::chunked::chunked_slicer::SLICER_IN_CHUNK_SIZE;


// pub(crate) mod arrow2_converter;
pub(crate) mod self_converter;

pub mod arrow_converter;
pub mod chunked_slicer;

pub(crate) trait Converter<'a> {
fn set_line_break_handler(&mut self, fn_line_break: FnFindLastLineBreak<'a>);
Expand Down Expand Up @@ -161,3 +164,109 @@ mod tests {
assert_eq!(column_length_num_rightaligned(b"123", 3), (0, 2));
}
}


pub(crate) struct ChunkAndResidue {
pub(crate) chunk: Box<[u8; SLICER_IN_CHUNK_SIZE]>,
}
pub(crate) trait Slicer<'a> {
fn slice_and_convert(
&mut self,
converter: Box<dyn 'a + Converter<'a>>,
infile: fs::File,
n_threads: usize,
) -> Result<crate::chunked::Stats, &str>;
}
pub(crate) struct Stats {
pub(crate) bytes_in: usize,
pub(crate) bytes_out: usize,

pub(crate) num_rows: i64,
}

pub(crate) type FnLineBreakLen = fn() -> usize;
#[allow(dead_code)]
pub(crate) fn line_break_len_cr() -> usize {
1 as usize
}
#[allow(dead_code)]
pub(crate) fn line_break_len_crlf() -> usize {
2 as usize
}

pub(crate) type FnFindLastLineBreak<'a> = fn(bytes: &'a [u8]) -> (bool, usize);
#[allow(dead_code)]
pub(crate) fn find_last_nlcr(bytes: &[u8]) -> (bool, usize) {
if bytes.is_empty() {
return (false, 0); // TODO should report err ...
}

let mut p2 = bytes.len() - 1;

if 0 == p2 {
return (false, 0); // hmm
}

loop {
if bytes[p2 - 1] == 0x0d && bytes[p2] == 0x0a {
return (true, p2 + 1);
}
if 0 == p2 {
return (false, 0); // indicate we didnt find nl
}

p2 -= 1;
}
}

#[allow(dead_code)]
pub(crate) fn find_last_nl(bytes: &[u8]) -> (bool, usize) {
if bytes.is_empty() {
return (false, 0); // Indicate we didnt found nl.
}

let mut p2 = bytes.len() - 1;

if 0 == p2 {
return (false, 0); // hmm
}

loop {
if bytes[p2] == 0x0a {
return (true, p2);
}
if 0 == p2 {
return (false, 0); // indicate we didnt find nl
}
p2 -= 1;
}
}
pub(crate) struct IterRevolver<'a, T> {
shards: *mut T,
next: usize,
len: usize,
phantom: std::marker::PhantomData<&'a mut [T]>,
}

impl<'a, T> From<&'a mut [T]> for crate::chunked::IterRevolver<'a, T> {
fn from(shards: &'a mut [T]) -> crate::chunked::IterRevolver<'a, T> {
IterRevolver {
next: 0,
len: shards.len(),
shards: shards.as_mut_ptr(),
phantom: std::marker::PhantomData,
}
}
}

impl<'a, T> Iterator for IterRevolver<'a, T> {
type Item = &'a mut T;
fn next(&mut self) -> Option<Self::Item> {
if self.next < self.len {
self.next += 1;
} else {
self.next = 1;
}
unsafe { Some(&mut *self.shards.offset(self.next as isize - 1)) }
}
}
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ use rayon::iter::IndexedParallelIterator;
use rayon::prelude::*;

use crate::datatype::DataType;
use crate::converters::{ColumnBuilder, Converter};
use crate::slicers::{FnFindLastLineBreak, FnLineBreakLen};
use crate::{converters, schema};
use crate::chunked::{ColumnBuilder, Converter};
use crate::chunked::{FnFindLastLineBreak, FnLineBreakLen};
use crate::{chunked, schema};
use debug_print::debug_println;

pub(crate) struct Slice2Arrow<'a> {
Expand Down Expand Up @@ -224,7 +224,7 @@ impl ColumnBuilder for HandlerInt32Builder {
Self: Sized,
{
let (start, stop) =
converters::column_length_num_rightaligned(data, self.runes_in_column as i16);
chunked::column_length_num_rightaligned(data, self.runes_in_column as i16);

match atoi_simd::parse(&data[start..stop]) {
Ok(n) => {
Expand Down Expand Up @@ -255,7 +255,7 @@ impl ColumnBuilder for HandlerInt64Builder {
Self: Sized,
{
let (start, stop) =
converters::column_length_num_rightaligned(data, self.runes_in_column as i16);
chunked::column_length_num_rightaligned(data, self.runes_in_column as i16);
match atoi_simd::parse(&data[start..stop]) {
Ok(n) => {
self.int64builder.append_value(n);
Expand Down Expand Up @@ -287,7 +287,7 @@ impl ColumnBuilder for HandlerStringBuilder {
where
Self: Sized,
{
let column_length: usize = converters::column_length(data, self.runes_in_column as i16);
let column_length: usize = chunked::column_length(data, self.runes_in_column as i16);
// Me dont like ... what is the cost ? Could it be done once for the whole chunk ?
let text: &str = unsafe { from_utf8_unchecked(&data[..column_length]) };

Expand Down Expand Up @@ -323,7 +323,7 @@ impl ColumnBuilder for HandlerBooleanBuilder {
Self: Sized,
{
let (start, stop) =
converters::column_length_char_rightaligned(data, self.runes_in_column as i16);
chunked::column_length_char_rightaligned(data, self.runes_in_column as i16);

let text: &str = unsafe { from_utf8_unchecked(&data[start..stop]) };

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ use std::{cmp, fs};

use log::info;

use crate::converters::Converter;
use crate::slicers::{ChunkAndResidue, FnFindLastLineBreak, Slicer};
use crate::slicers::{IterRevolver, Stats};
use crate::chunked::Converter;
use crate::chunked::{ChunkAndResidue, FnFindLastLineBreak, Slicer};
use crate::chunked::{IterRevolver, Stats};

/**
GOAL(s)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
* Last updated: 2023-12-14
*/

use crate::converters::Converter;
use crate::slicers::FnFindLastLineBreak;
use crate::chunked::Converter;
use crate::chunked::FnFindLastLineBreak;
use rayon::iter::IntoParallelRefIterator;
use rayon::prelude::*;
use std::fs::File;
Expand Down
14 changes: 7 additions & 7 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ use std::fs;
use std::fs::File;
use std::path::PathBuf;

// use crate::converters::arrow2_converter::{MasterBuilder, Slice2Arrow2};
use crate::converters::arrow_converter::{MasterBuilders, Slice2Arrow};
use crate::converters::self_converter::SampleSliceAggregator;
use crate::converters::Converter;
// use crate::chunked::arrow2_converter::{MasterBuilder, Slice2Arrow2};
use crate::chunked::arrow_converter::{MasterBuilders, Slice2Arrow};
use crate::chunked::self_converter::SampleSliceAggregator;
use crate::chunked::Converter;
use crate::dump::dump;
use crate::slicers::chunked_slicer::{OldSlicer, IN_MAX_CHUNKS, SLICER_IN_CHUNK_SIZE};
use crate::slicers::Slicer;
use crate::slicers::{find_last_nl, line_break_len_cr, ChunkAndResidue};
use crate::chunked::chunked_slicer::{OldSlicer, IN_MAX_CHUNKS, SLICER_IN_CHUNK_SIZE};

Check warning on line 37 in src/cli.rs

View workflow job for this annotation

GitHub Actions / Cargo check (ubuntu-latest, nightly)

unused imports: `IN_MAX_CHUNKS`, `SLICER_IN_CHUNK_SIZE`

Check warning on line 37 in src/cli.rs

View workflow job for this annotation

GitHub Actions / Cargo check (ubuntu-latest, stable)

unused imports: `IN_MAX_CHUNKS`, `SLICER_IN_CHUNK_SIZE`

Check warning on line 37 in src/cli.rs

View workflow job for this annotation

GitHub Actions / Cargo check (macos-latest, stable)

unused imports: `IN_MAX_CHUNKS`, `SLICER_IN_CHUNK_SIZE`

Check warning on line 37 in src/cli.rs

View workflow job for this annotation

GitHub Actions / Cargo check (macos-latest, nightly)

unused imports: `IN_MAX_CHUNKS`, `SLICER_IN_CHUNK_SIZE`

Check warning on line 37 in src/cli.rs

View workflow job for this annotation

GitHub Actions / Cargo check (windows-latest, stable)

unused imports: `IN_MAX_CHUNKS`, `SLICER_IN_CHUNK_SIZE`

Check warning on line 37 in src/cli.rs

View workflow job for this annotation

GitHub Actions / Cargo check (windows-latest, nightly)

unused imports: `IN_MAX_CHUNKS`, `SLICER_IN_CHUNK_SIZE`
use crate::chunked::Slicer;
use crate::chunked::{find_last_nl, line_break_len_cr, ChunkAndResidue};

Check warning on line 39 in src/cli.rs

View workflow job for this annotation

GitHub Actions / Cargo check (ubuntu-latest, nightly)

unused import: `ChunkAndResidue`

Check warning on line 39 in src/cli.rs

View workflow job for this annotation

GitHub Actions / Cargo check (ubuntu-latest, stable)

unused import: `ChunkAndResidue`

Check warning on line 39 in src/cli.rs

View workflow job for this annotation

GitHub Actions / Cargo check (macos-latest, stable)

unused import: `ChunkAndResidue`

Check warning on line 39 in src/cli.rs

View workflow job for this annotation

GitHub Actions / Cargo check (macos-latest, nightly)

unused import: `ChunkAndResidue`

Check warning on line 39 in src/cli.rs

View workflow job for this annotation

GitHub Actions / Cargo check (windows-latest, stable)

unused import: `ChunkAndResidue`

Check warning on line 39 in src/cli.rs

View workflow job for this annotation

GitHub Actions / Cargo check (windows-latest, nightly)

unused import: `ChunkAndResidue`
use crate::{converter, error, mocker, schema};

Check warning on line 40 in src/cli.rs

View workflow job for this annotation

GitHub Actions / Cargo check (ubuntu-latest, nightly)

unused imports: `error`, `mocker`, `schema`

Check warning on line 40 in src/cli.rs

View workflow job for this annotation

GitHub Actions / Cargo check (ubuntu-latest, stable)

unused imports: `error`, `mocker`, `schema`

Check warning on line 40 in src/cli.rs

View workflow job for this annotation

GitHub Actions / Cargo check (macos-latest, stable)

unused imports: `error`, `mocker`, `schema`

Check warning on line 40 in src/cli.rs

View workflow job for this annotation

GitHub Actions / Cargo check (macos-latest, nightly)

unused imports: `error`, `mocker`, `schema`

Check warning on line 40 in src/cli.rs

View workflow job for this annotation

GitHub Actions / Cargo check (windows-latest, stable)

unused imports: `error`, `mocker`, `schema`

Check warning on line 40 in src/cli.rs

View workflow job for this annotation

GitHub Actions / Cargo check (windows-latest, nightly)

unused imports: `error`, `mocker`, `schema`
use clap::{value_parser, ArgAction, Parser, Subcommand};

Check warning on line 41 in src/cli.rs

View workflow job for this annotation

GitHub Actions / Cargo check (ubuntu-latest, nightly)

unused import: `value_parser`

Check warning on line 41 in src/cli.rs

View workflow job for this annotation

GitHub Actions / Cargo check (ubuntu-latest, stable)

unused import: `value_parser`

Check warning on line 41 in src/cli.rs

View workflow job for this annotation

GitHub Actions / Cargo check (macos-latest, stable)

unused import: `value_parser`

Check warning on line 41 in src/cli.rs

View workflow job for this annotation

GitHub Actions / Cargo check (macos-latest, nightly)

unused import: `value_parser`

Check warning on line 41 in src/cli.rs

View workflow job for this annotation

GitHub Actions / Cargo check (windows-latest, stable)

unused import: `value_parser`

Check warning on line 41 in src/cli.rs

View workflow job for this annotation

GitHub Actions / Cargo check (windows-latest, nightly)

unused import: `value_parser`
use log::info;
Expand Down
8 changes: 3 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
* Last updated: 2024-02-28
*/

use crate::cli::Cli;
use clap::Parser;
use log::{debug, error, info};

Expand All @@ -34,17 +35,14 @@ mod error;
mod logger;
mod mocker;
mod schema;
use crate::slicers::chunked_slicer::{IN_MAX_CHUNKS, SLICER_IN_CHUNK_SIZE};
use crate::slicers::ChunkAndResidue;
use cli::Cli;

mod threads;
mod writer;
mod builder;
mod parser;

mod converters;
mod chunked;
mod dump;
mod slicers;
mod mocking;
mod converter;
mod slicer;
Expand Down
138 changes: 0 additions & 138 deletions src/slicers.rs

This file was deleted.

0 comments on commit 7b81d27

Please sign in to comment.