Skip to content

Commit

Permalink
[SPARK-1371][WIP] Compression support for Spark SQL in-memory columna…
Browse files Browse the repository at this point in the history
…r storage

JIRA issue: [SPARK-1373](https://issues.apache.org/jira/browse/SPARK-1373)

(Although tagged as WIP, this PR is structurally complete. The only things left unimplemented are 3 more compression algorithms: `BooleanBitSet`, `IntDelta` and `LongDelta`, which are trivial to add later in this or another separate PR.)

This PR contains compression support for Spark SQL in-memory columnar storage. Main interfaces include:

*   `CompressionScheme`

    Each `CompressionScheme` represents a concrete compression algorithm, which basically consists of an `Encoder` for compression and a `Decoder` for decompression. Algorithms implemented include:

    * `RunLengthEncoding`
    * `DictionaryEncoding`

    Algorithms to be implemented include:

    * `BooleanBitSet`
    * `IntDelta`
    * `LongDelta`

*   `CompressibleColumnBuilder`

    A stackable `ColumnBuilder` trait used to build byte buffers for compressible columns.  A best `CompressionScheme` that exhibits lowest compression ratio is chosen for each column according to statistical information gathered while elements are appended into the `ColumnBuilder`. However, if no `CompressionScheme` can achieve a compression ratio better than 80%, no compression will be done for this column to save CPU time.

    Memory layout of the final byte buffer is showed below:

    ```
     .--------------------------- Column type ID (4 bytes)
     |   .----------------------- Null count N (4 bytes)
     |   |   .------------------- Null positions (4 x N bytes, empty if null count is zero)
     |   |   |     .------------- Compression scheme ID (4 bytes)
     |   |   |     |   .--------- Compressed non-null elements
     V   V   V     V   V
    +---+---+-----+---+---------+
    |   |   | ... |   | ... ... |
    +---+---+-----+---+---------+
     \-----------/ \-----------/
        header         body
    ```

*   `CompressibleColumnAccessor`

    A stackable `ColumnAccessor` trait used to iterate (possibly) compressed data column.

*   `ColumnStats`

    Used to collect statistical information while loading data into in-memory columnar table. Optimizations like partition pruning rely on this information.

    Strictly speaking, `ColumnStats` related code is not part of the compression support. It's contained in this PR to ensure and validate the row-based API design (which is used to avoid boxing/unboxing cost whenever possible).

A major refactoring change since PR #205 is:

* Refactored all getter/setter methods for primitive types in various places into `ColumnType` classes to remove duplicated code.

Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #285 from liancheng/memColumnarCompression and squashes the following commits:

ed71bbd [Cheng Lian] Addressed all PR comments by @marmbrus
d3a4fa9 [Cheng Lian] Removed Ordering[T] in ColumnStats for better performance
5034453 [Cheng Lian] Bug fix, more tests, and more refactoring
c298b76 [Cheng Lian] Test suites refactored
2780d6a [Cheng Lian] [WIP] in-memory columnar compression support
211331c [Cheng Lian] WIP: in-memory columnar compression support
85cc59b [Cheng Lian] Refactored ColumnAccessors & ColumnBuilders to remove duplicate code
  • Loading branch information
liancheng authored and pwendell committed Apr 2, 2014
1 parent 7823633 commit 1faa579
Show file tree
Hide file tree
Showing 21 changed files with 1,644 additions and 408 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.nio.{ByteOrder, ByteBuffer}

import org.apache.spark.sql.catalyst.types.{BinaryType, NativeType, DataType}
import org.apache.spark.sql.catalyst.expressions.MutableRow
import org.apache.spark.sql.execution.SparkSqlSerializer
import org.apache.spark.sql.columnar.compression.CompressibleColumnAccessor

/**
* An `Iterator` like trait used to extract values from columnar byte buffer. When a value is
Expand All @@ -41,121 +41,66 @@ private[sql] trait ColumnAccessor {
protected def underlyingBuffer: ByteBuffer
}

private[sql] abstract class BasicColumnAccessor[T <: DataType, JvmType](buffer: ByteBuffer)
private[sql] abstract class BasicColumnAccessor[T <: DataType, JvmType](
protected val buffer: ByteBuffer,
protected val columnType: ColumnType[T, JvmType])
extends ColumnAccessor {

protected def initialize() {}

def columnType: ColumnType[T, JvmType]

def hasNext = buffer.hasRemaining

def extractTo(row: MutableRow, ordinal: Int) {
doExtractTo(row, ordinal)
columnType.setField(row, ordinal, extractSingle(buffer))
}

protected def doExtractTo(row: MutableRow, ordinal: Int)
def extractSingle(buffer: ByteBuffer): JvmType = columnType.extract(buffer)

protected def underlyingBuffer = buffer
}

private[sql] abstract class NativeColumnAccessor[T <: NativeType](
buffer: ByteBuffer,
val columnType: NativeColumnType[T])
extends BasicColumnAccessor[T, T#JvmType](buffer)
override protected val buffer: ByteBuffer,
override protected val columnType: NativeColumnType[T])
extends BasicColumnAccessor(buffer, columnType)
with NullableColumnAccessor
with CompressibleColumnAccessor[T]

private[sql] class BooleanColumnAccessor(buffer: ByteBuffer)
extends NativeColumnAccessor(buffer, BOOLEAN) {

override protected def doExtractTo(row: MutableRow, ordinal: Int) {
row.setBoolean(ordinal, columnType.extract(buffer))
}
}
extends NativeColumnAccessor(buffer, BOOLEAN)

private[sql] class IntColumnAccessor(buffer: ByteBuffer)
extends NativeColumnAccessor(buffer, INT) {

override protected def doExtractTo(row: MutableRow, ordinal: Int) {
row.setInt(ordinal, columnType.extract(buffer))
}
}
extends NativeColumnAccessor(buffer, INT)

private[sql] class ShortColumnAccessor(buffer: ByteBuffer)
extends NativeColumnAccessor(buffer, SHORT) {

override protected def doExtractTo(row: MutableRow, ordinal: Int) {
row.setShort(ordinal, columnType.extract(buffer))
}
}
extends NativeColumnAccessor(buffer, SHORT)

private[sql] class LongColumnAccessor(buffer: ByteBuffer)
extends NativeColumnAccessor(buffer, LONG) {

override protected def doExtractTo(row: MutableRow, ordinal: Int) {
row.setLong(ordinal, columnType.extract(buffer))
}
}
extends NativeColumnAccessor(buffer, LONG)

private[sql] class ByteColumnAccessor(buffer: ByteBuffer)
extends NativeColumnAccessor(buffer, BYTE) {

override protected def doExtractTo(row: MutableRow, ordinal: Int) {
row.setByte(ordinal, columnType.extract(buffer))
}
}
extends NativeColumnAccessor(buffer, BYTE)

private[sql] class DoubleColumnAccessor(buffer: ByteBuffer)
extends NativeColumnAccessor(buffer, DOUBLE) {

override protected def doExtractTo(row: MutableRow, ordinal: Int) {
row.setDouble(ordinal, columnType.extract(buffer))
}
}
extends NativeColumnAccessor(buffer, DOUBLE)

private[sql] class FloatColumnAccessor(buffer: ByteBuffer)
extends NativeColumnAccessor(buffer, FLOAT) {

override protected def doExtractTo(row: MutableRow, ordinal: Int) {
row.setFloat(ordinal, columnType.extract(buffer))
}
}
extends NativeColumnAccessor(buffer, FLOAT)

private[sql] class StringColumnAccessor(buffer: ByteBuffer)
extends NativeColumnAccessor(buffer, STRING) {

override protected def doExtractTo(row: MutableRow, ordinal: Int) {
row.setString(ordinal, columnType.extract(buffer))
}
}
extends NativeColumnAccessor(buffer, STRING)

private[sql] class BinaryColumnAccessor(buffer: ByteBuffer)
extends BasicColumnAccessor[BinaryType.type, Array[Byte]](buffer)
with NullableColumnAccessor {

def columnType = BINARY

override protected def doExtractTo(row: MutableRow, ordinal: Int) {
row(ordinal) = columnType.extract(buffer)
}
}
extends BasicColumnAccessor[BinaryType.type, Array[Byte]](buffer, BINARY)
with NullableColumnAccessor

private[sql] class GenericColumnAccessor(buffer: ByteBuffer)
extends BasicColumnAccessor[DataType, Array[Byte]](buffer)
with NullableColumnAccessor {

def columnType = GENERIC

override protected def doExtractTo(row: MutableRow, ordinal: Int) {
val serialized = columnType.extract(buffer)
row(ordinal) = SparkSqlSerializer.deserialize[Any](serialized)
}
}
extends BasicColumnAccessor[DataType, Array[Byte]](buffer, GENERIC)
with NullableColumnAccessor

private[sql] object ColumnAccessor {
def apply(b: ByteBuffer): ColumnAccessor = {
// The first 4 bytes in the buffer indicates the column type.
val buffer = b.duplicate().order(ByteOrder.nativeOrder())
def apply(buffer: ByteBuffer): ColumnAccessor = {
// The first 4 bytes in the buffer indicate the column type.
val columnTypeId = buffer.getInt()

columnTypeId match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,45 +22,52 @@ import java.nio.{ByteBuffer, ByteOrder}
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.columnar.ColumnBuilder._
import org.apache.spark.sql.execution.SparkSqlSerializer
import org.apache.spark.sql.columnar.compression.{AllCompressionSchemes, CompressibleColumnBuilder}

private[sql] trait ColumnBuilder {
/**
* Initializes with an approximate lower bound on the expected number of elements in this column.
*/
def initialize(initialSize: Int, columnName: String = "")

/**
* Appends `row(ordinal)` to the column builder.
*/
def appendFrom(row: Row, ordinal: Int)

/**
* Column statistics information
*/
def columnStats: ColumnStats[_, _]

/**
* Returns the final columnar byte buffer.
*/
def build(): ByteBuffer
}

private[sql] abstract class BasicColumnBuilder[T <: DataType, JvmType] extends ColumnBuilder {
private[sql] class BasicColumnBuilder[T <: DataType, JvmType](
val columnStats: ColumnStats[T, JvmType],
val columnType: ColumnType[T, JvmType])
extends ColumnBuilder {

private var columnName: String = _
protected var buffer: ByteBuffer = _
protected var columnName: String = _

def columnType: ColumnType[T, JvmType]
protected var buffer: ByteBuffer = _

override def initialize(initialSize: Int, columnName: String = "") = {
val size = if (initialSize == 0) DEFAULT_INITIAL_BUFFER_SIZE else initialSize
this.columnName = columnName
buffer = ByteBuffer.allocate(4 + 4 + size * columnType.defaultSize)

// Reserves 4 bytes for column type ID
buffer = ByteBuffer.allocate(4 + size * columnType.defaultSize)
buffer.order(ByteOrder.nativeOrder()).putInt(columnType.typeId)
}

// Have to give a concrete implementation to make mixin possible
override def appendFrom(row: Row, ordinal: Int) {
doAppendFrom(row, ordinal)
}

// Concrete `ColumnBuilder`s can override this method to append values
protected def doAppendFrom(row: Row, ordinal: Int)

// Helper method to append primitive values (to avoid boxing cost)
protected def appendValue(v: JvmType) {
buffer = ensureFreeSpace(buffer, columnType.actualSize(v))
columnType.append(v, buffer)
val field = columnType.getField(row, ordinal)
buffer = ensureFreeSpace(buffer, columnType.actualSize(field))
columnType.append(field, buffer)
}

override def build() = {
Expand All @@ -69,83 +76,39 @@ private[sql] abstract class BasicColumnBuilder[T <: DataType, JvmType] extends C
}
}

private[sql] abstract class NativeColumnBuilder[T <: NativeType](
val columnType: NativeColumnType[T])
extends BasicColumnBuilder[T, T#JvmType]
private[sql] abstract class ComplexColumnBuilder[T <: DataType, JvmType](
columnType: ColumnType[T, JvmType])
extends BasicColumnBuilder[T, JvmType](new NoopColumnStats[T, JvmType], columnType)
with NullableColumnBuilder

private[sql] class BooleanColumnBuilder extends NativeColumnBuilder(BOOLEAN) {
override def doAppendFrom(row: Row, ordinal: Int) {
appendValue(row.getBoolean(ordinal))
}
}

private[sql] class IntColumnBuilder extends NativeColumnBuilder(INT) {
override def doAppendFrom(row: Row, ordinal: Int) {
appendValue(row.getInt(ordinal))
}
}
private[sql] abstract class NativeColumnBuilder[T <: NativeType](
override val columnStats: NativeColumnStats[T],
override val columnType: NativeColumnType[T])
extends BasicColumnBuilder[T, T#JvmType](columnStats, columnType)
with NullableColumnBuilder
with AllCompressionSchemes
with CompressibleColumnBuilder[T]

private[sql] class ShortColumnBuilder extends NativeColumnBuilder(SHORT) {
override def doAppendFrom(row: Row, ordinal: Int) {
appendValue(row.getShort(ordinal))
}
}
private[sql] class BooleanColumnBuilder extends NativeColumnBuilder(new BooleanColumnStats, BOOLEAN)

private[sql] class LongColumnBuilder extends NativeColumnBuilder(LONG) {
override def doAppendFrom(row: Row, ordinal: Int) {
appendValue(row.getLong(ordinal))
}
}
private[sql] class IntColumnBuilder extends NativeColumnBuilder(new IntColumnStats, INT)

private[sql] class ByteColumnBuilder extends NativeColumnBuilder(BYTE) {
override def doAppendFrom(row: Row, ordinal: Int) {
appendValue(row.getByte(ordinal))
}
}
private[sql] class ShortColumnBuilder extends NativeColumnBuilder(new ShortColumnStats, SHORT)

private[sql] class DoubleColumnBuilder extends NativeColumnBuilder(DOUBLE) {
override def doAppendFrom(row: Row, ordinal: Int) {
appendValue(row.getDouble(ordinal))
}
}
private[sql] class LongColumnBuilder extends NativeColumnBuilder(new LongColumnStats, LONG)

private[sql] class FloatColumnBuilder extends NativeColumnBuilder(FLOAT) {
override def doAppendFrom(row: Row, ordinal: Int) {
appendValue(row.getFloat(ordinal))
}
}
private[sql] class ByteColumnBuilder extends NativeColumnBuilder(new ByteColumnStats, BYTE)

private[sql] class StringColumnBuilder extends NativeColumnBuilder(STRING) {
override def doAppendFrom(row: Row, ordinal: Int) {
appendValue(row.getString(ordinal))
}
}
private[sql] class DoubleColumnBuilder extends NativeColumnBuilder(new DoubleColumnStats, DOUBLE)

private[sql] class BinaryColumnBuilder
extends BasicColumnBuilder[BinaryType.type, Array[Byte]]
with NullableColumnBuilder {
private[sql] class FloatColumnBuilder extends NativeColumnBuilder(new FloatColumnStats, FLOAT)

def columnType = BINARY
private[sql] class StringColumnBuilder extends NativeColumnBuilder(new StringColumnStats, STRING)

override def doAppendFrom(row: Row, ordinal: Int) {
appendValue(row(ordinal).asInstanceOf[Array[Byte]])
}
}
private[sql] class BinaryColumnBuilder extends ComplexColumnBuilder(BINARY)

// TODO (lian) Add support for array, struct and map
private[sql] class GenericColumnBuilder
extends BasicColumnBuilder[DataType, Array[Byte]]
with NullableColumnBuilder {

def columnType = GENERIC

override def doAppendFrom(row: Row, ordinal: Int) {
val serialized = SparkSqlSerializer.serialize(row(ordinal))
buffer = ColumnBuilder.ensureFreeSpace(buffer, columnType.actualSize(serialized))
columnType.append(serialized, buffer)
}
}
private[sql] class GenericColumnBuilder extends ComplexColumnBuilder(GENERIC)

private[sql] object ColumnBuilder {
val DEFAULT_INITIAL_BUFFER_SIZE = 10 * 1024 * 104
Expand Down
Loading

0 comments on commit 1faa579

Please sign in to comment.