From 6c4669cf4b9e07057b8dee02e826b32cc5b797dd Mon Sep 17 00:00:00 2001 From: "chenweiguo.vc" Date: Wed, 28 Aug 2024 00:18:11 +0800 Subject: [PATCH 01/17] RunEndEncoded initial implement --- .../binder/ColumnBinderArrowTypeVisitor.java | 5 + .../arrow/c/BufferImportTypeVisitor.java | 6 + .../jdbc/utils/AvaticaParameterBinder.java | 6 + .../arrow/driver/jdbc/utils/ConvertUtils.java | 6 + .../src/main/codegen/data/ArrowTypes.tdd | 5 + .../org/apache/arrow/vector/TypeLayout.java | 11 + .../vector/compare/RangeEqualsVisitor.java | 9 + .../vector/compare/TypeEqualsVisitor.java | 6 + .../arrow/vector/compare/VectorVisitor.java | 3 + .../vector/complex/RunEndEncodedVector.java | 647 ++++++++++++++++++ .../org/apache/arrow/vector/types/Types.java | 19 + .../arrow/vector/util/VectorAppender.java | 6 + .../validate/ValidateVectorBufferVisitor.java | 6 + .../validate/ValidateVectorDataVisitor.java | 6 + .../validate/ValidateVectorTypeVisitor.java | 10 + .../validate/ValidateVectorVisitor.java | 6 + .../arrow/vector/TestRunEndEncodedVector.java | 134 ++++ 17 files changed, 891 insertions(+) create mode 100644 java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java create mode 100644 java/vector/src/test/java/org/apache/arrow/vector/TestRunEndEncodedVector.java diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/ColumnBinderArrowTypeVisitor.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/ColumnBinderArrowTypeVisitor.java index cb8e43035d33b..a3d615a7e1958 100644 --- a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/ColumnBinderArrowTypeVisitor.java +++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/ColumnBinderArrowTypeVisitor.java @@ -96,6 +96,11 @@ public ColumnBinder visit(ArrowType.Union type) { throw new UnsupportedOperationException("No column binder implemented for type " + type); } + @Override + public ColumnBinder visit(ArrowType.RunEndEncoded type) { + throw new UnsupportedOperationException("No column binder implemented for type " + type); + } + @Override public ColumnBinder visit(ArrowType.Map type) { return new MapBinder((MapVector) vector); diff --git a/java/c/src/main/java/org/apache/arrow/c/BufferImportTypeVisitor.java b/java/c/src/main/java/org/apache/arrow/c/BufferImportTypeVisitor.java index 93fef6d7ca801..1045671e51063 100644 --- a/java/c/src/main/java/org/apache/arrow/c/BufferImportTypeVisitor.java +++ b/java/c/src/main/java/org/apache/arrow/c/BufferImportTypeVisitor.java @@ -185,6 +185,12 @@ public List visit(ArrowType.Union type) { } } + + @Override + public List visit(ArrowType.RunEndEncoded type) { + throw new UnsupportedOperationException("Importing buffers for type: " + type); + } + @Override public List visit(ArrowType.Map type) { return Arrays.asList(maybeImportBitmap(type), importOffsets(type, MapVector.OFFSET_WIDTH)); diff --git a/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/AvaticaParameterBinder.java b/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/AvaticaParameterBinder.java index 232fa1524088b..4c2a9b865f141 100644 --- a/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/AvaticaParameterBinder.java +++ b/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/AvaticaParameterBinder.java @@ -281,5 +281,11 @@ public Boolean visit(ArrowType.ListView type) { public Boolean visit(ArrowType.LargeListView type) { throw new UnsupportedOperationException("Binding is not yet supported for type " + type); } + + @Override + public Boolean visit(ArrowType.RunEndEncoded type) { + throw new UnsupportedOperationException( + "No Avatica parameter binder implemented for type " + type); + } } } diff --git a/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/ConvertUtils.java b/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/ConvertUtils.java index ea57aeb774c0a..17b0f42dc7111 100644 --- a/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/ConvertUtils.java +++ b/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/ConvertUtils.java @@ -284,5 +284,11 @@ public AvaticaParameter visit(ArrowType.LargeListView type) { throw new UnsupportedOperationException( "AvaticaParameter not yet supported for type " + type); } + + @Override + public AvaticaParameter visit(ArrowType.RunEndEncoded type) { + throw new UnsupportedOperationException( + "No Avatica parameter binder implemented for type " + type); + } } } diff --git a/java/vector/src/main/codegen/data/ArrowTypes.tdd b/java/vector/src/main/codegen/data/ArrowTypes.tdd index d0e8ef1e358ea..5a0b30e47ee52 100644 --- a/java/vector/src/main/codegen/data/ArrowTypes.tdd +++ b/java/vector/src/main/codegen/data/ArrowTypes.tdd @@ -139,6 +139,11 @@ name: "LargeListView", fields: [], complex: true + }, + { + name: "RunEndEncoded", + fields: [], + complex: true } ] } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/TypeLayout.java b/java/vector/src/main/java/org/apache/arrow/vector/TypeLayout.java index 78a3cac020a8c..fa75ef04577a3 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/TypeLayout.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/TypeLayout.java @@ -40,6 +40,7 @@ import org.apache.arrow.vector.types.pojo.ArrowType.LargeUtf8; import org.apache.arrow.vector.types.pojo.ArrowType.Map; import org.apache.arrow.vector.types.pojo.ArrowType.Null; +import org.apache.arrow.vector.types.pojo.ArrowType.RunEndEncoded; import org.apache.arrow.vector.types.pojo.ArrowType.Struct; import org.apache.arrow.vector.types.pojo.ArrowType.Time; import org.apache.arrow.vector.types.pojo.ArrowType.Timestamp; @@ -280,6 +281,11 @@ public TypeLayout visit(Interval type) { public TypeLayout visit(Duration type) { return newFixedWidthTypeLayout(BufferLayout.dataBuffer(64)); } + + @Override + public TypeLayout visit(RunEndEncoded type) { + return new TypeLayout(Collections.emptyList()); + } }); return layout; } @@ -444,6 +450,11 @@ public Integer visit(Interval type) { public Integer visit(Duration type) { return FIXED_WIDTH_BUFFER_COUNT; } + + @Override + public Integer visit(RunEndEncoded type) { + return 0; + } }); } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compare/RangeEqualsVisitor.java b/java/vector/src/main/java/org/apache/arrow/vector/compare/RangeEqualsVisitor.java index 9aa1bffb8463e..d62bd43e5a090 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/compare/RangeEqualsVisitor.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/compare/RangeEqualsVisitor.java @@ -41,6 +41,7 @@ import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.complex.ListViewVector; import org.apache.arrow.vector.complex.NonNullableStructVector; +import org.apache.arrow.vector.complex.RunEndEncodedVector; import org.apache.arrow.vector.complex.UnionVector; /** Visitor to compare a range of values for vectors. */ @@ -261,6 +262,14 @@ protected RangeEqualsVisitor createInnerVisitor( return new RangeEqualsVisitor(leftInner, rightInner, typeComparator); } + @Override + public Boolean visit(RunEndEncodedVector left, Range range) { + if (!validate(left)) { + return false; + } + return true; // TODO + } + protected boolean compareUnionVectors(Range range) { UnionVector leftVector = (UnionVector) left; UnionVector rightVector = (UnionVector) right; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compare/TypeEqualsVisitor.java b/java/vector/src/main/java/org/apache/arrow/vector/compare/TypeEqualsVisitor.java index ce92b22ef61c9..30b2f511a0445 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/compare/TypeEqualsVisitor.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/compare/TypeEqualsVisitor.java @@ -32,6 +32,7 @@ import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.complex.ListViewVector; import org.apache.arrow.vector.complex.NonNullableStructVector; +import org.apache.arrow.vector.complex.RunEndEncodedVector; import org.apache.arrow.vector.complex.UnionVector; import org.apache.arrow.vector.types.pojo.Field; @@ -136,6 +137,11 @@ public Boolean visit(LargeListViewVector left, Void value) { return compareField(left.getField(), right.getField()); } + @Override + public Boolean visit(RunEndEncodedVector left, Void value) { + return compareField(left.getField(), right.getField()); + } + private boolean compareField(Field leftField, Field rightField) { if (leftField == rightField) { diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compare/VectorVisitor.java b/java/vector/src/main/java/org/apache/arrow/vector/compare/VectorVisitor.java index e20f8cd9cfba5..a5361c5401a20 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/compare/VectorVisitor.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/compare/VectorVisitor.java @@ -29,6 +29,7 @@ import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.complex.ListViewVector; import org.apache.arrow.vector.complex.NonNullableStructVector; +import org.apache.arrow.vector.complex.RunEndEncodedVector; import org.apache.arrow.vector.complex.UnionVector; /** @@ -71,4 +72,6 @@ default OUT visit(LargeListViewVector left, IN value) { throw new UnsupportedOperationException( "VectorVisitor for LargeListViewVector is not supported."); } + + OUT visit(RunEndEncodedVector left, IN value); } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java new file mode 100644 index 0000000000000..82ad9d19b5b4e --- /dev/null +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java @@ -0,0 +1,647 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.vector.complex; + +import static org.apache.arrow.util.Preconditions.checkArgument; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.OutOfMemoryException; +import org.apache.arrow.memory.util.ByteFunctionHelpers; +import org.apache.arrow.memory.util.hash.ArrowBufHasher; +import org.apache.arrow.vector.BaseIntVector; +import org.apache.arrow.vector.BaseValueVector; +import org.apache.arrow.vector.BufferBacked; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.ValueVector; +import org.apache.arrow.vector.compare.VectorVisitor; +import org.apache.arrow.vector.complex.reader.FieldReader; +import org.apache.arrow.vector.complex.writer.FieldWriter; +import org.apache.arrow.vector.ipc.message.ArrowFieldNode; +import org.apache.arrow.vector.types.Types.MinorType; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.util.CallBack; +import org.apache.arrow.vector.util.TransferPair; + +/** + * A run-end encoded vector contains only two child vectors: a run_end vector of type int and a + * values vector of any type. There are no buffers associated with the parent vector. + */ +public class RunEndEncodedVector extends BaseValueVector implements FieldVector { + + public static RunEndEncodedVector empty(String name, BufferAllocator allocator) { + return new RunEndEncodedVector( + name, allocator, FieldType.notNullable(ArrowType.RunEndEncoded.INSTANCE), null); + } + + protected final CallBack callBack; + protected Field field; + protected BaseIntVector runEndsVector; + protected FieldVector valuesVector; + protected int valueCount; + + /** + * Constructs a new instance. + * + * @param name The name of the instance. + * @param allocator The allocator to use for allocating/reallocating buffers. + * @param fieldType The type of the array that is run-end encoded. + * @param callBack A schema change callback. + */ + public RunEndEncodedVector( + String name, BufferAllocator allocator, FieldType fieldType, CallBack callBack) { + this(new Field(name, fieldType, null), allocator, callBack); + } + + /** + * Constructs a new instance. + * + * @param field The field materialized by this vector. + * @param allocator The allocator to use for allocating/reallocating buffers. + * @param callBack A schema change callback. + */ + public RunEndEncodedVector(Field field, BufferAllocator allocator, CallBack callBack) { + super(allocator); + this.field = field; + this.callBack = callBack; + this.valueCount = 0; + } + + /** ValueVector interface */ + + /** + * Allocate new buffers. ValueVector implements logic to determine how much to allocate. + * + * @throws OutOfMemoryException Thrown if no memory can be allocated. + */ + @Override + public void allocateNew() throws OutOfMemoryException { + if (!allocateNewSafe()) { + throw new OutOfMemoryException("Failure while allocating memory"); + } + } + + /** + * Allocates new buffers. ValueVector implements logic to determine how much to allocate. + * + * @return Returns true if allocation was successful. + */ + @Override + public boolean allocateNewSafe() { + initializeChildrenFromFields(field.getChildren()); + for (FieldVector v : getChildrenFromFields()) { + boolean isAllocated = v.allocateNewSafe(); + if (!isAllocated) { + v.clear(); + return false; + } + } + return true; + } + + /** + * Allocate new buffer with double capacity, and copy data into the new buffer. Replace vector's + * buffer with new buffer, and release old one + */ + @Override + public void reAlloc() { + for (FieldVector v : getChildrenFromFields()) { + v.reAlloc(); + } + } + + @Override + public BufferAllocator getAllocator() { + return allocator; + } + + @Override + protected FieldReader getReaderImpl() { + return null; + } + + /** + * Set the initial record capacity. + * + * @param numRecords the initial record capacity. + */ + @Override + public void setInitialCapacity(int numRecords) { + // TODO: does it make sense? + for (FieldVector v : getChildrenFromFields()) { + v.setInitialCapacity(numRecords); + } + } + + /** + * Returns the maximum number of values that can be stored in this vector instance. + * + * @return the maximum number of values that can be stored in this vector instance. + */ + @Override + public int getValueCapacity() { + return getChildrenFromFields().stream() + .mapToInt(ValueVector::getValueCapacity) + .min() + .orElseThrow(NoSuchElementException::new); + } + + /** Alternative to clear(). Allows use as an AutoCloseable in try-with-resources. */ + @Override + public void close() { + for (FieldVector v : getChildrenFromFields()) { + v.close(); + } + } + + /** + * Release any owned ArrowBuf and reset the ValueVector to the initial state. If the vector has + * any child vectors, they will also be cleared. + */ + @Override + public void clear() { + for (FieldVector v : getChildrenFromFields()) { + v.clear(); + } + } + + /** + * Reset the ValueVector to the initial state without releasing any owned ArrowBuf. Buffer + * capacities will remain unchanged and any previous data will be zeroed out. This includes + * buffers for data, validity, offset, etc. If the vector has any child vectors, they will also be + * reset. + */ + @Override + public void reset() { + for (FieldVector v : getChildrenFromFields()) { + v.reset(); + } + valueCount = 0; + } + + /** + * Get information about how this field is materialized. + * + * @return the field corresponding to this vector + */ + @Override + public Field getField() { + return field; + } + + @Override + public MinorType getMinorType() { + return MinorType.RUNENDENCODED; + } + + /** + * To transfer quota responsibility. + * + * @param allocator the target allocator + * @return a {@link org.apache.arrow.vector.util.TransferPair transfer pair}, creating a new + * target vector of the same type. + */ + @Override + public TransferPair getTransferPair(BufferAllocator allocator) { + throw new UnsupportedOperationException( + "RunEndEncodedVector does not support getTransferPair(BufferAllocator)"); + } + + /** + * To transfer quota responsibility. + * + * @param ref the name of the vector + * @param allocator the target allocator + * @return a {@link org.apache.arrow.vector.util.TransferPair transfer pair}, creating a new + * target vector of the same type. + */ + @Override + public TransferPair getTransferPair(String ref, BufferAllocator allocator) { + return getTransferPair(ref, allocator, null); + } + + /** + * To transfer quota responsibility. + * + * @param field the Field object used by the target vector + * @param allocator the target allocator + * @return a {@link org.apache.arrow.vector.util.TransferPair transfer pair}, creating a new + * target vector of the same type. + */ + @Override + public TransferPair getTransferPair(Field field, BufferAllocator allocator) { + return getTransferPair(field, allocator, null); + } + + /** + * To transfer quota responsibility. + * + * @param ref the name of the vector + * @param allocator the target allocator + * @param callBack A schema change callback. + * @return a {@link org.apache.arrow.vector.util.TransferPair transfer pair}, creating a new + * target vector of the same type. + */ + @Override + public TransferPair getTransferPair(String ref, BufferAllocator allocator, CallBack callBack) { + throw new UnsupportedOperationException( + "RunEndEncodedVector does not support getTransferPair(String, BufferAllocator, CallBack)"); + } + + /** + * To transfer quota responsibility. + * + * @param field the Field object used by the target vector + * @param allocator the target allocator + * @param callBack A schema change callback. + * @return a {@link org.apache.arrow.vector.util.TransferPair transfer pair}, creating a new + * target vector of the same type. + */ + @Override + public TransferPair getTransferPair(Field field, BufferAllocator allocator, CallBack callBack) { + throw new UnsupportedOperationException( + "RunEndEncodedVector does not support getTransferPair(Field, BufferAllocator, CallBack)"); + } + + /** + * Makes a new transfer pair used to transfer underlying buffers. + * + * @param target the target for the transfer + * @return a new {@link org.apache.arrow.vector.util.TransferPair transfer pair} that is used to + * transfer underlying buffers into the target vector. + */ + @Override + public TransferPair makeTransferPair(ValueVector target) { + throw new UnsupportedOperationException( + "RunEndEncodedVector does not support makeTransferPair(ValueVector)"); + } + + /** + * Get a reader for this vector. + * + * @return a {@link org.apache.arrow.vector.complex.reader.FieldReader field reader} that supports + * reading values from this vector. + */ + @Override + public FieldReader getReader() { + return null; // TODO + } + + /** + * Get a writer for this vector. + * + * @return a {@link org.apache.arrow.vector.complex.writer.FieldWriter field writer} that supports + * writing values to this vector. + */ + public FieldWriter getWriter() { + return null; // TODO + } + + /** + * Get the number of bytes used by this vector. + * + * @return the number of bytes that is used by this vector instance. + */ + @Override + public int getBufferSize() { + int bufferSize = 0; + for (FieldVector v : getChildrenFromFields()) { + bufferSize += v.getBufferSize(); + } + return bufferSize; + } + + /** + * Returns the number of bytes that is used by this vector if it holds the given number of values. + * The result will be the same as if setValueCount() were called, followed by calling + * getBufferSize(), but without any of the closing side-effects that setValueCount() implies wrt + * finishing off the population of a vector. Some operations might wish to use this to determine + * how much memory has been used by a vector so far, even though it is not finished being + * populated. + * + * @param valueCount the number of values to assume this vector contains + * @return the buffer size if this vector is holding valueCount values + */ + @Override + public int getBufferSizeFor(int valueCount) { + return 0; + } + + /** + * Return the underlying buffers associated with this vector. Note that this doesn't impact the + * reference counts for this buffer so it only should be used for in-context access. Also note + * that this buffer changes regularly thus external classes shouldn't hold a reference to it + * (unless they change it). + * + * @param clear Whether to clear vector before returning; the buffers will still be refcounted; + * but the returned array will be the only reference to them + * @return The underlying {@link ArrowBuf buffers} that is used by this vector instance. + */ + @Override + public ArrowBuf[] getBuffers(boolean clear) { + return null; + } + + /** + * Gets the underlying buffer associated with validity vector. + * + * @return buffer + */ + @Override + public ArrowBuf getValidityBuffer() { + return null; + } + + /** + * Gets the underlying buffer associated with data vector. + * + * @return buffer + */ + @Override + public ArrowBuf getDataBuffer() { + return null; + } + + /** + * Gets the underlying buffer associated with offset vector. + * + * @return buffer + */ + @Override + public ArrowBuf getOffsetBuffer() { + return null; + } + + /** + * Gets the number of values. + * + * @return number of values in the vector + */ + @Override + public int getValueCount() { + return valueCount; + } + + /** Set number of values in the vector. */ + @Override + public void setValueCount(int valueCount) { + this.valueCount = valueCount; + } + + /** + * Get friendly type object from the vector. + * + * @param index index of object to get + * @return friendly type object + */ + @Override + public Object getObject(int index) { + int physicalIndex = findFirstLargerThan(runEndsVector, index); + return valuesVector.getObject(physicalIndex); + } + + /** + * Returns number of null elements in the vector. + * + * @return number of null elements + */ + @Override + public int getNullCount() { + if (valuesVector.getNullCount() != 0) { + long nullCount = 0; + // TODO: make it more efficient + for (int i = 0; i < valuesVector.getValueCount(); i++) { + if (valuesVector.isNull(i)) { + long lastEnd = i > 0 ? runEndsVector.getValueAsLong(i - 1) : 0; + long nullEunLength = runEndsVector.getValueAsLong(i) - lastEnd; + nullCount += nullEunLength; + } + } + return (int) nullCount; + } + return 0; + } + + /** + * Check whether an element in the vector is null. + * + * @param index index to check for null + * @return true if element is null + */ + @Override + public boolean isNull(int index) { + int physicalIndex = findFirstLargerThan(runEndsVector, index); + return valuesVector.isNull(physicalIndex); + } + + /** Returns hashCode of element in index with the default hasher. */ + @Override + public int hashCode(int index) { + return hashCode(index, null); + } + + /** Returns hashCode of element in index with the given hasher. */ + @Override + public int hashCode(int index, ArrowBufHasher hasher) { + int hash = 0; + for (FieldVector v : getChildrenFromFields()) { + if (index < v.getValueCount()) { + hash = ByteFunctionHelpers.combineHash(hash, v.hashCode(index, hasher)); + } + } + return hash; + } + + /** + * Accept a generic {@link VectorVisitor} and return the result. + * + * @param the output result type. + * @param the input data together with visitor. + */ + @Override + public OUT accept(VectorVisitor visitor, IN value) { + return visitor.visit(this, value); + } + + /** + * Gets the name of the vector. + * + * @return the name of the vector. + */ + @Override + public String getName() { + return this.field.getName(); + } + + @Override + public Iterator iterator() { + return Collections.unmodifiableCollection(getChildrenFromFields()).iterator(); + } + + /** FieldVector interface */ + + /** + * Initializes the child vectors to be later loaded with loadBuffers. + * + * @param children the schema containing the run_ends column first and the values column second + */ + @Override + public void initializeChildrenFromFields(List children) { + checkArgument( + children.size() == 2, + "Run-end encoded vectors must have two child Fields. Found: %s", + children.isEmpty() ? "none" : children); + checkArgument( + Arrays.asList( + MinorType.SMALLINT.getType(), MinorType.INT.getType(), MinorType.BIGINT.getType()) + .contains(children.get(0).getType()), + "The first field represents the run-end vector and must be of type int with size 16, 32, or 64 bits. Found: %s", + children.get(0).getType()); + runEndsVector = (BaseIntVector) children.get(0).createVector(allocator); + valuesVector = children.get(1).createVector(allocator); + field = new Field(field.getName(), field.getFieldType(), children); + } + + /** + * The returned list is the same size as the list passed to initializeChildrenFromFields. + * + * @return the children according to schema (empty for primitive types) + */ + @Override + public List getChildrenFromFields() { + return Arrays.asList(runEndsVector, valuesVector); + } + + /** + * Loads data in the vectors. (ownBuffers must be the same size as getFieldVectors()) + * + * @param fieldNode the fieldNode + * @param ownBuffers the buffers for this Field (own buffers only, children not included) + */ + @Override + public void loadFieldBuffers(ArrowFieldNode fieldNode, List ownBuffers) { + throw new UnsupportedOperationException( + "Run-end encoded vectors do not have any associated buffers."); + } + + /** + * Get the buffers of the fields, (same size as getFieldVectors() since it is their content). + * + * @return the buffers containing the data for this vector (ready for reading) + */ + @Override + public List getFieldBuffers() { + return null; + } + + /** + * Get the inner vectors. + * + * @return the inner vectors for this field as defined by the TypeLayout + * @deprecated This API will be removed as the current implementations no longer support inner + * vectors. + */ + @Deprecated + @Override + public List getFieldInnerVectors() { + throw new UnsupportedOperationException("There are no inner vectors. Use getFieldBuffers()."); + } + + /** + * Gets the starting address of the underlying buffer associated with validity vector. + * + * @return buffer address + */ + @Override + public long getValidityBufferAddress() { + throw new UnsupportedOperationException( + "Run-end encoded vectors do not have a validity buffer."); + } + + /** + * Gets the starting address of the underlying buffer associated with data vector. + * + * @return buffer address + */ + @Override + public long getDataBufferAddress() { + throw new UnsupportedOperationException("Run-end encoded vectors do not have a data buffer."); + } + + /** + * Gets the starting address of the underlying buffer associated with offset vector. + * + * @return buffer address + */ + @Override + public long getOffsetBufferAddress() { + throw new UnsupportedOperationException( + "Run-end encoded vectors do not have an offset buffer."); + } + + /** + * Set the element at the given index to null. + * + * @param index the value to change + */ + @Override + public void setNull(int index) { + throw new UnsupportedOperationException( + "Run-end encoded vectors do not have a validity buffer."); + } + + public FieldVector getRunEndsVector() { + return runEndsVector; + } + + public FieldVector getValuesVector() { + return valuesVector; + } + + static int findFirstLargerThan(BaseIntVector runEndVector, int logicalIndex) { + int target = logicalIndex + 1; + if (runEndVector == null || runEndVector.getValueCount() == 0) { + return -1; + } + + int low = 0; + int high = runEndVector.getValueCount() - 1; + int result = -1; + + while (low <= high) { + int mid = low + (high - low) / 2; + long valueAsLong = runEndVector.getValueAsLong(mid); + if (valueAsLong == target) { + return mid; + } else if (valueAsLong > target) { + result = mid; + high = mid - 1; + } else { + low = mid + 1; + } + } + + return result; + } +} diff --git a/java/vector/src/main/java/org/apache/arrow/vector/types/Types.java b/java/vector/src/main/java/org/apache/arrow/vector/types/Types.java index 6b2c56de01c40..b77f4d610b026 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/types/Types.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/types/Types.java @@ -73,6 +73,7 @@ import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.complex.ListViewVector; import org.apache.arrow.vector.complex.MapVector; +import org.apache.arrow.vector.complex.RunEndEncodedVector; import org.apache.arrow.vector.complex.StructVector; import org.apache.arrow.vector.complex.UnionVector; import org.apache.arrow.vector.complex.impl.BigIntWriterImpl; @@ -142,6 +143,7 @@ import org.apache.arrow.vector.types.pojo.ArrowType.ListView; import org.apache.arrow.vector.types.pojo.ArrowType.Map; import org.apache.arrow.vector.types.pojo.ArrowType.Null; +import org.apache.arrow.vector.types.pojo.ArrowType.RunEndEncoded; import org.apache.arrow.vector.types.pojo.ArrowType.Struct; import org.apache.arrow.vector.types.pojo.ArrowType.Time; import org.apache.arrow.vector.types.pojo.ArrowType.Timestamp; @@ -786,6 +788,18 @@ public FieldWriter getNewFieldWriter(ValueVector vector) { .getNewFieldWriter(vector); } }, + RUNENDENCODED(RunEndEncoded.INSTANCE) { + @Override + public FieldVector getNewVector( + Field field, BufferAllocator allocator, CallBack schemaChangeCallback) { + return new RunEndEncodedVector(field, allocator, schemaChangeCallback); + } + + @Override + public FieldWriter getNewFieldWriter(ValueVector vector) { + return null; // TODO + } + }, ; private final ArrowType type; @@ -1021,6 +1035,11 @@ public MinorType visit(LargeListView type) { public MinorType visit(ExtensionType type) { return MinorType.EXTENSIONTYPE; } + + @Override + public MinorType visit(RunEndEncoded type) { + return MinorType.RUNENDENCODED; + } }); } } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/util/VectorAppender.java b/java/vector/src/main/java/org/apache/arrow/vector/util/VectorAppender.java index e703571b374eb..71a5d7ef854c4 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/util/VectorAppender.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/util/VectorAppender.java @@ -37,6 +37,7 @@ import org.apache.arrow.vector.complex.LargeListVector; import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.complex.NonNullableStructVector; +import org.apache.arrow.vector.complex.RunEndEncodedVector; import org.apache.arrow.vector.complex.UnionVector; /** Utility to append two vectors together. */ @@ -639,4 +640,9 @@ public ValueVector visit(ExtensionTypeVector deltaVector, Void value) { deltaVector.getUnderlyingVector().accept(underlyingAppender, null); return targetVector; } + + @Override + public ValueVector visit(RunEndEncodedVector deltaVector, Void value) { + return targetVector; // TODO + } } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorBufferVisitor.java b/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorBufferVisitor.java index 0c9140c360d15..47e32fa1c6358 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorBufferVisitor.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorBufferVisitor.java @@ -35,6 +35,7 @@ import org.apache.arrow.vector.complex.LargeListVector; import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.complex.NonNullableStructVector; +import org.apache.arrow.vector.complex.RunEndEncodedVector; import org.apache.arrow.vector.complex.UnionVector; import org.apache.arrow.vector.types.pojo.ArrowType; @@ -287,4 +288,9 @@ public Void visit(ExtensionTypeVector vector, Void value) { vector.getUnderlyingVector().accept(this, value); return null; } + + @Override + public Void visit(RunEndEncodedVector vector, Void value) { + return null; // TODO + } } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorDataVisitor.java b/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorDataVisitor.java index c62bff79f7710..0e2cab39d48a2 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorDataVisitor.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorDataVisitor.java @@ -32,6 +32,7 @@ import org.apache.arrow.vector.complex.LargeListVector; import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.complex.NonNullableStructVector; +import org.apache.arrow.vector.complex.RunEndEncodedVector; import org.apache.arrow.vector.complex.UnionVector; /** Utility for validating vector data. */ @@ -206,4 +207,9 @@ public Void visit(ExtensionTypeVector vector, Void value) { vector.getUnderlyingVector().accept(this, value); return null; } + + @Override + public Void visit(RunEndEncodedVector vector, Void value) { + return null; // TODO + } } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorTypeVisitor.java b/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorTypeVisitor.java index f947dcf41342f..674929606d5b8 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorTypeVisitor.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorTypeVisitor.java @@ -67,6 +67,7 @@ import org.apache.arrow.vector.complex.LargeListVector; import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.complex.NonNullableStructVector; +import org.apache.arrow.vector.complex.RunEndEncodedVector; import org.apache.arrow.vector.complex.UnionVector; import org.apache.arrow.vector.types.DateUnit; import org.apache.arrow.vector.types.FloatingPointPrecision; @@ -478,4 +479,13 @@ public Void visit(ExtensionTypeVector vector, Void value) { validateExtensionTypeVector(vector); return null; } + + @Override + public Void visit(RunEndEncodedVector vector, Void value) { + validateVectorCommon(vector, ArrowType.RunEndEncoded.class); + for (ValueVector subVector : vector.getChildrenFromFields()) { + subVector.accept(this, null); + } + return null; + } } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorVisitor.java b/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorVisitor.java index 5004ba488cacd..294add01c652c 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorVisitor.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorVisitor.java @@ -32,6 +32,7 @@ import org.apache.arrow.vector.complex.LargeListVector; import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.complex.NonNullableStructVector; +import org.apache.arrow.vector.complex.RunEndEncodedVector; import org.apache.arrow.vector.complex.UnionVector; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.util.ValueVectorUtility; @@ -318,4 +319,9 @@ public Void visit(ExtensionTypeVector vector, Void value) { vector.getUnderlyingVector().accept(this, value); return null; } + + @Override + public Void visit(RunEndEncodedVector vector, Void value) { + return null; // TODO + } } diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestRunEndEncodedVector.java b/java/vector/src/test/java/org/apache/arrow/vector/TestRunEndEncodedVector.java new file mode 100644 index 0000000000000..77d00c76131b9 --- /dev/null +++ b/java/vector/src/test/java/org/apache/arrow/vector/TestRunEndEncodedVector.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.vector; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.List; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.complex.RunEndEncodedVector; +import org.apache.arrow.vector.types.Types; +import org.apache.arrow.vector.types.pojo.ArrowType.RunEndEncoded; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class TestRunEndEncodedVector { + + private BufferAllocator allocator; + + @BeforeEach + public void init() { + allocator = new DirtyRootAllocator(Long.MAX_VALUE, (byte) 100); + } + + @AfterEach + public void terminate() throws Exception { + allocator.close(); + } + + @Test + public void testInitializeChildrenFromFields() throws Exception { + final FieldType valueType = FieldType.notNullable(Types.MinorType.BIGINT.getType()); + final FieldType runEndType = FieldType.notNullable(Types.MinorType.INT.getType()); + + final Field valueField = new Field("value", valueType, null); + final Field runEndField = new Field("ree", runEndType, null); + + try (RunEndEncodedVector reeVector = RunEndEncodedVector.empty("empty", allocator)) { + reeVector.initializeChildrenFromFields(List.of(runEndField, valueField)); + + reeVector.validate(); + } + } + + /** Create REE vector with constant value. */ + @Test + public void testConstantValueVector() throws Exception { + final FieldType valueType = FieldType.notNullable(Types.MinorType.BIGINT.getType()); + final FieldType runEndType = FieldType.notNullable(Types.MinorType.INT.getType()); + + final Field valueField = new Field("value", valueType, null); + final Field runEndField = new Field("ree", runEndType, null); + final Field runEndEncodedField = + new Field( + "constant", + FieldType.notNullable(RunEndEncoded.INSTANCE), + List.of(runEndField, valueField)); + + try (RunEndEncodedVector reeVector = + new RunEndEncodedVector(runEndEncodedField, allocator, null)) { + int runCount = 1; + int logicalValueCount = 100; + + reeVector.allocateNew(); + reeVector.setInitialCapacity(runCount); + ((BigIntVector) reeVector.getValuesVector()).set(0, 65536); + ((IntVector) reeVector.getRunEndsVector()).set(0, logicalValueCount); + reeVector.getValuesVector().setValueCount(runCount); + reeVector.getRunEndsVector().setValueCount(runCount); + reeVector.setValueCount(logicalValueCount); + + assertEquals(logicalValueCount, reeVector.getValueCount()); + for (int i = 0; i < logicalValueCount; i++) { + assertEquals(65536L, reeVector.getObject(i)); + } + } + } + + /** Create REE vector representing: [1, 2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 5, 5, 5, 5]. */ + @Test + public void testBasicRunEndEncodedVector() throws Exception { + + final FieldType valueType = FieldType.notNullable(Types.MinorType.BIGINT.getType()); + final FieldType runEndType = FieldType.notNullable(Types.MinorType.INT.getType()); + + final Field valueField = new Field("value", valueType, null); + final Field runEndField = new Field("ree", runEndType, null); + final Field runEndEncodedField = + new Field( + "ree", FieldType.notNullable(RunEndEncoded.INSTANCE), List.of(runEndField, valueField)); + + try (RunEndEncodedVector reeVector = + new RunEndEncodedVector(runEndEncodedField, allocator, null)) { + int runCount = 5; + reeVector.allocateNew(); + reeVector.setInitialCapacity(runCount); + int end = 0; + for (int i = 1; i <= runCount; i++) { + end += i; + ((BigIntVector) reeVector.getValuesVector()).set(i - 1, i); + ((IntVector) reeVector.getRunEndsVector()).set(i - 1, end); + } + + reeVector.getValuesVector().setValueCount(runCount); + reeVector.getRunEndsVector().setValueCount(runCount); + reeVector.setValueCount(end); + + assertEquals(15, reeVector.getValueCount()); + int index = 0; + for (int i = 1; i < runCount + 1; i++) { + for (int j = 0; j < i; j++) { + assertEquals((long) i, reeVector.getObject(index)); + index++; + } + } + } + } +} From 2d6eb8906307e09248b786b3c691bd639f1ae9c1 Mon Sep 17 00:00:00 2001 From: ViggoC Date: Sun, 1 Sep 2024 13:16:33 +0000 Subject: [PATCH 02/17] rebase --- .../java/org/apache/arrow/vector/extension/OpaqueType.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/java/vector/src/main/java/org/apache/arrow/vector/extension/OpaqueType.java b/java/vector/src/main/java/org/apache/arrow/vector/extension/OpaqueType.java index f4f06dad2a424..ca56214fdac77 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/extension/OpaqueType.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/extension/OpaqueType.java @@ -394,5 +394,10 @@ public FieldVector visit(ListView type) { public FieldVector visit(LargeListView type) { throw unsupported(type); } + + @Override + public FieldVector visit(RunEndEncoded type) { + throw unsupported(type); + } } } From 16cb751861cc5101ba109fddf40315589a125218 Mon Sep 17 00:00:00 2001 From: ViggoC Date: Sun, 1 Sep 2024 15:30:41 +0000 Subject: [PATCH 03/17] fix test --- java/vector/src/main/codegen/templates/UnionReader.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/vector/src/main/codegen/templates/UnionReader.java b/java/vector/src/main/codegen/templates/UnionReader.java index d2b2f4bb70975..68e30ef48846b 100644 --- a/java/vector/src/main/codegen/templates/UnionReader.java +++ b/java/vector/src/main/codegen/templates/UnionReader.java @@ -39,7 +39,7 @@ @SuppressWarnings("unused") public class UnionReader extends AbstractFieldReader { - private static final int NUM_SUPPORTED_TYPES = 50; + private static final int NUM_SUPPORTED_TYPES = 51; private BaseReader[] readers = new BaseReader[NUM_SUPPORTED_TYPES]; public UnionVector data; From b5208dc9d19e7e355363b4bc318a3744d540a18b Mon Sep 17 00:00:00 2001 From: "chenweiguo.vc" Date: Mon, 9 Sep 2024 23:54:50 +0800 Subject: [PATCH 04/17] rename getPhysicalIndex --- .../vector/complex/RunEndEncodedVector.java | 93 +++++++++---------- .../arrow/vector/TestRunEndEncodedVector.java | 6 +- 2 files changed, 47 insertions(+), 52 deletions(-) diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java index 82ad9d19b5b4e..d074d0ce2245b 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.arrow.vector.complex; import static org.apache.arrow.util.Preconditions.checkArgument; @@ -64,10 +65,10 @@ public static RunEndEncodedVector empty(String name, BufferAllocator allocator) /** * Constructs a new instance. * - * @param name The name of the instance. + * @param name The name of the instance. * @param allocator The allocator to use for allocating/reallocating buffers. * @param fieldType The type of the array that is run-end encoded. - * @param callBack A schema change callback. + * @param callBack A schema change callback. */ public RunEndEncodedVector( String name, BufferAllocator allocator, FieldType fieldType, CallBack callBack) { @@ -77,9 +78,9 @@ public RunEndEncodedVector( /** * Constructs a new instance. * - * @param field The field materialized by this vector. + * @param field The field materialized by this vector. * @param allocator The allocator to use for allocating/reallocating buffers. - * @param callBack A schema change callback. + * @param callBack A schema change callback. */ public RunEndEncodedVector(Field field, BufferAllocator allocator, CallBack callBack) { super(allocator); @@ -148,10 +149,6 @@ protected FieldReader getReaderImpl() { */ @Override public void setInitialCapacity(int numRecords) { - // TODO: does it make sense? - for (FieldVector v : getChildrenFromFields()) { - v.setInitialCapacity(numRecords); - } } /** @@ -167,7 +164,9 @@ public int getValueCapacity() { .orElseThrow(NoSuchElementException::new); } - /** Alternative to clear(). Allows use as an AutoCloseable in try-with-resources. */ + /** + * Alternative to clear(). Allows use as an AutoCloseable in try-with-resources. + */ @Override public void close() { for (FieldVector v : getChildrenFromFields()) { @@ -220,7 +219,7 @@ public MinorType getMinorType() { * * @param allocator the target allocator * @return a {@link org.apache.arrow.vector.util.TransferPair transfer pair}, creating a new - * target vector of the same type. + * target vector of the same type. */ @Override public TransferPair getTransferPair(BufferAllocator allocator) { @@ -231,10 +230,10 @@ public TransferPair getTransferPair(BufferAllocator allocator) { /** * To transfer quota responsibility. * - * @param ref the name of the vector + * @param ref the name of the vector * @param allocator the target allocator * @return a {@link org.apache.arrow.vector.util.TransferPair transfer pair}, creating a new - * target vector of the same type. + * target vector of the same type. */ @Override public TransferPair getTransferPair(String ref, BufferAllocator allocator) { @@ -244,10 +243,10 @@ public TransferPair getTransferPair(String ref, BufferAllocator allocator) { /** * To transfer quota responsibility. * - * @param field the Field object used by the target vector + * @param field the Field object used by the target vector * @param allocator the target allocator * @return a {@link org.apache.arrow.vector.util.TransferPair transfer pair}, creating a new - * target vector of the same type. + * target vector of the same type. */ @Override public TransferPair getTransferPair(Field field, BufferAllocator allocator) { @@ -257,11 +256,11 @@ public TransferPair getTransferPair(Field field, BufferAllocator allocator) { /** * To transfer quota responsibility. * - * @param ref the name of the vector + * @param ref the name of the vector * @param allocator the target allocator - * @param callBack A schema change callback. + * @param callBack A schema change callback. * @return a {@link org.apache.arrow.vector.util.TransferPair transfer pair}, creating a new - * target vector of the same type. + * target vector of the same type. */ @Override public TransferPair getTransferPair(String ref, BufferAllocator allocator, CallBack callBack) { @@ -272,11 +271,11 @@ public TransferPair getTransferPair(String ref, BufferAllocator allocator, CallB /** * To transfer quota responsibility. * - * @param field the Field object used by the target vector + * @param field the Field object used by the target vector * @param allocator the target allocator - * @param callBack A schema change callback. + * @param callBack A schema change callback. * @return a {@link org.apache.arrow.vector.util.TransferPair transfer pair}, creating a new - * target vector of the same type. + * target vector of the same type. */ @Override public TransferPair getTransferPair(Field field, BufferAllocator allocator, CallBack callBack) { @@ -289,7 +288,7 @@ public TransferPair getTransferPair(Field field, BufferAllocator allocator, Call * * @param target the target for the transfer * @return a new {@link org.apache.arrow.vector.util.TransferPair transfer pair} that is used to - * transfer underlying buffers into the target vector. + * transfer underlying buffers into the target vector. */ @Override public TransferPair makeTransferPair(ValueVector target) { @@ -301,7 +300,7 @@ public TransferPair makeTransferPair(ValueVector target) { * Get a reader for this vector. * * @return a {@link org.apache.arrow.vector.complex.reader.FieldReader field reader} that supports - * reading values from this vector. + * reading values from this vector. */ @Override public FieldReader getReader() { @@ -312,7 +311,7 @@ public FieldReader getReader() { * Get a writer for this vector. * * @return a {@link org.apache.arrow.vector.complex.writer.FieldWriter field writer} that supports - * writing values to this vector. + * writing values to this vector. */ public FieldWriter getWriter() { return null; // TODO @@ -355,7 +354,7 @@ public int getBufferSizeFor(int valueCount) { * (unless they change it). * * @param clear Whether to clear vector before returning; the buffers will still be refcounted; - * but the returned array will be the only reference to them + * but the returned array will be the only reference to them * @return The underlying {@link ArrowBuf buffers} that is used by this vector instance. */ @Override @@ -403,7 +402,9 @@ public int getValueCount() { return valueCount; } - /** Set number of values in the vector. */ + /** + * Set number of values in the vector. + */ @Override public void setValueCount(int valueCount) { this.valueCount = valueCount; @@ -417,7 +418,7 @@ public void setValueCount(int valueCount) { */ @Override public Object getObject(int index) { - int physicalIndex = findFirstLargerThan(runEndsVector, index); + int physicalIndex = getPhysicalIndex(runEndsVector, index); return valuesVector.getObject(physicalIndex); } @@ -428,18 +429,7 @@ public Object getObject(int index) { */ @Override public int getNullCount() { - if (valuesVector.getNullCount() != 0) { - long nullCount = 0; - // TODO: make it more efficient - for (int i = 0; i < valuesVector.getValueCount(); i++) { - if (valuesVector.isNull(i)) { - long lastEnd = i > 0 ? runEndsVector.getValueAsLong(i - 1) : 0; - long nullEunLength = runEndsVector.getValueAsLong(i) - lastEnd; - nullCount += nullEunLength; - } - } - return (int) nullCount; - } + // Null count must is always 0 for run-end encoded array return 0; } @@ -451,17 +441,21 @@ public int getNullCount() { */ @Override public boolean isNull(int index) { - int physicalIndex = findFirstLargerThan(runEndsVector, index); + int physicalIndex = getPhysicalIndex(runEndsVector, index); return valuesVector.isNull(physicalIndex); } - /** Returns hashCode of element in index with the default hasher. */ + /** + * Returns hashCode of element in index with the default hasher. + */ @Override public int hashCode(int index) { return hashCode(index, null); } - /** Returns hashCode of element in index with the given hasher. */ + /** + * Returns hashCode of element in index with the given hasher. + */ @Override public int hashCode(int index, ArrowBufHasher hasher) { int hash = 0; @@ -477,7 +471,7 @@ public int hashCode(int index, ArrowBufHasher hasher) { * Accept a generic {@link VectorVisitor} and return the result. * * @param the output result type. - * @param the input data together with visitor. + * @param the input data together with visitor. */ @Override public OUT accept(VectorVisitor visitor, IN value) { @@ -536,7 +530,7 @@ public List getChildrenFromFields() { /** * Loads data in the vectors. (ownBuffers must be the same size as getFieldVectors()) * - * @param fieldNode the fieldNode + * @param fieldNode the fieldNode * @param ownBuffers the buffers for this Field (own buffers only, children not included) */ @Override @@ -560,7 +554,7 @@ public List getFieldBuffers() { * * @return the inner vectors for this field as defined by the TypeLayout * @deprecated This API will be removed as the current implementations no longer support inner - * vectors. + * vectors. */ @Deprecated @Override @@ -619,8 +613,11 @@ public FieldVector getValuesVector() { return valuesVector; } - static int findFirstLargerThan(BaseIntVector runEndVector, int logicalIndex) { - int target = logicalIndex + 1; + /** + * The physical index is the index of the first value that is larger than logical index. e.g. if + * run_ends is [1,2,3], the physical index of logical index from 0 to 5 is [0, 1, 1, 2, 2, 2] + */ + static int getPhysicalIndex(BaseIntVector runEndVector, int logicalIndex) { if (runEndVector == null || runEndVector.getValueCount() == 0) { return -1; } @@ -632,9 +629,7 @@ static int findFirstLargerThan(BaseIntVector runEndVector, int logicalIndex) { while (low <= high) { int mid = low + (high - low) / 2; long valueAsLong = runEndVector.getValueAsLong(mid); - if (valueAsLong == target) { - return mid; - } else if (valueAsLong > target) { + if (valueAsLong > logicalIndex) { result = mid; high = mid - 1; } else { diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestRunEndEncodedVector.java b/java/vector/src/test/java/org/apache/arrow/vector/TestRunEndEncodedVector.java index 77d00c76131b9..aee776841b084 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/TestRunEndEncodedVector.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/TestRunEndEncodedVector.java @@ -44,7 +44,7 @@ public void terminate() throws Exception { } @Test - public void testInitializeChildrenFromFields() throws Exception { + public void testInitializeChildrenFromFields() { final FieldType valueType = FieldType.notNullable(Types.MinorType.BIGINT.getType()); final FieldType runEndType = FieldType.notNullable(Types.MinorType.INT.getType()); @@ -60,7 +60,7 @@ public void testInitializeChildrenFromFields() throws Exception { /** Create REE vector with constant value. */ @Test - public void testConstantValueVector() throws Exception { + public void testConstantValueVector() { final FieldType valueType = FieldType.notNullable(Types.MinorType.BIGINT.getType()); final FieldType runEndType = FieldType.notNullable(Types.MinorType.INT.getType()); @@ -94,7 +94,7 @@ public void testConstantValueVector() throws Exception { /** Create REE vector representing: [1, 2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 5, 5, 5, 5]. */ @Test - public void testBasicRunEndEncodedVector() throws Exception { + public void testBasicRunEndEncodedVector() { final FieldType valueType = FieldType.notNullable(Types.MinorType.BIGINT.getType()); final FieldType runEndType = FieldType.notNullable(Types.MinorType.INT.getType()); From 5c86cbf56982913d30db7469a5ab3c48008f4bf1 Mon Sep 17 00:00:00 2001 From: "chenweiguo.vc" Date: Tue, 10 Sep 2024 13:40:12 +0800 Subject: [PATCH 05/17] test index out of bound --- .../vector/complex/RunEndEncodedVector.java | 79 ++++++++++--------- .../arrow/vector/TestRunEndEncodedVector.java | 9 ++- 2 files changed, 49 insertions(+), 39 deletions(-) diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java index d074d0ce2245b..4dc6f6c03e513 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java @@ -65,10 +65,10 @@ public static RunEndEncodedVector empty(String name, BufferAllocator allocator) /** * Constructs a new instance. * - * @param name The name of the instance. + * @param name The name of the instance. * @param allocator The allocator to use for allocating/reallocating buffers. * @param fieldType The type of the array that is run-end encoded. - * @param callBack A schema change callback. + * @param callBack A schema change callback. */ public RunEndEncodedVector( String name, BufferAllocator allocator, FieldType fieldType, CallBack callBack) { @@ -78,9 +78,9 @@ public RunEndEncodedVector( /** * Constructs a new instance. * - * @param field The field materialized by this vector. + * @param field The field materialized by this vector. * @param allocator The allocator to use for allocating/reallocating buffers. - * @param callBack A schema change callback. + * @param callBack A schema change callback. */ public RunEndEncodedVector(Field field, BufferAllocator allocator, CallBack callBack) { super(allocator); @@ -148,8 +148,7 @@ protected FieldReader getReaderImpl() { * @param numRecords the initial record capacity. */ @Override - public void setInitialCapacity(int numRecords) { - } + public void setInitialCapacity(int numRecords) {} /** * Returns the maximum number of values that can be stored in this vector instance. @@ -164,9 +163,7 @@ public int getValueCapacity() { .orElseThrow(NoSuchElementException::new); } - /** - * Alternative to clear(). Allows use as an AutoCloseable in try-with-resources. - */ + /** Alternative to clear(). Allows use as an AutoCloseable in try-with-resources. */ @Override public void close() { for (FieldVector v : getChildrenFromFields()) { @@ -219,7 +216,7 @@ public MinorType getMinorType() { * * @param allocator the target allocator * @return a {@link org.apache.arrow.vector.util.TransferPair transfer pair}, creating a new - * target vector of the same type. + * target vector of the same type. */ @Override public TransferPair getTransferPair(BufferAllocator allocator) { @@ -230,10 +227,10 @@ public TransferPair getTransferPair(BufferAllocator allocator) { /** * To transfer quota responsibility. * - * @param ref the name of the vector + * @param ref the name of the vector * @param allocator the target allocator * @return a {@link org.apache.arrow.vector.util.TransferPair transfer pair}, creating a new - * target vector of the same type. + * target vector of the same type. */ @Override public TransferPair getTransferPair(String ref, BufferAllocator allocator) { @@ -243,10 +240,10 @@ public TransferPair getTransferPair(String ref, BufferAllocator allocator) { /** * To transfer quota responsibility. * - * @param field the Field object used by the target vector + * @param field the Field object used by the target vector * @param allocator the target allocator * @return a {@link org.apache.arrow.vector.util.TransferPair transfer pair}, creating a new - * target vector of the same type. + * target vector of the same type. */ @Override public TransferPair getTransferPair(Field field, BufferAllocator allocator) { @@ -256,11 +253,11 @@ public TransferPair getTransferPair(Field field, BufferAllocator allocator) { /** * To transfer quota responsibility. * - * @param ref the name of the vector + * @param ref the name of the vector * @param allocator the target allocator - * @param callBack A schema change callback. + * @param callBack A schema change callback. * @return a {@link org.apache.arrow.vector.util.TransferPair transfer pair}, creating a new - * target vector of the same type. + * target vector of the same type. */ @Override public TransferPair getTransferPair(String ref, BufferAllocator allocator, CallBack callBack) { @@ -271,11 +268,11 @@ public TransferPair getTransferPair(String ref, BufferAllocator allocator, CallB /** * To transfer quota responsibility. * - * @param field the Field object used by the target vector + * @param field the Field object used by the target vector * @param allocator the target allocator - * @param callBack A schema change callback. + * @param callBack A schema change callback. * @return a {@link org.apache.arrow.vector.util.TransferPair transfer pair}, creating a new - * target vector of the same type. + * target vector of the same type. */ @Override public TransferPair getTransferPair(Field field, BufferAllocator allocator, CallBack callBack) { @@ -288,7 +285,7 @@ public TransferPair getTransferPair(Field field, BufferAllocator allocator, Call * * @param target the target for the transfer * @return a new {@link org.apache.arrow.vector.util.TransferPair transfer pair} that is used to - * transfer underlying buffers into the target vector. + * transfer underlying buffers into the target vector. */ @Override public TransferPair makeTransferPair(ValueVector target) { @@ -300,7 +297,7 @@ public TransferPair makeTransferPair(ValueVector target) { * Get a reader for this vector. * * @return a {@link org.apache.arrow.vector.complex.reader.FieldReader field reader} that supports - * reading values from this vector. + * reading values from this vector. */ @Override public FieldReader getReader() { @@ -311,7 +308,7 @@ public FieldReader getReader() { * Get a writer for this vector. * * @return a {@link org.apache.arrow.vector.complex.writer.FieldWriter field writer} that supports - * writing values to this vector. + * writing values to this vector. */ public FieldWriter getWriter() { return null; // TODO @@ -354,7 +351,7 @@ public int getBufferSizeFor(int valueCount) { * (unless they change it). * * @param clear Whether to clear vector before returning; the buffers will still be refcounted; - * but the returned array will be the only reference to them + * but the returned array will be the only reference to them * @return The underlying {@link ArrowBuf buffers} that is used by this vector instance. */ @Override @@ -402,9 +399,7 @@ public int getValueCount() { return valueCount; } - /** - * Set number of values in the vector. - */ + /** Set number of values in the vector. */ @Override public void setValueCount(int valueCount) { this.valueCount = valueCount; @@ -418,7 +413,8 @@ public void setValueCount(int valueCount) { */ @Override public Object getObject(int index) { - int physicalIndex = getPhysicalIndex(runEndsVector, index); + checkIndex(index); + int physicalIndex = getPhysicalIndex(index); return valuesVector.getObject(physicalIndex); } @@ -445,17 +441,13 @@ public boolean isNull(int index) { return valuesVector.isNull(physicalIndex); } - /** - * Returns hashCode of element in index with the default hasher. - */ + /** Returns hashCode of element in index with the default hasher. */ @Override public int hashCode(int index) { return hashCode(index, null); } - /** - * Returns hashCode of element in index with the given hasher. - */ + /** Returns hashCode of element in index with the given hasher. */ @Override public int hashCode(int index, ArrowBufHasher hasher) { int hash = 0; @@ -471,7 +463,7 @@ public int hashCode(int index, ArrowBufHasher hasher) { * Accept a generic {@link VectorVisitor} and return the result. * * @param the output result type. - * @param the input data together with visitor. + * @param the input data together with visitor. */ @Override public OUT accept(VectorVisitor visitor, IN value) { @@ -510,7 +502,8 @@ public void initializeChildrenFromFields(List children) { Arrays.asList( MinorType.SMALLINT.getType(), MinorType.INT.getType(), MinorType.BIGINT.getType()) .contains(children.get(0).getType()), - "The first field represents the run-end vector and must be of type int with size 16, 32, or 64 bits. Found: %s", + "The first field represents the run-end vector and must be of type int " + + "with size 16, 32, or 64 bits. Found: %s", children.get(0).getType()); runEndsVector = (BaseIntVector) children.get(0).createVector(allocator); valuesVector = children.get(1).createVector(allocator); @@ -530,7 +523,7 @@ public List getChildrenFromFields() { /** * Loads data in the vectors. (ownBuffers must be the same size as getFieldVectors()) * - * @param fieldNode the fieldNode + * @param fieldNode the fieldNode * @param ownBuffers the buffers for this Field (own buffers only, children not included) */ @Override @@ -554,7 +547,7 @@ public List getFieldBuffers() { * * @return the inner vectors for this field as defined by the TypeLayout * @deprecated This API will be removed as the current implementations no longer support inner - * vectors. + * vectors. */ @Deprecated @Override @@ -613,10 +606,20 @@ public FieldVector getValuesVector() { return valuesVector; } + private void checkIndex(int logicalIndex) { + if (logicalIndex < 0 || logicalIndex >= valueCount) { + throw new IndexOutOfBoundsException(); + } + } + /** * The physical index is the index of the first value that is larger than logical index. e.g. if * run_ends is [1,2,3], the physical index of logical index from 0 to 5 is [0, 1, 1, 2, 2, 2] */ + public int getPhysicalIndex(int logicalIndex) { + return getPhysicalIndex(runEndsVector, logicalIndex); + } + static int getPhysicalIndex(BaseIntVector runEndVector, int logicalIndex) { if (runEndVector == null || runEndVector.getValueCount() == 0) { return -1; diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestRunEndEncodedVector.java b/java/vector/src/test/java/org/apache/arrow/vector/TestRunEndEncodedVector.java index aee776841b084..d37c2c495c610 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/TestRunEndEncodedVector.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/TestRunEndEncodedVector.java @@ -17,6 +17,7 @@ package org.apache.arrow.vector; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import java.util.List; import org.apache.arrow.memory.BufferAllocator; @@ -117,9 +118,11 @@ public void testBasicRunEndEncodedVector() { ((IntVector) reeVector.getRunEndsVector()).set(i - 1, end); } + final int logicalValueCount = end; + reeVector.getValuesVector().setValueCount(runCount); reeVector.getRunEndsVector().setValueCount(runCount); - reeVector.setValueCount(end); + reeVector.setValueCount(logicalValueCount); assertEquals(15, reeVector.getValueCount()); int index = 0; @@ -129,6 +132,10 @@ public void testBasicRunEndEncodedVector() { index++; } } + + // test index out of bound + assertThrows(IndexOutOfBoundsException.class, () -> reeVector.getObject(-1)); + assertThrows(IndexOutOfBoundsException.class, () -> reeVector.getObject(logicalValueCount)); } } } From d12977e22b26f4ef88039aeca2cf55d869d6f87d Mon Sep 17 00:00:00 2001 From: "chenweiguo.vc" Date: Wed, 11 Sep 2024 13:26:56 +0800 Subject: [PATCH 06/17] range compare --- .../vector/compare/RangeEqualsVisitor.java | 58 ++++++- .../vector/complex/RunEndEncodedVector.java | 7 +- .../arrow/vector/TestRunEndEncodedVector.java | 157 ++++++++++++------ 3 files changed, 166 insertions(+), 56 deletions(-) diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compare/RangeEqualsVisitor.java b/java/vector/src/main/java/org/apache/arrow/vector/compare/RangeEqualsVisitor.java index d62bd43e5a090..4944473426268 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/compare/RangeEqualsVisitor.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/compare/RangeEqualsVisitor.java @@ -29,6 +29,7 @@ import org.apache.arrow.vector.BaseVariableWidthViewVector; import org.apache.arrow.vector.BitVector; import org.apache.arrow.vector.ExtensionTypeVector; +import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.NullVector; import org.apache.arrow.vector.ValueVector; import org.apache.arrow.vector.complex.BaseLargeRepeatedValueViewVector; @@ -46,6 +47,7 @@ /** Visitor to compare a range of values for vectors. */ public class RangeEqualsVisitor implements VectorVisitor { + private ValueVector left; private ValueVector right; @@ -226,6 +228,14 @@ public Boolean visit(NullVector left, Range range) { return true; } + @Override + public Boolean visit(RunEndEncodedVector left, Range range) { + if (!validate(left)) { + return false; + } + return compareRunEndEncodedVectors(range); + } + @Override public Boolean visit(ExtensionTypeVector left, Range range) { if (!(right instanceof ExtensionTypeVector) || !validate(left)) { @@ -255,6 +265,46 @@ public Boolean visit(LargeListViewVector left, Range range) { return compareLargeListViewVectors(range); } + protected boolean compareRunEndEncodedVectors(Range range) { + RunEndEncodedVector leftVector = (RunEndEncodedVector) left; + RunEndEncodedVector rightVector = (RunEndEncodedVector) right; + + final int leftRangeEnd = range.getLeftStart() + range.getLength(); + final int rightRangeEnd = range.getRightStart() + range.getLength(); + + FieldVector leftValuesVector = leftVector.getValuesVector(); + FieldVector rightValuesVector = rightVector.getValuesVector(); + + RangeEqualsVisitor innerVisitor = createInnerVisitor(leftValuesVector, rightValuesVector, null); + + int leftLogicalIndex = range.getLeftStart(); + int rightLogicalIndex = range.getRightStart(); + + while (leftLogicalIndex < leftRangeEnd) { + int leftPhysicalIndex = leftVector.getPhysicalIndex(leftLogicalIndex); + int rightPhysicalIndex = rightVector.getPhysicalIndex(rightLogicalIndex); + if (leftValuesVector.accept( + innerVisitor, new Range(leftPhysicalIndex, rightPhysicalIndex, 1))) { + int leftRunEnd = leftVector.getRunEnd(leftLogicalIndex); + int rightRunEnd = rightVector.getRunEnd(rightLogicalIndex); + + int leftRunLength = Math.min(leftRunEnd, leftRangeEnd) - leftLogicalIndex; + int rightRunLength = Math.min(rightRunEnd, rightRangeEnd) - rightLogicalIndex; + + if (leftRunLength != rightRunLength) { + return false; + } else { + leftLogicalIndex = leftRunEnd; + rightLogicalIndex = rightRunEnd; + } + } else { + return false; + } + } + + return true; + } + protected RangeEqualsVisitor createInnerVisitor( ValueVector leftInner, ValueVector rightInner, @@ -262,14 +312,6 @@ protected RangeEqualsVisitor createInnerVisitor( return new RangeEqualsVisitor(leftInner, rightInner, typeComparator); } - @Override - public Boolean visit(RunEndEncodedVector left, Range range) { - if (!validate(left)) { - return false; - } - return true; // TODO - } - protected boolean compareUnionVectors(Range range) { UnionVector leftVector = (UnionVector) left; UnionVector rightVector = (UnionVector) right; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java index 4dc6f6c03e513..7799b0f13eb28 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.arrow.vector.complex; import static org.apache.arrow.util.Preconditions.checkArgument; @@ -418,6 +417,12 @@ public Object getObject(int index) { return valuesVector.getObject(physicalIndex); } + public int getRunEnd(int index) { + checkIndex(index); + int physicalIndex = getPhysicalIndex(index); + return (int) runEndsVector.getValueAsLong(physicalIndex); + } + /** * Returns number of null elements in the vector. * diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestRunEndEncodedVector.java b/java/vector/src/test/java/org/apache/arrow/vector/TestRunEndEncodedVector.java index d37c2c495c610..413743559ba70 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/TestRunEndEncodedVector.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/TestRunEndEncodedVector.java @@ -17,10 +17,14 @@ package org.apache.arrow.vector; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.List; import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.compare.Range; +import org.apache.arrow.vector.compare.RangeEqualsVisitor; import org.apache.arrow.vector.complex.RunEndEncodedVector; import org.apache.arrow.vector.types.Types; import org.apache.arrow.vector.types.pojo.ArrowType.RunEndEncoded; @@ -48,13 +52,11 @@ public void terminate() throws Exception { public void testInitializeChildrenFromFields() { final FieldType valueType = FieldType.notNullable(Types.MinorType.BIGINT.getType()); final FieldType runEndType = FieldType.notNullable(Types.MinorType.INT.getType()); - final Field valueField = new Field("value", valueType, null); final Field runEndField = new Field("ree", runEndType, null); try (RunEndEncodedVector reeVector = RunEndEncodedVector.empty("empty", allocator)) { reeVector.initializeChildrenFromFields(List.of(runEndField, valueField)); - reeVector.validate(); } } @@ -62,67 +64,40 @@ public void testInitializeChildrenFromFields() { /** Create REE vector with constant value. */ @Test public void testConstantValueVector() { - final FieldType valueType = FieldType.notNullable(Types.MinorType.BIGINT.getType()); - final FieldType runEndType = FieldType.notNullable(Types.MinorType.INT.getType()); - - final Field valueField = new Field("value", valueType, null); - final Field runEndField = new Field("ree", runEndType, null); - final Field runEndEncodedField = - new Field( - "constant", - FieldType.notNullable(RunEndEncoded.INSTANCE), - List.of(runEndField, valueField)); + final Field runEndEncodedField = createBigIntRunEndEncodedField("constant"); try (RunEndEncodedVector reeVector = new RunEndEncodedVector(runEndEncodedField, allocator, null)) { - int runCount = 1; int logicalValueCount = 100; - - reeVector.allocateNew(); - reeVector.setInitialCapacity(runCount); - ((BigIntVector) reeVector.getValuesVector()).set(0, 65536); - ((IntVector) reeVector.getRunEndsVector()).set(0, logicalValueCount); - reeVector.getValuesVector().setValueCount(runCount); - reeVector.getRunEndsVector().setValueCount(runCount); - reeVector.setValueCount(logicalValueCount); - + int value = 65536; + setConstantVector(reeVector, value, logicalValueCount); assertEquals(logicalValueCount, reeVector.getValueCount()); for (int i = 0; i < logicalValueCount; i++) { - assertEquals(65536L, reeVector.getObject(i)); + assertEquals(value, reeVector.getObject(i)); } } } + private static void setConstantVector( + RunEndEncodedVector constantVector, int value, int logicalValueCount) { + int runCount = 1; + constantVector.allocateNew(); + ((BigIntVector) constantVector.getValuesVector()).set(0, value); + ((IntVector) constantVector.getRunEndsVector()).set(0, logicalValueCount); + constantVector.getValuesVector().setValueCount(runCount); + constantVector.getRunEndsVector().setValueCount(runCount); + constantVector.setValueCount(logicalValueCount); + } + /** Create REE vector representing: [1, 2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 5, 5, 5, 5]. */ @Test public void testBasicRunEndEncodedVector() { - final FieldType valueType = FieldType.notNullable(Types.MinorType.BIGINT.getType()); - final FieldType runEndType = FieldType.notNullable(Types.MinorType.INT.getType()); - - final Field valueField = new Field("value", valueType, null); - final Field runEndField = new Field("ree", runEndType, null); - final Field runEndEncodedField = - new Field( - "ree", FieldType.notNullable(RunEndEncoded.INSTANCE), List.of(runEndField, valueField)); - try (RunEndEncodedVector reeVector = - new RunEndEncodedVector(runEndEncodedField, allocator, null)) { - int runCount = 5; - reeVector.allocateNew(); - reeVector.setInitialCapacity(runCount); - int end = 0; - for (int i = 1; i <= runCount; i++) { - end += i; - ((BigIntVector) reeVector.getValuesVector()).set(i - 1, i); - ((IntVector) reeVector.getRunEndsVector()).set(i - 1, end); - } - - final int logicalValueCount = end; + new RunEndEncodedVector(createBigIntRunEndEncodedField("basic"), allocator, null)) { - reeVector.getValuesVector().setValueCount(runCount); - reeVector.getRunEndsVector().setValueCount(runCount); - reeVector.setValueCount(logicalValueCount); + int runCount = 5; + final int logicalValueCount = setAccumulateVector(reeVector, runCount, 1); assertEquals(15, reeVector.getValueCount()); int index = 0; @@ -138,4 +113,92 @@ public void testBasicRunEndEncodedVector() { assertThrows(IndexOutOfBoundsException.class, () -> reeVector.getObject(logicalValueCount)); } } + + private static int setAccumulateVector( + RunEndEncodedVector reeVector, int runCount, int startRunValue) { + reeVector.allocateNew(); + reeVector.setInitialCapacity(runCount); + int end = 0; + int runValue = startRunValue; + for (int i = 0; i < runCount; i++) { + end += runValue; + ((BigIntVector) reeVector.getValuesVector()).set(i, runValue); + ((IntVector) reeVector.getRunEndsVector()).set(i, end); + runValue++; + } + + final int logicalValueCount = end; + reeVector.getValuesVector().setValueCount(runCount); + reeVector.getRunEndsVector().setValueCount(runCount); + reeVector.setValueCount(logicalValueCount); + return logicalValueCount; + } + + @Test + public void testRangeCompare() { + // test compare same constant vector + RunEndEncodedVector constantVector = + new RunEndEncodedVector(createBigIntRunEndEncodedField("constant"), allocator, null); + int logicalValueCount = 15; + + setConstantVector(constantVector, 1, logicalValueCount); + + assertTrue( + constantVector.accept( + new RangeEqualsVisitor(constantVector, constantVector), + new Range(0, 0, logicalValueCount))); + assertTrue( + constantVector.accept( + new RangeEqualsVisitor(constantVector, constantVector), new Range(1, 1, 14))); + assertTrue( + constantVector.accept( + new RangeEqualsVisitor(constantVector, constantVector), new Range(1, 2, 13))); + assertFalse( + constantVector.accept( + new RangeEqualsVisitor(constantVector, constantVector), new Range(1, 10, 10))); + assertFalse( + constantVector.accept( + new RangeEqualsVisitor(constantVector, constantVector), new Range(10, 1, 10))); + + RunEndEncodedVector reeVector = + new RunEndEncodedVector(createBigIntRunEndEncodedField("basic"), allocator, null); + setAccumulateVector(reeVector, 5, 1); + + assertTrue( + reeVector.accept( + new RangeEqualsVisitor(reeVector, reeVector), new Range(0, 0, logicalValueCount))); + assertTrue( + reeVector.accept( + new RangeEqualsVisitor(reeVector, reeVector), new Range(2, 2, logicalValueCount - 2))); + assertFalse( + reeVector.accept( + new RangeEqualsVisitor(reeVector, reeVector), new Range(1, 2, logicalValueCount - 2))); + + assertFalse( + reeVector.accept( + new RangeEqualsVisitor(reeVector, constantVector), new Range(0, 0, logicalValueCount))); + + RunEndEncodedVector reeVector2 = + new RunEndEncodedVector(createBigIntRunEndEncodedField("basic"), allocator, null); + setAccumulateVector(reeVector2, 4, 2); + + assertTrue( + reeVector.accept( + new RangeEqualsVisitor(reeVector, reeVector2), new Range(1, 0, logicalValueCount - 1))); + + constantVector.close(); + reeVector.close(); + reeVector2.close(); + } + + private static Field createBigIntRunEndEncodedField(String fieldName) { + final FieldType valueType = FieldType.notNullable(Types.MinorType.BIGINT.getType()); + final FieldType runEndType = FieldType.notNullable(Types.MinorType.INT.getType()); + + final Field valueField = new Field("value", valueType, null); + final Field runEndField = new Field("ree", runEndType, null); + + return new Field( + fieldName, FieldType.notNullable(RunEndEncoded.INSTANCE), List.of(runEndField, valueField)); + } } From 42cd1eaff54361c21137eafc5825e49d1a26b73f Mon Sep 17 00:00:00 2001 From: "chenweiguo.vc" Date: Wed, 11 Sep 2024 14:22:18 +0800 Subject: [PATCH 07/17] clean code --- .../vector/complex/RunEndEncodedVector.java | 2 +- .../arrow/vector/TestRunEndEncodedVector.java | 113 +++++++++++------- 2 files changed, 73 insertions(+), 42 deletions(-) diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java index 7799b0f13eb28..a71aa4adeaa79 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java @@ -430,7 +430,7 @@ public int getRunEnd(int index) { */ @Override public int getNullCount() { - // Null count must is always 0 for run-end encoded array + // Null count is always 0 for run-end encoded array return 0; } diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestRunEndEncodedVector.java b/java/vector/src/test/java/org/apache/arrow/vector/TestRunEndEncodedVector.java index 413743559ba70..67900d6d73257 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/TestRunEndEncodedVector.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/TestRunEndEncodedVector.java @@ -18,10 +18,12 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.List; +import java.util.function.Function; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.vector.compare.Range; import org.apache.arrow.vector.compare.RangeEqualsVisitor; @@ -65,45 +67,58 @@ public void testInitializeChildrenFromFields() { @Test public void testConstantValueVector() { final Field runEndEncodedField = createBigIntRunEndEncodedField("constant"); + int logicalValueCount = 100; + // constant vector try (RunEndEncodedVector reeVector = new RunEndEncodedVector(runEndEncodedField, allocator, null)) { - int logicalValueCount = 100; - int value = 65536; + Long value = 65536L; setConstantVector(reeVector, value, logicalValueCount); assertEquals(logicalValueCount, reeVector.getValueCount()); for (int i = 0; i < logicalValueCount; i++) { assertEquals(value, reeVector.getObject(i)); } } - } - private static void setConstantVector( - RunEndEncodedVector constantVector, int value, int logicalValueCount) { - int runCount = 1; - constantVector.allocateNew(); - ((BigIntVector) constantVector.getValuesVector()).set(0, value); - ((IntVector) constantVector.getRunEndsVector()).set(0, logicalValueCount); - constantVector.getValuesVector().setValueCount(runCount); - constantVector.getRunEndsVector().setValueCount(runCount); - constantVector.setValueCount(logicalValueCount); + // constant null vector + try (RunEndEncodedVector reeVector = + new RunEndEncodedVector(runEndEncodedField, allocator, null)) { + setConstantVector(reeVector, null, logicalValueCount); + assertEquals(logicalValueCount, reeVector.getValueCount()); + // Null count is always 0 for run-end encoded array + assertEquals(0, reeVector.getNullCount()); + for (int i = 0; i < logicalValueCount; i++) { + assertTrue(reeVector.isNull(i)); + assertNull(reeVector.getObject(i)); + } + } } - /** Create REE vector representing: [1, 2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 5, 5, 5, 5]. */ @Test public void testBasicRunEndEncodedVector() { - try (RunEndEncodedVector reeVector = new RunEndEncodedVector(createBigIntRunEndEncodedField("basic"), allocator, null)) { + // Create REE vector representing: + // [null, 2, 2, null, null, null, 4, 4, 4, 4, null, null, null, null, null]. int runCount = 5; - final int logicalValueCount = setAccumulateVector(reeVector, runCount, 1); + final int logicalValueCount = + setBasicVector( + reeVector, + runCount, + i -> i % 2 == 0 ? null : i + 1, + i -> i + 1); assertEquals(15, reeVector.getValueCount()); int index = 0; - for (int i = 1; i < runCount + 1; i++) { - for (int j = 0; j < i; j++) { - assertEquals((long) i, reeVector.getObject(index)); + for (int run = 0; run < runCount; run++) { + long expectedRunValue = (long) run + 1; + for (int j = 0; j <= run; j++) { + if (run % 2 == 0) { + assertNull(reeVector.getObject(index)); + } else { + assertEquals(expectedRunValue, reeVector.getObject(index)); + } index++; } } @@ -114,26 +129,6 @@ public void testBasicRunEndEncodedVector() { } } - private static int setAccumulateVector( - RunEndEncodedVector reeVector, int runCount, int startRunValue) { - reeVector.allocateNew(); - reeVector.setInitialCapacity(runCount); - int end = 0; - int runValue = startRunValue; - for (int i = 0; i < runCount; i++) { - end += runValue; - ((BigIntVector) reeVector.getValuesVector()).set(i, runValue); - ((IntVector) reeVector.getRunEndsVector()).set(i, end); - runValue++; - } - - final int logicalValueCount = end; - reeVector.getValuesVector().setValueCount(runCount); - reeVector.getRunEndsVector().setValueCount(runCount); - reeVector.setValueCount(logicalValueCount); - return logicalValueCount; - } - @Test public void testRangeCompare() { // test compare same constant vector @@ -141,7 +136,7 @@ public void testRangeCompare() { new RunEndEncodedVector(createBigIntRunEndEncodedField("constant"), allocator, null); int logicalValueCount = 15; - setConstantVector(constantVector, 1, logicalValueCount); + setConstantVector(constantVector, 1L, logicalValueCount); assertTrue( constantVector.accept( @@ -160,9 +155,10 @@ public void testRangeCompare() { constantVector.accept( new RangeEqualsVisitor(constantVector, constantVector), new Range(10, 1, 10))); + // Create REE vector representing: [1, 2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 5, 5, 5, 5]. RunEndEncodedVector reeVector = new RunEndEncodedVector(createBigIntRunEndEncodedField("basic"), allocator, null); - setAccumulateVector(reeVector, 5, 1); + setBasicVector(reeVector, 5, i -> i + 1, i -> i + 1); assertTrue( reeVector.accept( @@ -178,9 +174,10 @@ public void testRangeCompare() { reeVector.accept( new RangeEqualsVisitor(reeVector, constantVector), new Range(0, 0, logicalValueCount))); + // Create REE vector representing: [2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 5, 5, 5, 5]. RunEndEncodedVector reeVector2 = new RunEndEncodedVector(createBigIntRunEndEncodedField("basic"), allocator, null); - setAccumulateVector(reeVector2, 4, 2); + setBasicVector(reeVector2, 4, i -> i + 2, i -> i + 2); assertTrue( reeVector.accept( @@ -201,4 +198,38 @@ private static Field createBigIntRunEndEncodedField(String fieldName) { return new Field( fieldName, FieldType.notNullable(RunEndEncoded.INSTANCE), List.of(runEndField, valueField)); } + + private static void setConstantVector( + RunEndEncodedVector constantVector, Long value, long logicalValueCount) { + setBasicVector(constantVector, 1, i -> value, i -> logicalValueCount); + } + + private static int setBasicVector( + RunEndEncodedVector reeVector, + int runCount, + Function runValueSupplier, + Function runLengthSupplier) { + reeVector.allocateNew(); + reeVector.setInitialCapacity(runCount); + int end = 0; + for (int i = 0; i < runCount; i++) { + Long runValue = runValueSupplier.apply((long) i); + if (runValue == null) { + reeVector.getValuesVector().setNull(i); + } else { + ((BigIntVector) reeVector.getValuesVector()).set(i, runValue); + } + + Long runLength = runLengthSupplier.apply((long) i); + assert runLength != null && runLength > 0; + end += runLength; + ((IntVector) reeVector.getRunEndsVector()).set(i, end); + } + + final int logicalValueCount = end; + reeVector.getValuesVector().setValueCount(runCount); + reeVector.getRunEndsVector().setValueCount(runCount); + reeVector.setValueCount(logicalValueCount); + return logicalValueCount; + } } From 4540eddce00685cde4ef604b3df95f16609200f0 Mon Sep 17 00:00:00 2001 From: "chenweiguo.vc" Date: Wed, 11 Sep 2024 23:55:38 +0800 Subject: [PATCH 08/17] implement ValidateVectorBufferVisitor --- .../vector/complex/RunEndEncodedVector.java | 32 +++++++++++++--- .../arrow/vector/util/VectorAppender.java | 3 +- .../validate/ValidateVectorBufferVisitor.java | 31 ++++++++++++++- .../validate/ValidateVectorTypeVisitor.java | 4 +- .../arrow/vector/TestRunEndEncodedVector.java | 6 +-- .../vector/validate/TestValidateVector.java | 38 +++++++++++++++++++ 6 files changed, 100 insertions(+), 14 deletions(-) diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java index a71aa4adeaa79..4bf58440665ca 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java @@ -33,6 +33,7 @@ import org.apache.arrow.vector.BufferBacked; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.ValueVector; +import org.apache.arrow.vector.ZeroVector; import org.apache.arrow.vector.compare.VectorVisitor; import org.apache.arrow.vector.complex.reader.FieldReader; import org.apache.arrow.vector.complex.writer.FieldWriter; @@ -49,6 +50,8 @@ * values vector of any type. There are no buffers associated with the parent vector. */ public class RunEndEncodedVector extends BaseValueVector implements FieldVector { + public static final FieldVector DEFAULT_VALUE_VECTOR = ZeroVector.INSTANCE; + public static final FieldVector DEFAULT_RUN_END_VECTOR = ZeroVector.INSTANCE; public static RunEndEncodedVector empty(String name, BufferAllocator allocator) { return new RunEndEncodedVector( @@ -57,7 +60,7 @@ public static RunEndEncodedVector empty(String name, BufferAllocator allocator) protected final CallBack callBack; protected Field field; - protected BaseIntVector runEndsVector; + protected FieldVector runEndsVector; protected FieldVector valuesVector; protected int valueCount; @@ -82,10 +85,21 @@ public RunEndEncodedVector( * @param callBack A schema change callback. */ public RunEndEncodedVector(Field field, BufferAllocator allocator, CallBack callBack) { + this(field, allocator, DEFAULT_RUN_END_VECTOR, DEFAULT_VALUE_VECTOR, callBack); + } + + public RunEndEncodedVector( + Field field, + BufferAllocator allocator, + FieldVector runEndsVector, + FieldVector valuesVector, + CallBack callBack) { super(allocator); this.field = field; this.callBack = callBack; this.valueCount = 0; + this.runEndsVector = runEndsVector; + this.valuesVector = valuesVector; } /** ValueVector interface */ @@ -157,7 +171,7 @@ public void setInitialCapacity(int numRecords) {} @Override public int getValueCapacity() { return getChildrenFromFields().stream() - .mapToInt(ValueVector::getValueCapacity) + .mapToInt(item -> item != null ? item.getValueCapacity() : 0) .min() .orElseThrow(NoSuchElementException::new); } @@ -417,10 +431,16 @@ public Object getObject(int index) { return valuesVector.getObject(physicalIndex); } + /** + * Get the run end of giving index. + * + * @param index index of the run end to get + * @return the run end of giving index + */ public int getRunEnd(int index) { checkIndex(index); int physicalIndex = getPhysicalIndex(index); - return (int) runEndsVector.getValueAsLong(physicalIndex); + return (int) ((BaseIntVector) runEndsVector).getValueAsLong(physicalIndex); } /** @@ -544,7 +564,7 @@ public void loadFieldBuffers(ArrowFieldNode fieldNode, List ownBuffers */ @Override public List getFieldBuffers() { - return null; + return List.of(); } /** @@ -625,7 +645,7 @@ public int getPhysicalIndex(int logicalIndex) { return getPhysicalIndex(runEndsVector, logicalIndex); } - static int getPhysicalIndex(BaseIntVector runEndVector, int logicalIndex) { + static int getPhysicalIndex(FieldVector runEndVector, int logicalIndex) { if (runEndVector == null || runEndVector.getValueCount() == 0) { return -1; } @@ -636,7 +656,7 @@ static int getPhysicalIndex(BaseIntVector runEndVector, int logicalIndex) { while (low <= high) { int mid = low + (high - low) / 2; - long valueAsLong = runEndVector.getValueAsLong(mid); + long valueAsLong = ((BaseIntVector) runEndVector).getValueAsLong(mid); if (valueAsLong > logicalIndex) { result = mid; high = mid - 1; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/util/VectorAppender.java b/java/vector/src/main/java/org/apache/arrow/vector/util/VectorAppender.java index 71a5d7ef854c4..f9cb7bf907076 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/util/VectorAppender.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/util/VectorAppender.java @@ -643,6 +643,7 @@ public ValueVector visit(ExtensionTypeVector deltaVector, Void value) { @Override public ValueVector visit(RunEndEncodedVector deltaVector, Void value) { - return targetVector; // TODO + throw new UnsupportedOperationException( + "No VectorAppender implemented for RunEndEncodedVector"); // TODO } } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorBufferVisitor.java b/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorBufferVisitor.java index 47e32fa1c6358..82c23c1a70c16 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorBufferVisitor.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorBufferVisitor.java @@ -20,6 +20,7 @@ import org.apache.arrow.memory.ArrowBuf; import org.apache.arrow.vector.BaseFixedWidthVector; +import org.apache.arrow.vector.BaseIntVector; import org.apache.arrow.vector.BaseLargeVariableWidthVector; import org.apache.arrow.vector.BaseVariableWidthVector; import org.apache.arrow.vector.BaseVariableWidthViewVector; @@ -291,6 +292,34 @@ public Void visit(ExtensionTypeVector vector, Void value) { @Override public Void visit(RunEndEncodedVector vector, Void value) { - return null; // TODO + validateVectorCommon(vector); + int valueCount = vector.getValueCount(); + FieldVector runEndsVector = vector.getRunEndsVector(); + + if (runEndsVector != null) { + validateOrThrow( + runEndsVector.getNullCount() == 0, "Run ends vector cannot contain null values"); + runEndsVector.accept(this, null); + + int runCount = runEndsVector.getValueCount(); + if (runCount == 0) { + validateOrThrow( + valueCount == 0, "Run end vector do not have does not contain enough elements."); + } else if (runCount > 0) { + double lastEnd = ((BaseIntVector) runEndsVector).getValueAsLong(runCount - 1); + validateOrThrow( + valueCount == lastEnd, + "Vector logic length not equal to the last end in run ends vector. Logical length %s, last end %s", + valueCount, + lastEnd); + } + } + + FieldVector valuesVector = vector.getValuesVector(); + if (valuesVector != null) { + valuesVector.accept(this, null); + } + + return null; } } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorTypeVisitor.java b/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorTypeVisitor.java index 674929606d5b8..daad41dbdc2ce 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorTypeVisitor.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorTypeVisitor.java @@ -484,7 +484,9 @@ public Void visit(ExtensionTypeVector vector, Void value) { public Void visit(RunEndEncodedVector vector, Void value) { validateVectorCommon(vector, ArrowType.RunEndEncoded.class); for (ValueVector subVector : vector.getChildrenFromFields()) { - subVector.accept(this, null); + if (subVector != null) { + subVector.accept(this, null); + } } return null; } diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestRunEndEncodedVector.java b/java/vector/src/test/java/org/apache/arrow/vector/TestRunEndEncodedVector.java index 67900d6d73257..3f4be2e52ce56 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/TestRunEndEncodedVector.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/TestRunEndEncodedVector.java @@ -103,11 +103,7 @@ public void testBasicRunEndEncodedVector() { // [null, 2, 2, null, null, null, 4, 4, 4, 4, null, null, null, null, null]. int runCount = 5; final int logicalValueCount = - setBasicVector( - reeVector, - runCount, - i -> i % 2 == 0 ? null : i + 1, - i -> i + 1); + setBasicVector(reeVector, runCount, i -> i % 2 == 0 ? null : i + 1, i -> i + 1); assertEquals(15, reeVector.getValueCount()); int index = 0; diff --git a/java/vector/src/test/java/org/apache/arrow/vector/validate/TestValidateVector.java b/java/vector/src/test/java/org/apache/arrow/vector/validate/TestValidateVector.java index 60c4c3a9bc6d2..1e28f142cae57 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/validate/TestValidateVector.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/validate/TestValidateVector.java @@ -23,6 +23,7 @@ import java.nio.charset.Charset; import java.util.Arrays; +import java.util.List; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.BigIntVector; @@ -33,6 +34,7 @@ import org.apache.arrow.vector.complex.FixedSizeListVector; import org.apache.arrow.vector.complex.LargeListVector; import org.apache.arrow.vector.complex.ListVector; +import org.apache.arrow.vector.complex.RunEndEncodedVector; import org.apache.arrow.vector.complex.StructVector; import org.apache.arrow.vector.complex.UnionVector; import org.apache.arrow.vector.complex.impl.NullableStructWriter; @@ -40,6 +42,7 @@ import org.apache.arrow.vector.holders.NullableFloat8Holder; import org.apache.arrow.vector.types.Types; import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.ArrowType.RunEndEncoded; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.FieldType; import org.junit.jupiter.api.AfterEach; @@ -265,6 +268,41 @@ public void testBaseFixedWidthVectorInstanceMethod() { } } + @Test + public void testRunEndEncodedVector() { + final FieldType valueType = FieldType.notNullable(Types.MinorType.BIGINT.getType()); + final FieldType runEndType = FieldType.notNullable(Types.MinorType.INT.getType()); + + final Field valueField = new Field("value", valueType, null); + final Field runEndField = new Field("ree", runEndType, null); + + try (RunEndEncodedVector vector = + new RunEndEncodedVector( + new Field( + "ree", + FieldType.notNullable(RunEndEncoded.INSTANCE), + List.of(runEndField, valueField)), + allocator, + null)) { + vector.validate(); + + int runCount = 1; + vector.allocateNew(); + ((BigIntVector) vector.getValuesVector()).set(0, 1); + ((IntVector) vector.getRunEndsVector()).set(0, 10); + vector.getValuesVector().setValueCount(runCount); + vector.getRunEndsVector().setValueCount(runCount); + vector.setValueCount(10); + + vector.validate(); + + vector.getRunEndsVector().setValueCount(0); + ValidateUtil.ValidateException e = + assertThrows(ValidateUtil.ValidateException.class, () -> vector.validate()); + assertTrue(e.getMessage().contains("do not have does not contain enough elements.")); + } + } + private void writeStructVector(NullableStructWriter writer, int value1, long value2) { writer.start(); writer.integer("f0").writeInt(value1); From bf78841105d01733f0117317fbb925a6afb003af Mon Sep 17 00:00:00 2001 From: ViggoC Date: Thu, 12 Sep 2024 23:38:14 +0800 Subject: [PATCH 09/17] Update error message Co-authored-by: David Li --- .../arrow/vector/validate/ValidateVectorBufferVisitor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorBufferVisitor.java b/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorBufferVisitor.java index 82c23c1a70c16..a5069a372987c 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorBufferVisitor.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorBufferVisitor.java @@ -304,7 +304,7 @@ public Void visit(RunEndEncodedVector vector, Void value) { int runCount = runEndsVector.getValueCount(); if (runCount == 0) { validateOrThrow( - valueCount == 0, "Run end vector do not have does not contain enough elements."); + valueCount == 0, "Run end vector does not contain enough elements"); } else if (runCount > 0) { double lastEnd = ((BaseIntVector) runEndsVector).getValueAsLong(runCount - 1); validateOrThrow( From 6969394c8a945eb6b6620d1b886766217bf2cecc Mon Sep 17 00:00:00 2001 From: "chenweiguo.vc" Date: Thu, 12 Sep 2024 23:05:29 +0800 Subject: [PATCH 10/17] add java doc --- .../arrow/vector/complex/RunEndEncodedVector.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java index 4bf58440665ca..1668dd5c98bb7 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java @@ -88,6 +88,17 @@ public RunEndEncodedVector(Field field, BufferAllocator allocator, CallBack call this(field, allocator, DEFAULT_RUN_END_VECTOR, DEFAULT_VALUE_VECTOR, callBack); } + + /** + * Constructs a new instance. + * + * @param field The field materialized by this vector. + * @param allocator The allocator to use for allocating/reallocating buffers. + * @param runEndsVector The vector represents run ends. Only Zero vector or type int vector with + * size 16, 32 is allowed + * @param valuesVector The vector represents values + * @param callBack A schema change callback. + */ public RunEndEncodedVector( Field field, BufferAllocator allocator, From 3f493dae8f9bf397b92c1649b11633fb549054e7 Mon Sep 17 00:00:00 2001 From: "chenweiguo.vc" Date: Thu, 12 Sep 2024 23:51:38 +0800 Subject: [PATCH 11/17] fix comment --- .../arrow/vector/compare/VectorVisitor.java | 5 +++- .../vector/complex/RunEndEncodedVector.java | 24 ++++++++++--------- .../org/apache/arrow/vector/types/Types.java | 3 ++- .../arrow/vector/util/VectorAppender.java | 7 ------ .../validate/ValidateVectorBufferVisitor.java | 3 +-- .../validate/ValidateVectorDataVisitor.java | 6 ----- .../validate/ValidateVectorVisitor.java | 6 ----- .../vector/validate/TestValidateVector.java | 2 +- 8 files changed, 21 insertions(+), 35 deletions(-) diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compare/VectorVisitor.java b/java/vector/src/main/java/org/apache/arrow/vector/compare/VectorVisitor.java index a5361c5401a20..989c57a0c93d0 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/compare/VectorVisitor.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/compare/VectorVisitor.java @@ -73,5 +73,8 @@ default OUT visit(LargeListViewVector left, IN value) { "VectorVisitor for LargeListViewVector is not supported."); } - OUT visit(RunEndEncodedVector left, IN value); + default OUT visit(RunEndEncodedVector left, IN value) { + throw new UnsupportedOperationException( + "VectorVisitor for LargeListViewVector is not supported."); + }; } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java index 1668dd5c98bb7..e03412c27423c 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java @@ -88,16 +88,15 @@ public RunEndEncodedVector(Field field, BufferAllocator allocator, CallBack call this(field, allocator, DEFAULT_RUN_END_VECTOR, DEFAULT_VALUE_VECTOR, callBack); } - /** * Constructs a new instance. * - * @param field The field materialized by this vector. - * @param allocator The allocator to use for allocating/reallocating buffers. + * @param field The field materialized by this vector. + * @param allocator The allocator to use for allocating/reallocating buffers. * @param runEndsVector The vector represents run ends. Only Zero vector or type int vector with - * size 16, 32 is allowed - * @param valuesVector The vector represents values - * @param callBack A schema change callback. + * size 16, 32 is allowed + * @param valuesVector The vector represents values + * @param callBack A schema change callback. */ public RunEndEncodedVector( Field field, @@ -325,7 +324,7 @@ public TransferPair makeTransferPair(ValueVector target) { */ @Override public FieldReader getReader() { - return null; // TODO + throw new UnsupportedOperationException("Not yet implemented."); } /** @@ -335,7 +334,7 @@ public FieldReader getReader() { * writing values to this vector. */ public FieldWriter getWriter() { - return null; // TODO + throw new UnsupportedOperationException("Not yet implemented."); } /** @@ -564,8 +563,10 @@ public List getChildrenFromFields() { */ @Override public void loadFieldBuffers(ArrowFieldNode fieldNode, List ownBuffers) { - throw new UnsupportedOperationException( - "Run-end encoded vectors do not have any associated buffers."); + if (!ownBuffers.isEmpty()) { + throw new UnsupportedOperationException( + "Run-end encoded vectors do not have any associated buffers."); + } } /** @@ -644,7 +645,8 @@ public FieldVector getValuesVector() { private void checkIndex(int logicalIndex) { if (logicalIndex < 0 || logicalIndex >= valueCount) { - throw new IndexOutOfBoundsException(); + throw new IndexOutOfBoundsException( + String.format("index: %s, expected range (0, %s)", logicalIndex, valueCount)); } } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/types/Types.java b/java/vector/src/main/java/org/apache/arrow/vector/types/Types.java index b77f4d610b026..e9b963b62c13b 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/types/Types.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/types/Types.java @@ -797,7 +797,8 @@ public FieldVector getNewVector( @Override public FieldWriter getNewFieldWriter(ValueVector vector) { - return null; // TODO + throw new UnsupportedOperationException( + "FieldWriter for run-end encoded vector is not implemented yet."); } }, ; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/util/VectorAppender.java b/java/vector/src/main/java/org/apache/arrow/vector/util/VectorAppender.java index f9cb7bf907076..e703571b374eb 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/util/VectorAppender.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/util/VectorAppender.java @@ -37,7 +37,6 @@ import org.apache.arrow.vector.complex.LargeListVector; import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.complex.NonNullableStructVector; -import org.apache.arrow.vector.complex.RunEndEncodedVector; import org.apache.arrow.vector.complex.UnionVector; /** Utility to append two vectors together. */ @@ -640,10 +639,4 @@ public ValueVector visit(ExtensionTypeVector deltaVector, Void value) { deltaVector.getUnderlyingVector().accept(underlyingAppender, null); return targetVector; } - - @Override - public ValueVector visit(RunEndEncodedVector deltaVector, Void value) { - throw new UnsupportedOperationException( - "No VectorAppender implemented for RunEndEncodedVector"); // TODO - } } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorBufferVisitor.java b/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorBufferVisitor.java index a5069a372987c..ef31b4f837344 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorBufferVisitor.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorBufferVisitor.java @@ -303,8 +303,7 @@ public Void visit(RunEndEncodedVector vector, Void value) { int runCount = runEndsVector.getValueCount(); if (runCount == 0) { - validateOrThrow( - valueCount == 0, "Run end vector does not contain enough elements"); + validateOrThrow(valueCount == 0, "Run end vector does not contain enough elements"); } else if (runCount > 0) { double lastEnd = ((BaseIntVector) runEndsVector).getValueAsLong(runCount - 1); validateOrThrow( diff --git a/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorDataVisitor.java b/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorDataVisitor.java index 0e2cab39d48a2..c62bff79f7710 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorDataVisitor.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorDataVisitor.java @@ -32,7 +32,6 @@ import org.apache.arrow.vector.complex.LargeListVector; import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.complex.NonNullableStructVector; -import org.apache.arrow.vector.complex.RunEndEncodedVector; import org.apache.arrow.vector.complex.UnionVector; /** Utility for validating vector data. */ @@ -207,9 +206,4 @@ public Void visit(ExtensionTypeVector vector, Void value) { vector.getUnderlyingVector().accept(this, value); return null; } - - @Override - public Void visit(RunEndEncodedVector vector, Void value) { - return null; // TODO - } } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorVisitor.java b/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorVisitor.java index 294add01c652c..5004ba488cacd 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorVisitor.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorVisitor.java @@ -32,7 +32,6 @@ import org.apache.arrow.vector.complex.LargeListVector; import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.complex.NonNullableStructVector; -import org.apache.arrow.vector.complex.RunEndEncodedVector; import org.apache.arrow.vector.complex.UnionVector; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.util.ValueVectorUtility; @@ -319,9 +318,4 @@ public Void visit(ExtensionTypeVector vector, Void value) { vector.getUnderlyingVector().accept(this, value); return null; } - - @Override - public Void visit(RunEndEncodedVector vector, Void value) { - return null; // TODO - } } diff --git a/java/vector/src/test/java/org/apache/arrow/vector/validate/TestValidateVector.java b/java/vector/src/test/java/org/apache/arrow/vector/validate/TestValidateVector.java index 1e28f142cae57..35c15bdf538f3 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/validate/TestValidateVector.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/validate/TestValidateVector.java @@ -299,7 +299,7 @@ public void testRunEndEncodedVector() { vector.getRunEndsVector().setValueCount(0); ValidateUtil.ValidateException e = assertThrows(ValidateUtil.ValidateException.class, () -> vector.validate()); - assertTrue(e.getMessage().contains("do not have does not contain enough elements.")); + assertTrue(e.getMessage().contains("Run end vector does not contain enough elements")); } } From 8818918d19b8153d9643e79eca6ef14a30e31131 Mon Sep 17 00:00:00 2001 From: "chenweiguo.vc" Date: Wed, 18 Sep 2024 12:04:43 +0800 Subject: [PATCH 12/17] add todo --- .../org/apache/arrow/vector/compare/RangeEqualsVisitor.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compare/RangeEqualsVisitor.java b/java/vector/src/main/java/org/apache/arrow/vector/compare/RangeEqualsVisitor.java index 9151d661ff140..abcf312c5ecfc 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/compare/RangeEqualsVisitor.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/compare/RangeEqualsVisitor.java @@ -282,6 +282,8 @@ protected boolean compareRunEndEncodedVectors(Range range) { int rightLogicalIndex = range.getRightStart(); while (leftLogicalIndex < leftRangeEnd) { + // TODO: implement it more efficient + // https://github.com/apache/arrow/issues/44157 int leftPhysicalIndex = leftVector.getPhysicalIndex(leftLogicalIndex); int rightPhysicalIndex = rightVector.getPhysicalIndex(rightLogicalIndex); if (leftValuesVector.accept( From 2b24d39257bbdbee8697532d766631ba570224dc Mon Sep 17 00:00:00 2001 From: ViggoC Date: Wed, 18 Sep 2024 14:04:04 +0000 Subject: [PATCH 13/17] getReaderImpl throw unsupported exception --- .../org/apache/arrow/vector/complex/RunEndEncodedVector.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java index e03412c27423c..0d87fe64238c2 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java @@ -162,7 +162,8 @@ public BufferAllocator getAllocator() { @Override protected FieldReader getReaderImpl() { - return null; + throw new UnsupportedOperationException("Not yet implemented."); + ; } /** From 14ba5bbff29398ab8772ac3ae2817ee744e270b4 Mon Sep 17 00:00:00 2001 From: "chenweiguo.vc" Date: Fri, 20 Sep 2024 10:40:42 +0800 Subject: [PATCH 14/17] throw unsupported --- .../arrow/vector/complex/RunEndEncodedVector.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java index 0d87fe64238c2..47f8f013484dc 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java @@ -380,7 +380,7 @@ public int getBufferSizeFor(int valueCount) { */ @Override public ArrowBuf[] getBuffers(boolean clear) { - return null; + return new ArrowBuf[0]; } /** @@ -390,7 +390,8 @@ public ArrowBuf[] getBuffers(boolean clear) { */ @Override public ArrowBuf getValidityBuffer() { - return null; + throw new UnsupportedOperationException( + "Run-end encoded vectors do not have a validity buffer."); } /** @@ -400,7 +401,8 @@ public ArrowBuf getValidityBuffer() { */ @Override public ArrowBuf getDataBuffer() { - return null; + throw new UnsupportedOperationException( + "Run-end encoded vectors do not have a data buffer."); } /** @@ -410,7 +412,8 @@ public ArrowBuf getDataBuffer() { */ @Override public ArrowBuf getOffsetBuffer() { - return null; + throw new UnsupportedOperationException( + "Run-end encoded vectors do not have a offset buffer."); } /** From aa9b48d08d665c44f19249413fc326676062fd00 Mon Sep 17 00:00:00 2001 From: David Li Date: Thu, 19 Sep 2024 23:41:56 -0400 Subject: [PATCH 15/17] Update java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java --- .../org/apache/arrow/vector/complex/RunEndEncodedVector.java | 1 - 1 file changed, 1 deletion(-) diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java index 47f8f013484dc..9b635dab10e03 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java @@ -163,7 +163,6 @@ public BufferAllocator getAllocator() { @Override protected FieldReader getReaderImpl() { throw new UnsupportedOperationException("Not yet implemented."); - ; } /** From f77d8416c9d4c62a9a97f9c04d44817b88bd99e2 Mon Sep 17 00:00:00 2001 From: David Li Date: Fri, 20 Sep 2024 00:12:52 -0400 Subject: [PATCH 16/17] Update java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java --- .../apache/arrow/vector/complex/RunEndEncodedVector.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java index 9b635dab10e03..e8de86f6e9549 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java @@ -400,8 +400,7 @@ public ArrowBuf getValidityBuffer() { */ @Override public ArrowBuf getDataBuffer() { - throw new UnsupportedOperationException( - "Run-end encoded vectors do not have a data buffer."); + throw new UnsupportedOperationException("Run-end encoded vectors do not have a data buffer."); } /** @@ -411,8 +410,7 @@ public ArrowBuf getDataBuffer() { */ @Override public ArrowBuf getOffsetBuffer() { - throw new UnsupportedOperationException( - "Run-end encoded vectors do not have a offset buffer."); + throw new UnsupportedOperationException("Run-end encoded vectors do not have a offset buffer."); } /** From 7076a19f0e8b95f9a9fdb7db203adaee039c1585 Mon Sep 17 00:00:00 2001 From: David Li Date: Fri, 20 Sep 2024 00:41:49 -0400 Subject: [PATCH 17/17] Update java/c/src/main/java/org/apache/arrow/c/BufferImportTypeVisitor.java --- .../main/java/org/apache/arrow/c/BufferImportTypeVisitor.java | 1 - 1 file changed, 1 deletion(-) diff --git a/java/c/src/main/java/org/apache/arrow/c/BufferImportTypeVisitor.java b/java/c/src/main/java/org/apache/arrow/c/BufferImportTypeVisitor.java index 237ee46a6e55f..150c11e41edff 100644 --- a/java/c/src/main/java/org/apache/arrow/c/BufferImportTypeVisitor.java +++ b/java/c/src/main/java/org/apache/arrow/c/BufferImportTypeVisitor.java @@ -185,7 +185,6 @@ public List visit(ArrowType.Union type) { } } - @Override public List visit(ArrowType.RunEndEncoded type) { throw new UnsupportedOperationException("Importing buffers for type: " + type);