Skip to content

Commit

Permalink
Fixed LZ4 (jorgecarleitao#95)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored and dantengsky committed Apr 1, 2022
1 parent 8a4a8b2 commit 0f29db4
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 49 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ name = "parquet2"
bench = false

[dependencies]
parquet-format-async-temp = "0.2.0"
parquet-format-async-temp = "0.3.0"
bitpacking = { version = "0.8.2", default-features = false, features = ["bitpacker1x"] }
streaming-decompression = "0.1"

Expand All @@ -22,7 +22,7 @@ futures = { version = "0.3", optional = true }
snap = { version = "^1.0", optional = true }
brotli = { version = "^3.3", optional = true }
flate2 = { version = "^1.0", optional = true }
lz4 = { version = "^1.23", optional = true }
lz4 = { version = "1", optional = true }
zstd = { version = "^0.11", optional = true, default-features = false }

[features]
Expand Down
8 changes: 4 additions & 4 deletions integration-tests/integration/write_pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,11 @@ def case_struct(size):


def write_pyarrow(
case, size=1, page_version=1, use_dictionary=False, use_compression=False
case, size=1, page_version=1, use_dictionary=False, compression=None
):
data, schema, path = case(size)

compression_path = "/snappy" if use_compression else ""
compression_path = f"/{compression}" if compression else ""

if use_dictionary:
base_path = f"{PYARROW_PATH}/v{page_version}/dict{compression_path}"
Expand All @@ -136,13 +136,13 @@ def write_pyarrow(
version=f"{page_version}.0",
data_page_version=f"{page_version}.0",
write_statistics=True,
compression="snappy" if use_compression else None,
compression=compression,
use_dictionary=use_dictionary,
)


for case in [case_basic_nullable, case_basic_required, case_nested, case_struct]:
for version in [1, 2]:
for use_dict in [False, True]:
for compression in [False, True]:
for compression in [None, "snappy", "lz4"]:
write_pyarrow(case, 1, version, use_dict, compression)
48 changes: 26 additions & 22 deletions integration-tests/src/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,14 +312,13 @@ pub(crate) mod tests {
version: usize,
required: bool,
use_dictionary: bool,
use_compression: bool,
compression: &str,
) -> Result<()> {
if std::env::var("PARQUET2_IGNORE_PYARROW_TESTS").is_ok() {
return Ok(());
}
let required_s = if required { "required" } else { "nullable" };
let use_dictionary_s = if use_dictionary { "dict" } else { "non_dict" };
let compression = if use_compression { "/snappy" } else { "" };

let path = format!(
"fixtures/pyarrow3/v{}/{}{}/{}_{}_10.parquet",
Expand Down Expand Up @@ -358,96 +357,101 @@ pub(crate) mod tests {

#[test]
fn pyarrow_v1_dict_int64_required() -> Result<()> {
test_pyarrow_integration("basic", 0, 1, true, true, false)
test_pyarrow_integration("basic", 0, 1, true, true, "")
}

#[test]
fn pyarrow_v1_dict_int64_optional() -> Result<()> {
test_pyarrow_integration("basic", 0, 1, false, true, false)
test_pyarrow_integration("basic", 0, 1, false, true, "")
}

#[test]
fn pyarrow_v1_non_dict_int64_required() -> Result<()> {
test_pyarrow_integration("basic", 0, 1, true, false, false)
test_pyarrow_integration("basic", 0, 1, true, false, "")
}

#[test]
fn pyarrow_v1_non_dict_int64_optional() -> Result<()> {
test_pyarrow_integration("basic", 0, 1, false, false, false)
test_pyarrow_integration("basic", 0, 1, false, false, "")
}

#[test]
fn pyarrow_v1_non_dict_int64_optional_compressed() -> Result<()> {
test_pyarrow_integration("basic", 0, 1, false, false, true)
fn pyarrow_v1_non_dict_int64_optional_snappy() -> Result<()> {
test_pyarrow_integration("basic", 0, 1, false, false, "/snappy")
}

#[test]
fn pyarrow_v1_non_dict_int64_optional_lz4() -> Result<()> {
test_pyarrow_integration("basic", 0, 1, false, false, "/lz4")
}

#[test]
fn pyarrow_v2_non_dict_int64_optional() -> Result<()> {
test_pyarrow_integration("basic", 0, 2, false, false, false)
test_pyarrow_integration("basic", 0, 2, false, false, "")
}

#[test]
fn pyarrow_v2_non_dict_int64_required() -> Result<()> {
test_pyarrow_integration("basic", 0, 2, true, false, false)
test_pyarrow_integration("basic", 0, 2, true, false, "")
}

#[test]
fn pyarrow_v2_dict_int64_optional() -> Result<()> {
test_pyarrow_integration("basic", 0, 2, false, true, false)
test_pyarrow_integration("basic", 0, 2, false, true, "")
}

#[test]
fn pyarrow_v2_non_dict_int64_optional_compressed() -> Result<()> {
test_pyarrow_integration("basic", 0, 2, false, false, true)
test_pyarrow_integration("basic", 0, 2, false, false, "/snappy")
}

#[test]
fn pyarrow_v1_dict_string_required() -> Result<()> {
test_pyarrow_integration("basic", 2, 1, true, true, false)
test_pyarrow_integration("basic", 2, 1, true, true, "")
}

#[test]
fn pyarrow_v1_dict_string_optional() -> Result<()> {
test_pyarrow_integration("basic", 2, 1, false, true, false)
test_pyarrow_integration("basic", 2, 1, false, true, "")
}

#[test]
fn pyarrow_v1_non_dict_string_required() -> Result<()> {
test_pyarrow_integration("basic", 2, 1, true, false, false)
test_pyarrow_integration("basic", 2, 1, true, false, "")
}

#[test]
fn pyarrow_v1_non_dict_string_optional() -> Result<()> {
test_pyarrow_integration("basic", 2, 1, false, false, false)
test_pyarrow_integration("basic", 2, 1, false, false, "")
}

#[test]
fn pyarrow_v1_dict_list_optional() -> Result<()> {
test_pyarrow_integration("nested", 0, 1, false, true, false)
test_pyarrow_integration("nested", 0, 1, false, true, "")
}

#[test]
fn pyarrow_v1_non_dict_list_optional() -> Result<()> {
test_pyarrow_integration("nested", 0, 1, false, false, false)
test_pyarrow_integration("nested", 0, 1, false, false, "")
}

#[test]
fn pyarrow_v1_struct_optional() -> Result<()> {
test_pyarrow_integration("struct", 0, 1, false, false, false)
test_pyarrow_integration("struct", 0, 1, false, false, "")
}

#[test]
fn pyarrow_v2_struct_optional() -> Result<()> {
test_pyarrow_integration("struct", 0, 2, false, false, false)
test_pyarrow_integration("struct", 0, 2, false, false, "")
}

#[test]
fn pyarrow_v1_struct_required() -> Result<()> {
test_pyarrow_integration("struct", 1, 1, false, false, false)
test_pyarrow_integration("struct", 1, 1, false, false, "")
}

#[test]
fn pyarrow_v2_struct_required() -> Result<()> {
test_pyarrow_integration("struct", 1, 2, false, false, false)
test_pyarrow_integration("struct", 1, 2, false, false, "")
}
}
40 changes: 19 additions & 21 deletions src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,23 +62,21 @@ pub fn compress(
"compress to snappy".to_string(),
)),
#[cfg(feature = "lz4")]
Compression::Lz4 => {
use std::io::Write;
const LZ4_BUFFER_SIZE: usize = 4096;
let mut encoder = lz4::EncoderBuilder::new().build(output_buf)?;
let mut from = 0;
loop {
let to = std::cmp::min(from + LZ4_BUFFER_SIZE, input_buf.len());
encoder.write_all(&input_buf[from..to])?;
from += LZ4_BUFFER_SIZE;
if from >= input_buf.len() {
break;
}
}
encoder.finish().1.map_err(|e| e.into())
Compression::Lz4Raw => {
let output_buf_len = output_buf.len();
let required_len = input_buf.len();
output_buf.resize(output_buf_len + required_len, 0);
let size = lz4::block::compress_to_buffer(
input_buf,
None,
false,
&mut output_buf[output_buf_len..],
)?;
output_buf.truncate(output_buf_len + size);
Ok(())
}
#[cfg(not(feature = "lz4"))]
Compression::Lz4 => Err(ParquetError::FeatureNotActive(
Compression::Lz4Raw => Err(ParquetError::FeatureNotActive(
crate::error::Feature::Lz4,
"compress to lz4".to_string(),
)),
Expand Down Expand Up @@ -155,13 +153,13 @@ pub fn decompress(compression: Compression, input_buf: &[u8], output_buf: &mut [
"decompress with snappy".to_string(),
)),
#[cfg(feature = "lz4")]
Compression::Lz4 => {
use std::io::Read;
let mut decoder = lz4::Decoder::new(input_buf)?;
decoder.read_exact(output_buf).map_err(|e| e.into())
Compression::Lz4Raw => {
lz4::block::decompress_to_buffer(input_buf, Some(output_buf.len() as i32), output_buf)
.map(|_| {})
.map_err(|e| e.into())
}
#[cfg(not(feature = "lz4"))]
Compression::Lz4 => Err(ParquetError::FeatureNotActive(
Compression::Lz4Raw => Err(ParquetError::FeatureNotActive(
crate::error::Feature::Lz4,
"decompress with lz4".to_string(),
)),
Expand Down Expand Up @@ -230,7 +228,7 @@ mod tests {

#[test]
fn test_codec_lz4() {
test_codec(Compression::Lz4);
test_codec(Compression::Lz4Raw);
}

#[test]
Expand Down
1 change: 1 addition & 0 deletions src/metadata/column_chunk_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ impl ColumnChunkMetaData {

/// [`Compression`] for this column.
pub fn compression(&self) -> Compression {
println!("{:?}", self.column_metadata().codec);
self.column_metadata().codec.try_into().unwrap()
}

Expand Down
5 changes: 5 additions & 0 deletions src/parquet_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,14 @@ pub enum Compression {
Brotli,
Lz4,
Zstd,
Lz4Raw,
}

impl TryFrom<CompressionCodec> for Compression {
type Error = ParquetError;

fn try_from(codec: CompressionCodec) -> Result<Self, Self::Error> {
println!("{codec:?}");
Ok(match codec {
CompressionCodec::UNCOMPRESSED => Compression::Uncompressed,
CompressionCodec::SNAPPY => Compression::Snappy,
Expand All @@ -63,13 +65,15 @@ impl TryFrom<CompressionCodec> for Compression {
CompressionCodec::BROTLI => Compression::Brotli,
CompressionCodec::LZ4 => Compression::Lz4,
CompressionCodec::ZSTD => Compression::Zstd,
CompressionCodec::LZ4_RAW => Compression::Lz4Raw,
_ => return Err(ParquetError::OutOfSpec("Thrift out of range".to_string())),
})
}
}

impl From<Compression> for CompressionCodec {
fn from(codec: Compression) -> Self {
println!("{codec:?}");
match codec {
Compression::Uncompressed => CompressionCodec::UNCOMPRESSED,
Compression::Snappy => CompressionCodec::SNAPPY,
Expand All @@ -78,6 +82,7 @@ impl From<Compression> for CompressionCodec {
Compression::Brotli => CompressionCodec::BROTLI,
Compression::Lz4 => CompressionCodec::LZ4,
Compression::Zstd => CompressionCodec::ZSTD,
Compression::Lz4Raw => CompressionCodec::LZ4_RAW,
}
}
}
Expand Down

0 comments on commit 0f29db4

Please sign in to comment.