Skip to content

Commit

Permalink
feat: Supports UUID column (apache#395)
Browse files Browse the repository at this point in the history
* fix uuid

* address comments

---------

Co-authored-by: Huaxin Gao <huaxin.gao@apple.com>
(cherry picked from commit 7b0a7e0)
  • Loading branch information
huaxingao authored and Huaxin Gao committed May 23, 2024
1 parent ab4f297 commit 7fba7d8
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 12 deletions.
14 changes: 11 additions & 3 deletions common/src/main/java/org/apache/comet/parquet/ColumnReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.parquet.column.page.DataPageV2;
import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.page.PageReader;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.spark.sql.types.DataType;

import org.apache.comet.CometConf;
Expand Down Expand Up @@ -199,14 +200,19 @@ public CometDecodedVector loadVector() {
currentVector.close();
}

LogicalTypeAnnotation logicalTypeAnnotation =
descriptor.getPrimitiveType().getLogicalTypeAnnotation();
boolean isUuid =
logicalTypeAnnotation instanceof LogicalTypeAnnotation.UUIDLogicalTypeAnnotation;

long[] addresses = Native.currentBatch(nativeHandle);

try (ArrowArray array = ArrowArray.wrap(addresses[0]);
ArrowSchema schema = ArrowSchema.wrap(addresses[1])) {
FieldVector vector = Data.importVector(ALLOCATOR, array, schema, dictionaryProvider);
DictionaryEncoding dictionaryEncoding = vector.getField().getDictionary();

CometPlainVector cometVector = new CometPlainVector(vector, useDecimal128);
CometPlainVector cometVector = new CometPlainVector(vector, useDecimal128, isUuid);

// Update whether the current vector contains any null values. This is used in the following
// batch(s) to determine whether we can skip loading the native vector.
Expand All @@ -229,12 +235,14 @@ public CometDecodedVector loadVector() {
// initialized yet.
Dictionary arrowDictionary = dictionaryProvider.lookup(dictionaryEncoding.getId());
CometPlainVector dictionaryVector =
new CometPlainVector(arrowDictionary.getVector(), useDecimal128);
new CometPlainVector(arrowDictionary.getVector(), useDecimal128, isUuid);
dictionary = new CometDictionary(dictionaryVector);
}

currentVector =
new CometDictionaryVector(cometVector, dictionary, dictionaryProvider, useDecimal128);
new CometDictionaryVector(
cometVector, dictionary, dictionaryProvider, useDecimal128, false, isUuid);

return currentVector;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,20 @@ public abstract class CometDecodedVector extends CometVector {
private int numValues;
private int validityByteCacheIndex = -1;
private byte validityByteCache;
protected boolean isUuid;

protected CometDecodedVector(ValueVector vector, Field valueField, boolean useDecimal128) {
this(vector, valueField, useDecimal128, false);
}

protected CometDecodedVector(
ValueVector vector, Field valueField, boolean useDecimal128, boolean isUuid) {
super(Utils.fromArrowField(valueField), useDecimal128);
this.valueVector = vector;
this.numNulls = valueVector.getNullCount();
this.numValues = valueVector.getValueCount();
this.hasNull = numNulls != 0;
this.isUuid = isUuid;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,17 @@ public CometDictionaryVector(
CometDictionary values,
DictionaryProvider provider,
boolean useDecimal128) {
this(indices, values, provider, useDecimal128, false);
this(indices, values, provider, useDecimal128, false, false);
}

public CometDictionaryVector(
CometPlainVector indices,
CometDictionary values,
DictionaryProvider provider,
boolean useDecimal128,
boolean isAlias) {
super(indices.valueVector, values.getValueVector().getField(), useDecimal128);
boolean isAlias,
boolean isUuid) {
super(indices.valueVector, values.getValueVector().getField(), useDecimal128, isUuid);
Preconditions.checkArgument(
indices.valueVector instanceof IntVector, "'indices' should be a IntVector");
this.values = values;
Expand Down Expand Up @@ -130,6 +131,6 @@ public CometVector slice(int offset, int length) {
// Set the alias flag to true so that the sliced vector will not close the dictionary vector.
// Otherwise, if the dictionary is closed, the sliced vector will not be able to access the
// dictionary.
return new CometDictionaryVector(sliced, values, provider, useDecimal128, true);
return new CometDictionaryVector(sliced, values, provider, useDecimal128, true, isUuid);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ public class CometPlainVector extends CometDecodedVector {
private int booleanByteCacheIndex = -1;

public CometPlainVector(ValueVector vector, boolean useDecimal128) {
super(vector, vector.getField(), useDecimal128);
this(vector, useDecimal128, false);
}

public CometPlainVector(ValueVector vector, boolean useDecimal128, boolean isUuid) {
super(vector, vector.getField(), useDecimal128, isUuid);
// NullType doesn't have data buffer.
if (vector instanceof NullVector) {
this.valueBufferAddress = -1;
Expand Down Expand Up @@ -111,11 +115,10 @@ public UTF8String getUTF8String(int rowId) {
byte[] result = new byte[length];
Platform.copyMemory(
null, valueBufferAddress + offset, result, Platform.BYTE_ARRAY_OFFSET, length);
// FIXME Replace with Iceberg support when it is available
if (length == 16) {
return UTF8String.fromString(convertToUuid(result).toString());
} else {
if (!isUuid) {
return UTF8String.fromBytes(result);
} else {
return UTF8String.fromString(convertToUuid(result).toString());
}
}
}
Expand Down

0 comments on commit 7fba7d8

Please sign in to comment.