From 129f0da76ee7123844b9df6a3ce6d1c63f78e150 Mon Sep 17 00:00:00 2001 From: "Mateusz \"Serafin\" Gajewski" Date: Thu, 27 Jun 2024 18:26:14 +0200 Subject: [PATCH] Add bridge between aircompressor v1 and v2 Additionally: seal interfaces and add factory methods --- .../java/io/airlift/compress/Compressor.java | 28 +++++ .../io/airlift/compress/Decompressor.java | 28 +++++ .../compress/MalformedInputException.java | 36 ++++++ .../io/airlift/compress/bzip2/BZip2Codec.java | 19 +++ .../compress/deflate/JdkDeflateCodec.java | 19 +++ .../airlift/compress/gzip/JdkGzipCodec.java | 19 +++ .../compress/hadoop/HadoopStreams.java | 19 +++ .../io/airlift/compress/lz4/Lz4Codec.java | 19 +++ .../airlift/compress/lz4/Lz4Compressor.java | 60 ++++++++++ .../airlift/compress/lz4/Lz4Decompressor.java | 57 +++++++++ .../compress/lz4/Lz4RawCompressor.java | 28 +++++ .../io/airlift/compress/lzo/LzoCodec.java | 19 +++ .../airlift/compress/lzo/LzoCompressor.java | 57 +++++++++ .../airlift/compress/lzo/LzoDecompressor.java | 54 +++++++++ .../io/airlift/compress/lzo/LzopCodec.java | 19 +++ .../airlift/compress/snappy/SnappyCodec.java | 19 +++ .../compress/snappy/SnappyCompressor.java | 60 ++++++++++ .../compress/snappy/SnappyDecompressor.java | 64 ++++++++++ .../compress/v2/lz4/Lz4Compressor.java | 8 ++ .../compress/v2/lz4/Lz4Decompressor.java | 8 ++ .../compress/v2/snappy/SnappyCompressor.java | 8 ++ .../v2/snappy/SnappyDecompressor.java | 8 ++ .../compress/v2/zstd/ZstdCompressor.java | 13 ++- .../compress/v2/zstd/ZstdDecompressor.java | 11 +- .../compress/v2/zstd/ZstdJavaCompressor.java | 2 +- .../v2/zstd/ZstdJavaDecompressor.java | 2 +- .../v2/zstd/ZstdNativeCompressor.java | 2 +- .../v2/zstd/ZstdNativeDecompressor.java | 2 +- .../v2/zstd/ZstdStreamCompressor.java | 6 +- .../v2/zstd/ZstdStreamDecompressor.java | 2 +- .../io/airlift/compress/zstd/ZstdCodec.java | 19 +++ .../airlift/compress/zstd/ZstdCompressor.java | 60 ++++++++++ .../compress/zstd/ZstdDecompressor.java | 65 +++++++++++ .../compress/zstd/ZstdInputStream.java | 25 ++++ .../compress/zstd/ZstdOutputStream.java | 27 +++++ .../compress/v2/zstd/TestZstdPartial.java | 69 ----------- .../v2/zstd/ZstdPartialDecompressor.java | 109 ------------------ 37 files changed, 879 insertions(+), 191 deletions(-) create mode 100644 src/main/java/io/airlift/compress/Compressor.java create mode 100644 src/main/java/io/airlift/compress/Decompressor.java create mode 100644 src/main/java/io/airlift/compress/MalformedInputException.java create mode 100644 src/main/java/io/airlift/compress/bzip2/BZip2Codec.java create mode 100644 src/main/java/io/airlift/compress/deflate/JdkDeflateCodec.java create mode 100644 src/main/java/io/airlift/compress/gzip/JdkGzipCodec.java create mode 100644 src/main/java/io/airlift/compress/hadoop/HadoopStreams.java create mode 100644 src/main/java/io/airlift/compress/lz4/Lz4Codec.java create mode 100644 src/main/java/io/airlift/compress/lz4/Lz4Compressor.java create mode 100644 src/main/java/io/airlift/compress/lz4/Lz4Decompressor.java create mode 100644 src/main/java/io/airlift/compress/lz4/Lz4RawCompressor.java create mode 100644 src/main/java/io/airlift/compress/lzo/LzoCodec.java create mode 100644 src/main/java/io/airlift/compress/lzo/LzoCompressor.java create mode 100644 src/main/java/io/airlift/compress/lzo/LzoDecompressor.java create mode 100644 src/main/java/io/airlift/compress/lzo/LzopCodec.java create mode 100644 src/main/java/io/airlift/compress/snappy/SnappyCodec.java create mode 100644 src/main/java/io/airlift/compress/snappy/SnappyCompressor.java create mode 100644 src/main/java/io/airlift/compress/snappy/SnappyDecompressor.java rename src/{test => main}/java/io/airlift/compress/v2/zstd/ZstdStreamCompressor.java (93%) rename src/{test => main}/java/io/airlift/compress/v2/zstd/ZstdStreamDecompressor.java (98%) create mode 100644 src/main/java/io/airlift/compress/zstd/ZstdCodec.java create mode 100644 src/main/java/io/airlift/compress/zstd/ZstdCompressor.java create mode 100644 src/main/java/io/airlift/compress/zstd/ZstdDecompressor.java create mode 100644 src/main/java/io/airlift/compress/zstd/ZstdInputStream.java create mode 100644 src/main/java/io/airlift/compress/zstd/ZstdOutputStream.java delete mode 100644 src/test/java/io/airlift/compress/v2/zstd/TestZstdPartial.java delete mode 100644 src/test/java/io/airlift/compress/v2/zstd/ZstdPartialDecompressor.java diff --git a/src/main/java/io/airlift/compress/Compressor.java b/src/main/java/io/airlift/compress/Compressor.java new file mode 100644 index 00000000..9719dbad --- /dev/null +++ b/src/main/java/io/airlift/compress/Compressor.java @@ -0,0 +1,28 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.airlift.compress; + +import java.nio.ByteBuffer; + +public interface Compressor +{ + int maxCompressedLength(int uncompressedSize); + + /** + * @return number of bytes written to the output + */ + int compress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset, int maxOutputLength); + + void compress(ByteBuffer input, ByteBuffer output); +} diff --git a/src/main/java/io/airlift/compress/Decompressor.java b/src/main/java/io/airlift/compress/Decompressor.java new file mode 100644 index 00000000..6cf290cc --- /dev/null +++ b/src/main/java/io/airlift/compress/Decompressor.java @@ -0,0 +1,28 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.airlift.compress; + +import java.nio.ByteBuffer; + +public interface Decompressor +{ + /** + * @return number of bytes written to the output + */ + int decompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset, int maxOutputLength) + throws MalformedInputException; + + void decompress(ByteBuffer input, ByteBuffer output) + throws MalformedInputException; +} diff --git a/src/main/java/io/airlift/compress/MalformedInputException.java b/src/main/java/io/airlift/compress/MalformedInputException.java new file mode 100644 index 00000000..eb487a39 --- /dev/null +++ b/src/main/java/io/airlift/compress/MalformedInputException.java @@ -0,0 +1,36 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.airlift.compress; + +public class MalformedInputException + extends RuntimeException +{ + private final long offset; + + public MalformedInputException(long offset) + { + this(offset, "Malformed input"); + } + + public MalformedInputException(long offset, String reason) + { + super(reason + ": offset=" + offset); + this.offset = offset; + } + + public long getOffset() + { + return offset; + } +} diff --git a/src/main/java/io/airlift/compress/bzip2/BZip2Codec.java b/src/main/java/io/airlift/compress/bzip2/BZip2Codec.java new file mode 100644 index 00000000..662084e8 --- /dev/null +++ b/src/main/java/io/airlift/compress/bzip2/BZip2Codec.java @@ -0,0 +1,19 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.airlift.compress.bzip2; + +public class BZip2Codec + extends io.airlift.compress.v2.bzip2.BZip2Codec +{ +} diff --git a/src/main/java/io/airlift/compress/deflate/JdkDeflateCodec.java b/src/main/java/io/airlift/compress/deflate/JdkDeflateCodec.java new file mode 100644 index 00000000..b582cb2f --- /dev/null +++ b/src/main/java/io/airlift/compress/deflate/JdkDeflateCodec.java @@ -0,0 +1,19 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.airlift.compress.deflate; + +public class JdkDeflateCodec + extends io.airlift.compress.v2.deflate.JdkDeflateCodec +{ +} diff --git a/src/main/java/io/airlift/compress/gzip/JdkGzipCodec.java b/src/main/java/io/airlift/compress/gzip/JdkGzipCodec.java new file mode 100644 index 00000000..c7622900 --- /dev/null +++ b/src/main/java/io/airlift/compress/gzip/JdkGzipCodec.java @@ -0,0 +1,19 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.airlift.compress.gzip; + +public class JdkGzipCodec + extends io.airlift.compress.v2.gzip.JdkGzipCodec +{ +} diff --git a/src/main/java/io/airlift/compress/hadoop/HadoopStreams.java b/src/main/java/io/airlift/compress/hadoop/HadoopStreams.java new file mode 100644 index 00000000..84f1aad7 --- /dev/null +++ b/src/main/java/io/airlift/compress/hadoop/HadoopStreams.java @@ -0,0 +1,19 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.airlift.compress.hadoop; + +public interface HadoopStreams + extends io.airlift.compress.v2.hadoop.HadoopStreams +{ +} diff --git a/src/main/java/io/airlift/compress/lz4/Lz4Codec.java b/src/main/java/io/airlift/compress/lz4/Lz4Codec.java new file mode 100644 index 00000000..c1ed17ce --- /dev/null +++ b/src/main/java/io/airlift/compress/lz4/Lz4Codec.java @@ -0,0 +1,19 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.airlift.compress.lz4; + +public class Lz4Codec + extends io.airlift.compress.v2.lz4.Lz4Codec +{ +} diff --git a/src/main/java/io/airlift/compress/lz4/Lz4Compressor.java b/src/main/java/io/airlift/compress/lz4/Lz4Compressor.java new file mode 100644 index 00000000..f46eeefd --- /dev/null +++ b/src/main/java/io/airlift/compress/lz4/Lz4Compressor.java @@ -0,0 +1,60 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.airlift.compress.lz4; + +import io.airlift.compress.Compressor; +import io.airlift.compress.v2.lz4.Lz4JavaCompressor; +import io.airlift.compress.v2.lz4.Lz4NativeCompressor; + +import java.lang.foreign.MemorySegment; +import java.nio.ByteBuffer; + +import static java.lang.Math.toIntExact; + +public class Lz4Compressor + implements Compressor +{ + private static final boolean NATIVE_ENABLED = Lz4NativeCompressor.isEnabled(); + private final io.airlift.compress.v2.lz4.Lz4Compressor compressor; + + public Lz4Compressor() + { + this.compressor = NATIVE_ENABLED ? new Lz4NativeCompressor() : new Lz4JavaCompressor(); + } + + @Override + public int maxCompressedLength(int uncompressedSize) + { + return compressor.maxCompressedLength(uncompressedSize); + } + + @Override + public int compress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset, int maxOutputLength) + { + MemorySegment inputSegment = MemorySegment.ofArray(input).asSlice(inputOffset, inputLength); + MemorySegment outputSegment = MemorySegment.ofArray(output).asSlice(outputOffset, maxOutputLength); + + return toIntExact(compressor.compress(inputSegment, outputSegment)); + } + + @Override + public void compress(ByteBuffer input, ByteBuffer output) + { + MemorySegment inputSegment = MemorySegment.ofBuffer(input); + MemorySegment outputSegment = MemorySegment.ofBuffer(output); + + int written = compressor.compress(inputSegment, outputSegment); + output.position(output.position() + written); + } +} diff --git a/src/main/java/io/airlift/compress/lz4/Lz4Decompressor.java b/src/main/java/io/airlift/compress/lz4/Lz4Decompressor.java new file mode 100644 index 00000000..96c75b64 --- /dev/null +++ b/src/main/java/io/airlift/compress/lz4/Lz4Decompressor.java @@ -0,0 +1,57 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.airlift.compress.lz4; + +import io.airlift.compress.Decompressor; +import io.airlift.compress.MalformedInputException; +import io.airlift.compress.v2.lz4.Lz4JavaDecompressor; +import io.airlift.compress.v2.lz4.Lz4NativeDecompressor; + +import java.lang.foreign.MemorySegment; +import java.nio.ByteBuffer; + +import static java.lang.Math.toIntExact; + +public class Lz4Decompressor + implements Decompressor +{ + private static final boolean NATIVE_ENABLED = Lz4NativeDecompressor.isEnabled(); + private final io.airlift.compress.v2.lz4.Lz4Decompressor decompressor; + + public Lz4Decompressor() + { + this.decompressor = NATIVE_ENABLED ? new Lz4NativeDecompressor() : new Lz4JavaDecompressor(); + } + + @Override + public int decompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset, int maxOutputLength) + throws MalformedInputException + { + MemorySegment inputSegment = MemorySegment.ofArray(input).asSlice(inputOffset, inputLength); + MemorySegment outputSegment = MemorySegment.ofArray(output).asSlice(outputOffset, maxOutputLength); + + return toIntExact(decompressor.decompress(inputSegment, outputSegment)); + } + + @Override + public void decompress(ByteBuffer input, ByteBuffer output) + throws MalformedInputException + { + MemorySegment inputSegment = MemorySegment.ofBuffer(input); + MemorySegment outputSegment = MemorySegment.ofBuffer(output); + + int written = decompressor.decompress(inputSegment, outputSegment); + output.position(output.position() + written); + } +} diff --git a/src/main/java/io/airlift/compress/lz4/Lz4RawCompressor.java b/src/main/java/io/airlift/compress/lz4/Lz4RawCompressor.java new file mode 100644 index 00000000..6d955743 --- /dev/null +++ b/src/main/java/io/airlift/compress/lz4/Lz4RawCompressor.java @@ -0,0 +1,28 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.airlift.compress.lz4; + +public class Lz4RawCompressor +{ + private static final int HASH_LOG = 12; + + public static final int MAX_TABLE_SIZE = (1 << HASH_LOG); + + private Lz4RawCompressor() {} + + public static int maxCompressedLength(int sourceLength) + { + return sourceLength + sourceLength / 255 + 16; + } +} diff --git a/src/main/java/io/airlift/compress/lzo/LzoCodec.java b/src/main/java/io/airlift/compress/lzo/LzoCodec.java new file mode 100644 index 00000000..d25b15ae --- /dev/null +++ b/src/main/java/io/airlift/compress/lzo/LzoCodec.java @@ -0,0 +1,19 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.airlift.compress.lzo; + +public class LzoCodec + extends io.airlift.compress.v2.lzo.LzoCodec +{ +} diff --git a/src/main/java/io/airlift/compress/lzo/LzoCompressor.java b/src/main/java/io/airlift/compress/lzo/LzoCompressor.java new file mode 100644 index 00000000..46281935 --- /dev/null +++ b/src/main/java/io/airlift/compress/lzo/LzoCompressor.java @@ -0,0 +1,57 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.airlift.compress.lzo; + +import io.airlift.compress.Compressor; + +import java.lang.foreign.MemorySegment; +import java.nio.ByteBuffer; + +import static java.lang.Math.toIntExact; + +public class LzoCompressor + implements Compressor +{ + private final io.airlift.compress.v2.lzo.LzoCompressor compressor; + + public LzoCompressor() + { + this.compressor = new io.airlift.compress.v2.lzo.LzoCompressor(); + } + + @Override + public int maxCompressedLength(int uncompressedSize) + { + return compressor.maxCompressedLength(uncompressedSize); + } + + @Override + public int compress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset, int maxOutputLength) + { + MemorySegment inputSegment = MemorySegment.ofArray(input).asSlice(inputOffset, inputLength); + MemorySegment outputSegment = MemorySegment.ofArray(output).asSlice(outputOffset, maxOutputLength); + + return toIntExact(compressor.compress(inputSegment, outputSegment)); + } + + @Override + public void compress(ByteBuffer input, ByteBuffer output) + { + MemorySegment inputSegment = MemorySegment.ofBuffer(input); + MemorySegment outputSegment = MemorySegment.ofBuffer(output); + + int written = compressor.compress(inputSegment, outputSegment); + output.position(output.position() + written); + } +} diff --git a/src/main/java/io/airlift/compress/lzo/LzoDecompressor.java b/src/main/java/io/airlift/compress/lzo/LzoDecompressor.java new file mode 100644 index 00000000..1ead9de7 --- /dev/null +++ b/src/main/java/io/airlift/compress/lzo/LzoDecompressor.java @@ -0,0 +1,54 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.airlift.compress.lzo; + +import io.airlift.compress.Decompressor; +import io.airlift.compress.MalformedInputException; + +import java.lang.foreign.MemorySegment; +import java.nio.ByteBuffer; + +import static java.lang.Math.toIntExact; + +public class LzoDecompressor + implements Decompressor +{ + private final io.airlift.compress.v2.lzo.LzoDecompressor decompressor; + + public LzoDecompressor() + { + this.decompressor = new io.airlift.compress.v2.lzo.LzoDecompressor(); + } + + @Override + public int decompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset, int maxOutputLength) + throws MalformedInputException + { + MemorySegment inputSegment = MemorySegment.ofArray(input).asSlice(inputOffset, inputLength); + MemorySegment outputSegment = MemorySegment.ofArray(output).asSlice(outputOffset, maxOutputLength); + + return toIntExact(decompressor.decompress(inputSegment, outputSegment)); + } + + @Override + public void decompress(ByteBuffer input, ByteBuffer output) + throws MalformedInputException + { + MemorySegment inputSegment = MemorySegment.ofBuffer(input); + MemorySegment outputSegment = MemorySegment.ofBuffer(output); + + int written = decompressor.decompress(inputSegment, outputSegment); + output.position(output.position() + written); + } +} diff --git a/src/main/java/io/airlift/compress/lzo/LzopCodec.java b/src/main/java/io/airlift/compress/lzo/LzopCodec.java new file mode 100644 index 00000000..2d74d40f --- /dev/null +++ b/src/main/java/io/airlift/compress/lzo/LzopCodec.java @@ -0,0 +1,19 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.airlift.compress.lzo; + +public class LzopCodec + extends io.airlift.compress.v2.lzo.LzopCodec +{ +} diff --git a/src/main/java/io/airlift/compress/snappy/SnappyCodec.java b/src/main/java/io/airlift/compress/snappy/SnappyCodec.java new file mode 100644 index 00000000..73095e50 --- /dev/null +++ b/src/main/java/io/airlift/compress/snappy/SnappyCodec.java @@ -0,0 +1,19 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.airlift.compress.snappy; + +public class SnappyCodec + extends io.airlift.compress.v2.snappy.SnappyCodec +{ +} diff --git a/src/main/java/io/airlift/compress/snappy/SnappyCompressor.java b/src/main/java/io/airlift/compress/snappy/SnappyCompressor.java new file mode 100644 index 00000000..7d09dbb1 --- /dev/null +++ b/src/main/java/io/airlift/compress/snappy/SnappyCompressor.java @@ -0,0 +1,60 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.airlift.compress.snappy; + +import io.airlift.compress.Compressor; +import io.airlift.compress.v2.snappy.SnappyJavaCompressor; +import io.airlift.compress.v2.snappy.SnappyNativeCompressor; + +import java.lang.foreign.MemorySegment; +import java.nio.ByteBuffer; + +import static java.lang.Math.toIntExact; + +public class SnappyCompressor + implements Compressor +{ + private static final boolean NATIVE_ENABLED = SnappyNativeCompressor.isEnabled(); + private final io.airlift.compress.v2.snappy.SnappyCompressor compressor; + + public SnappyCompressor() + { + this.compressor = NATIVE_ENABLED ? new SnappyNativeCompressor() : new SnappyJavaCompressor(); + } + + @Override + public int maxCompressedLength(int uncompressedSize) + { + return compressor.maxCompressedLength(uncompressedSize); + } + + @Override + public int compress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset, int maxOutputLength) + { + MemorySegment inputSegment = MemorySegment.ofArray(input).asSlice(inputOffset, inputLength); + MemorySegment outputSegment = MemorySegment.ofArray(output).asSlice(outputOffset, maxOutputLength); + + return toIntExact(compressor.compress(inputSegment, outputSegment)); + } + + @Override + public void compress(ByteBuffer input, ByteBuffer output) + { + MemorySegment inputSegment = MemorySegment.ofBuffer(input); + MemorySegment outputSegment = MemorySegment.ofBuffer(output); + + int written = compressor.compress(inputSegment, outputSegment); + output.position(output.position() + written); + } +} diff --git a/src/main/java/io/airlift/compress/snappy/SnappyDecompressor.java b/src/main/java/io/airlift/compress/snappy/SnappyDecompressor.java new file mode 100644 index 00000000..ecbd3e5d --- /dev/null +++ b/src/main/java/io/airlift/compress/snappy/SnappyDecompressor.java @@ -0,0 +1,64 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.airlift.compress.snappy; + +import io.airlift.compress.Decompressor; +import io.airlift.compress.MalformedInputException; +import io.airlift.compress.v2.snappy.SnappyJavaDecompressor; +import io.airlift.compress.v2.snappy.SnappyNativeDecompressor; + +import java.lang.foreign.MemorySegment; +import java.nio.ByteBuffer; + +import static java.lang.Math.toIntExact; + +public class SnappyDecompressor + implements Decompressor +{ + private static final boolean NATIVE_ENABLED = SnappyNativeDecompressor.isEnabled(); + private final io.airlift.compress.v2.snappy.SnappyDecompressor decompressor; + + public SnappyDecompressor() + { + this.decompressor = NATIVE_ENABLED ? new SnappyNativeDecompressor() : new SnappyJavaDecompressor(); + } + + @Override + public int decompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset, int maxOutputLength) + throws MalformedInputException + { + MemorySegment inputSegment = MemorySegment.ofArray(input).asSlice(inputOffset, inputLength); + MemorySegment outputSegment = MemorySegment.ofArray(output).asSlice(outputOffset, maxOutputLength); + return toIntExact(decompressor.decompress(inputSegment, outputSegment)); + } + + @Override + public void decompress(ByteBuffer input, ByteBuffer output) + throws MalformedInputException + { + MemorySegment inputSegment = MemorySegment.ofBuffer(input); + MemorySegment outputSegment = MemorySegment.ofBuffer(output); + + int written = decompressor.decompress(inputSegment, outputSegment); + output.position(output.position() + written); + } + + public static int getUncompressedLength(byte[] input, int offset) + { + if (NATIVE_ENABLED) { + return new SnappyNativeDecompressor().getUncompressedLength(input, offset); + } + return new SnappyJavaDecompressor().getUncompressedLength(input, offset); + } +} diff --git a/src/main/java/io/airlift/compress/v2/lz4/Lz4Compressor.java b/src/main/java/io/airlift/compress/v2/lz4/Lz4Compressor.java index 0b9f0f6d..fc8c41fa 100644 --- a/src/main/java/io/airlift/compress/v2/lz4/Lz4Compressor.java +++ b/src/main/java/io/airlift/compress/v2/lz4/Lz4Compressor.java @@ -22,4 +22,12 @@ public sealed interface Lz4Compressor permits Lz4JavaCompressor, Lz4NativeCompressor { int compress(MemorySegment input, MemorySegment output); + + static Lz4Compressor create() + { + if (Lz4NativeCompressor.isEnabled()) { + return new Lz4NativeCompressor(); + } + return new Lz4JavaCompressor(); + } } diff --git a/src/main/java/io/airlift/compress/v2/lz4/Lz4Decompressor.java b/src/main/java/io/airlift/compress/v2/lz4/Lz4Decompressor.java index 233d642f..0dcd1ebc 100644 --- a/src/main/java/io/airlift/compress/v2/lz4/Lz4Decompressor.java +++ b/src/main/java/io/airlift/compress/v2/lz4/Lz4Decompressor.java @@ -22,4 +22,12 @@ public sealed interface Lz4Decompressor permits Lz4JavaDecompressor, Lz4NativeDecompressor { int decompress(MemorySegment input, MemorySegment output); + + static Lz4Decompressor create() + { + if (Lz4NativeDecompressor.isEnabled()) { + return new Lz4NativeDecompressor(); + } + return new Lz4JavaDecompressor(); + } } diff --git a/src/main/java/io/airlift/compress/v2/snappy/SnappyCompressor.java b/src/main/java/io/airlift/compress/v2/snappy/SnappyCompressor.java index 18423953..8d544257 100644 --- a/src/main/java/io/airlift/compress/v2/snappy/SnappyCompressor.java +++ b/src/main/java/io/airlift/compress/v2/snappy/SnappyCompressor.java @@ -22,4 +22,12 @@ public sealed interface SnappyCompressor permits SnappyJavaCompressor, SnappyNativeCompressor { int compress(MemorySegment input, MemorySegment output); + + static SnappyCompressor create() + { + if (SnappyNativeCompressor.isEnabled()) { + return new SnappyNativeCompressor(); + } + return new SnappyJavaCompressor(); + } } diff --git a/src/main/java/io/airlift/compress/v2/snappy/SnappyDecompressor.java b/src/main/java/io/airlift/compress/v2/snappy/SnappyDecompressor.java index f9ed1e37..6a20ec94 100644 --- a/src/main/java/io/airlift/compress/v2/snappy/SnappyDecompressor.java +++ b/src/main/java/io/airlift/compress/v2/snappy/SnappyDecompressor.java @@ -20,4 +20,12 @@ public sealed interface SnappyDecompressor permits SnappyJavaDecompressor, SnappyNativeDecompressor { int getUncompressedLength(byte[] compressed, int compressedOffset); + + static SnappyDecompressor create() + { + if (SnappyNativeDecompressor.isEnabled()) { + return new SnappyNativeDecompressor(); + } + return new SnappyJavaDecompressor(); + } } diff --git a/src/main/java/io/airlift/compress/v2/zstd/ZstdCompressor.java b/src/main/java/io/airlift/compress/v2/zstd/ZstdCompressor.java index 8c2a725c..2cd8e18b 100644 --- a/src/main/java/io/airlift/compress/v2/zstd/ZstdCompressor.java +++ b/src/main/java/io/airlift/compress/v2/zstd/ZstdCompressor.java @@ -15,10 +15,15 @@ import io.airlift.compress.v2.Compressor; -import java.lang.foreign.MemorySegment; - -public interface ZstdCompressor +public sealed interface ZstdCompressor extends Compressor + permits ZstdJavaCompressor, ZstdNativeCompressor, ZstdStreamCompressor { - int compress(MemorySegment input, MemorySegment output); + static ZstdCompressor create() + { + if (ZstdNativeCompressor.isEnabled()) { + return new ZstdNativeCompressor(); + } + return new ZstdJavaCompressor(); + } } diff --git a/src/main/java/io/airlift/compress/v2/zstd/ZstdDecompressor.java b/src/main/java/io/airlift/compress/v2/zstd/ZstdDecompressor.java index 44a164ca..df3a0287 100644 --- a/src/main/java/io/airlift/compress/v2/zstd/ZstdDecompressor.java +++ b/src/main/java/io/airlift/compress/v2/zstd/ZstdDecompressor.java @@ -15,8 +15,17 @@ import io.airlift.compress.v2.Decompressor; -public interface ZstdDecompressor +public sealed interface ZstdDecompressor extends Decompressor + permits ZstdJavaDecompressor, ZstdNativeDecompressor, ZstdStreamDecompressor { long getDecompressedSize(byte[] input, int offset, int length); + + static ZstdDecompressor create() + { + if (ZstdNativeDecompressor.isEnabled()) { + return new ZstdNativeDecompressor(); + } + return new ZstdJavaDecompressor(); + } } diff --git a/src/main/java/io/airlift/compress/v2/zstd/ZstdJavaCompressor.java b/src/main/java/io/airlift/compress/v2/zstd/ZstdJavaCompressor.java index 6b5ae3a3..db92a0e5 100644 --- a/src/main/java/io/airlift/compress/v2/zstd/ZstdJavaCompressor.java +++ b/src/main/java/io/airlift/compress/v2/zstd/ZstdJavaCompressor.java @@ -24,7 +24,7 @@ import static java.util.Objects.requireNonNull; import static sun.misc.Unsafe.ARRAY_BYTE_BASE_OFFSET; -public class ZstdJavaCompressor +public final class ZstdJavaCompressor implements ZstdCompressor { @Override diff --git a/src/main/java/io/airlift/compress/v2/zstd/ZstdJavaDecompressor.java b/src/main/java/io/airlift/compress/v2/zstd/ZstdJavaDecompressor.java index 992e93a5..b2b83d06 100644 --- a/src/main/java/io/airlift/compress/v2/zstd/ZstdJavaDecompressor.java +++ b/src/main/java/io/airlift/compress/v2/zstd/ZstdJavaDecompressor.java @@ -25,7 +25,7 @@ import static java.util.Objects.requireNonNull; import static sun.misc.Unsafe.ARRAY_BYTE_BASE_OFFSET; -public class ZstdJavaDecompressor +public final class ZstdJavaDecompressor implements ZstdDecompressor { private final ZstdFrameDecompressor decompressor = new ZstdFrameDecompressor(); diff --git a/src/main/java/io/airlift/compress/v2/zstd/ZstdNativeCompressor.java b/src/main/java/io/airlift/compress/v2/zstd/ZstdNativeCompressor.java index 5bdf5e12..00c2dd17 100644 --- a/src/main/java/io/airlift/compress/v2/zstd/ZstdNativeCompressor.java +++ b/src/main/java/io/airlift/compress/v2/zstd/ZstdNativeCompressor.java @@ -18,7 +18,7 @@ import static io.airlift.compress.v2.zstd.ZstdNative.DEFAULT_COMPRESSION_LEVEL; import static java.lang.Math.toIntExact; -public class ZstdNativeCompressor +public final class ZstdNativeCompressor implements ZstdCompressor { private final int compressionLevel; diff --git a/src/main/java/io/airlift/compress/v2/zstd/ZstdNativeDecompressor.java b/src/main/java/io/airlift/compress/v2/zstd/ZstdNativeDecompressor.java index 7ebd14b0..3d7c1614 100644 --- a/src/main/java/io/airlift/compress/v2/zstd/ZstdNativeDecompressor.java +++ b/src/main/java/io/airlift/compress/v2/zstd/ZstdNativeDecompressor.java @@ -17,7 +17,7 @@ import static java.lang.Math.toIntExact; -public class ZstdNativeDecompressor +public final class ZstdNativeDecompressor implements ZstdDecompressor { public ZstdNativeDecompressor() diff --git a/src/test/java/io/airlift/compress/v2/zstd/ZstdStreamCompressor.java b/src/main/java/io/airlift/compress/v2/zstd/ZstdStreamCompressor.java similarity index 93% rename from src/test/java/io/airlift/compress/v2/zstd/ZstdStreamCompressor.java rename to src/main/java/io/airlift/compress/v2/zstd/ZstdStreamCompressor.java index 58abe524..cc09eb0b 100644 --- a/src/test/java/io/airlift/compress/v2/zstd/ZstdStreamCompressor.java +++ b/src/main/java/io/airlift/compress/v2/zstd/ZstdStreamCompressor.java @@ -18,12 +18,12 @@ import java.io.UncheckedIOException; import java.lang.foreign.MemorySegment; -import static com.google.common.primitives.Ints.constrainToRange; import static io.airlift.compress.v2.zstd.Constants.MAX_BLOCK_SIZE; +import static java.lang.Math.clamp; import static java.lang.String.format; import static java.util.Objects.requireNonNull; -public class ZstdStreamCompressor +public non-sealed class ZstdStreamCompressor implements ZstdCompressor { @Override @@ -49,7 +49,7 @@ public int compress(byte[] input, int inputOffset, int inputLength, byte[] outpu int writtenBytes = 0; while (writtenBytes < inputLength) { // limit write size to max block size, which exercises internal buffer growth and flushing logic - int writeSize = constrainToRange(inputLength - writtenBytes, 0, MAX_BLOCK_SIZE); + int writeSize = clamp(inputLength - writtenBytes, 0, MAX_BLOCK_SIZE); zstdOutputStream.write(input, inputOffset + writtenBytes, writeSize); writtenBytes += writeSize; } diff --git a/src/test/java/io/airlift/compress/v2/zstd/ZstdStreamDecompressor.java b/src/main/java/io/airlift/compress/v2/zstd/ZstdStreamDecompressor.java similarity index 98% rename from src/test/java/io/airlift/compress/v2/zstd/ZstdStreamDecompressor.java rename to src/main/java/io/airlift/compress/v2/zstd/ZstdStreamDecompressor.java index 800318af..87bca7b4 100644 --- a/src/test/java/io/airlift/compress/v2/zstd/ZstdStreamDecompressor.java +++ b/src/main/java/io/airlift/compress/v2/zstd/ZstdStreamDecompressor.java @@ -24,7 +24,7 @@ import static java.util.Objects.requireNonNull; import static sun.misc.Unsafe.ARRAY_BYTE_BASE_OFFSET; -public class ZstdStreamDecompressor +public final class ZstdStreamDecompressor implements ZstdDecompressor { @Override diff --git a/src/main/java/io/airlift/compress/zstd/ZstdCodec.java b/src/main/java/io/airlift/compress/zstd/ZstdCodec.java new file mode 100644 index 00000000..38a79e21 --- /dev/null +++ b/src/main/java/io/airlift/compress/zstd/ZstdCodec.java @@ -0,0 +1,19 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.airlift.compress.zstd; + +public class ZstdCodec + extends io.airlift.compress.v2.zstd.ZstdCodec +{ +} diff --git a/src/main/java/io/airlift/compress/zstd/ZstdCompressor.java b/src/main/java/io/airlift/compress/zstd/ZstdCompressor.java new file mode 100644 index 00000000..0806c940 --- /dev/null +++ b/src/main/java/io/airlift/compress/zstd/ZstdCompressor.java @@ -0,0 +1,60 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.airlift.compress.zstd; + +import io.airlift.compress.Compressor; +import io.airlift.compress.v2.zstd.ZstdJavaCompressor; +import io.airlift.compress.v2.zstd.ZstdNativeCompressor; + +import java.lang.foreign.MemorySegment; +import java.nio.ByteBuffer; + +import static java.lang.Math.toIntExact; + +public class ZstdCompressor + implements Compressor +{ + private static final boolean NATIVE_ENABLED = ZstdNativeCompressor.isEnabled(); + private final io.airlift.compress.v2.zstd.ZstdCompressor compressor; + + public ZstdCompressor() + { + this.compressor = NATIVE_ENABLED ? new ZstdNativeCompressor() : new ZstdJavaCompressor(); + } + + @Override + public int maxCompressedLength(int uncompressedSize) + { + return compressor.maxCompressedLength(uncompressedSize); + } + + @Override + public int compress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset, int maxOutputLength) + { + MemorySegment inputSegment = MemorySegment.ofArray(input).asSlice(inputOffset, inputLength); + MemorySegment outputSegment = MemorySegment.ofArray(output).asSlice(outputOffset, maxOutputLength); + + return toIntExact(compressor.compress(inputSegment, outputSegment)); + } + + @Override + public void compress(ByteBuffer input, ByteBuffer output) + { + MemorySegment inputSegment = MemorySegment.ofBuffer(input); + MemorySegment outputSegment = MemorySegment.ofBuffer(output); + + int written = compressor.compress(inputSegment, outputSegment); + output.position(output.position() + written); + } +} diff --git a/src/main/java/io/airlift/compress/zstd/ZstdDecompressor.java b/src/main/java/io/airlift/compress/zstd/ZstdDecompressor.java new file mode 100644 index 00000000..62dc0998 --- /dev/null +++ b/src/main/java/io/airlift/compress/zstd/ZstdDecompressor.java @@ -0,0 +1,65 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.airlift.compress.zstd; + +import io.airlift.compress.Decompressor; +import io.airlift.compress.MalformedInputException; +import io.airlift.compress.v2.zstd.ZstdJavaDecompressor; +import io.airlift.compress.v2.zstd.ZstdNativeDecompressor; + +import java.lang.foreign.MemorySegment; +import java.nio.ByteBuffer; + +import static java.lang.Math.toIntExact; + +public class ZstdDecompressor + implements Decompressor +{ + private static final boolean NATIVE_ENABLED = ZstdNativeDecompressor.isEnabled(); + private final io.airlift.compress.v2.zstd.ZstdDecompressor decompressor; + + public ZstdDecompressor() + { + this.decompressor = NATIVE_ENABLED ? new ZstdNativeDecompressor() : new ZstdJavaDecompressor(); + } + + @Override + public int decompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset, int maxOutputLength) + throws MalformedInputException + { + MemorySegment inputSegment = MemorySegment.ofArray(input).asSlice(inputOffset, inputLength); + MemorySegment outputSegment = MemorySegment.ofArray(output).asSlice(outputOffset, maxOutputLength); + + return toIntExact(decompressor.decompress(inputSegment, outputSegment)); + } + + @Override + public void decompress(ByteBuffer input, ByteBuffer output) + throws MalformedInputException + { + MemorySegment inputSegment = MemorySegment.ofBuffer(input); + MemorySegment outputSegment = MemorySegment.ofBuffer(output); + + int written = decompressor.decompress(inputSegment, outputSegment); + output.position(output.position() + written); + } + + public static long getDecompressedSize(byte[] input, int offset, int length) + { + if (NATIVE_ENABLED) { + return new ZstdNativeDecompressor().getDecompressedSize(input, offset, length); + } + return new ZstdJavaDecompressor().getDecompressedSize(input, offset, length); + } +} diff --git a/src/main/java/io/airlift/compress/zstd/ZstdInputStream.java b/src/main/java/io/airlift/compress/zstd/ZstdInputStream.java new file mode 100644 index 00000000..8bd08cbc --- /dev/null +++ b/src/main/java/io/airlift/compress/zstd/ZstdInputStream.java @@ -0,0 +1,25 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.airlift.compress.zstd; + +import java.io.InputStream; + +public class ZstdInputStream + extends io.airlift.compress.v2.zstd.ZstdInputStream +{ + public ZstdInputStream(InputStream inputStream) + { + super(inputStream); + } +} diff --git a/src/main/java/io/airlift/compress/zstd/ZstdOutputStream.java b/src/main/java/io/airlift/compress/zstd/ZstdOutputStream.java new file mode 100644 index 00000000..e7d6ffba --- /dev/null +++ b/src/main/java/io/airlift/compress/zstd/ZstdOutputStream.java @@ -0,0 +1,27 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.airlift.compress.zstd; + +import java.io.IOException; +import java.io.OutputStream; + +public class ZstdOutputStream + extends io.airlift.compress.v2.zstd.ZstdOutputStream +{ + public ZstdOutputStream(OutputStream outputStream) + throws IOException + { + super(outputStream); + } +} diff --git a/src/test/java/io/airlift/compress/v2/zstd/TestZstdPartial.java b/src/test/java/io/airlift/compress/v2/zstd/TestZstdPartial.java deleted file mode 100644 index 6e965c3c..00000000 --- a/src/test/java/io/airlift/compress/v2/zstd/TestZstdPartial.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.airlift.compress.v2.zstd; - -import com.google.common.io.Resources; -import io.airlift.compress.v2.Compressor; -import io.airlift.compress.v2.Decompressor; -import io.airlift.compress.v2.MalformedInputException; - -import java.io.IOException; - -import static org.assertj.core.api.Assertions.assertThatThrownBy; - -class TestZstdPartial - extends AbstractTestZstd -{ - @Override - protected boolean isMemorySegmentSupported() - { - return false; - } - - @Override - protected ZstdCompressor getCompressor() - { - return new ZstdJavaCompressor(); - } - - @Override - protected ZstdDecompressor getDecompressor() - { - return new ZstdPartialDecompressor(); - } - - @Override - protected Compressor getVerifyCompressor() - { - return new ZstdJavaCompressor(); - } - - @Override - protected Decompressor getVerifyDecompressor() - { - return new ZstdJavaDecompressor(); - } - - @Override - public void testInvalidSequenceOffset() - throws IOException - { - byte[] compressed = Resources.toByteArray(Resources.getResource("data/zstd/offset-before-start.zst")); - byte[] output = new byte[compressed.length * 10]; - - assertThatThrownBy(() -> getDecompressor().decompress(compressed, 0, compressed.length, output, 0, output.length)) - .isInstanceOf(MalformedInputException.class) - .hasMessageStartingWith("Input is corrupted: offset="); - } -} diff --git a/src/test/java/io/airlift/compress/v2/zstd/ZstdPartialDecompressor.java b/src/test/java/io/airlift/compress/v2/zstd/ZstdPartialDecompressor.java deleted file mode 100644 index 1784d607..00000000 --- a/src/test/java/io/airlift/compress/v2/zstd/ZstdPartialDecompressor.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.airlift.compress.v2.zstd; - -import io.airlift.compress.v2.MalformedInputException; - -import java.lang.foreign.MemorySegment; - -import static java.lang.String.format; -import static java.util.Arrays.copyOfRange; -import static java.util.Objects.requireNonNull; -import static org.assertj.core.api.Assertions.assertThat; -import static sun.misc.Unsafe.ARRAY_BYTE_BASE_OFFSET; - -public class ZstdPartialDecompressor - implements ZstdDecompressor -{ - private final ZstdIncrementalFrameDecompressor decompressor = new ZstdIncrementalFrameDecompressor(); - - @Override - public int decompress(final byte[] input, final int inputOffset, final int inputLength, final byte[] output, final int outputOffset, final int maxOutputLength) - throws MalformedInputException - { - verifyRange(input, inputOffset, inputLength); - verifyRange(output, outputOffset, maxOutputLength); - - assertThat(decompressor.getInputRequired()).isEqualTo(0); - assertThat(decompressor.getRequestedOutputSize()).isEqualTo(0); - assertThat(decompressor.getInputConsumed()).isEqualTo(0); - assertThat(decompressor.getOutputBufferUsed()).isEqualTo(0); - - int inputPosition = inputOffset; - final int inputLimit = inputOffset + inputLength; - int outputPosition = outputOffset; - final int outputLimit = outputOffset + maxOutputLength; - - while (inputPosition < inputLimit || decompressor.getRequestedOutputSize() > 0) { - if (decompressor.getInputRequired() > inputLimit - inputPosition) { - // the non-partial tests verify the exact exception type, so just throw that exception - throw new MalformedInputException(inputPosition, "Not enough input bytes"); - } - if (outputPosition + decompressor.getRequestedOutputSize() > outputLimit) { - throw new IllegalArgumentException("Output buffer too small"); - } - - // for testing, we always send the minimum number of requested bytes - byte[] inputChunk = copyOfRange(input, inputPosition, inputPosition + decompressor.getInputRequired()); - - // for testing, we use two reads for larger buffers - byte[] outputBuffer = new byte[0]; - if (decompressor.getRequestedOutputSize() > 0) { - outputBuffer = new byte[decompressor.getRequestedOutputSize() > 500 ? decompressor.getRequestedOutputSize() - 457 : decompressor.getRequestedOutputSize()]; - } - - decompressor.partialDecompress( - inputChunk, - ARRAY_BYTE_BASE_OFFSET, - inputChunk.length + ARRAY_BYTE_BASE_OFFSET, - outputBuffer, - 0, - outputBuffer.length); - - // copy output chunk to output - int outputBufferUsed = decompressor.getOutputBufferUsed(); - if (outputBufferUsed > 0) { - assertThat(outputPosition + outputBufferUsed <= outputLimit).isTrue(); - System.arraycopy(outputBuffer, 0, output, outputPosition, outputBufferUsed); - outputPosition += outputBufferUsed; - } - - assertThat(decompressor.getInputConsumed() <= inputChunk.length).isTrue(); - inputPosition += decompressor.getInputConsumed(); - } - return outputPosition - outputOffset; - } - - @Override - public int decompress(MemorySegment input, MemorySegment output) - throws MalformedInputException - { - throw new UnsupportedOperationException("not yet implemented"); - } - - @Override - public long getDecompressedSize(byte[] input, int offset, int length) - { - int baseAddress = ARRAY_BYTE_BASE_OFFSET + offset; - return ZstdFrameDecompressor.getDecompressedSize(input, baseAddress, baseAddress + length); - } - - private static void verifyRange(byte[] data, int offset, int length) - { - requireNonNull(data, "data is null"); - if (offset < 0 || length < 0 || offset + length > data.length) { - throw new IllegalArgumentException(format("Invalid offset or length (%s, %s) in array of length %s", offset, length, data.length)); - } - } -}