From 0f29db46f85fe5fa0a09b4369692090fe49e69cd Mon Sep 17 00:00:00 2001 From: Jorge Leitao Date: Tue, 15 Mar 2022 07:07:56 +0100 Subject: [PATCH] Fixed LZ4 (#95) --- Cargo.toml | 4 +- .../integration/write_pyarrow.py | 8 ++-- integration-tests/src/read/mod.rs | 48 ++++++++++--------- src/compression.rs | 40 ++++++++-------- src/metadata/column_chunk_metadata.rs | 1 + src/parquet_bridge.rs | 5 ++ 6 files changed, 57 insertions(+), 49 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 1c38aec70..957762ec9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ name = "parquet2" bench = false [dependencies] -parquet-format-async-temp = "0.2.0" +parquet-format-async-temp = "0.3.0" bitpacking = { version = "0.8.2", default-features = false, features = ["bitpacker1x"] } streaming-decompression = "0.1" @@ -22,7 +22,7 @@ futures = { version = "0.3", optional = true } snap = { version = "^1.0", optional = true } brotli = { version = "^3.3", optional = true } flate2 = { version = "^1.0", optional = true } -lz4 = { version = "^1.23", optional = true } +lz4 = { version = "1", optional = true } zstd = { version = "^0.11", optional = true, default-features = false } [features] diff --git a/integration-tests/integration/write_pyarrow.py b/integration-tests/integration/write_pyarrow.py index b6fbe51cb..ed31b6a0b 100644 --- a/integration-tests/integration/write_pyarrow.py +++ b/integration-tests/integration/write_pyarrow.py @@ -117,11 +117,11 @@ def case_struct(size): def write_pyarrow( - case, size=1, page_version=1, use_dictionary=False, use_compression=False + case, size=1, page_version=1, use_dictionary=False, compression=None ): data, schema, path = case(size) - compression_path = "/snappy" if use_compression else "" + compression_path = f"/{compression}" if compression else "" if use_dictionary: base_path = f"{PYARROW_PATH}/v{page_version}/dict{compression_path}" @@ -136,7 +136,7 @@ def write_pyarrow( version=f"{page_version}.0", data_page_version=f"{page_version}.0", write_statistics=True, - compression="snappy" if use_compression else None, + compression=compression, use_dictionary=use_dictionary, ) @@ -144,5 +144,5 @@ def write_pyarrow( for case in [case_basic_nullable, case_basic_required, case_nested, case_struct]: for version in [1, 2]: for use_dict in [False, True]: - for compression in [False, True]: + for compression in [None, "snappy", "lz4"]: write_pyarrow(case, 1, version, use_dict, compression) diff --git a/integration-tests/src/read/mod.rs b/integration-tests/src/read/mod.rs index c1408bb91..8deee6b9e 100644 --- a/integration-tests/src/read/mod.rs +++ b/integration-tests/src/read/mod.rs @@ -312,14 +312,13 @@ pub(crate) mod tests { version: usize, required: bool, use_dictionary: bool, - use_compression: bool, + compression: &str, ) -> Result<()> { if std::env::var("PARQUET2_IGNORE_PYARROW_TESTS").is_ok() { return Ok(()); } let required_s = if required { "required" } else { "nullable" }; let use_dictionary_s = if use_dictionary { "dict" } else { "non_dict" }; - let compression = if use_compression { "/snappy" } else { "" }; let path = format!( "fixtures/pyarrow3/v{}/{}{}/{}_{}_10.parquet", @@ -358,96 +357,101 @@ pub(crate) mod tests { #[test] fn pyarrow_v1_dict_int64_required() -> Result<()> { - test_pyarrow_integration("basic", 0, 1, true, true, false) + test_pyarrow_integration("basic", 0, 1, true, true, "") } #[test] fn pyarrow_v1_dict_int64_optional() -> Result<()> { - test_pyarrow_integration("basic", 0, 1, false, true, false) + test_pyarrow_integration("basic", 0, 1, false, true, "") } #[test] fn pyarrow_v1_non_dict_int64_required() -> Result<()> { - test_pyarrow_integration("basic", 0, 1, true, false, false) + test_pyarrow_integration("basic", 0, 1, true, false, "") } #[test] fn pyarrow_v1_non_dict_int64_optional() -> Result<()> { - test_pyarrow_integration("basic", 0, 1, false, false, false) + test_pyarrow_integration("basic", 0, 1, false, false, "") } #[test] - fn pyarrow_v1_non_dict_int64_optional_compressed() -> Result<()> { - test_pyarrow_integration("basic", 0, 1, false, false, true) + fn pyarrow_v1_non_dict_int64_optional_snappy() -> Result<()> { + test_pyarrow_integration("basic", 0, 1, false, false, "/snappy") + } + + #[test] + fn pyarrow_v1_non_dict_int64_optional_lz4() -> Result<()> { + test_pyarrow_integration("basic", 0, 1, false, false, "/lz4") } #[test] fn pyarrow_v2_non_dict_int64_optional() -> Result<()> { - test_pyarrow_integration("basic", 0, 2, false, false, false) + test_pyarrow_integration("basic", 0, 2, false, false, "") } #[test] fn pyarrow_v2_non_dict_int64_required() -> Result<()> { - test_pyarrow_integration("basic", 0, 2, true, false, false) + test_pyarrow_integration("basic", 0, 2, true, false, "") } #[test] fn pyarrow_v2_dict_int64_optional() -> Result<()> { - test_pyarrow_integration("basic", 0, 2, false, true, false) + test_pyarrow_integration("basic", 0, 2, false, true, "") } #[test] fn pyarrow_v2_non_dict_int64_optional_compressed() -> Result<()> { - test_pyarrow_integration("basic", 0, 2, false, false, true) + test_pyarrow_integration("basic", 0, 2, false, false, "/snappy") } #[test] fn pyarrow_v1_dict_string_required() -> Result<()> { - test_pyarrow_integration("basic", 2, 1, true, true, false) + test_pyarrow_integration("basic", 2, 1, true, true, "") } #[test] fn pyarrow_v1_dict_string_optional() -> Result<()> { - test_pyarrow_integration("basic", 2, 1, false, true, false) + test_pyarrow_integration("basic", 2, 1, false, true, "") } #[test] fn pyarrow_v1_non_dict_string_required() -> Result<()> { - test_pyarrow_integration("basic", 2, 1, true, false, false) + test_pyarrow_integration("basic", 2, 1, true, false, "") } #[test] fn pyarrow_v1_non_dict_string_optional() -> Result<()> { - test_pyarrow_integration("basic", 2, 1, false, false, false) + test_pyarrow_integration("basic", 2, 1, false, false, "") } #[test] fn pyarrow_v1_dict_list_optional() -> Result<()> { - test_pyarrow_integration("nested", 0, 1, false, true, false) + test_pyarrow_integration("nested", 0, 1, false, true, "") } #[test] fn pyarrow_v1_non_dict_list_optional() -> Result<()> { - test_pyarrow_integration("nested", 0, 1, false, false, false) + test_pyarrow_integration("nested", 0, 1, false, false, "") } #[test] fn pyarrow_v1_struct_optional() -> Result<()> { - test_pyarrow_integration("struct", 0, 1, false, false, false) + test_pyarrow_integration("struct", 0, 1, false, false, "") } #[test] fn pyarrow_v2_struct_optional() -> Result<()> { - test_pyarrow_integration("struct", 0, 2, false, false, false) + test_pyarrow_integration("struct", 0, 2, false, false, "") } #[test] fn pyarrow_v1_struct_required() -> Result<()> { - test_pyarrow_integration("struct", 1, 1, false, false, false) + test_pyarrow_integration("struct", 1, 1, false, false, "") } #[test] fn pyarrow_v2_struct_required() -> Result<()> { - test_pyarrow_integration("struct", 1, 2, false, false, false) + test_pyarrow_integration("struct", 1, 2, false, false, "") } } diff --git a/src/compression.rs b/src/compression.rs index 124f1fad1..acba5f735 100644 --- a/src/compression.rs +++ b/src/compression.rs @@ -62,23 +62,21 @@ pub fn compress( "compress to snappy".to_string(), )), #[cfg(feature = "lz4")] - Compression::Lz4 => { - use std::io::Write; - const LZ4_BUFFER_SIZE: usize = 4096; - let mut encoder = lz4::EncoderBuilder::new().build(output_buf)?; - let mut from = 0; - loop { - let to = std::cmp::min(from + LZ4_BUFFER_SIZE, input_buf.len()); - encoder.write_all(&input_buf[from..to])?; - from += LZ4_BUFFER_SIZE; - if from >= input_buf.len() { - break; - } - } - encoder.finish().1.map_err(|e| e.into()) + Compression::Lz4Raw => { + let output_buf_len = output_buf.len(); + let required_len = input_buf.len(); + output_buf.resize(output_buf_len + required_len, 0); + let size = lz4::block::compress_to_buffer( + input_buf, + None, + false, + &mut output_buf[output_buf_len..], + )?; + output_buf.truncate(output_buf_len + size); + Ok(()) } #[cfg(not(feature = "lz4"))] - Compression::Lz4 => Err(ParquetError::FeatureNotActive( + Compression::Lz4Raw => Err(ParquetError::FeatureNotActive( crate::error::Feature::Lz4, "compress to lz4".to_string(), )), @@ -155,13 +153,13 @@ pub fn decompress(compression: Compression, input_buf: &[u8], output_buf: &mut [ "decompress with snappy".to_string(), )), #[cfg(feature = "lz4")] - Compression::Lz4 => { - use std::io::Read; - let mut decoder = lz4::Decoder::new(input_buf)?; - decoder.read_exact(output_buf).map_err(|e| e.into()) + Compression::Lz4Raw => { + lz4::block::decompress_to_buffer(input_buf, Some(output_buf.len() as i32), output_buf) + .map(|_| {}) + .map_err(|e| e.into()) } #[cfg(not(feature = "lz4"))] - Compression::Lz4 => Err(ParquetError::FeatureNotActive( + Compression::Lz4Raw => Err(ParquetError::FeatureNotActive( crate::error::Feature::Lz4, "decompress with lz4".to_string(), )), @@ -230,7 +228,7 @@ mod tests { #[test] fn test_codec_lz4() { - test_codec(Compression::Lz4); + test_codec(Compression::Lz4Raw); } #[test] diff --git a/src/metadata/column_chunk_metadata.rs b/src/metadata/column_chunk_metadata.rs index eea10caab..b98eea814 100644 --- a/src/metadata/column_chunk_metadata.rs +++ b/src/metadata/column_chunk_metadata.rs @@ -80,6 +80,7 @@ impl ColumnChunkMetaData { /// [`Compression`] for this column. pub fn compression(&self) -> Compression { + println!("{:?}", self.column_metadata().codec); self.column_metadata().codec.try_into().unwrap() } diff --git a/src/parquet_bridge.rs b/src/parquet_bridge.rs index 894ed587b..b783b4566 100644 --- a/src/parquet_bridge.rs +++ b/src/parquet_bridge.rs @@ -49,12 +49,14 @@ pub enum Compression { Brotli, Lz4, Zstd, + Lz4Raw, } impl TryFrom for Compression { type Error = ParquetError; fn try_from(codec: CompressionCodec) -> Result { + println!("{codec:?}"); Ok(match codec { CompressionCodec::UNCOMPRESSED => Compression::Uncompressed, CompressionCodec::SNAPPY => Compression::Snappy, @@ -63,6 +65,7 @@ impl TryFrom for Compression { CompressionCodec::BROTLI => Compression::Brotli, CompressionCodec::LZ4 => Compression::Lz4, CompressionCodec::ZSTD => Compression::Zstd, + CompressionCodec::LZ4_RAW => Compression::Lz4Raw, _ => return Err(ParquetError::OutOfSpec("Thrift out of range".to_string())), }) } @@ -70,6 +73,7 @@ impl TryFrom for Compression { impl From for CompressionCodec { fn from(codec: Compression) -> Self { + println!("{codec:?}"); match codec { Compression::Uncompressed => CompressionCodec::UNCOMPRESSED, Compression::Snappy => CompressionCodec::SNAPPY, @@ -78,6 +82,7 @@ impl From for CompressionCodec { Compression::Brotli => CompressionCodec::BROTLI, Compression::Lz4 => CompressionCodec::LZ4, Compression::Zstd => CompressionCodec::ZSTD, + Compression::Lz4Raw => CompressionCodec::LZ4_RAW, } } }