From a94ccff9deac04ca075f6f05f81a5755af81348e Mon Sep 17 00:00:00 2001 From: fan <75058860+fansehep@users.noreply.github.com> Date: Wed, 18 Oct 2023 17:36:43 +0800 Subject: [PATCH] feat: support parsing for parquet writer option (#4938) * feat: support parsing for parquet writer option Signed-off-by: fan * fix clippy warning Signed-off-by: fan * add tests Signed-off-by: fan * follow reviews Signed-off-by: fan * fix only support lower and uppercase Signed-off-by: fan --------- Signed-off-by: fan --- parquet/src/basic.rs | 185 +++++++++++++++++++++++++++++++++ parquet/src/file/properties.rs | 68 ++++++++++++ 2 files changed, 253 insertions(+) diff --git a/parquet/src/basic.rs b/parquet/src/basic.rs index cc8d033f42a4..cdad3597ffef 100644 --- a/parquet/src/basic.rs +++ b/parquet/src/basic.rs @@ -18,6 +18,7 @@ //! Contains Rust mappings for Thrift definition. //! Refer to [`parquet.thrift`](https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift) file to see raw definitions. +use std::str::FromStr; use std::{fmt, str}; pub use crate::compression::{BrotliLevel, GzipLevel, ZstdLevel}; @@ -278,6 +279,29 @@ pub enum Encoding { BYTE_STREAM_SPLIT, } +impl FromStr for Encoding { + type Err = ParquetError; + + fn from_str(s: &str) -> Result { + match s { + "PLAIN" | "plain" => Ok(Encoding::PLAIN), + "PLAIN_DICTIONARY" | "plain_dictionary" => Ok(Encoding::PLAIN_DICTIONARY), + "RLE" | "rle" => Ok(Encoding::RLE), + "BIT_PACKED" | "bit_packed" => Ok(Encoding::BIT_PACKED), + "DELTA_BINARY_PACKED" | "delta_binary_packed" => { + Ok(Encoding::DELTA_BINARY_PACKED) + } + "DELTA_LENGTH_BYTE_ARRAY" | "delta_length_byte_array" => { + Ok(Encoding::DELTA_LENGTH_BYTE_ARRAY) + } + "DELTA_BYTE_ARRAY" | "delta_byte_array" => Ok(Encoding::DELTA_BYTE_ARRAY), + "RLE_DICTIONARY" | "rle_dictionary" => Ok(Encoding::RLE_DICTIONARY), + "BYTE_STREAM_SPLIT" | "byte_stream_split" => Ok(Encoding::BYTE_STREAM_SPLIT), + _ => Err(general_err!("unknown encoding: {}", s)), + } + } +} + // ---------------------------------------------------------------------- // Mirrors `parquet::CompressionCodec` @@ -295,6 +319,90 @@ pub enum Compression { LZ4_RAW, } +fn split_compression_string( + str_setting: &str, +) -> Result<(&str, Option), ParquetError> { + let split_setting = str_setting.split_once('('); + + match split_setting { + Some((codec, level_str)) => { + let level = + &level_str[..level_str.len() - 1] + .parse::() + .map_err(|_| { + ParquetError::General(format!( + "invalid compression level: {}", + level_str + )) + })?; + Ok((codec, Some(*level))) + } + None => Ok((str_setting, None)), + } +} + +fn check_level_is_none(level: &Option) -> Result<(), ParquetError> { + if level.is_some() { + return Err(ParquetError::General("level is not support".to_string())); + } + + Ok(()) +} + +fn require_level(codec: &str, level: Option) -> Result { + level.ok_or(ParquetError::General(format!("{} require level", codec))) +} + +impl FromStr for Compression { + type Err = ParquetError; + + fn from_str(s: &str) -> std::result::Result { + let (codec, level) = split_compression_string(s)?; + + let c = match codec { + "UNCOMPRESSED" | "uncompressed" => { + check_level_is_none(&level)?; + Compression::UNCOMPRESSED + } + "SNAPPY" | "snappy" => { + check_level_is_none(&level)?; + Compression::SNAPPY + } + "GZIP" | "gzip" => { + let level = require_level(codec, level)?; + Compression::GZIP(GzipLevel::try_new(level)?) + } + "LZO" | "lzo" => { + check_level_is_none(&level)?; + Compression::LZO + } + "BROTLI" | "brotli" => { + let level = require_level(codec, level)?; + Compression::BROTLI(BrotliLevel::try_new(level)?) + } + "LZ4" | "lz4" => { + check_level_is_none(&level)?; + Compression::LZ4 + } + "ZSTD" | "zstd" => { + let level = require_level(codec, level)?; + Compression::ZSTD(ZstdLevel::try_new(level as i32)?) + } + "LZ4_RAW" | "lz4_raw" => { + check_level_is_none(&level)?; + Compression::LZ4_RAW + } + _ => { + return Err(ParquetError::General(format!( + "unsupport compression {codec}" + ))); + } + }; + + Ok(c) + } +} + // ---------------------------------------------------------------------- // Mirrors `parquet::PageType` @@ -2130,4 +2238,81 @@ mod tests { ); assert_eq!(ColumnOrder::UNDEFINED.sort_order(), SortOrder::SIGNED); } + + #[test] + fn test_parse_encoding() { + let mut encoding: Encoding = "PLAIN".parse().unwrap(); + assert_eq!(encoding, Encoding::PLAIN); + encoding = "PLAIN_DICTIONARY".parse().unwrap(); + assert_eq!(encoding, Encoding::PLAIN_DICTIONARY); + encoding = "RLE".parse().unwrap(); + assert_eq!(encoding, Encoding::RLE); + encoding = "BIT_PACKED".parse().unwrap(); + assert_eq!(encoding, Encoding::BIT_PACKED); + encoding = "DELTA_BINARY_PACKED".parse().unwrap(); + assert_eq!(encoding, Encoding::DELTA_BINARY_PACKED); + encoding = "DELTA_LENGTH_BYTE_ARRAY".parse().unwrap(); + assert_eq!(encoding, Encoding::DELTA_LENGTH_BYTE_ARRAY); + encoding = "DELTA_BYTE_ARRAY".parse().unwrap(); + assert_eq!(encoding, Encoding::DELTA_BYTE_ARRAY); + encoding = "RLE_DICTIONARY".parse().unwrap(); + assert_eq!(encoding, Encoding::RLE_DICTIONARY); + encoding = "BYTE_STREAM_SPLIT".parse().unwrap(); + assert_eq!(encoding, Encoding::BYTE_STREAM_SPLIT); + + // test lowercase + encoding = "byte_stream_split".parse().unwrap(); + assert_eq!(encoding, Encoding::BYTE_STREAM_SPLIT); + + // test unknown string + match "plain_xxx".parse::() { + Ok(e) => { + panic!("Should not be able to parse {:?}", e); + } + Err(e) => { + assert_eq!(e.to_string(), "Parquet error: unknown encoding: plain_xxx"); + } + } + } + + #[test] + fn test_parse_compression() { + let mut compress: Compression = "snappy".parse().unwrap(); + assert_eq!(compress, Compression::SNAPPY); + compress = "lzo".parse().unwrap(); + assert_eq!(compress, Compression::LZO); + compress = "zstd(3)".parse().unwrap(); + assert_eq!(compress, Compression::ZSTD(ZstdLevel::try_new(3).unwrap())); + compress = "LZ4_RAW".parse().unwrap(); + assert_eq!(compress, Compression::LZ4_RAW); + compress = "uncompressed".parse().unwrap(); + assert_eq!(compress, Compression::UNCOMPRESSED); + compress = "snappy".parse().unwrap(); + assert_eq!(compress, Compression::SNAPPY); + compress = "gzip(9)".parse().unwrap(); + assert_eq!(compress, Compression::GZIP(GzipLevel::try_new(9).unwrap())); + compress = "lzo".parse().unwrap(); + assert_eq!(compress, Compression::LZO); + compress = "brotli(3)".parse().unwrap(); + assert_eq!( + compress, + Compression::BROTLI(BrotliLevel::try_new(3).unwrap()) + ); + compress = "lz4".parse().unwrap(); + assert_eq!(compress, Compression::LZ4); + + // test unknown compression + let mut err = "plain_xxx".parse::().unwrap_err(); + assert_eq!( + err.to_string(), + "Parquet error: unknown encoding: plain_xxx" + ); + + // test invalid compress level + err = "gzip(-10)".parse::().unwrap_err(); + assert_eq!( + err.to_string(), + "Parquet error: unknown encoding: gzip(-10)" + ); + } } diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index c83fea3f9b92..93b034cf4f60 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -16,6 +16,7 @@ // under the License. //! Configuration via [`WriterProperties`] and [`ReaderProperties`] +use std::str::FromStr; use std::{collections::HashMap, sync::Arc}; use crate::basic::{Compression, Encoding}; @@ -72,6 +73,18 @@ impl WriterVersion { } } +impl FromStr for WriterVersion { + type Err = String; + + fn from_str(s: &str) -> Result { + match s { + "PARQUET_1_0" | "parquet_1_0" => Ok(WriterVersion::PARQUET_1_0), + "PARQUET_2_0" | "parquet_2_0" => Ok(WriterVersion::PARQUET_2_0), + _ => Err(format!("Invalid writer version: {}", s)), + } + } +} + /// Reference counted writer properties. pub type WriterPropertiesPtr = Arc; @@ -655,6 +668,19 @@ pub enum EnabledStatistics { Page, } +impl FromStr for EnabledStatistics { + type Err = String; + + fn from_str(s: &str) -> Result { + match s { + "NONE" | "none" => Ok(EnabledStatistics::None), + "CHUNK" | "chunk" => Ok(EnabledStatistics::Chunk), + "PAGE" | "page" => Ok(EnabledStatistics::Page), + _ => Err(format!("Invalid statistics arg: {}", s)), + } + } +} + impl Default for EnabledStatistics { fn default() -> Self { DEFAULT_STATISTICS_ENABLED @@ -1182,4 +1208,46 @@ mod tests { assert_eq!(props.codec_options(), &codec_options); } + + #[test] + fn test_parse_writerversion() { + let mut writer_version = "PARQUET_1_0".parse::().unwrap(); + assert_eq!(writer_version, WriterVersion::PARQUET_1_0); + writer_version = "PARQUET_2_0".parse::().unwrap(); + assert_eq!(writer_version, WriterVersion::PARQUET_2_0); + + // test lowercase + writer_version = "parquet_1_0".parse::().unwrap(); + assert_eq!(writer_version, WriterVersion::PARQUET_1_0); + + // test invalid version + match "PARQUET_-1_0".parse::() { + Ok(_) => panic!("Should not be able to parse PARQUET_-1_0"), + Err(e) => { + assert_eq!(e, "Invalid writer version: PARQUET_-1_0"); + } + } + } + + #[test] + fn test_parse_enabledstatistics() { + let mut enabled_statistics = "NONE".parse::().unwrap(); + assert_eq!(enabled_statistics, EnabledStatistics::None); + enabled_statistics = "CHUNK".parse::().unwrap(); + assert_eq!(enabled_statistics, EnabledStatistics::Chunk); + enabled_statistics = "PAGE".parse::().unwrap(); + assert_eq!(enabled_statistics, EnabledStatistics::Page); + + // test lowercase + enabled_statistics = "none".parse::().unwrap(); + assert_eq!(enabled_statistics, EnabledStatistics::None); + + //test invalid statistics + match "ChunkAndPage".parse::() { + Ok(_) => panic!("Should not be able to parse ChunkAndPage"), + Err(e) => { + assert_eq!(e, "Invalid statistics arg: ChunkAndPage"); + } + } + } }