Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try to Pool / re use compressors where possible. Fixes #2788 #2792

Merged
merged 10 commits into from
Sep 28, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

import io.deephaven.UncheckedDeephavenException;
import io.deephaven.parquet.base.util.SeekableChannelsProvider;
import io.deephaven.parquet.compress.Compressor;
import io.deephaven.parquet.compress.DeephavenCodecFactory;
import io.deephaven.parquet.compress.CompressorAdapter;
import io.deephaven.parquet.compress.DeephavenCompressorAdapterFactory;
import io.deephaven.util.datastructures.LazyCachingSupplier;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.column.ColumnDescriptor;
Expand Down Expand Up @@ -38,7 +38,7 @@ public class ColumnChunkReaderImpl implements ColumnChunkReader {
private final ColumnChunk columnChunk;
private final SeekableChannelsProvider channelsProvider;
private final Path rootPath;
private final Compressor decompressor;
private final CompressorAdapter decompressor;
abaranec marked this conversation as resolved.
Show resolved Hide resolved
private final ColumnDescriptor path;
private final OffsetIndex offsetIndex;
private final List<Type> fieldTypes;
Expand All @@ -56,9 +56,10 @@ public class ColumnChunkReaderImpl implements ColumnChunkReader {
this.path = type
.getColumnDescription(columnChunk.meta_data.getPath_in_schema().toArray(new String[0]));
if (columnChunk.getMeta_data().isSetCodec()) {
decompressor = DeephavenCodecFactory.getInstance().getByName(columnChunk.getMeta_data().getCodec().name());
decompressor = DeephavenCompressorAdapterFactory.getInstance()
.getByName(columnChunk.getMeta_data().getCodec().name());
} else {
decompressor = Compressor.PASSTHRU;
decompressor = CompressorAdapter.PASSTHRU;
}
this.offsetIndex = offsetIndex;
this.fieldTypes = fieldTypes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import io.deephaven.parquet.base.util.Helpers;
import io.deephaven.parquet.base.util.RunLengthBitPackingHybridBufferDecoder;
import io.deephaven.parquet.base.util.SeekableChannelsProvider;
import io.deephaven.parquet.compress.Compressor;
import io.deephaven.parquet.compress.CompressorAdapter;
import org.apache.commons.io.IOUtils;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.BytesInput;
Expand Down Expand Up @@ -48,7 +48,7 @@ public class ColumnPageReaderImpl implements ColumnPageReader {
public static final int NULL_OFFSET = -1;

private final SeekableChannelsProvider channelsProvider;
private final Compressor compressor;
private final CompressorAdapter compressorAdapter;
private final Supplier<Dictionary> dictionarySupplier;
private final PageMaterializer.Factory pageMaterializerFactory;
private final ColumnDescriptor path;
Expand All @@ -61,7 +61,7 @@ public class ColumnPageReaderImpl implements ColumnPageReader {
private int rowCount = -1;

ColumnPageReaderImpl(SeekableChannelsProvider channelsProvider,
Compressor compressor,
CompressorAdapter compressorAdapter,
Supplier<Dictionary> dictionarySupplier,
PageMaterializer.Factory materializerFactory,
ColumnDescriptor path,
Expand All @@ -71,7 +71,7 @@ public class ColumnPageReaderImpl implements ColumnPageReader {
PageHeader pageHeader,
int numValues) {
this.channelsProvider = channelsProvider;
this.compressor = compressor;
this.compressorAdapter = compressorAdapter;
this.dictionarySupplier = dictionarySupplier;
this.pageMaterializerFactory = materializerFactory;
this.path = path;
Expand Down Expand Up @@ -143,7 +143,8 @@ private int readRowCountFromDataPage(ReadableByteChannel file) throws IOExceptio
switch (pageHeader.type) {
case DATA_PAGE:
final BytesInput decompressedInput =
compressor.decompress(Channels.newInputStream(file), compressedPageSize, uncompressedPageSize);
compressorAdapter.decompress(Channels.newInputStream(file), compressedPageSize,
uncompressedPageSize);

DataPageHeader dataHeaderV1 = pageHeader.getData_page_header();
return readRowCountFromPageV1(new DataPageV1(
Expand All @@ -170,7 +171,8 @@ private IntBuffer readKeyFromDataPage(IntBuffer keyDest, int nullPlaceholder,
switch (pageHeader.type) {
case DATA_PAGE:
BytesInput decompressedInput =
compressor.decompress(Channels.newInputStream(file), compressedPageSize, uncompressedPageSize);
compressorAdapter.decompress(Channels.newInputStream(file), compressedPageSize,
uncompressedPageSize);

DataPageHeader dataHeaderV1 = pageHeader.getData_page_header();
return readKeysFromPageV1(new DataPageV1(
Expand All @@ -190,9 +192,10 @@ private IntBuffer readKeyFromDataPage(IntBuffer keyDest, int nullPlaceholder,
Helpers.readBytes(file, dataHeaderV2.getRepetition_levels_byte_length());
BytesInput definitionLevels =
Helpers.readBytes(file, dataHeaderV2.getDefinition_levels_byte_length());
BytesInput data = compressor.decompress(Channels.newInputStream(file), dataSize, uncompressedPageSize
- dataHeaderV2.getRepetition_levels_byte_length()
- dataHeaderV2.getDefinition_levels_byte_length());
BytesInput data = compressorAdapter.decompress(Channels.newInputStream(file), dataSize,
uncompressedPageSize
- dataHeaderV2.getRepetition_levels_byte_length()
- dataHeaderV2.getDefinition_levels_byte_length());

readKeysFromPageV2(new DataPageV2(
dataHeaderV2.getNum_rows(),
Expand All @@ -218,7 +221,8 @@ private Object readDataPage(Object nullValue, SeekableByteChannel file) throws I
switch (pageHeader.type) {
case DATA_PAGE:
BytesInput decompressedInput =
compressor.decompress(Channels.newInputStream(file), compressedPageSize, uncompressedPageSize);
compressorAdapter.decompress(Channels.newInputStream(file), compressedPageSize,
uncompressedPageSize);

DataPageHeader dataHeaderV1 = pageHeader.getData_page_header();
return readPageV1(new DataPageV1(
Expand All @@ -235,7 +239,7 @@ private Object readDataPage(Object nullValue, SeekableByteChannel file) throws I
- dataHeaderV2.getDefinition_levels_byte_length();
BytesInput repetitionLevels = Helpers.readBytes(file, dataHeaderV2.getRepetition_levels_byte_length());
BytesInput definitionLevels = Helpers.readBytes(file, dataHeaderV2.getDefinition_levels_byte_length());
BytesInput data = compressor.decompress(Channels.newInputStream(file), dataSize,
BytesInput data = compressorAdapter.decompress(Channels.newInputStream(file), dataSize,
pageHeader.getUncompressed_page_size()
- dataHeaderV2.getRepetition_levels_byte_length()
- dataHeaderV2.getDefinition_levels_byte_length());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
package io.deephaven.parquet.base;

import io.deephaven.parquet.base.tempfix.ParquetMetadataConverter;
import io.deephaven.parquet.compress.Compressor;
import io.deephaven.parquet.compress.CompressorAdapter;
import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.column.ColumnDescriptor;
Expand Down Expand Up @@ -40,7 +40,7 @@ public class ColumnWriterImpl implements ColumnWriter {
private final SeekableByteChannel writeChannel;
private final ColumnDescriptor column;
private final RowGroupWriterImpl owner;
private final Compressor compressor;
private final CompressorAdapter compressorAdapter;
private boolean hasDictionary;
private int pageCount = 0;
private static final ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter();
Expand All @@ -66,12 +66,12 @@ public class ColumnWriterImpl implements ColumnWriter {
final RowGroupWriterImpl owner,
final SeekableByteChannel writeChannel,
final ColumnDescriptor column,
final Compressor compressor,
final CompressorAdapter compressorAdapter,
final int targetPageSize,
final ByteBufferAllocator allocator) {
this.writeChannel = writeChannel;
this.column = column;
this.compressor = compressor;
this.compressorAdapter = compressorAdapter;
this.targetPageSize = targetPageSize;
this.allocator = allocator;
dlEncoder = column.getMaxDefinitionLevel() == 0 ? null
Expand Down Expand Up @@ -136,8 +136,9 @@ public void writeDictionaryPage(final ByteBuffer dictionaryBuffer, final int val
long currentChunkDictionaryPageOffset = writeChannel.position();
int uncompressedSize = dictionaryBuffer.remaining();

compressorAdapter.reset();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (WritableByteChannel channel = Channels.newChannel(compressor.compress(baos))) {
try (WritableByteChannel channel = Channels.newChannel(compressorAdapter.compress(baos))) {
channel.write(dictionaryBuffer);
}
BytesInput compressedBytes = BytesInput.from(baos);
Expand Down Expand Up @@ -264,7 +265,7 @@ public void writePageV2(
int uncompressedSize = (int) (uncompressedDataSize + repetitionLevels.size() + definitionLevels.size());

ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (WritableByteChannel channel = Channels.newChannel(compressor.compress(baos))) {
try (WritableByteChannel channel = Channels.newChannel(compressorAdapter.compress(baos))) {
channel.write(data);
}
BytesInput compressedData = BytesInput.from(baos);
Expand Down Expand Up @@ -303,8 +304,11 @@ private void writePage(final BytesInput bytes, final int valueCount, final Encod
"Cannot write page larger than Integer.MAX_VALUE bytes: " +
uncompressedSize);
}

compressorAdapter.reset();

ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (OutputStream cos = compressor.compress(baos)) {
try (OutputStream cos = compressorAdapter.compress(baos)) {
bytes.writeAllTo(cos);
}
BytesInput compressedBytes = BytesInput.from(baos);
Expand Down Expand Up @@ -395,7 +399,7 @@ public void close() {
owner.releaseWriter(this,
ColumnChunkMetaData.get(ColumnPath.get(column.getPath()),
column.getPrimitiveType(),
compressor.getCodecName(),
compressorAdapter.getCodecName(),
encodingStatsBuilder.build(),
encodings,
Statistics.createStats(column.getPrimitiveType()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

import io.deephaven.parquet.base.tempfix.ParquetMetadataConverter;
import io.deephaven.parquet.base.util.SeekableChannelsProvider;
import io.deephaven.parquet.compress.Compressor;
import io.deephaven.parquet.compress.DeephavenCodecFactory;
import io.deephaven.parquet.compress.CompressorAdapter;
import io.deephaven.parquet.compress.DeephavenCompressorAdapterFactory;
import org.apache.parquet.Version;
import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.bytes.BytesUtils;
Expand Down Expand Up @@ -38,7 +38,7 @@ public class ParquetFileWriter {
private final int targetPageSize;
private final ByteBufferAllocator allocator;
private final SeekableChannelsProvider channelsProvider;
private final Compressor compressor;
private final CompressorAdapter compressorAdapter;
private final Map<String, String> extraMetaData;
private final List<BlockMetaData> blocks = new ArrayList<>();
private final List<List<OffsetIndex>> offsetIndexes = new ArrayList<>();
Expand All @@ -58,20 +58,21 @@ public ParquetFileWriter(
writeChannel.write(ByteBuffer.wrap(ParquetFileReader.MAGIC));
this.type = type;
this.channelsProvider = channelsProvider;
this.compressor = DeephavenCodecFactory.getInstance().getByName(codecName);
this.compressorAdapter = DeephavenCompressorAdapterFactory.getInstance().getByName(codecName);
}

@SuppressWarnings("unused")
RowGroupWriter addRowGroup(final String path, final boolean append) throws IOException {
RowGroupWriterImpl rowGroupWriter =
new RowGroupWriterImpl(path, append, channelsProvider, type, targetPageSize, allocator, compressor);
new RowGroupWriterImpl(path, append, channelsProvider, type, targetPageSize, allocator,
compressorAdapter);
blocks.add(rowGroupWriter.getBlock());
return rowGroupWriter;
}

public RowGroupWriter addRowGroup(final long size) {
RowGroupWriterImpl rowGroupWriter =
new RowGroupWriterImpl(writeChannel, type, targetPageSize, allocator, compressor);
new RowGroupWriterImpl(writeChannel, type, targetPageSize, allocator, compressorAdapter);
rowGroupWriter.getBlock().setRowCount(size);
blocks.add(rowGroupWriter.getBlock());
offsetIndexes.add(rowGroupWriter.offsetIndexes());
Expand All @@ -86,6 +87,8 @@ public void close() throws IOException {
serializeFooter(footer, os);
}
// os (and thus writeChannel) are closed at this point.

compressorAdapter.close();
}

private void serializeFooter(final ParquetMetadata footer, final OutputStream os) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
package io.deephaven.parquet.base;

import io.deephaven.parquet.base.util.SeekableChannelsProvider;
import io.deephaven.parquet.compress.Compressor;
import io.deephaven.parquet.compress.CompressorAdapter;
import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
Expand All @@ -27,18 +27,18 @@ public class RowGroupWriterImpl implements RowGroupWriter {
private ColumnWriterImpl activeWriter;
private final BlockMetaData blockMetaData;
private final List<OffsetIndex> currentOffsetIndexes = new ArrayList<>();
private final Compressor compressor;
private final CompressorAdapter compressorAdapter;

RowGroupWriterImpl(String path,
boolean append,
SeekableChannelsProvider channelsProvider,
MessageType type,
int targetPageSize,
ByteBufferAllocator allocator,
Compressor compressor)
CompressorAdapter compressorAdapter)
throws IOException {
this(channelsProvider.getWriteChannel(path, append), type, targetPageSize, allocator, blockWithPath(path),
compressor);
compressorAdapter);
}

private static BlockMetaData blockWithPath(String path) {
Expand All @@ -51,8 +51,8 @@ private static BlockMetaData blockWithPath(String path) {
MessageType type,
int targetPageSize,
ByteBufferAllocator allocator,
Compressor compressor) {
this(writeChannel, type, targetPageSize, allocator, new BlockMetaData(), compressor);
CompressorAdapter compressorAdapter) {
this(writeChannel, type, targetPageSize, allocator, new BlockMetaData(), compressorAdapter);
}


Expand All @@ -61,13 +61,13 @@ private RowGroupWriterImpl(SeekableByteChannel writeChannel,
int targetPageSize,
ByteBufferAllocator allocator,
BlockMetaData blockMetaData,
Compressor compressor) {
CompressorAdapter compressorAdapter) {
this.writeChannel = writeChannel;
this.type = type;
this.targetPageSize = targetPageSize;
this.allocator = allocator;
this.blockMetaData = blockMetaData;
this.compressor = compressor;
this.compressorAdapter = compressorAdapter;
}

String[] getPrimitivePath(String columnName) {
Expand Down Expand Up @@ -95,7 +95,7 @@ public ColumnWriter addColumn(String columnName) {
activeWriter = new ColumnWriterImpl(this,
writeChannel,
type.getColumnDescription(getPrimitivePath(columnName)),
compressor,
compressorAdapter,
targetPageSize,
allocator);
return activeWriter;
Expand Down
Loading