From 56e4b6da292f28916aa2f76883b74df4b78000d0 Mon Sep 17 00:00:00 2001 From: Dominik Moritz Date: Sun, 15 Dec 2024 11:29:41 -0500 Subject: [PATCH] feat: customize CSV format (#115) --- Cargo.lock | 4 +- crates/csv2arrow/Cargo.toml | 1 + crates/csv2arrow/Readme.md | 39 +++++++++++++--- crates/csv2arrow/src/main.rs | 75 ++++++++++++++++++++++-------- crates/csv2parquet/Cargo.toml | 1 + crates/csv2parquet/Readme.md | 65 ++++++++++++++++++++++---- crates/csv2parquet/src/main.rs | 83 +++++++++++++++++++++++++--------- 7 files changed, 211 insertions(+), 57 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e1f295a..b195af8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "adler2" @@ -571,6 +571,7 @@ dependencies = [ "assert_cmd", "clap", "predicates", + "regex", "serde_json", ] @@ -585,6 +586,7 @@ dependencies = [ "clap", "parquet", "predicates", + "regex", "serde_json", ] diff --git a/crates/csv2arrow/Cargo.toml b/crates/csv2arrow/Cargo.toml index 3dadfb6..8ec62f3 100644 --- a/crates/csv2arrow/Cargo.toml +++ b/crates/csv2arrow/Cargo.toml @@ -16,6 +16,7 @@ arrow-schema = { version = "53.0.0", features = ["serde"] } serde_json = "1.0.133" clap = { version = "4.5.21", features = ["derive"] } arrow-tools = { version = "0.19.0", path = "../arrow-tools" } +regex = "1.5.4" [dev-dependencies] assert_cmd = "2.0.14" diff --git a/crates/csv2arrow/Readme.md b/crates/csv2arrow/Readme.md index ec768f4..6b7c785 100644 --- a/crates/csv2arrow/Readme.md +++ b/crates/csv2arrow/Readme.md @@ -36,24 +36,51 @@ cargo binstall csv2arrow Usage: csv2arrow [OPTIONS] [ARROW] Arguments: - Input CSV file, stdin if not present - [ARROW] Output file, stdout if not present + + Input CSV file, stdin if not present + + [ARROW] + Output file, stdout if not present Options: -s, --schema-file File with Arrow schema in JSON format + -m, --max-read-records The number of records to infer the schema from. All rows if not present. Setting max-read-records to zero will stop schema inference and all columns will be string typed + --header
- Set whether the CSV file has headers [possible values: true, false] - -d, --delimiter - Set the CSV file's column delimiter as a byte character [default: ,] + Set whether the CSV file has headers + + [default: true] + [possible values: true, false] + + --delimiter + Set the CSV file's column delimiter as a byte character + + --escape + Specify an escape character + + --quote + Specify a custom quote character + + --comment + Specify a comment character. + + Lines starting with this character will be ignored + + --null-regex + Provide a regex to match null values + -p, --print-schema Print the schema to stderr + -n, --dry Only print the schema + -h, --help - Print help + Print help (see a summary with '-h') + -V, --version Print version ``` diff --git a/crates/csv2arrow/src/main.rs b/crates/csv2arrow/src/main.rs index 897f30f..5fe3d74 100644 --- a/crates/csv2arrow/src/main.rs +++ b/crates/csv2arrow/src/main.rs @@ -1,6 +1,7 @@ use arrow::{csv::reader::Format, csv::ReaderBuilder, error::ArrowError, ipc::writer::FileWriter}; use arrow_tools::seekable_reader::{SeekRead, SeekableReader}; use clap::{Parser, ValueHint}; +use regex::Regex; use std::io::stdout; use std::path::PathBuf; use std::sync::Arc; @@ -25,13 +26,31 @@ struct Opts { #[clap(short, long)] max_read_records: Option, - /// Set whether the CSV file has headers - #[clap(long)] + /// Set whether the CSV file has headers. + #[clap(long, default_value = "true")] header: Option, /// Set the CSV file's column delimiter as a byte character. - #[clap(short, long, default_value = ",")] - delimiter: char, + #[clap(long)] + delimiter: Option, + + /// Specify an escape character. + #[clap(long)] + escape: Option, + + /// Specify a custom quote character. + #[clap(long)] + quote: Option, + + /// Specify a comment character. + /// + /// Lines starting with this character will be ignored + #[clap(long)] + comment: Option, + + /// Provide a regex to match null values. + #[clap(long)] + null_regex: Option, /// Print the schema to stderr. #[clap(short, long)] @@ -56,6 +75,32 @@ fn main() -> Result<(), ArrowError> { )) }; + let mut format = Format::default(); + + if let Some(header) = opts.header { + format = format.with_header(header); + } + + if let Some(delimiter) = opts.delimiter { + format = format.with_delimiter(delimiter as u8); + } + + if let Some(escape) = opts.escape { + format = format.with_escape(escape as u8); + } + + if let Some(quote) = opts.quote { + format = format.with_quote(quote as u8); + } + + if let Some(comment) = opts.comment { + format = format.with_comment(comment as u8); + } + + if let Some(regex) = opts.null_regex { + format = format.with_null_regex(regex); + } + let schema = match opts.schema_file { Some(schema_def_file_path) => { let schema_file = match File::open(&schema_def_file_path) { @@ -76,18 +121,12 @@ fn main() -> Result<(), ArrowError> { ))), } } - _ => { - let format = Format::default() - .with_delimiter(opts.delimiter as u8) - .with_header(opts.header.unwrap_or(true)); - - match format.infer_schema(&mut input, opts.max_read_records) { - Ok((schema, _size)) => Ok(schema), - Err(error) => Err(ArrowError::SchemaError(format!( - "Error inferring schema: {error}" - ))), - } - } + _ => match format.infer_schema(&mut input, opts.max_read_records) { + Ok((schema, _size)) => Ok(schema), + Err(error) => Err(ArrowError::SchemaError(format!( + "Error inferring schema: {error}" + ))), + }, }?; if opts.print_schema || opts.dry { @@ -100,9 +139,7 @@ fn main() -> Result<(), ArrowError> { } let schema_ref = Arc::new(schema); - let builder = ReaderBuilder::new(schema_ref) - .with_header(opts.header.unwrap_or(true)) - .with_delimiter(opts.delimiter as u8); + let builder = ReaderBuilder::new(schema_ref).with_format(format); input.rewind()?; diff --git a/crates/csv2parquet/Cargo.toml b/crates/csv2parquet/Cargo.toml index 2f84222..5342e7e 100644 --- a/crates/csv2parquet/Cargo.toml +++ b/crates/csv2parquet/Cargo.toml @@ -17,6 +17,7 @@ arrow-schema = { version = "53.0.0", features = ["serde"] } serde_json = "1.0.133" clap = { version = "4.5.21", features = ["derive"] } arrow-tools = { version = "0.19.0", path = "../arrow-tools" } +regex = "1.5.4" [dev-dependencies] assert_cmd = "2.0.14" diff --git a/crates/csv2parquet/Readme.md b/crates/csv2parquet/Readme.md index 4b22d7b..aea44ea 100644 --- a/crates/csv2parquet/Readme.md +++ b/crates/csv2parquet/Readme.md @@ -36,44 +36,89 @@ cargo binstall csv2parquet Usage: csv2parquet [OPTIONS] Arguments: - Input CSV fil, stdin if not present - Output file + + Input CSV fil, stdin if not present + + + Output file Options: -s, --schema-file File with Arrow schema in JSON format + --max-read-records The number of records to infer the schema from. All rows if not present. Setting max-read-records to zero will stop schema inference and all columns will be string typed + --header
- Set whether the CSV file has headers [possible values: true, false] - -d, --delimiter - Set the CSV file's column delimiter as a byte character [default: ,] + Set whether the CSV file has headers + + [default: true] + [possible values: true, false] + + --delimiter + Set the CSV file's column delimiter as a byte character + + --escape + Specify an escape character + + --quote + Specify a custom quote character + + --comment + Specify a comment character. + + Lines starting with this character will be ignored + + --null-regex + Provide a regex to match null values + -c, --compression - Set the compression [possible values: uncompressed, snappy, gzip, lzo, brotli, lz4, zstd, lz4-raw] + Set the compression + + [possible values: uncompressed, snappy, gzip, lzo, brotli, lz4, zstd, lz4-raw] + -e, --encoding - Sets encoding for any column [possible values: plain, plain-dictionary, rle, rle-dictionary, delta-binary-packed, delta-length-byte-array, delta-byte-array, byte-stream-split] + Sets encoding for any column + + [possible values: plain, plain-dictionary, rle, rle-dictionary, delta-binary-packed, delta-length-byte-array, delta-byte-array, byte-stream-split] + --data-page-size-limit Sets data page size limit + --dictionary-page-size-limit Sets dictionary page size limit + --write-batch-size Sets write batch size + --max-row-group-size Sets max size for a row group + --created-by Sets "created by" property - --dictionary + + --dictionary Sets flag to enable/disable dictionary encoding for any column + + [possible values: true, false] + --statistics - Sets flag to enable/disable statistics for any column [possible values: none, chunk, page] + Sets flag to enable/disable statistics for any column + + [possible values: none, chunk, page] + --max-statistics-size Sets max statistics size for any column. Applicable only if statistics are enabled + -p, --print-schema Print the schema to stderr + -n, --dry Only print the schema + -h, --help - Print help + Print help (see a summary with '-h') + -V, --version Print version ``` diff --git a/crates/csv2parquet/src/main.rs b/crates/csv2parquet/src/main.rs index 23f07da..d425e28 100644 --- a/crates/csv2parquet/src/main.rs +++ b/crates/csv2parquet/src/main.rs @@ -7,6 +7,7 @@ use parquet::{ errors::ParquetError, file::properties::{EnabledStatistics, WriterProperties}, }; +use regex::Regex; use std::path::PathBuf; use std::sync::Arc; use std::{fs::File, io::Seek}; @@ -64,13 +65,31 @@ struct Opts { #[clap(long)] max_read_records: Option, - /// Set whether the CSV file has headers - #[clap(long)] + /// Set whether the CSV file has headers. + #[clap(long, default_value = "true")] header: Option, /// Set the CSV file's column delimiter as a byte character. - #[clap(short, long, default_value = ",")] - delimiter: char, + #[clap(long)] + delimiter: Option, + + /// Specify an escape character. + #[clap(long)] + escape: Option, + + /// Specify a custom quote character. + #[clap(long)] + quote: Option, + + /// Specify a comment character. + /// + /// Lines starting with this character will be ignored + #[clap(long)] + comment: Option, + + /// Provide a regex to match null values. + #[clap(long)] + null_regex: Option, /// Set the compression. #[clap(short, long, value_enum)] @@ -102,7 +121,7 @@ struct Opts { /// Sets flag to enable/disable dictionary encoding for any column. #[clap(long)] - dictionary: bool, + dictionary: Option, /// Sets flag to enable/disable statistics for any column. #[clap(long, value_enum)] @@ -135,6 +154,32 @@ fn main() -> Result<(), ParquetError> { )) }; + let mut format = Format::default(); + + if let Some(header) = opts.header { + format = format.with_header(header); + } + + if let Some(delimiter) = opts.delimiter { + format = format.with_delimiter(delimiter as u8); + } + + if let Some(escape) = opts.escape { + format = format.with_escape(escape as u8); + } + + if let Some(quote) = opts.quote { + format = format.with_quote(quote as u8); + } + + if let Some(comment) = opts.comment { + format = format.with_comment(comment as u8); + } + + if let Some(regex) = opts.null_regex { + format = format.with_null_regex(regex); + } + let schema = match opts.schema_file { Some(schema_def_file_path) => { let schema_file = match File::open(&schema_def_file_path) { @@ -152,18 +197,12 @@ fn main() -> Result<(), ParquetError> { ))), } } - _ => { - let format = Format::default() - .with_delimiter(opts.delimiter as u8) - .with_header(opts.header.unwrap_or(true)); - - match format.infer_schema(&mut input, opts.max_read_records) { - Ok((schema, _size)) => Ok(schema), - Err(error) => Err(ParquetError::General(format!( - "Error inferring schema: {error}" - ))), - } - } + _ => match format.infer_schema(&mut input, opts.max_read_records) { + Ok((schema, _size)) => Ok(schema), + Err(error) => Err(ParquetError::General(format!( + "Error inferring schema: {error}" + ))), + }, }?; if opts.print_schema || opts.dry { @@ -176,9 +215,7 @@ fn main() -> Result<(), ParquetError> { } let schema_ref = Arc::new(schema); - let builder = ReaderBuilder::new(schema_ref) - .with_header(opts.header.unwrap_or(true)) - .with_delimiter(opts.delimiter as u8); + let builder = ReaderBuilder::new(schema_ref).with_format(format); input.rewind()?; @@ -186,7 +223,11 @@ fn main() -> Result<(), ParquetError> { let output = File::create(opts.output)?; - let mut props = WriterProperties::builder().set_dictionary_enabled(opts.dictionary); + let mut props = WriterProperties::builder(); + + if let Some(enabled) = opts.dictionary { + props = props.set_dictionary_enabled(enabled); + } if let Some(statistics) = opts.statistics { let statistics = match statistics {