From 4f749f7247bb88944be3635d3b8e4c3b6820bc56 Mon Sep 17 00:00:00 2001 From: Andrew Baranec Date: Wed, 31 Aug 2022 16:16:38 -0400 Subject: [PATCH 1/7] Rename DeephavenCompressor to COmpressorAdapter and try to pool / re-use compressors where possible --- .../parquet/base/ColumnChunkReaderImpl.java | 10 +- .../parquet/base/ColumnPageReaderImpl.java | 18 ++-- .../parquet/base/ColumnWriterImpl.java | 20 ++-- .../parquet/base/ParquetFileWriter.java | 12 +-- .../parquet/base/RowGroupWriterImpl.java | 18 ++-- extensions/parquet/benchmark/build.gradle | 94 ++++++++++++++++ .../parquet/benchmark/gradle.properties | 1 + .../parquet/table/TableWriteBenchmark.java | 100 ++++++++++++++++++ ...Compressor.java => CompressorAdapter.java} | 35 +++--- ...=> DeephavenCompressorAdapterFactory.java} | 97 ++++++++++------- settings.gradle | 3 + 11 files changed, 311 insertions(+), 97 deletions(-) create mode 100644 extensions/parquet/benchmark/build.gradle create mode 100644 extensions/parquet/benchmark/gradle.properties create mode 100644 extensions/parquet/benchmark/src/benchmark/java/io/deephaven/benchmark/parquet/table/TableWriteBenchmark.java rename extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/{Compressor.java => CompressorAdapter.java} (71%) rename extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/{DeephavenCodecFactory.java => DeephavenCompressorAdapterFactory.java} (59%) diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java index e6fc3fb45f2..f65c072f584 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java @@ -5,8 +5,8 @@ import io.deephaven.UncheckedDeephavenException; import io.deephaven.parquet.base.util.SeekableChannelsProvider; -import io.deephaven.parquet.compress.Compressor; -import io.deephaven.parquet.compress.DeephavenCodecFactory; +import io.deephaven.parquet.compress.CompressorAdapter; +import io.deephaven.parquet.compress.DeephavenCompressorAdapterFactory; import io.deephaven.util.datastructures.LazyCachingSupplier; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.column.ColumnDescriptor; @@ -38,7 +38,7 @@ public class ColumnChunkReaderImpl implements ColumnChunkReader { private final ColumnChunk columnChunk; private final SeekableChannelsProvider channelsProvider; private final Path rootPath; - private final Compressor decompressor; + private final CompressorAdapter decompressor; private final ColumnDescriptor path; private final OffsetIndex offsetIndex; private final List fieldTypes; @@ -56,9 +56,9 @@ public class ColumnChunkReaderImpl implements ColumnChunkReader { this.path = type .getColumnDescription(columnChunk.meta_data.getPath_in_schema().toArray(new String[0])); if (columnChunk.getMeta_data().isSetCodec()) { - decompressor = DeephavenCodecFactory.getInstance().getByName(columnChunk.getMeta_data().getCodec().name()); + decompressor = DeephavenCompressorAdapterFactory.getInstance().getByName(columnChunk.getMeta_data().getCodec().name()); } else { - decompressor = Compressor.PASSTHRU; + decompressor = CompressorAdapter.PASSTHRU; } this.offsetIndex = offsetIndex; this.fieldTypes = fieldTypes; diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java index ee8b64f0b40..40a233291a9 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java @@ -7,7 +7,7 @@ import io.deephaven.parquet.base.util.Helpers; import io.deephaven.parquet.base.util.RunLengthBitPackingHybridBufferDecoder; import io.deephaven.parquet.base.util.SeekableChannelsProvider; -import io.deephaven.parquet.compress.Compressor; +import io.deephaven.parquet.compress.CompressorAdapter; import org.apache.commons.io.IOUtils; import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.BytesInput; @@ -48,7 +48,7 @@ public class ColumnPageReaderImpl implements ColumnPageReader { public static final int NULL_OFFSET = -1; private final SeekableChannelsProvider channelsProvider; - private final Compressor compressor; + private final CompressorAdapter compressorAdapter; private final Supplier dictionarySupplier; private final PageMaterializer.Factory pageMaterializerFactory; private final ColumnDescriptor path; @@ -61,7 +61,7 @@ public class ColumnPageReaderImpl implements ColumnPageReader { private int rowCount = -1; ColumnPageReaderImpl(SeekableChannelsProvider channelsProvider, - Compressor compressor, + CompressorAdapter compressorAdapter, Supplier dictionarySupplier, PageMaterializer.Factory materializerFactory, ColumnDescriptor path, @@ -71,7 +71,7 @@ public class ColumnPageReaderImpl implements ColumnPageReader { PageHeader pageHeader, int numValues) { this.channelsProvider = channelsProvider; - this.compressor = compressor; + this.compressorAdapter = compressorAdapter; this.dictionarySupplier = dictionarySupplier; this.pageMaterializerFactory = materializerFactory; this.path = path; @@ -143,7 +143,7 @@ private int readRowCountFromDataPage(ReadableByteChannel file) throws IOExceptio switch (pageHeader.type) { case DATA_PAGE: final BytesInput decompressedInput = - compressor.decompress(Channels.newInputStream(file), compressedPageSize, uncompressedPageSize); + compressorAdapter.decompress(Channels.newInputStream(file), compressedPageSize, uncompressedPageSize); DataPageHeader dataHeaderV1 = pageHeader.getData_page_header(); return readRowCountFromPageV1(new DataPageV1( @@ -170,7 +170,7 @@ private IntBuffer readKeyFromDataPage(IntBuffer keyDest, int nullPlaceholder, switch (pageHeader.type) { case DATA_PAGE: BytesInput decompressedInput = - compressor.decompress(Channels.newInputStream(file), compressedPageSize, uncompressedPageSize); + compressorAdapter.decompress(Channels.newInputStream(file), compressedPageSize, uncompressedPageSize); DataPageHeader dataHeaderV1 = pageHeader.getData_page_header(); return readKeysFromPageV1(new DataPageV1( @@ -190,7 +190,7 @@ private IntBuffer readKeyFromDataPage(IntBuffer keyDest, int nullPlaceholder, Helpers.readBytes(file, dataHeaderV2.getRepetition_levels_byte_length()); BytesInput definitionLevels = Helpers.readBytes(file, dataHeaderV2.getDefinition_levels_byte_length()); - BytesInput data = compressor.decompress(Channels.newInputStream(file), dataSize, uncompressedPageSize + BytesInput data = compressorAdapter.decompress(Channels.newInputStream(file), dataSize, uncompressedPageSize - dataHeaderV2.getRepetition_levels_byte_length() - dataHeaderV2.getDefinition_levels_byte_length()); @@ -218,7 +218,7 @@ private Object readDataPage(Object nullValue, SeekableByteChannel file) throws I switch (pageHeader.type) { case DATA_PAGE: BytesInput decompressedInput = - compressor.decompress(Channels.newInputStream(file), compressedPageSize, uncompressedPageSize); + compressorAdapter.decompress(Channels.newInputStream(file), compressedPageSize, uncompressedPageSize); DataPageHeader dataHeaderV1 = pageHeader.getData_page_header(); return readPageV1(new DataPageV1( @@ -235,7 +235,7 @@ private Object readDataPage(Object nullValue, SeekableByteChannel file) throws I - dataHeaderV2.getDefinition_levels_byte_length(); BytesInput repetitionLevels = Helpers.readBytes(file, dataHeaderV2.getRepetition_levels_byte_length()); BytesInput definitionLevels = Helpers.readBytes(file, dataHeaderV2.getDefinition_levels_byte_length()); - BytesInput data = compressor.decompress(Channels.newInputStream(file), dataSize, + BytesInput data = compressorAdapter.decompress(Channels.newInputStream(file), dataSize, pageHeader.getUncompressed_page_size() - dataHeaderV2.getRepetition_levels_byte_length() - dataHeaderV2.getDefinition_levels_byte_length()); diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnWriterImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnWriterImpl.java index 92393fc4ff3..8ba4073f420 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnWriterImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnWriterImpl.java @@ -4,7 +4,7 @@ package io.deephaven.parquet.base; import io.deephaven.parquet.base.tempfix.ParquetMetadataConverter; -import io.deephaven.parquet.compress.Compressor; +import io.deephaven.parquet.compress.CompressorAdapter; import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.column.ColumnDescriptor; @@ -39,7 +39,7 @@ public class ColumnWriterImpl implements ColumnWriter { private final SeekableByteChannel writeChannel; private final ColumnDescriptor column; private final RowGroupWriterImpl owner; - private final Compressor compressor; + private final CompressorAdapter compressorAdapter; private boolean hasDictionary; private int pageCount = 0; private static final ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter(); @@ -63,12 +63,12 @@ public class ColumnWriterImpl implements ColumnWriter { final RowGroupWriterImpl owner, final SeekableByteChannel writeChannel, final ColumnDescriptor column, - final Compressor compressor, + final CompressorAdapter compressorAdapter, final int pageSize, final ByteBufferAllocator allocator) { this.writeChannel = writeChannel; this.column = column; - this.compressor = compressor; + this.compressorAdapter = compressorAdapter; this.pageSize = pageSize; this.allocator = allocator; dlEncoder = column.getMaxDefinitionLevel() == 0 ? null @@ -128,8 +128,9 @@ public void writeDictionaryPage(final ByteBuffer dictionaryBuffer, final int val long currentChunkDictionaryPageOffset = writeChannel.position(); int uncompressedSize = dictionaryBuffer.remaining(); + compressorAdapter.reset(); ByteArrayOutputStream baos = new ByteArrayOutputStream(); - try (WritableByteChannel channel = Channels.newChannel(compressor.compress(baos))) { + try (WritableByteChannel channel = Channels.newChannel(compressorAdapter.compress(baos))) { channel.write(dictionaryBuffer); } BytesInput compressedBytes = BytesInput.from(baos); @@ -259,7 +260,7 @@ public void writePageV2( int uncompressedSize = (int) (uncompressedDataSize + repetitionLevels.size() + definitionLevels.size()); ByteArrayOutputStream baos = new ByteArrayOutputStream(); - try (WritableByteChannel channel = Channels.newChannel(compressor.compress(baos))) { + try (WritableByteChannel channel = Channels.newChannel(compressorAdapter.compress(baos))) { channel.write(data); } BytesInput compressedData = BytesInput.from(baos); @@ -298,8 +299,11 @@ private void writePage(final BytesInput bytes, final int valueCount, final Encod "Cannot write page larger than Integer.MAX_VALUE bytes: " + uncompressedSize); } + + compressorAdapter.reset(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); - try (OutputStream cos = compressor.compress(baos)) { + try (OutputStream cos = compressorAdapter.compress(baos)) { bytes.writeAllTo(cos); } BytesInput compressedBytes = BytesInput.from(baos); @@ -388,7 +392,7 @@ private void writePage(final ByteBuffer encodedData, final long valueCount) { public void close() { owner.releaseWriter(this, ColumnChunkMetaData.get(ColumnPath.get(column.getPath()), column.getPrimitiveType(), - compressor.getCodecName(), + compressorAdapter.getCodecName(), null, encodings, Statistics.createStats(column.getPrimitiveType()), firstDataPageOffset, dictionaryOffset, totalValueCount, compressedLength, uncompressedLength)); } diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileWriter.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileWriter.java index 507e88979ec..03e6bbca742 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileWriter.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileWriter.java @@ -5,8 +5,8 @@ import io.deephaven.parquet.base.tempfix.ParquetMetadataConverter; import io.deephaven.parquet.base.util.SeekableChannelsProvider; -import io.deephaven.parquet.compress.Compressor; -import io.deephaven.parquet.compress.DeephavenCodecFactory; +import io.deephaven.parquet.compress.CompressorAdapter; +import io.deephaven.parquet.compress.DeephavenCompressorAdapterFactory; import org.apache.parquet.Version; import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.BytesUtils; @@ -37,7 +37,7 @@ public class ParquetFileWriter { private final int pageSize; private final ByteBufferAllocator allocator; private final SeekableChannelsProvider channelsProvider; - private final Compressor compressor; + private final CompressorAdapter compressorAdapter; private final Map extraMetaData; private final List blocks = new ArrayList<>(); private final List> offsetIndexes = new ArrayList<>(); @@ -56,19 +56,19 @@ public ParquetFileWriter( writeChannel = channelsProvider.getWriteChannel(filePath, false); // TODO add support for appending this.type = type; this.channelsProvider = channelsProvider; - this.compressor = DeephavenCodecFactory.getInstance().getByName(codecName); + this.compressorAdapter = DeephavenCompressorAdapterFactory.getInstance().getByName(codecName); } @SuppressWarnings("unused") RowGroupWriter addRowGroup(final String path, final boolean append) throws IOException { RowGroupWriterImpl rowGroupWriter = - new RowGroupWriterImpl(path, append, channelsProvider, type, pageSize, allocator, compressor); + new RowGroupWriterImpl(path, append, channelsProvider, type, pageSize, allocator, compressorAdapter); blocks.add(rowGroupWriter.getBlock()); return rowGroupWriter; } public RowGroupWriter addRowGroup(final long size) { - RowGroupWriterImpl rowGroupWriter = new RowGroupWriterImpl(writeChannel, type, pageSize, allocator, compressor); + RowGroupWriterImpl rowGroupWriter = new RowGroupWriterImpl(writeChannel, type, pageSize, allocator, compressorAdapter); rowGroupWriter.getBlock().setRowCount(size); blocks.add(rowGroupWriter.getBlock()); offsetIndexes.add(rowGroupWriter.offsetIndexes()); diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/RowGroupWriterImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/RowGroupWriterImpl.java index 4c9dd953abc..d1c6026e618 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/RowGroupWriterImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/RowGroupWriterImpl.java @@ -4,7 +4,7 @@ package io.deephaven.parquet.base; import io.deephaven.parquet.base.util.SeekableChannelsProvider; -import io.deephaven.parquet.compress.Compressor; +import io.deephaven.parquet.compress.CompressorAdapter; import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; @@ -27,13 +27,13 @@ public class RowGroupWriterImpl implements RowGroupWriter { private ColumnWriterImpl activeWriter; private final BlockMetaData blockMetaData; private final List currentOffsetIndexes = new ArrayList<>(); - private final Compressor compressor; + private final CompressorAdapter compressorAdapter; RowGroupWriterImpl(String path, boolean append, SeekableChannelsProvider channelsProvider, MessageType type, - int pageSize, ByteBufferAllocator allocator, Compressor compressor) + int pageSize, ByteBufferAllocator allocator, CompressorAdapter compressorAdapter) throws IOException { this(channelsProvider.getWriteChannel(path, append), type, pageSize, allocator, blockWithPath(path), - compressor); + compressorAdapter); } private static BlockMetaData blockWithPath(String path) { @@ -43,20 +43,20 @@ private static BlockMetaData blockWithPath(String path) { } RowGroupWriterImpl(SeekableByteChannel writeChannel, MessageType type, int pageSize, ByteBufferAllocator allocator, - Compressor compressor) { - this(writeChannel, type, pageSize, allocator, new BlockMetaData(), compressor); + CompressorAdapter compressorAdapter) { + this(writeChannel, type, pageSize, allocator, new BlockMetaData(), compressorAdapter); } private RowGroupWriterImpl(SeekableByteChannel writeChannel, MessageType type, int pageSize, ByteBufferAllocator allocator, BlockMetaData blockMetaData, - Compressor compressor) { + CompressorAdapter compressorAdapter) { this.writeChannel = writeChannel; this.type = type; this.pageSize = pageSize; this.allocator = allocator; this.blockMetaData = blockMetaData; - this.compressor = compressor; + this.compressorAdapter = compressorAdapter; } String[] getPrimitivePath(String columnName) { @@ -82,7 +82,7 @@ public ColumnWriter addColumn(String columnName) { + " need to close that before opening a writer for " + columnName); } activeWriter = new ColumnWriterImpl(this, writeChannel, type.getColumnDescription(getPrimitivePath(columnName)), - compressor, pageSize, allocator); + compressorAdapter, pageSize, allocator); return activeWriter; } diff --git a/extensions/parquet/benchmark/build.gradle b/extensions/parquet/benchmark/build.gradle new file mode 100644 index 00000000000..0a1ebc9d674 --- /dev/null +++ b/extensions/parquet/benchmark/build.gradle @@ -0,0 +1,94 @@ +plugins { + id 'java-library' + id 'io.deephaven.project.register' +} + +description 'Parquet Benchmark: Benchmarks for Parquet modules' + +sourceSets { + test { + java { + srcDir 'src/benchmark/java' + } + } +} + +dependencies { + testImplementation project(':extensions-parquet-table'), + project(':engine-table'), + project(':BenchmarkSupport'), + TestTools.projectDependency(project, 'engine-rowset'), + TestTools.projectDependency(project, 'engine-table') + + testAnnotationProcessor 'org.openjdk.jmh:jmh-generator-annprocess:1.21' + testCompileOnly 'org.openjdk.jmh:jmh-generator-annprocess:1.21' + + testRuntimeOnly project(path: ':configs') + testRuntimeOnly project(path: ':test-configs') +} + +spotless { + java { + targetExclude( + '**/Boolean*Benchmark.java', + '**/Byte*Benchmark.java', + '**/Char*Benchmark.java', + '**/Short*Benchmark.java', + '**/Int*Benchmark.java', + '**/Long*Benchmark.java', + '**/Float*Benchmark.java', + '**/Double*Benchmark.java', + '**/Object*Benchmark.java', + ) + } +} + +task jmhRun(type: JavaExec) { + new File("$rootDir/tmp/workspace").mkdirs() + new File("$rootDir/tmp/logs").mkdirs() + workingDir "$rootDir/tmp/workspace" + + classpath = sourceSets.test.runtimeClasspath + + main = 'io.deephaven.benchmarking.runner.BenchmarkRunner' + + // arguments to pass to the application + jvmArgs '-DConfiguration.rootFile=dh-tests.prop', + "-Ddevroot=$rootDir", + "-Dworkspace=$rootDir/tmp/workspace", + '-Dconfiguration.quiet=true', + '-Djava.awt.headless=true', + '-DQueryTable.memoizeResults=false', + '-DUpdateGraphProcessor.checkTableOperations=false', + '-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=9500' +} + +def createJmhTask = { + taskName, cliArgs, jvmAddArgs=[], heapSize='8g' -> tasks.create(taskName, JavaExec, { JavaExec task -> + new File("$rootDir/tmp/workspace").mkdirs() + new File("$rootDir/tmp/logs").mkdirs() + + task.workingDir "$rootDir/tmp/workspace" + task.classpath = sourceSets.test.runtimeClasspath + task.main = 'io.deephaven.benchmarking.runner.BenchmarkRunner' + + // arguments to pass to the application + def jvmArgs = [ '-DConfiguration.rootFile=dh-tests.prop', + "-Ddevroot=$rootDir", + "-Dworkspace=$rootDir/tmp/workspace", + '-Dconfiguration.quiet=true', + '-Djava.awt.headless=true', + '-DQueryTable.memoizeResults=false', + '-DUpdateGraphProcessor.checkTableOperations=false', + "-Xmx$heapSize" + //'-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=9501' + ] + jvmArgs.addAll(jvmAddArgs) + task.jvmArgs jvmArgs + task.args cliArgs + + return + }) +} + +createJmhTask('jmhRunTableWrite', 'TableWriteBenchmark') \ No newline at end of file diff --git a/extensions/parquet/benchmark/gradle.properties b/extensions/parquet/benchmark/gradle.properties new file mode 100644 index 00000000000..eeac3e65888 --- /dev/null +++ b/extensions/parquet/benchmark/gradle.properties @@ -0,0 +1 @@ +io.deephaven.project.ProjectType=JAVA_LOCAL diff --git a/extensions/parquet/benchmark/src/benchmark/java/io/deephaven/benchmark/parquet/table/TableWriteBenchmark.java b/extensions/parquet/benchmark/src/benchmark/java/io/deephaven/benchmark/parquet/table/TableWriteBenchmark.java new file mode 100644 index 00000000000..cc4893a0a35 --- /dev/null +++ b/extensions/parquet/benchmark/src/benchmark/java/io/deephaven/benchmark/parquet/table/TableWriteBenchmark.java @@ -0,0 +1,100 @@ +package io.deephaven.benchmark.parquet.table; + +import io.deephaven.base.FileUtils; +import io.deephaven.configuration.Configuration; +import io.deephaven.engine.context.CompilerTools; +import io.deephaven.engine.context.ExecutionContext; +import io.deephaven.engine.context.QueryLibrary; +import io.deephaven.engine.context.QueryScope; +import io.deephaven.engine.table.Table; +import io.deephaven.engine.table.impl.sources.regioned.TestChunkedRegionedOperations; +import io.deephaven.engine.util.TableTools; +import io.deephaven.parquet.table.ParquetInstructions; +import io.deephaven.parquet.table.ParquetTools; +import io.deephaven.time.DateTimeUtils; +import io.deephaven.util.SafeCloseable; +import org.jetbrains.annotations.NotNull; +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.infra.Blackhole; + +import java.io.IOException; +import java.math.BigInteger; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +@State(Scope.Thread) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@Warmup(iterations = 1, time = 2) +@Measurement(iterations = 1, time = 2) +@Fork(1) +public class TableWriteBenchmark { + private static final String[] LETTERS = IntStream.range('A', 'A' + 64) + .mapToObj(c -> String.valueOf((char) c)) + .toArray(String[]::new); + + //@Param({"UNCOMPRESSED", "SNAPPY", "GZIP"}) + private String compressionCodec = "SNAPPY"; + + private Table table; + private SafeCloseable exContextCloseable; + private Path rootPath; + + @Setup(Level.Trial) + public void setupEnv() throws IOException { + rootPath = Files.createTempDirectory(Paths.get(Configuration.getInstance().getWorkspacePath()), "TableWriteBenchmark"); + final ExecutionContext context = ExecutionContext.newBuilder() + .newQueryLibrary() + .newQueryScope() + .setCompilerContext(CompilerTools.newContext(rootPath.resolve("cache").toFile(), getClass().getClassLoader())) + .build(); + exContextCloseable = context.open(); + + final QueryScope queryScope = context.getQueryScope(); + queryScope.putParam("nowNanos", DateTimeUtils.currentTime().getNanos()); + queryScope.putParam("letters", LETTERS); + + QueryLibrary library = context.getQueryLibrary(); + library.importClass(TestChunkedRegionedOperations.SimpleExternalizable.class); + // Serializable is already very inefficient, however SNAPPY also has an O(n^2) block write pattern and compounds + // the terribleness. For now, we will exclude the serializable from the benchmark + //library.importClass(TestChunkedRegionedOperations.SimpleSerializable.class); + library.importClass(BigInteger.class); + + table = TableTools.emptyTable(1_000_000).updateView( + "B = ii % 1000 == 0 ? NULL_BYTE : (byte) ii", + "C = ii % 27 == 26 ? NULL_CHAR : (char) ('A' + ii % 27)", + "S = ii % 30000 == 0 ? NULL_SHORT : (short) ii", + "I = ii % 512 == 0 ? NULL_INT : (int) ii", + "L = ii % 1024 == 0 ? NULL_LONG : ii", + "F = ii % 2048 == 0 ? NULL_FLOAT : (float) (ii * 0.25)", + "D = ii % 4096 == 0 ? NULL_DOUBLE : ii * 1.25", + "Bl = ii % 8192 == 0 ? null : ii % 2 == 0", + "Sym = ii % 64 == 0 ? null : Long.toString(ii % 1000)", + "Str = ii % 128 == 0 ? null : Long.toString(ii)", + "DT = ii % 256 == 0 ? null : new DateTime(nowNanos + ii)", +// "Ser = ii % 1024 == 0 ? null : new SimpleSerializable(ii)", + "Ext = ii % 1024 == 0 ? null : new SimpleExternalizable(ii)", + "Fix = ii % 64 == 0 ? null : new BigInteger(Long.toString(ii % 1000), 10)", + "Var = Str == null ? null : new BigInteger(Str, 10)" + ); + } + + @TearDown(Level.Trial) + public void cleanUp() { + FileUtils.deleteRecursively(rootPath.toFile()); + exContextCloseable.close(); + } + + @Benchmark + public Table writeTable(@NotNull final Blackhole bh) { + final ParquetInstructions instructions = ParquetInstructions.builder() + .setCompressionCodecName(compressionCodec) + .build(); + ParquetTools.writeTable(table, rootPath.resolve("table.parquet").toFile(), instructions); + return table; + } +} diff --git a/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/Compressor.java b/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/CompressorAdapter.java similarity index 71% rename from extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/Compressor.java rename to extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/CompressorAdapter.java index 67f3e069370..d2a558a5032 100644 --- a/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/Compressor.java +++ b/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/CompressorAdapter.java @@ -10,21 +10,19 @@ import java.io.InputStream; import java.io.OutputStream; -public interface Compressor { +/** + * An Intermediate adapter interface between Deephaven column writing and parquet compression. + */ +public interface CompressorAdapter { /** - * Compressor instance that reads and writes uncompressed data directly. + * An {@link CompressorAdapter} instance that reads and writes uncompressed data directly. */ - Compressor PASSTHRU = new Compressor() { + CompressorAdapter PASSTHRU = new CompressorAdapter() { @Override - public OutputStream compress(OutputStream os) throws IOException { + public OutputStream compress(OutputStream os) { return os; } - @Override - public InputStream decompress(InputStream is) throws IOException { - return is; - } - @Override public CompressionCodecName getCodecName() { return CompressionCodecName.UNCOMPRESSED; @@ -34,8 +32,10 @@ public CompressionCodecName getCodecName() { public BytesInput decompress(InputStream inputStream, int compressedSize, int uncompressedSize) { return BytesInput.from(inputStream, compressedSize); } - }; + @Override + public void reset() { } + }; /** * Creates a new output stream that will take uncompressed writes, and flush data to the provided stream as @@ -47,16 +47,6 @@ public BytesInput decompress(InputStream inputStream, int compressedSize, int un */ OutputStream compress(OutputStream os) throws IOException; - /** - * Returns a new input stream that when read will provide the uncompressed data, by wrapping an input stream - * containing the compressed data. - * - * @param is an input stream that can be read to see compressed data - * @return an input stream that can be read to see uncompressed data - * @throws IOException thrown if an error occurs reading data - */ - InputStream decompress(InputStream is) throws IOException; - /** * Returns an in-memory instance of BytesInput containing the fully decompressed results of the input stream. * @@ -72,4 +62,9 @@ public BytesInput decompress(InputStream inputStream, int compressedSize, int un * @return the CompressionCodecName enum value that represents this compressor. */ CompressionCodecName getCodecName(); + + /** + * Reset the internal state of this {@link CompressorAdapter} so more rows can be read or written. + */ + void reset(); } diff --git a/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/DeephavenCodecFactory.java b/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/DeephavenCompressorAdapterFactory.java similarity index 59% rename from extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/DeephavenCodecFactory.java rename to extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/DeephavenCompressorAdapterFactory.java index a893796f650..d845955354b 100644 --- a/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/DeephavenCodecFactory.java +++ b/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/DeephavenCompressorAdapterFactory.java @@ -6,11 +6,7 @@ import com.google.common.io.ByteStreams; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.compress.CodecPool; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.CompressionCodecFactory; -import org.apache.hadoop.io.compress.CompressionInputStream; -import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.io.compress.*; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.hadoop.metadata.CompressionCodecName; @@ -27,7 +23,7 @@ * configuration or from the classpath (via service loaders), while still offering the ability to get a * CompressionCodecName enum value having loaded the codec in this way. */ -public class DeephavenCodecFactory { +public class DeephavenCompressorAdapterFactory { // Default codecs to list in the configuration rather than rely on the classloader private static final Set DEFAULT_CODECS = Set.of( @@ -45,73 +41,92 @@ public class DeephavenCodecFactory { } }).collect(Collectors.toList()); - private static volatile DeephavenCodecFactory INSTANCE; + private static volatile DeephavenCompressorAdapterFactory INSTANCE; - public static synchronized void setInstance(DeephavenCodecFactory factory) { + public static synchronized void setInstance(DeephavenCompressorAdapterFactory factory) { if (INSTANCE != null) { throw new IllegalStateException("Can't assign an instance when one is already set"); } INSTANCE = factory; } - public static DeephavenCodecFactory getInstance() { + public static DeephavenCompressorAdapterFactory getInstance() { if (INSTANCE == null) { - synchronized (DeephavenCodecFactory.class) { + synchronized (DeephavenCompressorAdapterFactory.class) { if (INSTANCE == null) { - INSTANCE = new DeephavenCodecFactory(CODECS); + INSTANCE = new DeephavenCompressorAdapterFactory(CODECS); } } } return INSTANCE; } - public static class CodecWrappingCompressor implements Compressor { + public static class CodecWrappingCompressorAdapter implements CompressorAdapter { private final CompressionCodec compressionCodec; + private Compressor innerCompressor; + private Decompressor innerDecompressor; - private CodecWrappingCompressor(CompressionCodec compressionCodec) { + private CodecWrappingCompressorAdapter(CompressionCodec compressionCodec) { this.compressionCodec = compressionCodec; + this.innerCompressor = compressionCodec.createCompressor(); } @Override public OutputStream compress(OutputStream os) throws IOException { - return compressionCodec.createOutputStream(os); - } + if(innerCompressor == null) { + innerCompressor = CodecPool.getCompressor(compressionCodec); + if(innerCompressor == null) { + // Some compressors are allowed to declare they cannot be pooled. If we fail to get one + // then fall back on just creating a new one to hang on to. + innerCompressor = compressionCodec.createCompressor(); + } - @Override - public InputStream decompress(InputStream is) throws IOException { - return compressionCodec.createInputStream(is); + if(innerCompressor == null) { + return compressionCodec.createOutputStream(os); + } + + innerCompressor.reset(); + } + return compressionCodec.createOutputStream(os, innerCompressor); } @Override public CompressionCodecName getCodecName() { return Stream.of(CompressionCodecName.values()) .filter(codec -> compressionCodec.getDefaultExtension().equals(codec.getExtension())) - .findAny().get(); + .findAny() + .get(); } @Override public BytesInput decompress(InputStream inputStream, int compressedSize, int uncompressedSize) throws IOException { - Decompressor decompressor = CodecPool.getDecompressor(compressionCodec); - if (decompressor != null) { + if(innerDecompressor == null) { + innerDecompressor = CodecPool.getDecompressor(compressionCodec); + } + + if (innerDecompressor != null) { // It is permitted for a decompressor to be null, otherwise we want to reset() it to // be ready for a new stream. // Note that this strictly shouldn't be necessary, since returnDecompressor will reset // it as well, but this is the pattern copied from CodecFactory.decompress. - decompressor.reset(); + innerDecompressor.reset(); } + // Note that we don't close this, we assume the caller will close their input stream when ready, + // and this won't need to be closed. + InputStream buffered = ByteStreams.limit(IOUtils.buffer(inputStream), compressedSize); + CompressionInputStream decompressed = compressionCodec.createInputStream(buffered, innerDecompressor); + return BytesInput.copy(BytesInput.from(decompressed, uncompressedSize)); + } - try { - // Note that we don't close this, we assume the caller will close their input stream when ready, - // and this won't need to be closed. - InputStream buffered = ByteStreams.limit(IOUtils.buffer(inputStream), compressedSize); - CompressionInputStream decompressed = compressionCodec.createInputStream(buffered, decompressor); - return BytesInput.copy(BytesInput.from(decompressed, uncompressedSize)); - } finally { - // Always return it, the pool will decide if it should be reused or not. - // CodecFactory has no logic around only returning after successful streams, - // and the instance appears to leak otherwise. - CodecPool.returnDecompressor(decompressor); + @Override + public void reset() { + if(innerCompressor != null) { + innerCompressor.reset(); + } + + if(innerDecompressor != null) { + innerDecompressor.reset(); } } } @@ -125,27 +140,29 @@ private static Configuration configurationWithCodecClasses(List> codecC private final CompressionCodecFactory compressionCodecFactory; - public DeephavenCodecFactory(List> codecClasses) { + public DeephavenCompressorAdapterFactory(List> codecClasses) { this(configurationWithCodecClasses(codecClasses)); } - public DeephavenCodecFactory(Configuration configuration) { + public DeephavenCompressorAdapterFactory(Configuration configuration) { compressionCodecFactory = new CompressionCodecFactory(configuration); } /** - * Returns a compressor with the given codec name. Do not use this to get a "no-op" codec, instead use - * {@link Compressor#PASSTHRU}. Names are identified using the {@link CompressionCodecFactory} rules (roughly, the - * first word in the class's name). + * Returns a compressor with the given codec name. * * @param codecName the name of the codec to search for. * @return a compressor instance with a name matching the given codec. */ - public Compressor getByName(String codecName) { + public CompressorAdapter getByName(String codecName) { + if(codecName.equalsIgnoreCase("UNCOMPRESSED")) { + return CompressorAdapter.PASSTHRU; + } + CompressionCodec codec = compressionCodecFactory.getCodecByName(codecName); if (codec == null) { throw new IllegalArgumentException("Failed to find a compression codec with name " + codecName); } - return new CodecWrappingCompressor(codec); + return new CodecWrappingCompressorAdapter(codec); } } diff --git a/settings.gradle b/settings.gradle index 493a58c4508..3fe1748aec9 100644 --- a/settings.gradle +++ b/settings.gradle @@ -210,6 +210,9 @@ project(':extensions-parquet-compression').projectDir = file('extensions/parquet include(':extensions-parquet-table') project(':extensions-parquet-table').projectDir = file('extensions/parquet/table') +include(':extensions-parquet-benchmark') +project(':extensions-parquet-benchmark').projectDir = file('extensions/parquet/benchmark') + include(':extensions-barrage') project(':extensions-barrage').projectDir = file('extensions/barrage') From 9632106a5aaae637035e483e3cc5d3302ae86fba Mon Sep 17 00:00:00 2001 From: Andrew Baranec Date: Thu, 1 Sep 2022 00:51:43 -0400 Subject: [PATCH 2/7] Make CompressorAdapter SafeCloseable --- .../parquet/base/ParquetFileWriter.java | 2 ++ .../parquet/table/TableWriteBenchmark.java | 4 ++-- extensions/parquet/compression/build.gradle | 4 +++- .../parquet/compress/CompressorAdapter.java | 8 +++++++- .../DeephavenCompressorAdapterFactory.java | 19 ++++++++++++++++++- 5 files changed, 32 insertions(+), 5 deletions(-) diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileWriter.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileWriter.java index 03e6bbca742..3261ff594c6 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileWriter.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileWriter.java @@ -84,6 +84,8 @@ public void close() throws IOException { serializeFooter(footer, os); } // os (and thus writeChannel) are closed at this point. + + compressorAdapter.close(); } private void serializeFooter(final ParquetMetadata footer, final OutputStream os) throws IOException { diff --git a/extensions/parquet/benchmark/src/benchmark/java/io/deephaven/benchmark/parquet/table/TableWriteBenchmark.java b/extensions/parquet/benchmark/src/benchmark/java/io/deephaven/benchmark/parquet/table/TableWriteBenchmark.java index cc4893a0a35..e59d6ca6fc8 100644 --- a/extensions/parquet/benchmark/src/benchmark/java/io/deephaven/benchmark/parquet/table/TableWriteBenchmark.java +++ b/extensions/parquet/benchmark/src/benchmark/java/io/deephaven/benchmark/parquet/table/TableWriteBenchmark.java @@ -36,8 +36,8 @@ public class TableWriteBenchmark { .mapToObj(c -> String.valueOf((char) c)) .toArray(String[]::new); - //@Param({"UNCOMPRESSED", "SNAPPY", "GZIP"}) - private String compressionCodec = "SNAPPY"; + @Param({"UNCOMPRESSED", "SNAPPY", "GZIP"}) + private String compressionCodec; private Table table; private SafeCloseable exContextCloseable; diff --git a/extensions/parquet/compression/build.gradle b/extensions/parquet/compression/build.gradle index 7b4b7733fca..a3687f44bd7 100644 --- a/extensions/parquet/compression/build.gradle +++ b/extensions/parquet/compression/build.gradle @@ -4,7 +4,9 @@ plugins { } dependencies { - api project(':ParquetHadoop') + api project(':ParquetHadoop'), + project(':Util') + implementation project(':Configuration') implementation depCommonsIo diff --git a/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/CompressorAdapter.java b/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/CompressorAdapter.java index d2a558a5032..2f8f94bb10c 100644 --- a/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/CompressorAdapter.java +++ b/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/CompressorAdapter.java @@ -3,6 +3,7 @@ */ package io.deephaven.parquet.compress; +import io.deephaven.util.SafeCloseable; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.hadoop.metadata.CompressionCodecName; @@ -13,7 +14,7 @@ /** * An Intermediate adapter interface between Deephaven column writing and parquet compression. */ -public interface CompressorAdapter { +public interface CompressorAdapter extends SafeCloseable { /** * An {@link CompressorAdapter} instance that reads and writes uncompressed data directly. */ @@ -35,6 +36,11 @@ public BytesInput decompress(InputStream inputStream, int compressedSize, int un @Override public void reset() { } + + @Override + public void close() { + + } }; /** diff --git a/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/DeephavenCompressorAdapterFactory.java b/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/DeephavenCompressorAdapterFactory.java index d845955354b..a6e385c8888 100644 --- a/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/DeephavenCompressorAdapterFactory.java +++ b/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/DeephavenCompressorAdapterFactory.java @@ -63,7 +63,11 @@ public static DeephavenCompressorAdapterFactory getInstance() { public static class CodecWrappingCompressorAdapter implements CompressorAdapter { private final CompressionCodec compressionCodec; + + private boolean innerCompressorPooled; private Compressor innerCompressor; + + public boolean innerDecompressorPooled; private Decompressor innerDecompressor; private CodecWrappingCompressorAdapter(CompressionCodec compressionCodec) { @@ -75,7 +79,8 @@ private CodecWrappingCompressorAdapter(CompressionCodec compressionCodec) { public OutputStream compress(OutputStream os) throws IOException { if(innerCompressor == null) { innerCompressor = CodecPool.getCompressor(compressionCodec); - if(innerCompressor == null) { + innerCompressorPooled = innerCompressor != null; + if(!innerCompressorPooled) { // Some compressors are allowed to declare they cannot be pooled. If we fail to get one // then fall back on just creating a new one to hang on to. innerCompressor = compressionCodec.createCompressor(); @@ -103,6 +108,7 @@ public BytesInput decompress(InputStream inputStream, int compressedSize, int un throws IOException { if(innerDecompressor == null) { innerDecompressor = CodecPool.getDecompressor(compressionCodec); + innerDecompressorPooled = innerDecompressor != null; } if (innerDecompressor != null) { @@ -129,6 +135,17 @@ public void reset() { innerDecompressor.reset(); } } + + @Override + public void close() { + if(innerCompressor != null && innerCompressorPooled) { + CodecPool.returnCompressor(innerCompressor); + } + + if(innerDecompressor != null && innerDecompressorPooled) { + CodecPool.returnDecompressor(innerDecompressor); + } + } } private static Configuration configurationWithCodecClasses(List> codecClasses) { From 0b7591807b66215548db1b71cb84f388d2da2abf Mon Sep 17 00:00:00 2001 From: Andrew Baranec Date: Thu, 1 Sep 2022 01:05:50 -0400 Subject: [PATCH 3/7] Apply spotless --- .../parquet/base/ColumnChunkReaderImpl.java | 3 ++- .../parquet/base/ColumnPageReaderImpl.java | 16 +++++++++------ .../parquet/base/ParquetFileWriter.java | 3 ++- .../parquet/table/TableWriteBenchmark.java | 15 +++++++------- .../parquet/compress/CompressorAdapter.java | 2 +- .../DeephavenCompressorAdapterFactory.java | 20 +++++++++---------- 6 files changed, 33 insertions(+), 26 deletions(-) diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java index f65c072f584..f56216ebe86 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java @@ -56,7 +56,8 @@ public class ColumnChunkReaderImpl implements ColumnChunkReader { this.path = type .getColumnDescription(columnChunk.meta_data.getPath_in_schema().toArray(new String[0])); if (columnChunk.getMeta_data().isSetCodec()) { - decompressor = DeephavenCompressorAdapterFactory.getInstance().getByName(columnChunk.getMeta_data().getCodec().name()); + decompressor = DeephavenCompressorAdapterFactory.getInstance() + .getByName(columnChunk.getMeta_data().getCodec().name()); } else { decompressor = CompressorAdapter.PASSTHRU; } diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java index 40a233291a9..b58375d2a3b 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java @@ -143,7 +143,8 @@ private int readRowCountFromDataPage(ReadableByteChannel file) throws IOExceptio switch (pageHeader.type) { case DATA_PAGE: final BytesInput decompressedInput = - compressorAdapter.decompress(Channels.newInputStream(file), compressedPageSize, uncompressedPageSize); + compressorAdapter.decompress(Channels.newInputStream(file), compressedPageSize, + uncompressedPageSize); DataPageHeader dataHeaderV1 = pageHeader.getData_page_header(); return readRowCountFromPageV1(new DataPageV1( @@ -170,7 +171,8 @@ private IntBuffer readKeyFromDataPage(IntBuffer keyDest, int nullPlaceholder, switch (pageHeader.type) { case DATA_PAGE: BytesInput decompressedInput = - compressorAdapter.decompress(Channels.newInputStream(file), compressedPageSize, uncompressedPageSize); + compressorAdapter.decompress(Channels.newInputStream(file), compressedPageSize, + uncompressedPageSize); DataPageHeader dataHeaderV1 = pageHeader.getData_page_header(); return readKeysFromPageV1(new DataPageV1( @@ -190,9 +192,10 @@ private IntBuffer readKeyFromDataPage(IntBuffer keyDest, int nullPlaceholder, Helpers.readBytes(file, dataHeaderV2.getRepetition_levels_byte_length()); BytesInput definitionLevels = Helpers.readBytes(file, dataHeaderV2.getDefinition_levels_byte_length()); - BytesInput data = compressorAdapter.decompress(Channels.newInputStream(file), dataSize, uncompressedPageSize - - dataHeaderV2.getRepetition_levels_byte_length() - - dataHeaderV2.getDefinition_levels_byte_length()); + BytesInput data = compressorAdapter.decompress(Channels.newInputStream(file), dataSize, + uncompressedPageSize + - dataHeaderV2.getRepetition_levels_byte_length() + - dataHeaderV2.getDefinition_levels_byte_length()); readKeysFromPageV2(new DataPageV2( dataHeaderV2.getNum_rows(), @@ -218,7 +221,8 @@ private Object readDataPage(Object nullValue, SeekableByteChannel file) throws I switch (pageHeader.type) { case DATA_PAGE: BytesInput decompressedInput = - compressorAdapter.decompress(Channels.newInputStream(file), compressedPageSize, uncompressedPageSize); + compressorAdapter.decompress(Channels.newInputStream(file), compressedPageSize, + uncompressedPageSize); DataPageHeader dataHeaderV1 = pageHeader.getData_page_header(); return readPageV1(new DataPageV1( diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileWriter.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileWriter.java index 3261ff594c6..3315aad64f6 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileWriter.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileWriter.java @@ -68,7 +68,8 @@ RowGroupWriter addRowGroup(final String path, final boolean append) throws IOExc } public RowGroupWriter addRowGroup(final long size) { - RowGroupWriterImpl rowGroupWriter = new RowGroupWriterImpl(writeChannel, type, pageSize, allocator, compressorAdapter); + RowGroupWriterImpl rowGroupWriter = + new RowGroupWriterImpl(writeChannel, type, pageSize, allocator, compressorAdapter); rowGroupWriter.getBlock().setRowCount(size); blocks.add(rowGroupWriter.getBlock()); offsetIndexes.add(rowGroupWriter.offsetIndexes()); diff --git a/extensions/parquet/benchmark/src/benchmark/java/io/deephaven/benchmark/parquet/table/TableWriteBenchmark.java b/extensions/parquet/benchmark/src/benchmark/java/io/deephaven/benchmark/parquet/table/TableWriteBenchmark.java index e59d6ca6fc8..419efebb1cf 100644 --- a/extensions/parquet/benchmark/src/benchmark/java/io/deephaven/benchmark/parquet/table/TableWriteBenchmark.java +++ b/extensions/parquet/benchmark/src/benchmark/java/io/deephaven/benchmark/parquet/table/TableWriteBenchmark.java @@ -45,11 +45,13 @@ public class TableWriteBenchmark { @Setup(Level.Trial) public void setupEnv() throws IOException { - rootPath = Files.createTempDirectory(Paths.get(Configuration.getInstance().getWorkspacePath()), "TableWriteBenchmark"); + rootPath = Files.createTempDirectory(Paths.get(Configuration.getInstance().getWorkspacePath()), + "TableWriteBenchmark"); final ExecutionContext context = ExecutionContext.newBuilder() .newQueryLibrary() .newQueryScope() - .setCompilerContext(CompilerTools.newContext(rootPath.resolve("cache").toFile(), getClass().getClassLoader())) + .setCompilerContext( + CompilerTools.newContext(rootPath.resolve("cache").toFile(), getClass().getClassLoader())) .build(); exContextCloseable = context.open(); @@ -60,8 +62,8 @@ public void setupEnv() throws IOException { QueryLibrary library = context.getQueryLibrary(); library.importClass(TestChunkedRegionedOperations.SimpleExternalizable.class); // Serializable is already very inefficient, however SNAPPY also has an O(n^2) block write pattern and compounds - // the terribleness. For now, we will exclude the serializable from the benchmark - //library.importClass(TestChunkedRegionedOperations.SimpleSerializable.class); + // the terribleness. For now, we will exclude the serializable from the benchmark + // library.importClass(TestChunkedRegionedOperations.SimpleSerializable.class); library.importClass(BigInteger.class); table = TableTools.emptyTable(1_000_000).updateView( @@ -76,11 +78,10 @@ public void setupEnv() throws IOException { "Sym = ii % 64 == 0 ? null : Long.toString(ii % 1000)", "Str = ii % 128 == 0 ? null : Long.toString(ii)", "DT = ii % 256 == 0 ? null : new DateTime(nowNanos + ii)", -// "Ser = ii % 1024 == 0 ? null : new SimpleSerializable(ii)", + // "Ser = ii % 1024 == 0 ? null : new SimpleSerializable(ii)", "Ext = ii % 1024 == 0 ? null : new SimpleExternalizable(ii)", "Fix = ii % 64 == 0 ? null : new BigInteger(Long.toString(ii % 1000), 10)", - "Var = Str == null ? null : new BigInteger(Str, 10)" - ); + "Var = Str == null ? null : new BigInteger(Str, 10)"); } @TearDown(Level.Trial) diff --git a/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/CompressorAdapter.java b/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/CompressorAdapter.java index 2f8f94bb10c..cc4633ae96e 100644 --- a/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/CompressorAdapter.java +++ b/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/CompressorAdapter.java @@ -35,7 +35,7 @@ public BytesInput decompress(InputStream inputStream, int compressedSize, int un } @Override - public void reset() { } + public void reset() {} @Override public void close() { diff --git a/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/DeephavenCompressorAdapterFactory.java b/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/DeephavenCompressorAdapterFactory.java index a6e385c8888..4e9918a888b 100644 --- a/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/DeephavenCompressorAdapterFactory.java +++ b/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/DeephavenCompressorAdapterFactory.java @@ -77,16 +77,16 @@ private CodecWrappingCompressorAdapter(CompressionCodec compressionCodec) { @Override public OutputStream compress(OutputStream os) throws IOException { - if(innerCompressor == null) { + if (innerCompressor == null) { innerCompressor = CodecPool.getCompressor(compressionCodec); innerCompressorPooled = innerCompressor != null; - if(!innerCompressorPooled) { - // Some compressors are allowed to declare they cannot be pooled. If we fail to get one + if (!innerCompressorPooled) { + // Some compressors are allowed to declare they cannot be pooled. If we fail to get one // then fall back on just creating a new one to hang on to. innerCompressor = compressionCodec.createCompressor(); } - if(innerCompressor == null) { + if (innerCompressor == null) { return compressionCodec.createOutputStream(os); } @@ -106,7 +106,7 @@ public CompressionCodecName getCodecName() { @Override public BytesInput decompress(InputStream inputStream, int compressedSize, int uncompressedSize) throws IOException { - if(innerDecompressor == null) { + if (innerDecompressor == null) { innerDecompressor = CodecPool.getDecompressor(compressionCodec); innerDecompressorPooled = innerDecompressor != null; } @@ -127,22 +127,22 @@ public BytesInput decompress(InputStream inputStream, int compressedSize, int un @Override public void reset() { - if(innerCompressor != null) { + if (innerCompressor != null) { innerCompressor.reset(); } - if(innerDecompressor != null) { + if (innerDecompressor != null) { innerDecompressor.reset(); } } @Override public void close() { - if(innerCompressor != null && innerCompressorPooled) { + if (innerCompressor != null && innerCompressorPooled) { CodecPool.returnCompressor(innerCompressor); } - if(innerDecompressor != null && innerDecompressorPooled) { + if (innerDecompressor != null && innerDecompressorPooled) { CodecPool.returnDecompressor(innerDecompressor); } } @@ -172,7 +172,7 @@ public DeephavenCompressorAdapterFactory(Configuration configuration) { * @return a compressor instance with a name matching the given codec. */ public CompressorAdapter getByName(String codecName) { - if(codecName.equalsIgnoreCase("UNCOMPRESSED")) { + if (codecName.equalsIgnoreCase("UNCOMPRESSED")) { return CompressorAdapter.PASSTHRU; } From ac7b130f1c9e627a891fd8452432c8e5a2030cb8 Mon Sep 17 00:00:00 2001 From: Andrew Baranec Date: Sat, 3 Sep 2022 11:01:19 -0400 Subject: [PATCH 4/7] spotless --- .../main/java/io/deephaven/parquet/base/ParquetFileWriter.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileWriter.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileWriter.java index c293a80a7b4..e7bf963300f 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileWriter.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileWriter.java @@ -62,7 +62,8 @@ public ParquetFileWriter( @SuppressWarnings("unused") RowGroupWriter addRowGroup(final String path, final boolean append) throws IOException { RowGroupWriterImpl rowGroupWriter = - new RowGroupWriterImpl(path, append, channelsProvider, type, targetPageSize, allocator, compressorAdapter); + new RowGroupWriterImpl(path, append, channelsProvider, type, targetPageSize, allocator, + compressorAdapter); blocks.add(rowGroupWriter.getBlock()); return rowGroupWriter; } From ec2f9177cee762db6456918c9705fffb053bf73d Mon Sep 17 00:00:00 2001 From: Andrew Baranec Date: Thu, 8 Sep 2022 09:48:21 -0400 Subject: [PATCH 5/7] Fix CR stuff --- .../parquet/compress/DeephavenCompressorAdapterFactory.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/DeephavenCompressorAdapterFactory.java b/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/DeephavenCompressorAdapterFactory.java index 4e9918a888b..cabb11a4ee8 100644 --- a/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/DeephavenCompressorAdapterFactory.java +++ b/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/DeephavenCompressorAdapterFactory.java @@ -67,12 +67,11 @@ public static class CodecWrappingCompressorAdapter implements CompressorAdapter private boolean innerCompressorPooled; private Compressor innerCompressor; - public boolean innerDecompressorPooled; + private boolean innerDecompressorPooled; private Decompressor innerDecompressor; private CodecWrappingCompressorAdapter(CompressionCodec compressionCodec) { this.compressionCodec = compressionCodec; - this.innerCompressor = compressionCodec.createCompressor(); } @Override From a6df9914e773a9e76cf642859dc13577999bb57f Mon Sep 17 00:00:00 2001 From: Andrew Baranec Date: Thu, 8 Sep 2022 13:12:16 -0400 Subject: [PATCH 6/7] Fix from merge --- .../benchmark/parquet/table/TableWriteBenchmark.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/extensions/parquet/benchmark/src/benchmark/java/io/deephaven/benchmark/parquet/table/TableWriteBenchmark.java b/extensions/parquet/benchmark/src/benchmark/java/io/deephaven/benchmark/parquet/table/TableWriteBenchmark.java index 419efebb1cf..d3fed59474c 100644 --- a/extensions/parquet/benchmark/src/benchmark/java/io/deephaven/benchmark/parquet/table/TableWriteBenchmark.java +++ b/extensions/parquet/benchmark/src/benchmark/java/io/deephaven/benchmark/parquet/table/TableWriteBenchmark.java @@ -2,8 +2,8 @@ import io.deephaven.base.FileUtils; import io.deephaven.configuration.Configuration; -import io.deephaven.engine.context.CompilerTools; import io.deephaven.engine.context.ExecutionContext; +import io.deephaven.engine.context.QueryCompiler; import io.deephaven.engine.context.QueryLibrary; import io.deephaven.engine.context.QueryScope; import io.deephaven.engine.table.Table; @@ -50,8 +50,8 @@ public void setupEnv() throws IOException { final ExecutionContext context = ExecutionContext.newBuilder() .newQueryLibrary() .newQueryScope() - .setCompilerContext( - CompilerTools.newContext(rootPath.resolve("cache").toFile(), getClass().getClassLoader())) + .setQueryCompiler( + QueryCompiler.create(rootPath.resolve("cache").toFile(), getClass().getClassLoader())) .build(); exContextCloseable = context.open(); From ce244fa4d5158358114d40bc0b3fd8f016849e1a Mon Sep 17 00:00:00 2001 From: Andrew Baranec Date: Fri, 9 Sep 2022 13:07:46 -0400 Subject: [PATCH 7/7] Don't try to cache Decompressor. There is nowhere to close it easily --- .../DeephavenCompressorAdapterFactory.java | 39 ++++++++----------- 1 file changed, 16 insertions(+), 23 deletions(-) diff --git a/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/DeephavenCompressorAdapterFactory.java b/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/DeephavenCompressorAdapterFactory.java index cabb11a4ee8..c1439e7cd00 100644 --- a/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/DeephavenCompressorAdapterFactory.java +++ b/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/DeephavenCompressorAdapterFactory.java @@ -67,9 +67,6 @@ public static class CodecWrappingCompressorAdapter implements CompressorAdapter private boolean innerCompressorPooled; private Compressor innerCompressor; - private boolean innerDecompressorPooled; - private Decompressor innerDecompressor; - private CodecWrappingCompressorAdapter(CompressionCodec compressionCodec) { this.compressionCodec = compressionCodec; } @@ -105,23 +102,27 @@ public CompressionCodecName getCodecName() { @Override public BytesInput decompress(InputStream inputStream, int compressedSize, int uncompressedSize) throws IOException { - if (innerDecompressor == null) { - innerDecompressor = CodecPool.getDecompressor(compressionCodec); - innerDecompressorPooled = innerDecompressor != null; - } - - if (innerDecompressor != null) { + final Decompressor decompressor = CodecPool.getDecompressor(compressionCodec); + if (decompressor != null) { // It is permitted for a decompressor to be null, otherwise we want to reset() it to // be ready for a new stream. // Note that this strictly shouldn't be necessary, since returnDecompressor will reset // it as well, but this is the pattern copied from CodecFactory.decompress. - innerDecompressor.reset(); + decompressor.reset(); + } + + try { + // Note that we don't close this, we assume the caller will close their input stream when ready, + // and this won't need to be closed. + InputStream buffered = ByteStreams.limit(IOUtils.buffer(inputStream), compressedSize); + CompressionInputStream decompressed = compressionCodec.createInputStream(buffered, decompressor); + return BytesInput.copy(BytesInput.from(decompressed, uncompressedSize)); + } finally { + // Always return it, the pool will decide if it should be reused or not. + // CodecFactory has no logic around only returning after successful streams, + // and the instance appears to leak otherwise. + CodecPool.returnDecompressor(decompressor); } - // Note that we don't close this, we assume the caller will close their input stream when ready, - // and this won't need to be closed. - InputStream buffered = ByteStreams.limit(IOUtils.buffer(inputStream), compressedSize); - CompressionInputStream decompressed = compressionCodec.createInputStream(buffered, innerDecompressor); - return BytesInput.copy(BytesInput.from(decompressed, uncompressedSize)); } @Override @@ -129,10 +130,6 @@ public void reset() { if (innerCompressor != null) { innerCompressor.reset(); } - - if (innerDecompressor != null) { - innerDecompressor.reset(); - } } @Override @@ -140,10 +137,6 @@ public void close() { if (innerCompressor != null && innerCompressorPooled) { CodecPool.returnCompressor(innerCompressor); } - - if (innerDecompressor != null && innerDecompressorPooled) { - CodecPool.returnDecompressor(innerDecompressor); - } } }