Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-20960][SQL] make ColumnVector public #20116

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@
import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils;
import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector;
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

Expand Down Expand Up @@ -248,7 +248,10 @@ public void enableReturningBatches() {
* Advances to the next batch of rows. Returns false if there are no more.
*/
public boolean nextBatch() throws IOException {
columnarBatch.reset();
for (WritableColumnVector vector : columnVectors) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the space before :

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the standard java foreach code style

vector.reset();
}
columnarBatch.setNumRows(0);
if (rowsReturned >= totalRowCount) return false;
checkEndOfRowGroup();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.util.DateTimeUtils;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.vectorized.ColumnarArray;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.UTF8String;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.sql.catalyst.util.MapData;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.vectorized.ColumnarArray;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.apache.spark.sql.vectorized.ColumnarRow;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.UTF8String;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.unsafe.array.ByteArrayMethods;
import org.apache.spark.unsafe.types.UTF8String;

Expand Down Expand Up @@ -586,7 +587,7 @@ public final int appendStruct(boolean isNull) {
if (isNull) {
appendNull();
for (ColumnVector c: childColumns) {
if (c.type instanceof StructType) {
if (c.dataType() instanceof StructType) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Which access type will we use for ColumnVector.type? dataType() or type?
For example, OnHeapColumnVector.reserveInternal() uses type while this line uses dataType().

((WritableColumnVector) c).appendStruct(true);
} else {
((WritableColumnVector) c).appendNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.sql.execution.vectorized;
package org.apache.spark.sql.vectorized;

import org.apache.arrow.vector.*;
import org.apache.arrow.vector.complex.*;
Expand All @@ -34,11 +34,7 @@ public final class ArrowColumnVector extends ColumnVector {
private ArrowColumnVector[] childColumns;

private void ensureAccessible(int index) {
int valueCount = accessor.getValueCount();
if (index < 0 || index >= valueCount) {
throw new IndexOutOfBoundsException(
String.format("index: %d, valueCount: %d", index, valueCount));
}
ensureAccessible(index, 1);
}

private void ensureAccessible(int index, int count) {
Expand All @@ -64,20 +60,12 @@ public void close() {
accessor.close();
}

//
// APIs dealing with nulls
//

@Override
public boolean isNullAt(int rowId) {
ensureAccessible(rowId);
return accessor.isNullAt(rowId);
}

//
// APIs dealing with Booleans
//

@Override
public boolean getBoolean(int rowId) {
ensureAccessible(rowId);
Expand All @@ -94,10 +82,6 @@ public boolean[] getBooleans(int rowId, int count) {
return array;
}

//
// APIs dealing with Bytes
//

@Override
public byte getByte(int rowId) {
ensureAccessible(rowId);
Expand All @@ -114,10 +98,6 @@ public byte[] getBytes(int rowId, int count) {
return array;
}

//
// APIs dealing with Shorts
//

@Override
public short getShort(int rowId) {
ensureAccessible(rowId);
Expand All @@ -134,10 +114,6 @@ public short[] getShorts(int rowId, int count) {
return array;
}

//
// APIs dealing with Ints
//

@Override
public int getInt(int rowId) {
ensureAccessible(rowId);
Expand All @@ -154,10 +130,6 @@ public int[] getInts(int rowId, int count) {
return array;
}

//
// APIs dealing with Longs
//

@Override
public long getLong(int rowId) {
ensureAccessible(rowId);
Expand All @@ -174,10 +146,6 @@ public long[] getLongs(int rowId, int count) {
return array;
}

//
// APIs dealing with floats
//

@Override
public float getFloat(int rowId) {
ensureAccessible(rowId);
Expand All @@ -194,10 +162,6 @@ public float[] getFloats(int rowId, int count) {
return array;
}

//
// APIs dealing with doubles
//

@Override
public double getDouble(int rowId) {
ensureAccessible(rowId);
Expand All @@ -214,10 +178,6 @@ public double[] getDoubles(int rowId, int count) {
return array;
}

//
// APIs dealing with Arrays
//

@Override
public int getArrayLength(int rowId) {
ensureAccessible(rowId);
Expand All @@ -230,45 +190,27 @@ public int getArrayOffset(int rowId) {
return accessor.getArrayOffset(rowId);
}

//
// APIs dealing with Decimals
//

@Override
public Decimal getDecimal(int rowId, int precision, int scale) {
ensureAccessible(rowId);
return accessor.getDecimal(rowId, precision, scale);
}

//
// APIs dealing with UTF8Strings
//

@Override
public UTF8String getUTF8String(int rowId) {
ensureAccessible(rowId);
return accessor.getUTF8String(rowId);
}

//
// APIs dealing with Binaries
//

@Override
public byte[] getBinary(int rowId) {
ensureAccessible(rowId);
return accessor.getBinary(rowId);
}

/**
* Returns the data for the underlying array.
*/
@Override
public ArrowColumnVector arrayData() { return childColumns[0]; }

/**
* Returns the ordinal's child data column.
*/
@Override
public ArrowColumnVector getChildColumn(int ordinal) { return childColumns[ordinal]; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,38 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.vectorized;
package org.apache.spark.sql.vectorized;

import org.apache.spark.sql.catalyst.util.MapData;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.unsafe.types.UTF8String;

/**
* This class represents in-memory values of a column and provides the main APIs to access the data.
* It supports all the types and contains get APIs as well as their batched versions. The batched
* versions are considered to be faster and preferable whenever possible.
* An interface representing in-memory columnar data in Spark. This interface defines the main APIs
* to access the data, as well as their batched versions. The batched versions are considered to be
* faster and preferable whenever possible.
*
* To handle nested schemas, ColumnVector has two types: Arrays and Structs. In both cases these
* columns have child columns. All of the data are stored in the child columns and the parent column
* only contains nullability. In the case of Arrays, the lengths and offsets are saved in the child
* column and are encoded identically to INTs.
* Most of the APIs take the rowId as a parameter. This is the batch local 0-based row id for values
* in this ColumnVector.
*
* Maps are just a special case of a two field struct.
* ColumnVector supports all the data types including nested types. To handle nested types,
* ColumnVector can have children and is a tree structure. For struct type, it stores the actual
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: child -> children

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's already children

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for my mistake.

* data of each field in the corresponding child ColumnVector, and only store null information in
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

store -> stores

* the parent ColumnVector. For array type, it stores the actual array elements in the child
* ColumnVector, and store null information, array offsets and lengths in the parent ColumnVector.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

store -> stores

*
* Most of the APIs take the rowId as a parameter. This is the batch local 0-based row id for values
* in the current batch.
* ColumnVector is expected to be reused during the entire data loading process, to avoid allocating
* memory again and again.
*
* ColumnVector is meant to maximize CPU efficiency and not storage footprint, implementations
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: not -> not to minimize

* should prefer computing efficiency over storage efficiency when design the format. Since it is
* expected to reuse the ColumnVector instance, the storage footprint is negligible.
*/
public abstract class ColumnVector implements AutoCloseable {

/**
* Returns the data type of this column.
* Returns the data type of this column vector.
*/
public final DataType dataType() { return type; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.vectorized;
package org.apache.spark.sql.vectorized;

import org.apache.spark.sql.catalyst.util.ArrayData;
import org.apache.spark.sql.catalyst.util.MapData;
Expand All @@ -23,8 +23,7 @@
import org.apache.spark.unsafe.types.UTF8String;

/**
* Array abstraction in {@link ColumnVector}. The instance of this class is intended
* to be reused, callers should copy the data out if it needs to be stored.
* Array abstraction in {@link ColumnVector}.
*/
public final class ColumnarArray extends ArrayData {
// The data for this array. This array contains elements from
Expand All @@ -33,7 +32,7 @@ public final class ColumnarArray extends ArrayData {
private final int offset;
private final int length;

ColumnarArray(ColumnVector data, int offset, int length) {
public ColumnarArray(ColumnVector data, int offset, int length) {
this.data = data;
this.offset = offset;
this.length = length;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.vectorized;
package org.apache.spark.sql.vectorized;

import java.util.*;

import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.vectorized.MutableColumnarRow;
import org.apache.spark.sql.types.StructType;

/**
* This class is the in memory representation of rows as they are streamed through operators. It
* is designed to maximize CPU efficiency and not storage footprint. Since it is expected that
* each operator allocates one of these objects, the storage footprint on the task is negligible.
*
* The layout is a columnar with values encoded in their native format. Each RowBatch contains
* a horizontal partitioning of the data, split into columns.
*
* The ColumnarBatch supports either on heap or offheap modes with (mostly) the identical API.
*
* TODO:
* - There are many TODOs for the existing APIs. They should throw a not implemented exception.
* - Compaction: The batch and columns should be able to compact based on a selection vector.
* This class is a wrapper of multiple ColumnVectors and represents a logical table-like data
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about?

This class wraps multiple ColumnVectors as a row-wise table

* structure. It provides a row-view of this batch so that Spark can access the data row by row.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

row-view -> row view

* Instance of it is meant to be reused during the entire data loading process.
*/
public final class ColumnarBatch {
public static final int DEFAULT_BATCH_SIZE = 4 * 1024;
Expand All @@ -57,7 +49,7 @@ public void close() {
}

/**
* Returns an iterator over the rows in this batch. This skips rows that are filtered out.
* Returns an iterator over the rows in this batch.
*/
public Iterator<InternalRow> rowIterator() {
final int maxRows = numRows;
Expand Down Expand Up @@ -87,19 +79,7 @@ public void remove() {
}

/**
* Resets the batch for writing.
*/
public void reset() {
for (int i = 0; i < numCols(); ++i) {
if (columns[i] instanceof WritableColumnVector) {
((WritableColumnVector) columns[i]).reset();
}
}
this.numRows = 0;
}

/**
* Sets the number of rows that are valid.
* Sets the number of rows that are valid in this batch.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about?

Sets the number of rows

*/
public void setNumRows(int numRows) {
assert(numRows <= this.capacity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.vectorized;
package org.apache.spark.sql.vectorized;

import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
Expand All @@ -24,8 +24,7 @@
import org.apache.spark.unsafe.types.UTF8String;

/**
* Row abstraction in {@link ColumnVector}. The instance of this class is intended
* to be reused, callers should copy the data out if it needs to be stored.
* Row abstraction in {@link ColumnVector}.
*/
public final class ColumnarRow extends InternalRow {
// The data for this row.
Expand All @@ -34,7 +33,7 @@ public final class ColumnarRow extends InternalRow {
private final int rowId;
private final int numFields;

ColumnarRow(ColumnVector data, int rowId) {
public ColumnarRow(ColumnVector data, int rowId) {
assert (data.dataType() instanceof StructType);
this.data = data;
this.rowId = rowId;
Expand Down
Loading