Skip to content

Commit

Permalink
PARQUET-2196: Support LZ4_RAW codec
Browse files Browse the repository at this point in the history
  • Loading branch information
wgtmac committed Sep 27, 2022
1 parent 53f65a8 commit e3ffe88
Show file tree
Hide file tree
Showing 12 changed files with 688 additions and 260 deletions.
2 changes: 2 additions & 0 deletions parquet-cli/src/main/java/org/apache/parquet/cli/Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ public static String shortCodec(CompressionCodecName codec) {
return "B";
case LZ4:
return "4";
case LZ4_RAW:
return "4";
case ZSTD:
return "Z";
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ public enum CompressionCodecName {
LZO("com.hadoop.compression.lzo.LzoCodec", CompressionCodec.LZO, ".lzo"),
BROTLI("org.apache.hadoop.io.compress.BrotliCodec", CompressionCodec.BROTLI, ".br"),
LZ4("org.apache.hadoop.io.compress.Lz4Codec", CompressionCodec.LZ4, ".lz4"),
ZSTD("org.apache.parquet.hadoop.codec.ZstandardCodec", CompressionCodec.ZSTD, ".zstd");
ZSTD("org.apache.parquet.hadoop.codec.ZstandardCodec", CompressionCodec.ZSTD, ".zstd"),
LZ4_RAW("org.apache.parquet.hadoop.codec.Lz4RawCodec", CompressionCodec.LZ4_RAW, ".lz4");

public static CompressionCodecName fromConf(String name) {
if (name == null) {
Expand Down
5 changes: 5 additions & 0 deletions parquet-hadoop/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.airlift</groupId>
<artifactId>aircompressor</artifactId>
<version>0.21</version>
</dependency>
<dependency>
<groupId>commons-pool</groupId>
<artifactId>commons-pool</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.hadoop.codec;

import org.apache.parquet.Preconditions;

/**
* Utilities for NonBlockedCompressor and NonBlockedDecompressor.
*/
public class CodecUtil {
public static void validateBuffer(byte[] buffer, int off, int len) {
Preconditions.checkNotNull(buffer, "buffer");
Preconditions.checkArgument(off >= 0 && len >= 0 && off <= buffer.length - len,
"Invalid buffer offset or length: buffer.length=%s off=%s len=%s",
buffer.length, off, len);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.hadoop.codec;

import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.*;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

/**
* Lz4 raw compression codec for Parquet. This codec type has been introduced
* into the parquet format since version 2.9.0.
*/
public class Lz4RawCodec implements Configurable, CompressionCodec {

private Configuration conf;
// Hadoop config for how big to make intermediate buffers.
private final String BUFFER_SIZE_CONFIG = "io.file.buffer.size";

@Override
public void setConf(Configuration conf) {
this.conf = conf;
}

@Override
public Configuration getConf() {
return conf;
}

@Override
public Compressor createCompressor() {
return new Lz4RawCompressor();
}

@Override
public Decompressor createDecompressor() {
return new Lz4RawDecompressor();
}

@Override
public CompressionInputStream createInputStream(InputStream stream)
throws IOException {
return createInputStream(stream, createDecompressor());
}

@Override
public CompressionInputStream createInputStream(InputStream stream,
Decompressor decompressor) throws IOException {
return new NonBlockedDecompressorStream(stream, decompressor,
conf.getInt(BUFFER_SIZE_CONFIG, 4*1024));
}

@Override
public CompressionOutputStream createOutputStream(OutputStream stream)
throws IOException {
return createOutputStream(stream, createCompressor());
}

@Override
public CompressionOutputStream createOutputStream(OutputStream stream,
Compressor compressor) throws IOException {
return new NonBlockedCompressorStream(stream, compressor,
conf.getInt(BUFFER_SIZE_CONFIG, 4*1024));
}

@Override
public Class<? extends Compressor> getCompressorType() {
return Lz4RawCompressor.class;
}

@Override
public Class<? extends Decompressor> getDecompressorType() {
return Lz4RawDecompressor.class;
}

@Override
public String getDefaultExtension() {
return ".lz4";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.hadoop.codec;

import io.airlift.compress.lz4.Lz4Compressor;

import java.io.IOException;
import java.nio.ByteBuffer;

public class Lz4RawCompressor extends NonBlockedCompressor {

private Lz4Compressor compressor = new Lz4Compressor();

@Override
protected int maxCompressedLength(int byteSize) {
return io.airlift.compress.lz4.Lz4RawCompressor.maxCompressedLength(byteSize);
}

@Override
protected int compress(ByteBuffer uncompressed, ByteBuffer compressed) throws IOException {
compressor.compress(uncompressed, compressed);
int compressedSize = compressed.position();
compressed.limit(compressedSize);
compressed.rewind();
return compressedSize;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.hadoop.codec;

import io.airlift.compress.lz4.Lz4Decompressor;

import java.io.IOException;
import java.nio.ByteBuffer;

public class Lz4RawDecompressor extends NonBlockedDecompressor {

private Lz4Decompressor decompressor = new Lz4Decompressor();

@Override
protected int uncompress(ByteBuffer compressed, ByteBuffer uncompressed) throws IOException {
decompressor.decompress(compressed, uncompressed);
int uncompressedSize = uncompressed.position();
uncompressed.limit(uncompressedSize);
uncompressed.rewind();
return uncompressedSize;
}

@Override
protected int uncompressedLength(ByteBuffer compressed, int maxUncompressedLength) throws IOException {
// We cannot obtain the precise uncompressed length from the input data.
// Simply return the maxUncompressedLength.
return maxUncompressedLength;
}

} //class Lz4RawDecompressor
Loading

0 comments on commit e3ffe88

Please sign in to comment.