diff --git a/ParquetHadoop/build.gradle b/ParquetHadoop/build.gradle index 5a5f890ef9d..48461b1a974 100644 --- a/ParquetHadoop/build.gradle +++ b/ParquetHadoop/build.gradle @@ -17,6 +17,7 @@ tasks.withType(License) { } dependencies { + // TODO(deephaven-core#3148): LZ4_RAW parquet support api('org.apache.parquet:parquet-hadoop:1.12.3') api('org.apache.hadoop:hadoop-common:3.3.3') { diff --git a/ParquetHadoop/src/main/java/io/deephaven/parquet/compress/codec/ZstdCodec.java b/ParquetHadoop/src/main/java/io/deephaven/parquet/compress/codec/ZstdCodec.java deleted file mode 100644 index 0a549e5230e..00000000000 --- a/ParquetHadoop/src/main/java/io/deephaven/parquet/compress/codec/ZstdCodec.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package io.deephaven.parquet.compress.codec; - -import com.github.luben.zstd.BufferPool; -import com.github.luben.zstd.NoPool; -import com.github.luben.zstd.RecyclingBufferPool; -import io.deephaven.parquet.compress.codec.zstd.ZstdCompressorStream; -import io.deephaven.parquet.compress.codec.zstd.ZstdDecompressorStream; -import org.apache.hadoop.io.compress.CompressionInputStream; -import org.apache.hadoop.io.compress.CompressionOutputStream; -import org.apache.parquet.hadoop.codec.ZstandardCodec; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; - -/** - * Provides an alternative codec name of "ZSTD" instead of the superclass's "ZSTANDARD". These streams are also modified - * to use the "no finalizer" variant of the underlying streams, so that GC picking up the streams doesn't close the - * underlying file. - */ -public class ZstdCodec extends ZstandardCodec { - @Override - public CompressionInputStream createInputStream(InputStream stream) throws IOException { - BufferPool pool; - if (getConf().getBoolean(PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED, - DEFAULT_PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED)) { - pool = RecyclingBufferPool.INSTANCE; - } else { - pool = NoPool.INSTANCE; - } - return new ZstdDecompressorStream(stream, pool); - } - - @Override - public CompressionOutputStream createOutputStream(OutputStream stream) throws IOException { - BufferPool pool; - if (getConf().getBoolean(PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED, - DEFAULT_PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED)) { - pool = RecyclingBufferPool.INSTANCE; - } else { - pool = NoPool.INSTANCE; - } - return new ZstdCompressorStream(stream, pool, - getConf().getInt(PARQUET_COMPRESS_ZSTD_LEVEL, DEFAULT_PARQUET_COMPRESS_ZSTD_LEVEL), - getConf().getInt(PARQUET_COMPRESS_ZSTD_WORKERS, DEFAULTPARQUET_COMPRESS_ZSTD_WORKERS)); - } -} diff --git a/ParquetHadoop/src/main/java/io/deephaven/parquet/compress/codec/zstd/ZstdCompressorStream.java b/ParquetHadoop/src/main/java/io/deephaven/parquet/compress/codec/zstd/ZstdCompressorStream.java deleted file mode 100644 index 3eaf96c52ed..00000000000 --- a/ParquetHadoop/src/main/java/io/deephaven/parquet/compress/codec/zstd/ZstdCompressorStream.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package io.deephaven.parquet.compress.codec.zstd; - -import com.github.luben.zstd.BufferPool; -import com.github.luben.zstd.ZstdOutputStreamNoFinalizer; -import org.apache.hadoop.io.compress.CompressionOutputStream; - -import java.io.IOException; -import java.io.OutputStream; - -/** - * Modified version of {@link org.apache.parquet.hadoop.codec.ZstdCompressorStream} but with the no-finalizer - * version of the output stream to avoid closing the underlying stream when GC runs. - */ -public class ZstdCompressorStream extends CompressionOutputStream { - - private final ZstdOutputStreamNoFinalizer zstdOutputStream; - - public ZstdCompressorStream(OutputStream stream, int level, int workers) throws IOException { - super(stream); - zstdOutputStream = new ZstdOutputStreamNoFinalizer(stream, level); - zstdOutputStream.setWorkers(workers); - } - - public ZstdCompressorStream(OutputStream stream, BufferPool pool, int level, int workers) throws IOException { - super(stream); - zstdOutputStream = new ZstdOutputStreamNoFinalizer(stream, pool); - zstdOutputStream.setLevel(level); - zstdOutputStream.setWorkers(workers); - } - - public void write(byte[] b, int off, int len) throws IOException { - zstdOutputStream.write(b, off, len); - } - - public void write(int b) throws IOException { - zstdOutputStream.write(b); - } - - public void finish() throws IOException { - // no-op, doesn't apply to ZSTD - } - - public void resetState() throws IOException { - // no-op, doesn't apply to ZSTD - } - - @Override - public void flush() throws IOException { - zstdOutputStream.flush(); - } - - @Override - public void close() throws IOException { - zstdOutputStream.close(); - } -} diff --git a/ParquetHadoop/src/main/java/io/deephaven/parquet/compress/codec/zstd/ZstdDecompressorStream.java b/ParquetHadoop/src/main/java/io/deephaven/parquet/compress/codec/zstd/ZstdDecompressorStream.java deleted file mode 100644 index 6a054f49760..00000000000 --- a/ParquetHadoop/src/main/java/io/deephaven/parquet/compress/codec/zstd/ZstdDecompressorStream.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package io.deephaven.parquet.compress.codec.zstd; - -import com.github.luben.zstd.BufferPool; -import com.github.luben.zstd.ZstdInputStreamNoFinalizer; -import org.apache.hadoop.io.compress.CompressionInputStream; - -import java.io.IOException; -import java.io.InputStream; - -/** - * Modified version of {@link org.apache.parquet.hadoop.codec.ZstdDecompressorStream} but with the no-finalizer - * version of the input stream to avoid closing the underlying stream when GC runs. - */ -public class ZstdDecompressorStream extends CompressionInputStream { - - private final ZstdInputStreamNoFinalizer zstdInputStream; - - public ZstdDecompressorStream(InputStream stream) throws IOException { - super(stream); - zstdInputStream = new ZstdInputStreamNoFinalizer(stream); - } - - public ZstdDecompressorStream(InputStream stream, BufferPool pool) throws IOException { - super(stream); - zstdInputStream = new ZstdInputStreamNoFinalizer(stream, pool); - } - - public int read(byte[] b, int off, int len) throws IOException { - return zstdInputStream.read(b, off, len); - } - - public int read() throws IOException { - return zstdInputStream.read(); - } - - public void resetState() throws IOException { - // no-op, doesn't apply to ZSTD - } - - @Override - public void close() throws IOException { - try { - zstdInputStream.close(); - } finally { - super.close(); - } - } -} diff --git a/extensions/parquet/compression/build.gradle b/extensions/parquet/compression/build.gradle index e4e9cbe9ce0..416f9e95591 100644 --- a/extensions/parquet/compression/build.gradle +++ b/extensions/parquet/compression/build.gradle @@ -18,7 +18,7 @@ dependencies { because 'hadoop-common required dependency for LZ4Codec' } // Pick up default jvm-compatible compression codecs - implementation('io.airlift:aircompressor:0.21') { + implementation('io.airlift:aircompressor:0.24') { because 'Provides Lz4, LZO, Zstd compression support for parquet' } diff --git a/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/DeephavenCompressorAdapterFactory.java b/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/DeephavenCompressorAdapterFactory.java index c1439e7cd00..5b506d5f1fc 100644 --- a/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/DeephavenCompressorAdapterFactory.java +++ b/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/DeephavenCompressorAdapterFactory.java @@ -4,19 +4,33 @@ package io.deephaven.parquet.compress; import com.google.common.io.ByteStreams; +import io.airlift.compress.gzip.JdkGzipCodec; +import io.airlift.compress.lz4.Lz4Codec; +import io.airlift.compress.lzo.LzoCodec; +import io.airlift.compress.zstd.ZstdCodec; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.compress.*; +import org.apache.hadoop.io.compress.CodecPool; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.io.compress.CompressionInputStream; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.hadoop.io.compress.Decompressor; import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.hadoop.codec.SnappyCodec; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.Stream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; + /** * Deephaven flavor of the Hadoop/Parquet CompressionCodec factory, offering support for picking codecs from @@ -24,51 +38,61 @@ * CompressionCodecName enum value having loaded the codec in this way. */ public class DeephavenCompressorAdapterFactory { - - // Default codecs to list in the configuration rather than rely on the classloader - private static final Set DEFAULT_CODECS = Set.of( - // Manually specify the "parquet" codec rather than the ServiceLoader-selected snappy codec, which is - // apparently incompatible with other parquet files which use snappy. This codec does use platform-specific - // implementations, but has native implementations for the platforms we support today. - "org.apache.parquet.hadoop.codec.SnappyCodec"); - private static final List> CODECS = io.deephaven.configuration.Configuration.getInstance() - .getStringSetFromPropertyWithDefault("DeephavenCodecFactory.codecs", DEFAULT_CODECS).stream() - .map((String className) -> { - try { - return Class.forName(className); - } catch (ClassNotFoundException e) { - throw new IllegalStateException("Can't find codec with name " + className); - } - }).collect(Collectors.toList()); - private static volatile DeephavenCompressorAdapterFactory INSTANCE; - public static synchronized void setInstance(DeephavenCompressorAdapterFactory factory) { - if (INSTANCE != null) { - throw new IllegalStateException("Can't assign an instance when one is already set"); - } - INSTANCE = factory; - } - public static DeephavenCompressorAdapterFactory getInstance() { if (INSTANCE == null) { synchronized (DeephavenCompressorAdapterFactory.class) { if (INSTANCE == null) { - INSTANCE = new DeephavenCompressorAdapterFactory(CODECS); + INSTANCE = createInstance(); } } } return INSTANCE; } - public static class CodecWrappingCompressorAdapter implements CompressorAdapter { + private static DeephavenCompressorAdapterFactory createInstance() { + // It's important that we create an explicit hadoop configuration for these so they take precedence; they will + // come last when added to the map, so will overwrite other codecs that match the same name / extension. + // See org.apache.hadoop.io.compress.CompressionCodecFactory#addCodec. + final Map, CompressionCodecName> explicitConfig = Map.of( + // Manually specify the "parquet" codec rather than the ServiceLoader-selected snappy codec, + // which is apparently incompatible with other parquet files which use snappy. This codec + // does use platform-specific implementations, but has native implementations for the + // platforms we support today. + SnappyCodec.class, CompressionCodecName.SNAPPY, + // The rest of these are aircompressor codecs which have fast / pure java implementations + JdkGzipCodec.class, CompressionCodecName.GZIP, + LzoCodec.class, CompressionCodecName.LZO, + Lz4Codec.class, CompressionCodecName.LZ4, + ZstdCodec.class, CompressionCodecName.ZSTD); + final Configuration conf = configurationWithCodecClasses(explicitConfig.keySet()); + final CompressionCodecFactory factory = new CompressionCodecFactory(conf); + final Map codecToNames = + new HashMap<>(CompressionCodecName.values().length + explicitConfig.size()); + for (CompressionCodecName value : CompressionCodecName.values()) { + final String name = value.getHadoopCompressionCodecClassName(); + if (name != null) { + codecToNames.put(name, value); + } + } + for (Entry, CompressionCodecName> e : explicitConfig.entrySet()) { + codecToNames.put(e.getKey().getName(), e.getValue()); + } + return new DeephavenCompressorAdapterFactory(factory, Collections.unmodifiableMap(codecToNames)); + } + + private static class CodecWrappingCompressorAdapter implements CompressorAdapter { private final CompressionCodec compressionCodec; + private final CompressionCodecName compressionCodecName; private boolean innerCompressorPooled; private Compressor innerCompressor; - private CodecWrappingCompressorAdapter(CompressionCodec compressionCodec) { - this.compressionCodec = compressionCodec; + private CodecWrappingCompressorAdapter(CompressionCodec compressionCodec, + CompressionCodecName compressionCodecName) { + this.compressionCodec = Objects.requireNonNull(compressionCodec); + this.compressionCodecName = Objects.requireNonNull(compressionCodecName); } @Override @@ -93,10 +117,7 @@ public OutputStream compress(OutputStream os) throws IOException { @Override public CompressionCodecName getCodecName() { - return Stream.of(CompressionCodecName.values()) - .filter(codec -> compressionCodec.getDefaultExtension().equals(codec.getExtension())) - .findAny() - .get(); + return compressionCodecName; } @Override @@ -140,21 +161,20 @@ public void close() { } } - private static Configuration configurationWithCodecClasses(List> codecClasses) { + private static Configuration configurationWithCodecClasses( + Collection> codecClasses) { Configuration conf = new Configuration(); - // noinspection unchecked, rawtypes - CompressionCodecFactory.setCodecClasses(conf, (List) codecClasses); + CompressionCodecFactory.setCodecClasses(conf, new ArrayList<>(codecClasses)); return conf; } private final CompressionCodecFactory compressionCodecFactory; + private final Map codecClassnameToCodecName; - public DeephavenCompressorAdapterFactory(List> codecClasses) { - this(configurationWithCodecClasses(codecClasses)); - } - - public DeephavenCompressorAdapterFactory(Configuration configuration) { - compressionCodecFactory = new CompressionCodecFactory(configuration); + private DeephavenCompressorAdapterFactory(CompressionCodecFactory compressionCodecFactory, + Map codecClassnameToCodecName) { + this.compressionCodecFactory = Objects.requireNonNull(compressionCodecFactory); + this.codecClassnameToCodecName = Objects.requireNonNull(codecClassnameToCodecName); } /** @@ -167,11 +187,17 @@ public CompressorAdapter getByName(String codecName) { if (codecName.equalsIgnoreCase("UNCOMPRESSED")) { return CompressorAdapter.PASSTHRU; } - - CompressionCodec codec = compressionCodecFactory.getCodecByName(codecName); + final CompressionCodec codec = compressionCodecFactory.getCodecByName(codecName); if (codec == null) { - throw new IllegalArgumentException("Failed to find a compression codec with name " + codecName); + throw new IllegalArgumentException( + String.format("Failed to find CompressionCodec for codecName=%s", codecName)); + } + final CompressionCodecName ccn = codecClassnameToCodecName.get(codec.getClass().getName()); + if (ccn == null) { + throw new IllegalArgumentException(String.format( + "Failed to find CompressionCodecName for codecName=%s, codec=%s, codec.getDefaultExtension()=%s", + codecName, codec, codec.getDefaultExtension())); } - return new CodecWrappingCompressorAdapter(codec); + return new CodecWrappingCompressorAdapter(codec, ccn); } } diff --git a/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/codec/LzoCodec.java b/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/codec/LzoCodec.java deleted file mode 100644 index 1db729d88e6..00000000000 --- a/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/codec/LzoCodec.java +++ /dev/null @@ -1,15 +0,0 @@ -/** - * Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending - */ -package io.deephaven.parquet.compress.codec; - -/** - * Provides an alternative file extension of ".lzo", while the subclass offers ".lzo_deflate". This is necessary to - * fully replace functionality of the non-ServiceLoader compression codec factory. - */ -public class LzoCodec extends io.airlift.compress.lzo.LzoCodec { - @Override - public String getDefaultExtension() { - return ".lzo"; - } -} diff --git a/extensions/parquet/compression/src/main/resources/META-INF/services/org.apache.hadoop.io.compress.CompressionCodec b/extensions/parquet/compression/src/main/resources/META-INF/services/org.apache.hadoop.io.compress.CompressionCodec deleted file mode 100644 index e981ac1d72c..00000000000 --- a/extensions/parquet/compression/src/main/resources/META-INF/services/org.apache.hadoop.io.compress.CompressionCodec +++ /dev/null @@ -1,4 +0,0 @@ -# Provides codec support that Deephaven will use out of the box, but isn't specified upstream -io.airlift.compress.lzo.LzoCodec -io.deephaven.parquet.compress.codec.ZstdCodec -io.deephaven.parquet.compress.codec.LzoCodec diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetSchemaReader.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetSchemaReader.java index d88b716ab57..bf8518b64a2 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetSchemaReader.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetSchemaReader.java @@ -352,6 +352,8 @@ public Optional> visit(final LogicalTypeAnnotation.TimeLogicalTypeAnnot @Override public Optional> visit( final LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) { + // TODO(deephaven-core#3588): Unable to read parquet TimestampLogicalTypeAnnotation that is not adjusted + // to UTC if (timestampLogicalType.isAdjustedToUTC()) { switch (timestampLogicalType.getUnit()) { case MILLIS: diff --git a/extensions/parquet/table/src/test/e0.py b/extensions/parquet/table/src/test/e0.py new file mode 100644 index 00000000000..09416337baa --- /dev/null +++ b/extensions/parquet/table/src/test/e0.py @@ -0,0 +1,23 @@ +import pandas as pd +import numpy as np + +df = pd.DataFrame( + { + "a": list("abc"), + "b": list(range(1, 4)), + "c": np.arange(3, 6).astype("u1"), + "d": np.arange(4.0, 7.0, dtype="float64"), + "e": [True, False, True], + "f": pd.date_range("20130101", periods=3), + "g": pd.date_range("20130101", periods=3, tz="US/Eastern"), + "h": pd.Categorical(list("abc")), + "i": pd.Categorical(list("abc"), ordered=True), + } +) + +df.to_parquet("resources/e0/uncompressed.parquet", compression=None) +df.to_parquet("resources/e0/brotli.parquet", compression="brotli") +df.to_parquet("resources/e0/gzip.parquet", compression="gzip") +df.to_parquet("resources/e0/lz4.parquet", compression="lz4") +df.to_parquet("resources/e0/snappy.parquet", compression="snappy") +df.to_parquet("resources/e0/zstd.parquet", compression="zstd") diff --git a/extensions/parquet/table/src/test/e0.requirements.txt b/extensions/parquet/table/src/test/e0.requirements.txt new file mode 100644 index 00000000000..ba37038bfb6 --- /dev/null +++ b/extensions/parquet/table/src/test/e0.requirements.txt @@ -0,0 +1,6 @@ +numpy==1.24.2 +pandas==1.5.3 +pyarrow==4.0.1 +python-dateutil==2.8.2 +pytz==2022.7.1 +six==1.16.0 diff --git a/extensions/parquet/table/src/test/e1.py b/extensions/parquet/table/src/test/e1.py new file mode 100644 index 00000000000..408c327f3a8 --- /dev/null +++ b/extensions/parquet/table/src/test/e1.py @@ -0,0 +1,23 @@ +import pandas as pd +import numpy as np + +df = pd.DataFrame( + { + "a": list("abc"), + "b": list(range(1, 4)), + "c": np.arange(3, 6).astype("u1"), + "d": np.arange(4.0, 7.0, dtype="float64"), + "e": [True, False, True], + "f": pd.date_range("20130101", periods=3), + "g": pd.date_range("20130101", periods=3, tz="US/Eastern"), + "h": pd.Categorical(list("abc")), + "i": pd.Categorical(list("abc"), ordered=True), + } +) + +df.to_parquet("resources/e1/uncompressed.parquet", compression=None) +df.to_parquet("resources/e1/brotli.parquet", compression="brotli") +df.to_parquet("resources/e1/gzip.parquet", compression="gzip") +df.to_parquet("resources/e1/lz4.parquet", compression="lz4") +df.to_parquet("resources/e1/snappy.parquet", compression="snappy") +df.to_parquet("resources/e1/zstd.parquet", compression="zstd") diff --git a/extensions/parquet/table/src/test/e1.requirements.txt b/extensions/parquet/table/src/test/e1.requirements.txt new file mode 100644 index 00000000000..b5f6f59c296 --- /dev/null +++ b/extensions/parquet/table/src/test/e1.requirements.txt @@ -0,0 +1,6 @@ +numpy==1.24.2 +pandas==1.5.3 +pyarrow==11.0.0 +python-dateutil==2.8.2 +pytz==2022.7.1 +six==1.16.0 diff --git a/extensions/parquet/table/src/test/e2.py b/extensions/parquet/table/src/test/e2.py new file mode 100644 index 00000000000..446fb28519a --- /dev/null +++ b/extensions/parquet/table/src/test/e2.py @@ -0,0 +1,23 @@ +import pandas as pd +import numpy as np + +df = pd.DataFrame( + { + "a": list("abc"), + "b": list(range(1, 4)), + "c": np.arange(3, 6).astype("u1"), + "d": np.arange(4.0, 7.0, dtype="float64"), + "e": [True, False, True], + "f": pd.date_range("20130101", periods=3), + "g": pd.date_range("20130101", periods=3, tz="US/Eastern"), + "h": pd.Categorical(list("abc")), + "i": pd.Categorical(list("abc"), ordered=True), + } +) + +df.to_parquet("resources/e2/uncompressed.parquet", compression=None) +df.to_parquet("resources/e2/brotli.parquet", compression="brotli") +df.to_parquet("resources/e2/gzip.parquet", compression="gzip") +df.to_parquet("resources/e2/lz4.parquet", compression="lz4") +df.to_parquet("resources/e2/snappy.parquet", compression="snappy") +df.to_parquet("resources/e2/zstd.parquet", compression="zstd") diff --git a/extensions/parquet/table/src/test/e2.requirements.txt b/extensions/parquet/table/src/test/e2.requirements.txt new file mode 100644 index 00000000000..b1ccb61d6c7 --- /dev/null +++ b/extensions/parquet/table/src/test/e2.requirements.txt @@ -0,0 +1,9 @@ +cramjam==2.6.2 +fastparquet==2023.2.0 +fsspec==2023.3.0 +numpy==1.24.2 +packaging==23.0 +pandas==1.5.3 +python-dateutil==2.8.2 +pytz==2022.7.1 +six==1.16.0 diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetTools.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetTools.java index b8bc9d10a81..5dc25d65f0e 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetTools.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetTools.java @@ -434,6 +434,62 @@ public void testMultipleRenamesWithSameOuterName() { t -> t.updateView("Y = Z", "Y = X").where("Y % 2 == 0")); } + @Test + public void e0() { + final Table uncompressed = + ParquetTools.readTable(TestParquetTools.class.getResource("/e0/uncompressed.parquet").getFile()); + + final Table gzip = ParquetTools.readTable(TestParquetTools.class.getResource("/e0/gzip.parquet").getFile()); + assertTableEquals(uncompressed, gzip); + + final Table lz4 = ParquetTools.readTable(TestParquetTools.class.getResource("/e0/lz4.parquet").getFile()); + assertTableEquals(uncompressed, lz4); + + final Table snappy = ParquetTools.readTable(TestParquetTools.class.getResource("/e0/snappy.parquet").getFile()); + assertTableEquals(uncompressed, snappy); + + final Table zstd = ParquetTools.readTable(TestParquetTools.class.getResource("/e0/zstd.parquet").getFile()); + assertTableEquals(uncompressed, zstd); + } + + @Test + public void e1() { + final Table uncompressed = + ParquetTools.readTable(TestParquetTools.class.getResource("/e1/uncompressed.parquet").getFile()); + + final Table gzip = ParquetTools.readTable(TestParquetTools.class.getResource("/e1/gzip.parquet").getFile()); + assertTableEquals(uncompressed, gzip); + + // TODO(deephaven-core#3585): LZ4_RAW parquet support + // final Table lz4 = ParquetTools.readTable(TestParquetTools.class.getResource("/e1/lz4.parquet").getFile()); + // assertTableEquals(uncompressed, lz4); + + final Table snappy = ParquetTools.readTable(TestParquetTools.class.getResource("/e1/snappy.parquet").getFile()); + assertTableEquals(uncompressed, snappy); + + final Table zstd = ParquetTools.readTable(TestParquetTools.class.getResource("/e1/zstd.parquet").getFile()); + assertTableEquals(uncompressed, zstd); + } + + // TODO(deephaven-core#3588): Unable to read parquet TimestampLogicalTypeAnnotation that is not adjusted to UTC + // @Test + // public void e2() { + // final Table uncompressed = + // ParquetTools.readTable(TestParquetTools.class.getResource("/e2/uncompressed.parquet").getFile()); + // + // final Table gzip = ParquetTools.readTable(TestParquetTools.class.getResource("/e2/gzip.parquet").getFile()); + // assertTableEquals(uncompressed, gzip); + // + // final Table lz4 = ParquetTools.readTable(TestParquetTools.class.getResource("/e2/lz4.parquet").getFile()); + // assertTableEquals(uncompressed, lz4); + // + // final Table snappy = ParquetTools.readTable(TestParquetTools.class.getResource("/e2/snappy.parquet").getFile()); + // assertTableEquals(uncompressed, snappy); + // + // final Table zstd = ParquetTools.readTable(TestParquetTools.class.getResource("/e2/zstd.parquet").getFile()); + // assertTableEquals(uncompressed, zstd); + // } + private void testWriteRead(Table source, Function transform) { final File f2w = new File(testRoot, "testWriteRead.parquet"); ParquetTools.writeTable(source, f2w); diff --git a/extensions/parquet/table/src/test/resources/e0/brotli.parquet b/extensions/parquet/table/src/test/resources/e0/brotli.parquet new file mode 100644 index 00000000000..01e7d994f64 Binary files /dev/null and b/extensions/parquet/table/src/test/resources/e0/brotli.parquet differ diff --git a/extensions/parquet/table/src/test/resources/e0/gzip.parquet b/extensions/parquet/table/src/test/resources/e0/gzip.parquet new file mode 100644 index 00000000000..b68912d7cdb Binary files /dev/null and b/extensions/parquet/table/src/test/resources/e0/gzip.parquet differ diff --git a/extensions/parquet/table/src/test/resources/e0/lz4.parquet b/extensions/parquet/table/src/test/resources/e0/lz4.parquet new file mode 100644 index 00000000000..7762b171369 Binary files /dev/null and b/extensions/parquet/table/src/test/resources/e0/lz4.parquet differ diff --git a/extensions/parquet/table/src/test/resources/e0/snappy.parquet b/extensions/parquet/table/src/test/resources/e0/snappy.parquet new file mode 100644 index 00000000000..70610211695 Binary files /dev/null and b/extensions/parquet/table/src/test/resources/e0/snappy.parquet differ diff --git a/extensions/parquet/table/src/test/resources/e0/uncompressed.parquet b/extensions/parquet/table/src/test/resources/e0/uncompressed.parquet new file mode 100644 index 00000000000..a10b74a8ab6 Binary files /dev/null and b/extensions/parquet/table/src/test/resources/e0/uncompressed.parquet differ diff --git a/extensions/parquet/table/src/test/resources/e0/zstd.parquet b/extensions/parquet/table/src/test/resources/e0/zstd.parquet new file mode 100644 index 00000000000..39047596648 Binary files /dev/null and b/extensions/parquet/table/src/test/resources/e0/zstd.parquet differ diff --git a/extensions/parquet/table/src/test/resources/e1/brotli.parquet b/extensions/parquet/table/src/test/resources/e1/brotli.parquet new file mode 100644 index 00000000000..bbda7e06fb9 Binary files /dev/null and b/extensions/parquet/table/src/test/resources/e1/brotli.parquet differ diff --git a/extensions/parquet/table/src/test/resources/e1/gzip.parquet b/extensions/parquet/table/src/test/resources/e1/gzip.parquet new file mode 100644 index 00000000000..fcd8ba59e75 Binary files /dev/null and b/extensions/parquet/table/src/test/resources/e1/gzip.parquet differ diff --git a/extensions/parquet/table/src/test/resources/e1/lz4.parquet b/extensions/parquet/table/src/test/resources/e1/lz4.parquet new file mode 100644 index 00000000000..2b474d731ac Binary files /dev/null and b/extensions/parquet/table/src/test/resources/e1/lz4.parquet differ diff --git a/extensions/parquet/table/src/test/resources/e1/snappy.parquet b/extensions/parquet/table/src/test/resources/e1/snappy.parquet new file mode 100644 index 00000000000..36a55927719 Binary files /dev/null and b/extensions/parquet/table/src/test/resources/e1/snappy.parquet differ diff --git a/extensions/parquet/table/src/test/resources/e1/uncompressed.parquet b/extensions/parquet/table/src/test/resources/e1/uncompressed.parquet new file mode 100644 index 00000000000..79700941900 Binary files /dev/null and b/extensions/parquet/table/src/test/resources/e1/uncompressed.parquet differ diff --git a/extensions/parquet/table/src/test/resources/e1/zstd.parquet b/extensions/parquet/table/src/test/resources/e1/zstd.parquet new file mode 100644 index 00000000000..52317b0f204 Binary files /dev/null and b/extensions/parquet/table/src/test/resources/e1/zstd.parquet differ diff --git a/extensions/parquet/table/src/test/resources/e2/brotli.parquet b/extensions/parquet/table/src/test/resources/e2/brotli.parquet new file mode 100644 index 00000000000..f0def9676f0 Binary files /dev/null and b/extensions/parquet/table/src/test/resources/e2/brotli.parquet differ diff --git a/extensions/parquet/table/src/test/resources/e2/gzip.parquet b/extensions/parquet/table/src/test/resources/e2/gzip.parquet new file mode 100644 index 00000000000..30d83fc0cbd Binary files /dev/null and b/extensions/parquet/table/src/test/resources/e2/gzip.parquet differ diff --git a/extensions/parquet/table/src/test/resources/e2/lz4.parquet b/extensions/parquet/table/src/test/resources/e2/lz4.parquet new file mode 100644 index 00000000000..c7416517a78 Binary files /dev/null and b/extensions/parquet/table/src/test/resources/e2/lz4.parquet differ diff --git a/extensions/parquet/table/src/test/resources/e2/snappy.parquet b/extensions/parquet/table/src/test/resources/e2/snappy.parquet new file mode 100644 index 00000000000..f06e8ff2cf5 Binary files /dev/null and b/extensions/parquet/table/src/test/resources/e2/snappy.parquet differ diff --git a/extensions/parquet/table/src/test/resources/e2/uncompressed.parquet b/extensions/parquet/table/src/test/resources/e2/uncompressed.parquet new file mode 100644 index 00000000000..50855c72df8 Binary files /dev/null and b/extensions/parquet/table/src/test/resources/e2/uncompressed.parquet differ diff --git a/extensions/parquet/table/src/test/resources/e2/zstd.parquet b/extensions/parquet/table/src/test/resources/e2/zstd.parquet new file mode 100644 index 00000000000..29d4295166d Binary files /dev/null and b/extensions/parquet/table/src/test/resources/e2/zstd.parquet differ diff --git a/py/server/tests/test_parquet.py b/py/server/tests/test_parquet.py index 3c6fd582ddb..e58ca97abc2 100644 --- a/py/server/tests/test_parquet.py +++ b/py/server/tests/test_parquet.py @@ -187,7 +187,7 @@ def test_round_trip_data(self): self.round_trip_with_compression("SNAPPY", dh_table) # LZO is not fully supported in python/c++ # self.round_trip_with_compression("LZO", dh_table) - # TODO(deephaven-core#3148) This test seems to write parquet output with LZ4_RAW as the compression type, Java can't read it + # TODO(deephaven-core#3148): LZ4_RAW parquet support # self.round_trip_with_compression("LZ4", dh_table) self.round_trip_with_compression("GZIP", dh_table) self.round_trip_with_compression("ZSTD", dh_table)