Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize PLAIN values decoders in parquet reader #15308

Merged
merged 7 commits into from
Dec 21, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,22 @@ public static int readUleb128Int(SimpleSliceInputStream input)
return value | inputByte << 28;
}

/**
* Propagate the sign bit in values that are shorter than 8 bytes.
* <p>
* When the value of less than 8 bytes in put into a long variable, the padding bytes on the
* left side of the number should be all zeros for a positive number or all ones for negatives.
* This method does this padding using signed bit shift operator without branches.
*
* @param value Value to trim
* @param bitsToPad Number of bits to pad
* @return Value with correct padding
*/
public static long propagateSignBit(long value, int bitsToPad)
raunaqmorarka marked this conversation as resolved.
Show resolved Hide resolved
{
return value << bitsToPad >> bitsToPad;
}

/**
* Method simulates a cast from boolean to byte value. Despite using
* a ternary (?) operator, the just-in-time compiler usually figures out
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ public static void checkBytesFitInShortDecimal(byte[] bytes, int offset, int len
if (bytes[i] != expectedValue) {
throw new TrinoException(NOT_SUPPORTED, format(
"Could not read unscaled value %s into %s from column %s",
new BigInteger(bytes),
new BigInteger(bytes, offset, length + Long.BYTES),
trinoType,
descriptor));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,23 @@
package io.trino.parquet.dictionary;

import io.trino.parquet.DictionaryPage;
import org.apache.parquet.column.values.plain.PlainValuesReader.IntegerPlainValuesReader;

import java.io.IOException;
import io.trino.parquet.reader.SimpleSliceInputStream;
import io.trino.parquet.reader.decoders.ValueDecoder;

import static com.google.common.base.MoreObjects.toStringHelper;
import static io.trino.parquet.ParquetReaderUtils.toInputStream;
import static io.trino.parquet.reader.decoders.PlainValueDecoders.IntPlainValueDecoder;

public class IntegerDictionary
implements Dictionary
{
private final int[] content;

public IntegerDictionary(DictionaryPage dictionaryPage)
throws IOException
{
content = new int[dictionaryPage.getDictionarySize()];
IntegerPlainValuesReader intReader = new IntegerPlainValuesReader();
intReader.initFromPage(dictionaryPage.getDictionarySize(), toInputStream(dictionaryPage));
for (int i = 0; i < content.length; i++) {
content[i] = intReader.readInteger();
}
ValueDecoder<int[]> intReader = new IntPlainValueDecoder();
intReader.init(new SimpleSliceInputStream(dictionaryPage.getSlice()));
intReader.read(content, 0, dictionaryPage.getDictionarySize());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,23 @@
package io.trino.parquet.dictionary;

import io.trino.parquet.DictionaryPage;
import org.apache.parquet.column.values.plain.PlainValuesReader.LongPlainValuesReader;

import java.io.IOException;
import io.trino.parquet.reader.SimpleSliceInputStream;
import io.trino.parquet.reader.decoders.ValueDecoder;

import static com.google.common.base.MoreObjects.toStringHelper;
import static io.trino.parquet.ParquetReaderUtils.toInputStream;
import static io.trino.parquet.reader.decoders.PlainValueDecoders.LongPlainValueDecoder;

public class LongDictionary
implements Dictionary
{
private final long[] content;

public LongDictionary(DictionaryPage dictionaryPage)
throws IOException
{
content = new long[dictionaryPage.getDictionarySize()];
LongPlainValuesReader longReader = new LongPlainValuesReader();
longReader.initFromPage(dictionaryPage.getDictionarySize(), toInputStream(dictionaryPage));
for (int i = 0; i < content.length; i++) {
content[i] = longReader.readLong();
}
ValueDecoder<long[]> longReader = new LongPlainValueDecoder();
longReader.init(new SimpleSliceInputStream(dictionaryPage.getSlice()));
longReader.read(content, 0, dictionaryPage.getDictionarySize());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
package io.trino.parquet.reader;

import io.airlift.slice.Slice;
import io.airlift.slice.UnsafeSlice;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkPositionIndexes;
import static java.util.Objects.requireNonNull;

/**
Expand Down Expand Up @@ -47,6 +49,20 @@ public byte readByte()
return slice.getByte(offset++);
}

public short readShort()
{
short value = slice.getShort(offset);
offset += Short.BYTES;
return value;
}

public int readInt()
{
int value = slice.getInt(offset);
offset += Integer.BYTES;
return value;
}

public long readLong()
{
long value = slice.getLong(offset);
Expand All @@ -61,6 +77,12 @@ public byte[] readBytes()
return bytes;
}

public void readBytes(Slice destination, int destinationIndex, int length)
{
slice.getBytes(offset, destination, destinationIndex, length);
offset += length;
}

public void skip(int n)
{
offset += n;
Expand Down Expand Up @@ -88,4 +110,58 @@ public int getByteArrayOffset()
{
return offset + slice.byteArrayOffset();
}

public void ensureBytesAvailable(int bytes)
raunaqmorarka marked this conversation as resolved.
Show resolved Hide resolved
{
checkPositionIndexes(offset, offset + bytes, slice.length());
}

/**
* Always check if needed data is available with ensureBytesAvailable method.
* Failing to do so may result in instant JVM crash.
*/
public int readIntUnsafe()
{
int value = UnsafeSlice.getIntUnchecked(slice, offset);
offset += Integer.BYTES;
return value;
}

/**
* Always check if needed data is available with ensureBytesAvailable method.
* Failing to do so may result in instant JVM crash.
*/
public long readLongUnsafe()
{
long value = UnsafeSlice.getLongUnchecked(slice, offset);
offset += Long.BYTES;
return value;
}

/**
* Always check if needed data is available with ensureBytesAvailable method.
* Failing to do so may result in instant JVM crash.
*/
public byte getByteUnsafe(int index)
{
return UnsafeSlice.getByteUnchecked(slice, offset + index);
}

/**
* Always check if needed data is available with ensureBytesAvailable method.
* Failing to do so may result in instant JVM crash.
*/
public int getIntUnsafe(int index)
{
return UnsafeSlice.getIntUnchecked(slice, offset + index);
}

/**
* Always check if needed data is available with ensureBytesAvailable method.
* Failing to do so may result in instant JVM crash.
*/
public long getLongUnsafe(int index)
{
return UnsafeSlice.getLongUnchecked(slice, offset + index);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -208,68 +208,6 @@ public void skip(int n)
}
}

public static final class DoubleApacheParquetValueDecoder
implements ValueDecoder<long[]>
{
private final ValuesReader delegate;

public DoubleApacheParquetValueDecoder(ValuesReader delegate)
{
this.delegate = requireNonNull(delegate, "delegate is null");
}

@Override
public void init(SimpleSliceInputStream input)
{
initialize(input, delegate);
}

@Override
public void read(long[] values, int offset, int length)
{
for (int i = offset; i < offset + length; i++) {
values[i] = Double.doubleToLongBits(delegate.readDouble());
}
}

@Override
public void skip(int n)
{
delegate.skip(n);
}
}

public static final class FloatApacheParquetValueDecoder
implements ValueDecoder<int[]>
{
private final ValuesReader delegate;

public FloatApacheParquetValueDecoder(ValuesReader delegate)
{
this.delegate = requireNonNull(delegate, "delegate is null");
}

@Override
public void init(SimpleSliceInputStream input)
{
initialize(input, delegate);
}

@Override
public void read(int[] values, int offset, int length)
{
for (int i = offset; i < offset + length; i++) {
values[i] = Float.floatToIntBits(delegate.readFloat());
}
}

@Override
public void skip(int n)
{
delegate.skip(n);
}
}

public static final class BooleanApacheParquetValueDecoder
implements ValueDecoder<byte[]>
{
Expand Down
Loading