Skip to content

Commit

Permalink
Merge pull request #32 from firelink-data/merge_single_to_main
Browse files Browse the repository at this point in the history
First merge of Wilhelms branch. Mockery only atm
  • Loading branch information
Ignalina authored May 9, 2024
2 parents 1a216e8 + 7bffc34 commit 5848af3
Show file tree
Hide file tree
Showing 11 changed files with 1,127 additions and 259 deletions.
20 changes: 11 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
[package]
name = "evolution"
version = "0.2.2"
version = "0.3.5"
edition = "2021"
description = "🦖 Evolve your fixed length data files into Apache Arrow tables, fully parallelized!"
authors = [
"Wilhelm Ågren <wilhelmagren98@gmail.com>",
"Ted Hammarlund <TedHammarlund@gmail.com>",
"Rickard Lundin <rickard@x14.se>",
]

readme = "README.md"
Expand All @@ -28,19 +30,19 @@ arrow-schema = "51.0.0"
arrow-array = "51.0.0"
parquet = "51.0.0"
atoi_simd = "0.15.6"
chrono = "0.4.35"
clap = { version = "4.5.3", features = ["default", "derive"] }
chrono = "0.4.38"
clap = { version = "4.5.4", features = ["default", "derive"] }
crossbeam = "0.8.4"
colored = "2.1.0"
env_logger = "0.11.3"
half = "2.4.0"
half = "2.4.1"
log = "0.4.21"
num_cpus = "1.16.0"
padder = "1.0.0"
rand = "0.8.5"
rayon = "1.9.0"
serde = { version = "1.0.197", features = ["derive"] }
serde_json = "1.0.114"
padder = "1.1.0"
rand = { version = "0.8.5" }
rayon = { version = "1.10.0" }
serde = { version = "1.0.201", features = ["derive"] }
serde_json = "1.0.117"
threadpool = "1.8.1"
substring = "1.4.5"
tempfile = "3.10.1"
Expand Down
139 changes: 139 additions & 0 deletions src/builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* MIT License
*
* Copyright (c) 2024 Firelink Data
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
* File created: 2024-05-07
* Last updated: 2024-05-08
*/

use arrow::array::{ArrayRef, BooleanBuilder};
use log::warn;

use std::fmt::Debug;
use std::sync::Arc;

use crate::parser::BooleanParser;

///
pub(crate) trait ColumnBuilder: Debug {
fn parse_and_push_bytes(&mut self, bytes: &[u8]);
fn runes(&self) -> usize;
fn finish(&mut self) -> (&str, ArrayRef);
}

///
#[derive(Debug)]
pub(crate) struct BooleanColumnBuilder {
inner: BooleanBuilder,
runes: usize,
name: String,
parser: BooleanParser,
}

///
impl BooleanColumnBuilder {
///
pub fn new(runes: usize, name: String, parser: BooleanParser) -> Self {
Self {
inner: BooleanBuilder::new(),
runes,
name,
parser,
}
}
}

///
impl ColumnBuilder for BooleanColumnBuilder {
///
fn parse_and_push_bytes(&mut self, bytes: &[u8]) {
match self.parser.parse(bytes) {
Ok(b) => self.inner.append_value(b),
Err(e) => {
warn!("Could not convert utf-8 text to bool: {:?}", e);
self.inner.append_null();
},
};
}

///
fn runes(&self) -> usize {
self.runes
}

///
fn finish(&mut self) -> (&str, ArrayRef) {
(
&self.name,
Arc::new(self.inner.finish()) as ArrayRef,
)
}
}

///
#[derive(Debug)]
pub(crate) struct Float16Builder {
pub runes: usize,
pub name: String,
}

///
#[derive(Debug)]
pub(crate) struct Float32Builder {
pub runes: usize,
pub name: String,
}

///
#[derive(Debug)]
pub(crate) struct Float64Builder {
pub runes: usize,
pub name: String,
}

///
#[derive(Debug)]
pub(crate) struct Int16Builder {
pub runes: usize,
pub name: String,
}

///
#[derive(Debug)]
pub(crate) struct Int32Builder {
pub runes: usize,
pub name: String,
}

///
#[derive(Debug)]
pub(crate) struct Int64Builder {
pub runes: usize,
pub name: String,
}

///
#[derive(Debug)]
pub(crate) struct StringBuilder {
pub runes: usize,
pub name: String,
}
62 changes: 45 additions & 17 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,12 @@ use crate::slicers::old_slicer::{OldSlicer, IN_MAX_CHUNKS};
use crate::slicers::Slicer;
use crate::slicers::{find_last_nl, line_break_len_cr, ChunkAndResidue};
use crate::{error, mocker, schema};

Check warning on line 40 in src/cli.rs

View workflow job for this annotation

GitHub Actions / Cargo check (ubuntu-latest, stable)

unused imports: `error`, `mocker`, `schema`

Check warning on line 40 in src/cli.rs

View workflow job for this annotation

GitHub Actions / Cargo check (macos-latest, nightly)

unused imports: `error`, `mocker`, `schema`

Check warning on line 40 in src/cli.rs

View workflow job for this annotation

GitHub Actions / Cargo check (ubuntu-latest, nightly)

unused imports: `error`, `mocker`, `schema`

Check warning on line 40 in src/cli.rs

View workflow job for this annotation

GitHub Actions / Cargo check (macos-latest, stable)

unused imports: `error`, `mocker`, `schema`

Check warning on line 40 in src/cli.rs

View workflow job for this annotation

GitHub Actions / Cargo check (windows-latest, nightly)

unused imports: `error`, `mocker`, `schema`

Check warning on line 40 in src/cli.rs

View workflow job for this annotation

GitHub Actions / Cargo check (windows-latest, stable)

unused imports: `error`, `mocker`, `schema`
use clap::{Parser, Subcommand};
use clap::{value_parser, ArgAction, Parser, Subcommand};

Check warning on line 41 in src/cli.rs

View workflow job for this annotation

GitHub Actions / Cargo check (ubuntu-latest, stable)

unused import: `value_parser`

Check warning on line 41 in src/cli.rs

View workflow job for this annotation

GitHub Actions / Cargo check (macos-latest, nightly)

unused import: `value_parser`

Check warning on line 41 in src/cli.rs

View workflow job for this annotation

GitHub Actions / Cargo check (ubuntu-latest, nightly)

unused import: `value_parser`

Check warning on line 41 in src/cli.rs

View workflow job for this annotation

GitHub Actions / Cargo check (macos-latest, stable)

unused import: `value_parser`

Check warning on line 41 in src/cli.rs

View workflow job for this annotation

GitHub Actions / Cargo check (windows-latest, nightly)

unused import: `value_parser`

Check warning on line 41 in src/cli.rs

View workflow job for this annotation

GitHub Actions / Cargo check (windows-latest, stable)

unused import: `value_parser`
use log::info;
use parquet::arrow::ArrowWriter;
use crate::mocker::Mocker;
use crate::error::Result;
use crate::threads::get_available_threads;

Check warning on line 46 in src/cli.rs

View workflow job for this annotation

GitHub Actions / Cargo check (ubuntu-latest, stable)

unused import: `crate::threads::get_available_threads`

Check warning on line 46 in src/cli.rs

View workflow job for this annotation

GitHub Actions / Cargo check (macos-latest, nightly)

unused import: `crate::threads::get_available_threads`

Check warning on line 46 in src/cli.rs

View workflow job for this annotation

GitHub Actions / Cargo check (ubuntu-latest, nightly)

unused import: `crate::threads::get_available_threads`

Check warning on line 46 in src/cli.rs

View workflow job for this annotation

GitHub Actions / Cargo check (macos-latest, stable)

unused import: `crate::threads::get_available_threads`

Check warning on line 46 in src/cli.rs

View workflow job for this annotation

GitHub Actions / Cargo check (windows-latest, nightly)

unused import: `crate::threads::get_available_threads`

Check warning on line 46 in src/cli.rs

View workflow job for this annotation

GitHub Actions / Cargo check (windows-latest, stable)

unused import: `crate::threads::get_available_threads`

#[derive(Parser)]
#[command(author, version, about, long_about = None)]
Expand Down Expand Up @@ -77,13 +80,32 @@ enum Commands {
/// does testing things
Mock {
/// Sets schema file
#[arg(short, long, value_name = "SCHEMA")]
#[arg(short = 's', long="schema", value_name = "SCHEMA",required = true)]
schema: PathBuf,
/// Sets input file
#[arg(short, long, value_name = "FILE")]
file: Option<PathBuf>,
#[arg(short, long, value_name = "n-rows", default_value = "100")]
n_rows: Option<i64>,
#[arg(short='o', long="output-file", value_name = "OUTPUT-FILE",required = false)]
output_file: Option<PathBuf>,
#[arg(short='n', long="n-rows", value_name = "NUM-ROWS", default_value = "100",required = false)]
n_rows: Option<usize>,

/// Set the size of the buffer (number of rows).
#[arg(
long = "buffer-size",
value_name = "BUFFER-SIZE",
action = ArgAction::Set,
required = false,
)]
buffer_size: Option<usize>,

/// Set the capacity of the thread channel (number of messages).
#[arg(
long = "thread-channel-capacity",
value_name = "THREAD-CHANNEL-CAPACITY",
action = ArgAction::Set,
required = false,
)]
thread_channel_capacity: Option<usize>,

},
Convert {
/// Sets schema file
Expand Down Expand Up @@ -119,7 +141,7 @@ impl Cli {
pub fn run<'a>(
&self,
in_buffers: &mut [ChunkAndResidue; IN_MAX_CHUNKS],
) -> Result<(), error::ExecutionError> {
) -> Result<()> {
let n_logical_threads = num_cpus::get();
let mut n_threads: usize = self.n_threads as usize;

Expand All @@ -139,17 +161,23 @@ impl Cli {
match &self.command {
Some(Commands::Mock {
schema,
file,
output_file,
n_rows,
}) => {
print!("target file {:?}", file.as_ref());

mocker::Mocker::new(
schema::FixedSchema::from_path(schema.into()),
file.clone(),
n_threads,
)
.generate(n_rows.unwrap() as usize);
buffer_size,
thread_channel_capacity,
}) => {
print!("target file {:?}", output_file.as_ref());

Mocker::builder()
.schema(schema.to_owned())
.output_file(output_file.to_owned())
.num_rows(*n_rows)
.num_threads(n_threads)
.buffer_size(*buffer_size)
.thread_channel_capacity(*thread_channel_capacity)
.build()?
.generate();

Ok(())
}

Expand Down
52 changes: 50 additions & 2 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,56 @@
* SOFTWARE.
*
* File created: 2024-02-05
* Last updated: 2024-02-05
* Last updated: 2024-05-07
*/

use std::error;
use std::fmt;
use std::result;

/// Generic result type which allows for dynamic dispatch of our custom error variants.
pub(crate) type Result<T> = result::Result<T, Box<dyn error::Error>>;

/// Error type used during execution when something goes wrong.
#[derive(Debug)]
pub(crate) struct ExecutionError;

Check warning on line 37 in src/error.rs

View workflow job for this annotation

GitHub Actions / Cargo check (ubuntu-latest, stable)

struct `ExecutionError` is never constructed

Check warning on line 37 in src/error.rs

View workflow job for this annotation

GitHub Actions / Cargo check (macos-latest, nightly)

struct `ExecutionError` is never constructed

Check warning on line 37 in src/error.rs

View workflow job for this annotation

GitHub Actions / Cargo check (ubuntu-latest, nightly)

struct `ExecutionError` is never constructed

Check warning on line 37 in src/error.rs

View workflow job for this annotation

GitHub Actions / Cargo check (macos-latest, stable)

struct `ExecutionError` is never constructed

Check warning on line 37 in src/error.rs

View workflow job for this annotation

GitHub Actions / Cargo check (windows-latest, nightly)

struct `ExecutionError` is never constructed

Check warning on line 37 in src/error.rs

View workflow job for this annotation

GitHub Actions / Cargo check (windows-latest, stable)

struct `ExecutionError` is never constructed

impl error::Error for ExecutionError {}
impl fmt::Display for ExecutionError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Execution failed, please refer to any logged error messages or the stack-trace for debugging.")
}
}

/// Error type used during setup of either [`Mocker`] or [`Converter`] if any
/// required values are invalid or missing.
#[derive(Debug)]
pub struct ExecutionError {}
pub(crate) struct SetupError;

impl error::Error for SetupError {}
impl fmt::Display for SetupError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Setup failed, please refer to any logged error messages or the stack-trace for debugging.")
}
}

#[cfg(test)]
mod tests_error {
use super::*;

#[test]
fn test_execution_error() {
assert_eq!(
"Execution failed, please refer to any logged error messages or the stack-trace for debugging.",
ExecutionError.to_string(),
);
}

#[test]
fn test_setup_error() {
assert_eq!(
"Setup failed, please refer to any logged error messages or the stack-trace for debugging.",
SetupError.to_string(),
);
}
}
5 changes: 5 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,15 @@ mod schema;
use crate::slicers::old_slicer::{IN_MAX_CHUNKS, SLICER_IN_CHUNK_SIZE};
use crate::slicers::ChunkAndResidue;
use cli::Cli;
mod threads;
mod writer;
mod builder;
mod parser;

mod converters;
mod dump;
mod slicers;
mod mocking;
///
fn main() {
let cli = Cli::parse();
Expand Down
Loading

0 comments on commit 5848af3

Please sign in to comment.