Skip to content

Commit

Permalink
Optimizations for BigDecimal and BigInteger parquet reads (#5638)
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam authored Jun 20, 2024
1 parent 83f62db commit b900e46
Show file tree
Hide file tree
Showing 47 changed files with 659 additions and 425 deletions.
1 change: 1 addition & 0 deletions extensions/parquet/base/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ dependencies {
implementation project(':Util')
implementation project(':engine-time')
implementation project(':Configuration')
implementation project(':DataStructures')
implementation depCommonsIo

compileOnly depAnnotations
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.parquet.table;
package io.deephaven.parquet.base;

import io.deephaven.datastructures.util.CollectionUtil;
import io.deephaven.util.codec.ObjectCodec;
import org.apache.parquet.schema.PrimitiveType;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand All @@ -14,12 +15,63 @@
import java.nio.ByteBuffer;

public class BigDecimalParquetBytesCodec implements ObjectCodec<BigDecimal> {

private static final int MIN_DECIMAL_INT_PRECISION = 1;
private static final int MAX_DECIMAL_INT_PRECISION = 9;
private static final int MIN_DECIMAL_LONG_PRECISION = 1;
private static final int MAX_DECIMAL_LONG_PRECISION = 18;

private final int precision;
private final int scale;
private final int encodedSizeInBytes;
private final RoundingMode roundingMode;
private final byte[] nullBytes;

/**
* Verify that the precision and scale are valid.
*
* @throws IllegalArgumentException if the provided precision and/or scale is invalid
*/
public static void verifyPrecisionAndScale(final int precision, final int scale) {
if (precision <= 0) {
throw new IllegalArgumentException(String.format("precision (=%d) should be > 0", precision));
}
if (scale < 0) {
throw new IllegalArgumentException(String.format("scale (=%d) should be >= 0", scale));
}
if (scale > precision) {
throw new IllegalArgumentException(
String.format("scale (=%d) is greater than precision (=%d)", scale, precision));
}
}

/**
* Verify that the precision and scale are valid for the given primitive type.
*
* @throws IllegalArgumentException if the provided precision and/or scale is invalid
*/
public static void verifyPrecisionAndScale(final int precision, final int scale,
final PrimitiveType.PrimitiveTypeName primitiveType) {
verifyPrecisionAndScale(precision, scale);
if (primitiveType == PrimitiveType.PrimitiveTypeName.INT32) {
if (precision < MIN_DECIMAL_INT_PRECISION || precision > MAX_DECIMAL_INT_PRECISION) {
throw new IllegalArgumentException(
String.format(
"Column with decimal logical type and INT32 primitive type should have precision in " +
"range [%d, %d], found column with precision %d",
MIN_DECIMAL_INT_PRECISION, MAX_DECIMAL_INT_PRECISION, precision));
}
} else if (primitiveType == PrimitiveType.PrimitiveTypeName.INT64) {
if (precision < MIN_DECIMAL_LONG_PRECISION || precision > MAX_DECIMAL_LONG_PRECISION) {
throw new IllegalArgumentException(
String.format(
"Column with decimal logical type and INT64 primitive type should have precision in " +
"range [%d, %d], found column with precision %d",
MIN_DECIMAL_LONG_PRECISION, MAX_DECIMAL_LONG_PRECISION, precision));
}
}
}

/**
*
* @param precision
Expand All @@ -31,15 +83,7 @@ public class BigDecimalParquetBytesCodec implements ObjectCodec<BigDecimal> {
*/
public BigDecimalParquetBytesCodec(final int precision, final int scale, final int encodedSizeInBytes,
final RoundingMode roundingMode) {
if (precision <= 0) {
throw new IllegalArgumentException("precision (=" + precision + ") should be > 0");
}
if (scale < 0) {
throw new IllegalArgumentException("scale (=" + scale + ") should be >= 0");
}
if (scale > precision) {
throw new IllegalArgumentException("scale (=" + scale + ") is greater than precision (=" + precision + ")");
}
verifyPrecisionAndScale(precision, scale);
this.precision = precision;
this.scale = scale;
this.encodedSizeInBytes = encodedSizeInBytes;
Expand All @@ -58,6 +102,10 @@ public BigDecimalParquetBytesCodec(final int precision, final int scale, final i
this(precision, scale, encodedSizeInBytes, RoundingMode.HALF_UP);
}

public BigDecimalParquetBytesCodec(final int precision, final int scale) {
this(precision, scale, -1);
}

// Given how parquet encoding works for nulls, the actual value provided for a null is irrelevant.
@Override
public boolean isNullable() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.parquet.table;
package io.deephaven.parquet.base;

import io.deephaven.datastructures.util.CollectionUtil;
import io.deephaven.util.codec.ObjectCodec;
Expand Down Expand Up @@ -34,6 +34,10 @@ public BigIntegerParquetBytesCodec(final int encodedSizeInBytes) {
}
}

public BigIntegerParquetBytesCodec() {
this(-1);
}

// Given how parquet encoding works for nulls, the actual value provided for a null is irrelevant.
@Override
public boolean isNullable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,27 @@
import org.apache.parquet.schema.PrimitiveType;
import org.jetbrains.annotations.NotNull;

import java.math.BigDecimal;
import java.math.BigInteger;

import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;

public interface PageMaterializer {

/**
* Get the internal type used by Deephaven to represent a Parquet
* {@link LogicalTypeAnnotation.DecimalLogicalTypeAnnotation Decimal} logical type
*/
static Class<?> resolveDecimalLogicalType(
final LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType) {
// This pair of values (precision=1, scale=0) is set at write time as a marker so that we can recover
// the fact that the type is a BigInteger, not a BigDecimal when the fies are read.
if (decimalLogicalType.getPrecision() == 1 && decimalLogicalType.getScale() == 0) {
return BigInteger.class;
}
return BigDecimal.class;
}

static PageMaterializerFactory factoryForType(@NotNull final PrimitiveType primitiveType) {
final PrimitiveType.PrimitiveTypeName primitiveTypeName = primitiveType.getPrimitiveTypeName();
final LogicalTypeAnnotation logicalTypeAnnotation = primitiveType.getLogicalTypeAnnotation();
Expand Down Expand Up @@ -47,6 +66,10 @@ static PageMaterializerFactory factoryForType(@NotNull final PrimitiveType primi
}
// isAdjustedToUTC parameter is ignored while reading LocalTime from Parquet files
return LocalTimeFromMillisMaterializer.Factory;
} else if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
final LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType =
(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalTypeAnnotation;
return new BigDecimalFromIntMaterializer.Factory(decimalLogicalType.getScale());
}
return IntMaterializer.Factory;
case INT64:
Expand Down Expand Up @@ -87,6 +110,10 @@ static PageMaterializerFactory factoryForType(@NotNull final PrimitiveType primi
default:
throw new IllegalArgumentException("Unsupported unit=" + timeLogicalType.getUnit());
}
} else if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
final LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType =
(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalTypeAnnotation;
return new BigDecimalFromLongMaterializer.Factory(decimalLogicalType.getScale());
}
return LongMaterializer.Factory;
case INT96:
Expand All @@ -102,6 +129,16 @@ static PageMaterializerFactory factoryForType(@NotNull final PrimitiveType primi
return StringMaterializer.Factory;
}
case FIXED_LEN_BYTE_ARRAY: // fall through
if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
final LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType =
(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalTypeAnnotation;
final int encodedSizeInBytes = primitiveTypeName == BINARY ? -1 : primitiveType.getTypeLength();
if (resolveDecimalLogicalType(decimalLogicalType) == BigInteger.class) {
return new BigIntegerMaterializer.Factory(new BigIntegerParquetBytesCodec(encodedSizeInBytes));
}
return new BigDecimalFromBytesMaterializer.Factory(new BigDecimalParquetBytesCodec(
decimalLogicalType.getPrecision(), decimalLogicalType.getScale(), encodedSizeInBytes));
}
return BlobMaterializer.Factory;
default:
throw new RuntimeException("Unexpected type name:" + primitiveTypeName);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.parquet.base.materializers;

import io.deephaven.parquet.base.PageMaterializer;
import io.deephaven.parquet.base.PageMaterializerFactory;
import io.deephaven.util.codec.ObjectCodec;
import org.apache.parquet.column.values.ValuesReader;

import java.math.BigDecimal;

public class BigDecimalFromBytesMaterializer extends ObjectMaterializerBase<BigDecimal> implements PageMaterializer {

public static final class Factory implements PageMaterializerFactory {

final ObjectCodec<BigDecimal> codec;

public Factory(ObjectCodec<BigDecimal> codec) {
this.codec = codec;
}

@Override
public PageMaterializer makeMaterializerWithNulls(ValuesReader dataReader, Object nullValue, int numValues) {
return new BigDecimalFromBytesMaterializer(dataReader, (BigDecimal) nullValue, numValues, codec);
}

@Override
public PageMaterializer makeMaterializerNonNull(ValuesReader dataReader, int numValues) {
return new BigDecimalFromBytesMaterializer(dataReader, numValues, codec);
}
}

private final ValuesReader dataReader;
private final ObjectCodec<BigDecimal> codec;

private BigDecimalFromBytesMaterializer(ValuesReader dataReader, int numValues,
ObjectCodec<BigDecimal> codec) {
this(dataReader, null, numValues, codec);
}

private BigDecimalFromBytesMaterializer(ValuesReader dataReader, BigDecimal nullValue, int numValues,
ObjectCodec<BigDecimal> codec) {
super(nullValue, new BigDecimal[numValues]);
this.dataReader = dataReader;
this.codec = codec;
}

@Override
public void fillValues(int startIndex, int endIndex) {
for (int ii = startIndex; ii < endIndex; ii++) {
final byte[] bytes = dataReader.readBytes().getBytes();
data[ii] = codec.decode(bytes, 0, bytes.length);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
// ****** AUTO-GENERATED CLASS - DO NOT EDIT MANUALLY
// ****** Edit BigDecimalFromLongMaterializer and run "./gradlew replicatePageMaterializers" to regenerate
//
// @formatter:off
package io.deephaven.parquet.base.materializers;

import io.deephaven.parquet.base.PageMaterializer;
import io.deephaven.parquet.base.PageMaterializerFactory;
import org.apache.parquet.column.values.ValuesReader;

import java.math.BigDecimal;

public class BigDecimalFromIntMaterializer extends ObjectMaterializerBase<BigDecimal> implements PageMaterializer {

public static final class Factory implements PageMaterializerFactory {
final int scale;

public Factory(final int scale) {
this.scale = scale;
}

@Override
public PageMaterializer makeMaterializerWithNulls(ValuesReader dataReader, Object nullValue, int numValues) {
return new BigDecimalFromIntMaterializer(dataReader, (BigDecimal) nullValue, numValues, scale);
}

@Override
public PageMaterializer makeMaterializerNonNull(ValuesReader dataReader, int numValues) {
return new BigDecimalFromIntMaterializer(dataReader, numValues, scale);
}
};

private final ValuesReader dataReader;
private final int scale;

private BigDecimalFromIntMaterializer(ValuesReader dataReader, int numValues, int scale) {
this(dataReader, null, numValues, scale);
}

private BigDecimalFromIntMaterializer(ValuesReader dataReader, BigDecimal nullValue, int numValues, int scale) {
super(nullValue, new BigDecimal[numValues]);
this.dataReader = dataReader;
this.scale = scale;
}

@Override
public void fillValues(int startIndex, int endIndex) {
for (int ii = startIndex; ii < endIndex; ii++) {
data[ii] = BigDecimal.valueOf(dataReader.readInteger(), scale);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.parquet.base.materializers;

import io.deephaven.parquet.base.PageMaterializer;
import io.deephaven.parquet.base.PageMaterializerFactory;
import org.apache.parquet.column.values.ValuesReader;

import java.math.BigDecimal;

public class BigDecimalFromLongMaterializer extends ObjectMaterializerBase<BigDecimal> implements PageMaterializer {

public static final class Factory implements PageMaterializerFactory {
final int scale;

public Factory(final int scale) {
this.scale = scale;
}

@Override
public PageMaterializer makeMaterializerWithNulls(ValuesReader dataReader, Object nullValue, int numValues) {
return new BigDecimalFromLongMaterializer(dataReader, (BigDecimal) nullValue, numValues, scale);
}

@Override
public PageMaterializer makeMaterializerNonNull(ValuesReader dataReader, int numValues) {
return new BigDecimalFromLongMaterializer(dataReader, numValues, scale);
}
};

private final ValuesReader dataReader;
private final int scale;

private BigDecimalFromLongMaterializer(ValuesReader dataReader, int numValues, int scale) {
this(dataReader, null, numValues, scale);
}

private BigDecimalFromLongMaterializer(ValuesReader dataReader, BigDecimal nullValue, int numValues, int scale) {
super(nullValue, new BigDecimal[numValues]);
this.dataReader = dataReader;
this.scale = scale;
}

@Override
public void fillValues(int startIndex, int endIndex) {
for (int ii = startIndex; ii < endIndex; ii++) {
data[ii] = BigDecimal.valueOf(dataReader.readLong(), scale);
}
}
}
Loading

0 comments on commit b900e46

Please sign in to comment.