Skip to content

Commit

Permalink
[SPARK-20960][SQL] make ColumnVector public
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

move `ColumnVector` and related classes to `org.apache.spark.sql.vectorized`, and improve the document.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #20116 from cloud-fan/column-vector.

(cherry picked from commit b297029)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
  • Loading branch information
cloud-fan authored and gatorsmile committed Jan 3, 2018
1 parent 79f7263 commit a51212b
Show file tree
Hide file tree
Showing 20 changed files with 63 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@
import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils;
import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector;
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

Expand Down Expand Up @@ -248,7 +248,10 @@ public void enableReturningBatches() {
* Advances to the next batch of rows. Returns false if there are no more.
*/
public boolean nextBatch() throws IOException {
columnarBatch.reset();
for (WritableColumnVector vector : columnVectors) {
vector.reset();
}
columnarBatch.setNumRows(0);
if (rowsReturned >= totalRowCount) return false;
checkEndOfRowGroup();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.util.DateTimeUtils;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.vectorized.ColumnarArray;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.UTF8String;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.sql.catalyst.util.MapData;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.vectorized.ColumnarArray;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.apache.spark.sql.vectorized.ColumnarRow;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.UTF8String;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.unsafe.array.ByteArrayMethods;
import org.apache.spark.unsafe.types.UTF8String;

Expand Down Expand Up @@ -585,11 +586,11 @@ public final int appendArray(int length) {
public final int appendStruct(boolean isNull) {
if (isNull) {
appendNull();
for (ColumnVector c: childColumns) {
for (WritableColumnVector c: childColumns) {
if (c.type instanceof StructType) {
((WritableColumnVector) c).appendStruct(true);
c.appendStruct(true);
} else {
((WritableColumnVector) c).appendNull();
c.appendNull();
}
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.sql.execution.vectorized;
package org.apache.spark.sql.vectorized;

import org.apache.arrow.vector.*;
import org.apache.arrow.vector.complex.*;
Expand All @@ -34,11 +34,7 @@ public final class ArrowColumnVector extends ColumnVector {
private ArrowColumnVector[] childColumns;

private void ensureAccessible(int index) {
int valueCount = accessor.getValueCount();
if (index < 0 || index >= valueCount) {
throw new IndexOutOfBoundsException(
String.format("index: %d, valueCount: %d", index, valueCount));
}
ensureAccessible(index, 1);
}

private void ensureAccessible(int index, int count) {
Expand All @@ -64,20 +60,12 @@ public void close() {
accessor.close();
}

//
// APIs dealing with nulls
//

@Override
public boolean isNullAt(int rowId) {
ensureAccessible(rowId);
return accessor.isNullAt(rowId);
}

//
// APIs dealing with Booleans
//

@Override
public boolean getBoolean(int rowId) {
ensureAccessible(rowId);
Expand All @@ -94,10 +82,6 @@ public boolean[] getBooleans(int rowId, int count) {
return array;
}

//
// APIs dealing with Bytes
//

@Override
public byte getByte(int rowId) {
ensureAccessible(rowId);
Expand All @@ -114,10 +98,6 @@ public byte[] getBytes(int rowId, int count) {
return array;
}

//
// APIs dealing with Shorts
//

@Override
public short getShort(int rowId) {
ensureAccessible(rowId);
Expand All @@ -134,10 +114,6 @@ public short[] getShorts(int rowId, int count) {
return array;
}

//
// APIs dealing with Ints
//

@Override
public int getInt(int rowId) {
ensureAccessible(rowId);
Expand All @@ -154,10 +130,6 @@ public int[] getInts(int rowId, int count) {
return array;
}

//
// APIs dealing with Longs
//

@Override
public long getLong(int rowId) {
ensureAccessible(rowId);
Expand All @@ -174,10 +146,6 @@ public long[] getLongs(int rowId, int count) {
return array;
}

//
// APIs dealing with floats
//

@Override
public float getFloat(int rowId) {
ensureAccessible(rowId);
Expand All @@ -194,10 +162,6 @@ public float[] getFloats(int rowId, int count) {
return array;
}

//
// APIs dealing with doubles
//

@Override
public double getDouble(int rowId) {
ensureAccessible(rowId);
Expand All @@ -214,10 +178,6 @@ public double[] getDoubles(int rowId, int count) {
return array;
}

//
// APIs dealing with Arrays
//

@Override
public int getArrayLength(int rowId) {
ensureAccessible(rowId);
Expand All @@ -230,45 +190,27 @@ public int getArrayOffset(int rowId) {
return accessor.getArrayOffset(rowId);
}

//
// APIs dealing with Decimals
//

@Override
public Decimal getDecimal(int rowId, int precision, int scale) {
ensureAccessible(rowId);
return accessor.getDecimal(rowId, precision, scale);
}

//
// APIs dealing with UTF8Strings
//

@Override
public UTF8String getUTF8String(int rowId) {
ensureAccessible(rowId);
return accessor.getUTF8String(rowId);
}

//
// APIs dealing with Binaries
//

@Override
public byte[] getBinary(int rowId) {
ensureAccessible(rowId);
return accessor.getBinary(rowId);
}

/**
* Returns the data for the underlying array.
*/
@Override
public ArrowColumnVector arrayData() { return childColumns[0]; }

/**
* Returns the ordinal's child data column.
*/
@Override
public ArrowColumnVector getChildColumn(int ordinal) { return childColumns[ordinal]; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,39 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.vectorized;
package org.apache.spark.sql.vectorized;

import org.apache.spark.sql.catalyst.util.MapData;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.unsafe.types.UTF8String;

/**
* This class represents in-memory values of a column and provides the main APIs to access the data.
* It supports all the types and contains get APIs as well as their batched versions. The batched
* versions are considered to be faster and preferable whenever possible.
* An interface representing in-memory columnar data in Spark. This interface defines the main APIs
* to access the data, as well as their batched versions. The batched versions are considered to be
* faster and preferable whenever possible.
*
* To handle nested schemas, ColumnVector has two types: Arrays and Structs. In both cases these
* columns have child columns. All of the data are stored in the child columns and the parent column
* only contains nullability. In the case of Arrays, the lengths and offsets are saved in the child
* column and are encoded identically to INTs.
* Most of the APIs take the rowId as a parameter. This is the batch local 0-based row id for values
* in this ColumnVector.
*
* Maps are just a special case of a two field struct.
* ColumnVector supports all the data types including nested types. To handle nested types,
* ColumnVector can have children and is a tree structure. For struct type, it stores the actual
* data of each field in the corresponding child ColumnVector, and only stores null information in
* the parent ColumnVector. For array type, it stores the actual array elements in the child
* ColumnVector, and stores null information, array offsets and lengths in the parent ColumnVector.
*
* Most of the APIs take the rowId as a parameter. This is the batch local 0-based row id for values
* in the current batch.
* ColumnVector is expected to be reused during the entire data loading process, to avoid allocating
* memory again and again.
*
* ColumnVector is meant to maximize CPU efficiency but not to minimize storage footprint.
* Implementations should prefer computing efficiency over storage efficiency when design the
* format. Since it is expected to reuse the ColumnVector instance while loading data, the storage
* footprint is negligible.
*/
public abstract class ColumnVector implements AutoCloseable {

/**
* Returns the data type of this column.
* Returns the data type of this column vector.
*/
public final DataType dataType() { return type; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.vectorized;
package org.apache.spark.sql.vectorized;

import org.apache.spark.sql.catalyst.util.ArrayData;
import org.apache.spark.sql.catalyst.util.MapData;
Expand All @@ -23,8 +23,7 @@
import org.apache.spark.unsafe.types.UTF8String;

/**
* Array abstraction in {@link ColumnVector}. The instance of this class is intended
* to be reused, callers should copy the data out if it needs to be stored.
* Array abstraction in {@link ColumnVector}.
*/
public final class ColumnarArray extends ArrayData {
// The data for this array. This array contains elements from
Expand All @@ -33,7 +32,7 @@ public final class ColumnarArray extends ArrayData {
private final int offset;
private final int length;

ColumnarArray(ColumnVector data, int offset, int length) {
public ColumnarArray(ColumnVector data, int offset, int length) {
this.data = data;
this.offset = offset;
this.length = length;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.vectorized;
package org.apache.spark.sql.vectorized;

import java.util.*;

import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.vectorized.MutableColumnarRow;
import org.apache.spark.sql.types.StructType;

/**
* This class is the in memory representation of rows as they are streamed through operators. It
* is designed to maximize CPU efficiency and not storage footprint. Since it is expected that
* each operator allocates one of these objects, the storage footprint on the task is negligible.
*
* The layout is a columnar with values encoded in their native format. Each RowBatch contains
* a horizontal partitioning of the data, split into columns.
*
* The ColumnarBatch supports either on heap or offheap modes with (mostly) the identical API.
*
* TODO:
* - There are many TODOs for the existing APIs. They should throw a not implemented exception.
* - Compaction: The batch and columns should be able to compact based on a selection vector.
* This class wraps multiple ColumnVectors as a row-wise table. It provides a row view of this
* batch so that Spark can access the data row by row. Instance of it is meant to be reused during
* the entire data loading process.
*/
public final class ColumnarBatch {
public static final int DEFAULT_BATCH_SIZE = 4 * 1024;
Expand All @@ -57,7 +49,7 @@ public void close() {
}

/**
* Returns an iterator over the rows in this batch. This skips rows that are filtered out.
* Returns an iterator over the rows in this batch.
*/
public Iterator<InternalRow> rowIterator() {
final int maxRows = numRows;
Expand Down Expand Up @@ -87,19 +79,7 @@ public void remove() {
}

/**
* Resets the batch for writing.
*/
public void reset() {
for (int i = 0; i < numCols(); ++i) {
if (columns[i] instanceof WritableColumnVector) {
((WritableColumnVector) columns[i]).reset();
}
}
this.numRows = 0;
}

/**
* Sets the number of rows that are valid.
* Sets the number of rows in this batch.
*/
public void setNumRows(int numRows) {
assert(numRows <= this.capacity);
Expand Down
Loading

0 comments on commit a51212b

Please sign in to comment.