Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Further improvements to parquet page materializaition #5670

Merged
merged 7 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions Util/src/main/java/io/deephaven/util/codec/MapCodec.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,10 @@ private ByteBuffer encodeIntoBuffer(final ByteBuffer scratch, @NotNull Map<K, V>

@Nullable
@Override
public Map<K, V> decode(@NotNull final byte[] input, final int offset, final int length) {
if (input.length == 0) {
public Map<K, V> decode(@NotNull final ByteBuffer byteBuffer) {
if (byteBuffer.remaining() == 0) {
return null;
}
final ByteBuffer byteBuffer = ByteBuffer.wrap(input);
final int size = byteBuffer.getInt();
if (size == 0) {
return Collections.emptyMap();
Expand All @@ -129,13 +128,22 @@ public Map<K, V> decode(@NotNull final byte[] input, final int offset, final int
final V value = decodeValue(byteBuffer);
return Collections.singletonMap(key, value);
}
final LinkedHashMap<K, V> result = new LinkedHashMap<>(size);
final Map<K, V> result = new LinkedHashMap<>(size);
for (int ii = 0; ii < size; ++ii) {
result.put(decodeKey(byteBuffer), decodeValue(byteBuffer));
}
return Collections.unmodifiableMap(result);
}

@Nullable
@Override
public Map<K, V> decode(@NotNull final byte[] input, final int offset, final int length) {
if (input.length == 0) {
return null;
}
return decode(ByteBuffer.wrap(input, offset, length));
}

/**
* Estimate the size of the encoded map.
*
Expand Down
22 changes: 21 additions & 1 deletion Util/src/main/java/io/deephaven/util/codec/ObjectDecoder.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.nio.ByteBuffer;

/**
* <p>
* Codec superinterface for Object translation from byte arrays for serialization and deserialization.
Expand All @@ -30,12 +32,30 @@ public interface ObjectDecoder<TYPE> {
*
* @param input The input byte array containing bytes to decode
* @param offset The offset into the byte array to start decoding from
* @param length The length of the byte array to decode from, starting at the offset
* @param length The number of bytes to decode, starting at the offset
* @return The output object, possibly null
*/
@Nullable
TYPE decode(@NotNull byte[] input, int offset, int length);

/**
* Decode an object from a ByteBuffer. The position of the input buffer may or may not be modified by this method.
*
* @param buffer The input ByteBuffer containing bytes to decode
* @return The output object, possibly null
*/
@Nullable
default TYPE decode(@NotNull final ByteBuffer buffer) {
if (buffer.hasArray()) {
return decode(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
} else {
// Make a copy of the buffer's contents
final byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
return decode(bytes, 0, bytes.length);
}
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* What width byte array does this ObjectCodec expect to encode and decode?
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ public byte[] encode(@Nullable final byte[] input) {
if (input == null) {
throw new IllegalArgumentException(SimpleByteArrayCodec.class.getSimpleName() + " cannot encode nulls");
}
return input;
final byte[] output = new byte[input.length];
System.arraycopy(input, 0, output, 0, input.length);
return output;
}

@Override
Expand All @@ -73,9 +75,6 @@ public byte[] decode(@NotNull final byte[] input, final int offset, final int le
if (input.length == 0) {
return CollectionUtil.ZERO_LENGTH_BYTE_ARRAY;
}
if (offset == 0 && length == input.length) {
return input;
}
final byte[] output = new byte[length];
System.arraycopy(input, offset, output, 0, length);
return output;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,10 @@ interface ColumnPageReaderIterator {
}

/**
* @param pageMaterializerFactory The factory to use for constructing page materializers.
* @return An iterator over individual parquet pages.
*/
ColumnPageReaderIterator getPageIterator() throws IOException;
ColumnPageReaderIterator getPageIterator(PageMaterializerFactory pageMaterializerFactory) throws IOException;

interface ColumnPageDirectAccessor {
/**
Expand All @@ -86,9 +87,10 @@ interface ColumnPageDirectAccessor {
}

/**
* @param pageMaterializerFactory The factory to use for constructing page materializers.
* @return An accessor for individual parquet pages which uses the provided offset index.
*/
ColumnPageDirectAccessor getPageAccessor(OffsetIndex offsetIndex);
ColumnPageDirectAccessor getPageAccessor(OffsetIndex offsetIndex, PageMaterializerFactory pageMaterializerFactory);

/**
* @return Whether this column chunk uses a dictionary-based encoding on every page.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ final class ColumnChunkReaderImpl implements ColumnChunkReader {
private final OffsetIndexReader offsetIndexReader;
private final List<Type> fieldTypes;
private final Function<SeekableChannelContext, Dictionary> dictionarySupplier;
private final PageMaterializerFactory pageMaterializerFactory;
private final URI columnChunkURI;
/**
* Number of rows in the row group of this column chunk.
Expand Down Expand Up @@ -81,7 +80,6 @@ final class ColumnChunkReaderImpl implements ColumnChunkReader {
}
this.fieldTypes = fieldTypes;
this.dictionarySupplier = new SoftCachingFunction<>(this::getDictionary);
this.pageMaterializerFactory = PageMaterializer.factoryForType(path.getPrimitiveType());
this.numRows = numRows;
this.version = version;
if (columnChunk.isSetFile_path() && FILE_URI_SCHEME.equals(rootURI.getScheme())) {
Expand Down Expand Up @@ -130,16 +128,18 @@ public OffsetIndex getOffsetIndex(final SeekableChannelContext context) {
}

@Override
public ColumnPageReaderIterator getPageIterator() {
return new ColumnPageReaderIteratorImpl();
public ColumnPageReaderIterator getPageIterator(final PageMaterializerFactory pageMaterializerFactory) {
return new ColumnPageReaderIteratorImpl(pageMaterializerFactory);
}

@Override
public ColumnPageDirectAccessor getPageAccessor(final OffsetIndex offsetIndex) {
public ColumnPageDirectAccessor getPageAccessor(
final OffsetIndex offsetIndex,
final PageMaterializerFactory pageMaterializerFactory) {
if (offsetIndex == null) {
throw new UnsupportedOperationException("Cannot use direct accessor without offset index");
}
return new ColumnPageDirectAccessorImpl(offsetIndex);
return new ColumnPageDirectAccessorImpl(offsetIndex, pageMaterializerFactory);
}

@Override
Expand Down Expand Up @@ -247,10 +247,12 @@ private Dictionary readDictionary(long dictionaryPageOffset, SeekableChannelCont
private final class ColumnPageReaderIteratorImpl implements ColumnPageReaderIterator {
private long nextHeaderOffset;
private long remainingValues;
PageMaterializerFactory pageMaterializerFactory;

ColumnPageReaderIteratorImpl() {
ColumnPageReaderIteratorImpl(final PageMaterializerFactory pageMaterializerFactory) {
this.remainingValues = columnChunk.meta_data.getNum_values();
this.nextHeaderOffset = columnChunk.meta_data.getData_page_offset();
this.pageMaterializerFactory = pageMaterializerFactory;
}

@Override
Expand Down Expand Up @@ -335,9 +337,12 @@ private static int getNumValues(PageHeader pageHeader) {
private final class ColumnPageDirectAccessorImpl implements ColumnPageDirectAccessor {

private final OffsetIndex offsetIndex;
private final PageMaterializerFactory pageMaterializerFactory;

ColumnPageDirectAccessorImpl(final OffsetIndex offsetIndex) {
ColumnPageDirectAccessorImpl(@NotNull final OffsetIndex offsetIndex,
@NotNull final PageMaterializerFactory pageMaterializerFactory) {
this.offsetIndex = offsetIndex;
this.pageMaterializerFactory = pageMaterializerFactory;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ private IntBuffer readKeysFromPageCommon(
final RunLengthBitPackingHybridBufferDecoder rlDecoder,
final RunLengthBitPackingHybridBufferDecoder dlDecoder,
final ValuesReader dataReader) throws IOException {
final Object result = materialize(IntMaterializer.Factory, dlDecoder, rlDecoder, dataReader, nullPlaceholder);
final Object result = materialize(IntMaterializer.FACTORY, dlDecoder, rlDecoder, dataReader, nullPlaceholder);
if (result instanceof DataWithOffsets) {
keyDest.put((int[]) ((DataWithOffsets) result).materializeResult);
return ((DataWithOffsets) result).offsets;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,148 +3,8 @@
//
package io.deephaven.parquet.base;

import io.deephaven.parquet.base.materializers.*;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.PrimitiveType;
import org.jetbrains.annotations.NotNull;

import java.math.BigDecimal;
import java.math.BigInteger;

import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;

public interface PageMaterializer {

/**
* Get the internal type used by Deephaven to represent a Parquet
* {@link LogicalTypeAnnotation.DecimalLogicalTypeAnnotation Decimal} logical type
*/
static Class<?> resolveDecimalLogicalType(
final LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType) {
// This pair of values (precision=1, scale=0) is set at write time as a marker so that we can recover
// the fact that the type is a BigInteger, not a BigDecimal when the fies are read.
if (decimalLogicalType.getPrecision() == 1 && decimalLogicalType.getScale() == 0) {
return BigInteger.class;
}
return BigDecimal.class;
}

static PageMaterializerFactory factoryForType(@NotNull final PrimitiveType primitiveType) {
final PrimitiveType.PrimitiveTypeName primitiveTypeName = primitiveType.getPrimitiveTypeName();
final LogicalTypeAnnotation logicalTypeAnnotation = primitiveType.getLogicalTypeAnnotation();
switch (primitiveTypeName) {
case INT32:
if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.IntLogicalTypeAnnotation) {
final LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType =
(LogicalTypeAnnotation.IntLogicalTypeAnnotation) logicalTypeAnnotation;
if (intLogicalType.isSigned()) {
switch (intLogicalType.getBitWidth()) {
case 8:
return ByteMaterializer.Factory;
case 16:
return ShortMaterializer.Factory;
case 32:
return IntMaterializer.Factory;
}
} else {
switch (intLogicalType.getBitWidth()) {
case 8:
case 16:
return CharMaterializer.Factory;
case 32:
return LongFromUnsignedIntMaterializer.Factory;
}
}
} else if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.DateLogicalTypeAnnotation) {
return LocalDateMaterializer.Factory;
} else if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.TimeLogicalTypeAnnotation) {
final LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType =
(LogicalTypeAnnotation.TimeLogicalTypeAnnotation) logicalTypeAnnotation;
if (timeLogicalType.getUnit() != LogicalTypeAnnotation.TimeUnit.MILLIS) {
throw new IllegalArgumentException(
"Expected unit type to be MILLIS, found " + timeLogicalType.getUnit());
}
// isAdjustedToUTC parameter is ignored while reading LocalTime from Parquet files
return LocalTimeFromMillisMaterializer.Factory;
} else if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
final LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType =
(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalTypeAnnotation;
return new BigDecimalFromIntMaterializer.Factory(decimalLogicalType.getScale());
}
return IntMaterializer.Factory;
case INT64:
if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) {
final LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType =
(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) logicalTypeAnnotation;
if (timestampLogicalType.isAdjustedToUTC()) {
// The column will store nanoseconds elapsed since epoch as long values
switch (timestampLogicalType.getUnit()) {
case MILLIS:
return InstantNanosFromMillisMaterializer.Factory;
case MICROS:
return InstantNanosFromMicrosMaterializer.Factory;
case NANOS:
return LongMaterializer.Factory;
}
} else {
// The column will be stored as LocalDateTime values
// Ref:https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#local-semantics-timestamps-not-normalized-to-utc
switch (timestampLogicalType.getUnit()) {
case MILLIS:
return LocalDateTimeFromMillisMaterializer.Factory;
case MICROS:
return LocalDateTimeFromMicrosMaterializer.Factory;
case NANOS:
return LocalDateTimeFromNanosMaterializer.Factory;
}
}
} else if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.TimeLogicalTypeAnnotation) {
final LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType =
(LogicalTypeAnnotation.TimeLogicalTypeAnnotation) logicalTypeAnnotation;
// isAdjustedToUTC parameter is ignored while reading LocalTime from Parquet files
switch (timeLogicalType.getUnit()) {
case MICROS:
return LocalTimeFromMicrosMaterializer.Factory;
case NANOS:
return LocalTimeFromNanosMaterializer.Factory;
default:
throw new IllegalArgumentException("Unsupported unit=" + timeLogicalType.getUnit());
}
} else if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
final LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType =
(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalTypeAnnotation;
return new BigDecimalFromLongMaterializer.Factory(decimalLogicalType.getScale());
}
return LongMaterializer.Factory;
case INT96:
return InstantFromInt96Materializer.Factory;
case FLOAT:
return FloatMaterializer.Factory;
case DOUBLE:
return DoubleMaterializer.Factory;
case BOOLEAN:
return BoolMaterializer.Factory;
case BINARY:
if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.StringLogicalTypeAnnotation) {
return StringMaterializer.Factory;
}
case FIXED_LEN_BYTE_ARRAY: // fall through
if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
final LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType =
(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalTypeAnnotation;
final int encodedSizeInBytes = primitiveTypeName == BINARY ? -1 : primitiveType.getTypeLength();
if (resolveDecimalLogicalType(decimalLogicalType) == BigInteger.class) {
return new BigIntegerMaterializer.Factory(new BigIntegerParquetBytesCodec(encodedSizeInBytes));
}
return new BigDecimalFromBytesMaterializer.Factory(new BigDecimalParquetBytesCodec(
decimalLogicalType.getPrecision(), decimalLogicalType.getScale(), encodedSizeInBytes));
}
return BlobMaterializer.Factory;
default:
throw new RuntimeException("Unexpected type name:" + primitiveTypeName);
}
}

void fillNulls(int startIndex, int endIndex);

void fillValues(int startIndex, int endIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,16 @@ public interface PageMaterializerFactory {
PageMaterializer makeMaterializerWithNulls(ValuesReader dataReader, Object nullValue, int numValues);

PageMaterializer makeMaterializerNonNull(ValuesReader dataReader, int numValues);

PageMaterializerFactory NULL_FACTORY = new PageMaterializerFactory() {
@Override
public PageMaterializer makeMaterializerWithNulls(ValuesReader dataReader, Object nullValue, int numValues) {
throw new UnsupportedOperationException("Does not support materializing pages");
}

@Override
public PageMaterializer makeMaterializerNonNull(ValuesReader dataReader, int numValues) {
throw new UnsupportedOperationException("Does not support materializing pages");
}
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ private BigDecimalFromBytesMaterializer(ValuesReader dataReader, BigDecimal null
@Override
public void fillValues(int startIndex, int endIndex) {
for (int ii = startIndex; ii < endIndex; ii++) {
final byte[] bytes = dataReader.readBytes().getBytes();
data[ii] = codec.decode(bytes, 0, bytes.length);
data[ii] = codec.decode(dataReader.readBytes().toByteBuffer());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ private BigIntegerMaterializer(ValuesReader dataReader, BigInteger nullValue, in
@Override
public void fillValues(int startIndex, int endIndex) {
for (int ii = startIndex; ii < endIndex; ii++) {
final byte[] bytes = dataReader.readBytes().getBytes();
data[ii] = codec.decode(bytes, 0, bytes.length);
data[ii] = codec.decode(dataReader.readBytes().toByteBuffer());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@
import org.apache.parquet.column.values.ValuesReader;
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
import org.apache.parquet.io.api.Binary;

/**
* Materializer for binary data.
*/
public class BlobMaterializer extends ObjectMaterializerBase<Binary> implements PageMaterializer {

public static final PageMaterializerFactory Factory = new PageMaterializerFactory() {
public static final PageMaterializerFactory FACTORY = new PageMaterializerFactory() {
@Override
public PageMaterializer makeMaterializerWithNulls(ValuesReader dataReader, Object nullValue, int numValues) {
return new BlobMaterializer(dataReader, (Binary) nullValue, numValues);
Expand Down
Loading
Loading