From d075b6e99d6f57571dd6d2f9897b56e0da40cd81 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rickard=20Ernst=20Bj=C3=B6rn=20Lundin?= <33436048+Ignalina@users.noreply.github.com> Date: Fri, 29 Dec 2023 13:43:02 +0100 Subject: [PATCH] [feat] clap subcommand changes, implement parsing (#17) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * initial coding to show the team * add subocmands ..not done * cleaned redundat code * CLAP subcommands mock,slice,parse * lint * fmt --------- Co-authored-by: Rickard Lundin Co-authored-by: Wilhelm Ågren <36638274+wilhelmagren@users.noreply.github.com> --- Cargo.toml | 4 +- src/builder.rs | 51 ++++++++++- src/builder_datatypes.rs | 116 +++++++++++++++++++++++++ src/main.rs | 180 ++++++++++++++++++++++----------------- src/mock.rs | 6 +- src/slicer.rs | 8 +- 6 files changed, 278 insertions(+), 87 deletions(-) create mode 100644 src/builder_datatypes.rs diff --git a/Cargo.toml b/Cargo.toml index 5f76b7c..b21984a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "evolution" -version = "0.1.0" +version = "0.2.2" edition = "2021" description = "🦖 Evolve your fixed length data files into Apache Arrow tables, fully parallelized!" authors = [ @@ -20,7 +20,7 @@ default-run = "evolution" [dependencies] arrow2 = { version = "0.18.0", features = ["io_ipc"] } chrono = "0.4.31" -clap = { version = "4.4.8", features = ["derive"] } +clap = { version = "4.4.8", features = ["default", "derive"] } crossbeam = "0.8.2" colored = "2.0.4" env_logger = "0.10.1" diff --git a/src/builder.rs b/src/builder.rs index 032baff..4606ade 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -22,12 +22,18 @@ * SOFTWARE. * * File created: 2023-11-21 -* Last updated: 2023-11-21 +* Last updated: 2023-12-24 */ +use std::path::PathBuf; + +use arrow2::array::MutablePrimitiveArray; use arrow2::datatypes::{DataType, Field, Schema}; use arrow2::io::ipc::write::Record; +use crate::builder_datatypes::ColumnBuilderType; +use crate::schema; + /// #[allow(dead_code)] struct FixedField { @@ -99,12 +105,49 @@ struct FixedTable<'a> { /// pub trait ColumnBuilder { /// - fn parse_value(&self, name: String) -> bool; + fn parse_value(&mut self, name: &str); /// - fn finish_column(&self) -> bool; + fn finish_column(&mut self); /// I think this function won't be necessary. /// `[arrow2]` supports bitmap nulling out-of-the-box. - fn nullify(&self); + fn nullify(&mut self); +} + +pub(crate) fn parse_from_schema( + schema_path: PathBuf, + _in_file_path: PathBuf, + _out_file_path: PathBuf, + _n_threads: i16, +) { + let mut builders: Vec> = Vec::new(); + for val in schema::FixedSchema::from_path(schema_path.into()).iter() { + match val.dtype().as_str() { + "i32" => builders.push(Box::new(ColumnBuilderType:: { + rows: MutablePrimitiveArray::new(), + })), + "i64" => builders.push(Box::new(ColumnBuilderType:: { + rows: MutablePrimitiveArray::new(), + })), + + &_ => {} + }; + } } +/* + + "bool" => Ok(DataType::Boolean), + "boolean" => Ok(DataType::Boolean), + "i16" => Ok(DataType::Int16), + "i32" => Ok(DataType::Int32), + "i64" => Ok(DataType::Int64), + "f16" => Ok(DataType::Float16), + "f32" => Ok(DataType::Float32), + "f64" => Ok(DataType::Float64), + "utf8" => Ok(DataType::Utf8), + "string" => Ok(DataType::Utf8), + "lutf8" => Ok(DataType::LargeUtf8), + "lstring" => Ok(DataType::LargeUtf8), + +*/ diff --git a/src/builder_datatypes.rs b/src/builder_datatypes.rs new file mode 100644 index 0000000..25ec560 --- /dev/null +++ b/src/builder_datatypes.rs @@ -0,0 +1,116 @@ +/* +* MIT License +* +* Copyright (c) 2023 Firelink Data +* +* Permission is hereby granted, free of charge, to any person obtaining a copy +* of this software and associated documentation files (the "Software"), to deal +* in the Software without restriction, including without limitation the rights +* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +* copies of the Software, and to permit persons to whom the Software is +* furnished to do so, subject to the following conditions: +* +* The above copyright notice and this permission notice shall be included in all +* copies or substantial portions of the Software. +* +* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +* SOFTWARE. +* +* File created: 2023-11-21 +* Last updated: 2023-11-21 +*/ + +use arrow2::array::MutablePrimitiveArray; +use arrow2::types::NativeType; + +use crate::builder::ColumnBuilder; + +/* + + "bool" => Ok(DataType::Boolean), + "boolean" => Ok(DataType::Boolean), + "i16" => Ok(DataType::Int16), + "i32" => Ok(DataType::Int32), + "i64" => Ok(DataType::Int64), + "f16" => Ok(DataType::Float16), + "f32" => Ok(DataType::Float32), + "f64" => Ok(DataType::Float64), + "utf8" => Ok(DataType::Utf8), + "string" => Ok(DataType::Utf8), + "lutf8" => Ok(DataType::LargeUtf8), + "lstring" => Ok(DataType::LargeUtf8), + +*/ + +pub(crate) struct ColumnBuilderType { + pub rows: MutablePrimitiveArray, +} + +impl ColumnBuilder for ColumnBuilderType { + fn parse_value(&mut self, name: &str) + where + Self: Sized, + { + match name.parse::() { + Ok(n) => { + self.rows.push(Some(n)); + n + } + Err(_e) => { + self.nullify(); + 0 + } + }; + } + + fn finish_column(&mut self) + where + Self: Sized, + { + todo!() + } + + fn nullify(&mut self) + where + Self: Sized, + { + self.rows.push(None); + } +} + +impl ColumnBuilder for ColumnBuilderType { + fn parse_value(&mut self, name: &str) + where + Self: Sized, + { + match name.parse::() { + Ok(n) => { + self.rows.push(Some(n)); + n + } + Err(_e) => { + self.nullify(); + 0 + } + }; + } + + fn finish_column(&mut self) + where + Self: Sized, + { + todo!() + } + + fn nullify(&mut self) + where + Self: Sized, + { + self.rows.push(None); + } +} diff --git a/src/main.rs b/src/main.rs index deb3d7b..4c270fd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -25,73 +25,81 @@ * Last updated: 2023-12-14 */ -use crate::slicer::{find_last_nl, SampleSliceAggregator}; -use clap::{value_parser, Arg, ArgAction, Command}; -use log::{info, SetLoggerError}; use std::fs; +use std::path::PathBuf; + +use clap::{Parser, Subcommand}; +use log::{info, SetLoggerError}; + +use crate::builder::parse_from_schema; +use crate::slicer::{find_last_nl, SampleSliceAggregator}; mod builder; +mod builder_datatypes; mod logging; mod mock; mod schema; mod slicer; +#[derive(Parser)] +#[command(author, version, about, long_about = None)] +struct Cli { + /// Optional name to operate on + name: Option, + + /// Threads + #[arg(short, long, action = clap::ArgAction::Count)] + n_threads: u8, + // value_parser(value_parser!(usize)) + /// Turn debugging information on + #[arg(short, long, action = clap::ArgAction::Count)] + debug: u8, + + #[command(subcommand)] + command: Option, +} + +#[derive(Subcommand)] +enum Commands { + /// does testing things + Mock { + /// Sets schema file + #[arg(short, long, value_name = "SCHEMA")] + schema: Option, + /// Sets input file + #[arg(short, long, value_name = "FILE")] + file: Option, + #[arg(short, long, value_name = "n-rows")] + n_rows: Option, + }, + Slice { + /// Sets input file + #[arg(short, long, value_name = "FILE")] + /// Sets amount of rows to generate. + file: Option, + }, + Convert { + /// Sets schema file + #[arg(short, long, value_name = "SCHEMA")] + schema: Option, + + /// Sets input file + #[arg(short, long, value_name = "IN-FILE")] + in_file: Option, + + /// Sets input file + #[arg(short, long, value_name = "OUT-FILE")] + out_file: Option, + }, +} + /// fn main() -> Result<(), SetLoggerError> { logging::setup_log()?; - - let mut matches = Command::new("evolution") - .author("Wilhelm Ågren ") - .version("0.2.1") - .about( - "🦖 Evolve your fixed length data files into Apache Arrow tables, fully parallelized!", - ) - .arg( - Arg::new("schema") - .short('s') - .long("schema") - .action(ArgAction::Set), - ) - .arg( - Arg::new("file") - .short('f') - .long("file") - .requires("slice") - .action(ArgAction::Set), - ) - .arg( - Arg::new("mock") - .short('m') - .long("mock") - .requires("schema") - .action(ArgAction::SetTrue), - ) - .arg( - Arg::new("slice") - .long("slice") - .requires("schema") - .requires("file") - .action(ArgAction::SetTrue), - ) - .arg( - Arg::new("n-rows") - .long("n-rows") - .requires("mock") - .action(ArgAction::Set) - .default_value("1000") - .value_parser(value_parser!(usize)), - ) - .arg( - Arg::new("n-threads") - .long("n-threads") - .action(ArgAction::Set) - .default_value("1") - .value_parser(value_parser!(usize)), - ) - .get_matches(); + let cli = Cli::parse(); let n_logical_threads = num_cpus::get(); - let mut n_threads = matches.remove_one::("n-threads").unwrap(); + let mut n_threads: usize = cli.n_threads as usize; if n_threads > n_logical_threads { info!( @@ -106,32 +114,52 @@ fn main() -> Result<(), SetLoggerError> { info!("multithreading enabled ({} logical threads)", n_threads); } - if matches.get_flag("mock") { - mock::mock_from_schema( - matches.remove_one::("schema").unwrap(), - matches.remove_one::("n-rows").unwrap(), - ); - } - - if matches.get_flag("slice") { - let file_name = matches.remove_one::("file").unwrap(); + match &cli.command { + Some(Commands::Mock { + schema, + file: _, + n_rows, + }) => { + mock::mock_from_schema( + schema.as_ref().expect("REASON").to_path_buf(), + n_rows.unwrap() as usize, + ); + } + Some(Commands::Slice { file }) => { + let file_name = file.as_ref().expect("REASON").to_path_buf(); + let file = std::fs::File::open(&file_name).expect("bbb"); + let mut out_file_name = file_name.clone().to_owned(); + out_file_name.push("SLICED"); - let file = std::fs::File::open(&file_name).expect("bbb"); - let mut out_file_name = file_name.clone().to_owned(); - out_file_name.push_str("SLICED"); + let file_out = fs::OpenOptions::new() + .create(true) + .append(true) + .open(out_file_name) + .expect("aaa"); - let file_out = fs::OpenOptions::new() - .create(true) - .append(true) - .open(out_file_name) - .expect("aaa"); + let saa: Box = Box::new(slicer::SampleSliceAggregator { + file_out, + fn_line_break: find_last_nl, + }); + slicer::slice_and_process(saa, file, n_threads); + } - let saa: Box = Box::new(slicer::SampleSliceAggregator { - file_out, - fn_line_break: find_last_nl, - }); + Some(Commands::Convert { + schema, + in_file, + out_file: _, + }) => { + parse_from_schema( + schema.as_ref().expect("REASON").to_path_buf(), + in_file.as_ref().expect("REASON").to_path_buf(), + in_file.as_ref().expect("REASON").to_path_buf(), + 0, + ); + } - slicer::slice_and_process(saa, file, n_threads); + None => {} + #[allow(unreachable_patterns)] + _ => {} } Ok(()) diff --git a/src/mock.rs b/src/mock.rs index 08a3d50..2dd976b 100644 --- a/src/mock.rs +++ b/src/mock.rs @@ -136,8 +136,10 @@ pub(crate) fn mock_string<'a>(_len: usize) -> &'a str { } /// -pub(crate) fn mock_from_schema(schema_path: String, n_rows: usize) { - let schema = schema::FixedSchema::from_path(schema_path.into()); +pub(crate) fn mock_from_schema(schema_path: PathBuf, n_rows: usize) { + let schema = schema::FixedSchema::from_path(schema_path); + + // let schema = schema::FixedSchema::from_path(schema_path.into()); // let mocker = FixedMocker::new(schema); //mocker.generate(n_rows); generate_threaded(schema, n_rows, 16); diff --git a/src/slicer.rs b/src/slicer.rs index 4b575ec..c49cced 100644 --- a/src/slicer.rs +++ b/src/slicer.rs @@ -25,11 +25,12 @@ * Last updated: 2023-12-14 */ -use log::info; use std::cmp; use std::fs::File; use std::io::{BufReader, Read, Write}; +use log::info; + /** GOAL(s) @@ -44,7 +45,7 @@ SLICER_IN_CHUNK_SIZE in_max_chunks in_chunk_cores (how man splits will be made) -*/ + */ pub(crate) const SLICER_IN_CHUNK_SIZE: usize = 1024 * 1024; @@ -66,6 +67,7 @@ pub(crate) fn slice_and_process( let in_max_chunks: i8 = 3; let mut remaining_file_length = file.metadata().unwrap().len() as usize; + let mut chunks = [ [0_u8; SLICER_IN_CHUNK_SIZE], [0_u8; SLICER_IN_CHUNK_SIZE], @@ -191,7 +193,7 @@ fn read_chunk_and_slice<'a>( } /// -type FnLineBreak = fn(bytes: &[u8]) -> (bool, usize); +pub(crate) type FnLineBreak = fn(bytes: &[u8]) -> (bool, usize); #[allow(dead_code)] ///