Skip to content

Commit

Permalink
WIP: in-memory columnar compression support
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed Apr 1, 2014
1 parent 85cc59b commit 211331c
Show file tree
Hide file tree
Showing 11 changed files with 487 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,28 +41,29 @@ private[sql] trait ColumnAccessor {
}

private[sql] abstract class BasicColumnAccessor[T <: DataType, JvmType](
buffer: ByteBuffer, columnType: ColumnType[T, JvmType])
protected val buffer: ByteBuffer,
protected val columnType: ColumnType[T, JvmType])
extends ColumnAccessor {

protected def initialize() {}

def hasNext = buffer.hasRemaining

def extractTo(row: MutableRow, ordinal: Int) {
columnType.setField(row, ordinal, columnType.extract(buffer))
columnType.setField(row, ordinal, extractSingle(buffer))
}

def extractSingle(buffer: ByteBuffer) = columnType.extract(buffer)

protected def underlyingBuffer = buffer
}

private[sql] abstract class NativeColumnAccessor[T <: NativeType](
buffer: ByteBuffer,
val columnType: NativeColumnType[T])
extends BasicColumnAccessor[T, T#JvmType](buffer, columnType)
with NullableColumnAccessor {

type JvmType = T#JvmType
}
columnType: NativeColumnType[T])
extends BasicColumnAccessor(buffer, columnType)
with NullableColumnAccessor
with CompressedColumnAccessor[T]

private[sql] class BooleanColumnAccessor(buffer: ByteBuffer)
extends NativeColumnAccessor(buffer, BOOLEAN)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,36 @@ private[sql] trait ColumnBuilder {
*/
def initialize(initialSize: Int, columnName: String = "")

/**
* Gathers statistics information from `row(ordinal)`.
*/
def gatherStats(row: Row, ordinal: Int) {}

/**
* Appends `row(ordinal)` to the column builder.
*/
def appendFrom(row: Row, ordinal: Int)

/**
* Returns the final columnar byte buffer.
*/
def build(): ByteBuffer
}

private[sql] abstract class BasicColumnBuilder[T <: DataType, JvmType](
val columnType: ColumnType[T, JvmType])
extends ColumnBuilder {

private var columnName: String = _
protected var columnName: String = _

protected var buffer: ByteBuffer = _

override def initialize(initialSize: Int, columnName: String = "") = {
val size = if (initialSize == 0) DEFAULT_INITIAL_BUFFER_SIZE else initialSize
this.columnName = columnName
buffer = ByteBuffer.allocate(4 + 4 + size * columnType.defaultSize)

// Reserves 4 bytes for column type ID
buffer = ByteBuffer.allocate(4 + size * columnType.defaultSize)
buffer.order(ByteOrder.nativeOrder()).putInt(columnType.typeId)
}

Expand All @@ -66,8 +77,9 @@ private[sql] abstract class BasicColumnBuilder[T <: DataType, JvmType](
private[sql] abstract class NativeColumnBuilder[T <: NativeType](
protected val columnStats: ColumnStats[T],
columnType: NativeColumnType[T])
extends BasicColumnBuilder[T, T#JvmType](columnType)
with NullableColumnBuilder {
extends BasicColumnBuilder(columnType)
with NullableColumnBuilder
with CompressedColumnBuilder[T] {

override def gatherStats(row: Row, ordinal: Int) {
columnStats.gatherStats(row, ordinal)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.columnar

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.types._

private[sql] sealed abstract class ColumnStats[T <: NativeType] extends Serializable{
type JvmType = T#JvmType

protected var (_lower, _upper) = initialBounds

protected val ordering: Ordering[JvmType]

protected def columnType: NativeColumnType[T]

/**
* Closed lower bound of this column.
*/
def lowerBound = _lower

/**
* Closed upper bound of this column.
*/
def upperBound = _upper

/**
* Initial values for the closed lower/upper bounds, in the format of `(lower, upper)`.
*/
protected def initialBounds: (JvmType, JvmType)

/**
* Gathers statistics information from `row(ordinal)`.
*/
@inline def gatherStats(row: Row, ordinal: Int) {
val field = columnType.getField(row, ordinal)
if (ordering.gt(field, upperBound)) _upper = field
if (ordering.lt(field, lowerBound)) _lower = field
}

/**
* Returns `true` if `lower <= row(ordinal) <= upper`.
*/
@inline def contains(row: Row, ordinal: Int) = {
val field = columnType.getField(row, ordinal)
ordering.lteq(lowerBound, field) && ordering.lteq(field, upperBound)
}

/**
* Returns `true` if `row(ordinal) < upper` holds.
*/
@inline def isAbove(row: Row, ordinal: Int) = {
val field = columnType.getField(row, ordinal)
ordering.lt(field, upperBound)
}

/**
* Returns `true` if `lower < row(ordinal)` holds.
*/
@inline def isBelow(row: Row, ordinal: Int) = {
val field = columnType.getField(row, ordinal)
ordering.lt(lowerBound, field)
}

/**
* Returns `true` if `row(ordinal) <= upper` holds.
*/
@inline def isAtOrAbove(row: Row, ordinal: Int) = {
contains(row, ordinal) || isAbove(row, ordinal)
}

/**
* Returns `true` if `lower <= row(ordinal)` holds.
*/
@inline def isAtOrBelow(row: Row, ordinal: Int) = {
contains(row, ordinal) || isBelow(row, ordinal)
}
}

private[sql] abstract class BasicColumnStats[T <: NativeType](
protected val columnType: NativeColumnType[T])
extends ColumnStats[T]

private[sql] class BooleanColumnStats extends BasicColumnStats(BOOLEAN) {
override protected val ordering = implicitly[Ordering[JvmType]]
override protected def initialBounds = (true, false)
}

private[sql] class ByteColumnStats extends BasicColumnStats(BYTE) {
override protected val ordering = implicitly[Ordering[JvmType]]
override protected def initialBounds = (Byte.MaxValue, Byte.MinValue)
}

private[sql] class ShortColumnStats extends BasicColumnStats(SHORT) {
override protected val ordering = implicitly[Ordering[JvmType]]
override protected def initialBounds = (Short.MaxValue, Short.MinValue)
}

private[sql] class LongColumnStats extends BasicColumnStats(LONG) {
override protected val ordering = implicitly[Ordering[JvmType]]
override protected def initialBounds = (Long.MaxValue, Long.MinValue)
}

private[sql] class DoubleColumnStats extends BasicColumnStats(DOUBLE) {
override protected val ordering = implicitly[Ordering[JvmType]]
override protected def initialBounds = (Double.MaxValue, Double.MinValue)
}

private[sql] class FloatColumnStats extends BasicColumnStats(FLOAT) {
override protected val ordering = implicitly[Ordering[JvmType]]
override protected def initialBounds = (Float.MaxValue, Float.MinValue)
}

private[sql] class IntColumnStats extends BasicColumnStats(INT) {
private object OrderedState extends Enumeration {
val Uninitialized, Initialized, Ascending, Descending, Unordered = Value
}

import OrderedState._

private var orderedState = Uninitialized
private var lastValue: Int = _
private var _maxDelta: Int = _

def isAscending = orderedState != Descending && orderedState != Unordered
def isDescending = orderedState != Ascending && orderedState != Unordered
def isOrdered = isAscending || isDescending
def maxDelta = _maxDelta

override protected val ordering = implicitly[Ordering[JvmType]]
override protected def initialBounds = (Int.MaxValue, Int.MinValue)

override def gatherStats(row: Row, ordinal: Int) = {
val field = columnType.getField(row, ordinal)

if (field > upperBound) _upper = field
if (field < lowerBound) _lower = field

orderedState = orderedState match {
case Uninitialized =>
lastValue = field
Initialized

case Initialized =>
// If all the integers in the column are the same, ordered state is set to Ascending.
// TODO (lian) Confirm whether this is the standard behaviour.
val nextState = if (field >= lastValue) Ascending else Descending
_maxDelta = math.abs(field - lastValue)
lastValue = field
nextState

case Ascending if field < lastValue =>
Unordered

case Descending if field > lastValue =>
Unordered

case state @ (Ascending | Descending) =>
_maxDelta = _maxDelta.max(field - lastValue)
lastValue = field
state
}
}
}

private[sql] class StringColumnStates extends BasicColumnStats(STRING) {
override protected val ordering = implicitly[Ordering[JvmType]]
override protected def initialBounds = (null, null)

override def contains(row: Row, ordinal: Int) = {
!(upperBound eq null) && super.contains(row, ordinal)
}

override def isAbove(row: Row, ordinal: Int) = {
!(upperBound eq null) && super.isAbove(row, ordinal)
}

override def isBelow(row: Row, ordinal: Int) = {
!(lowerBound eq null) && super.isBelow(row, ordinal)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.columnar

import java.nio.ByteBuffer

import org.apache.spark.sql.catalyst.types.NativeType
import org.apache.spark.sql.columnar.CompressionAlgorithm.NoopDecoder
import org.apache.spark.sql.columnar.CompressionType._

private[sql] trait CompressedColumnAccessor[T <: NativeType] extends ColumnAccessor {
this: BasicColumnAccessor[T, T#JvmType] =>

private var decoder: Iterator[T#JvmType] = _

abstract override protected def initialize() = {
super.initialize()

decoder = underlyingBuffer.getInt() match {
case id if id == Noop.id => new NoopDecoder[T](buffer, columnType)
case _ => throw new UnsupportedOperationException()
}
}

abstract override def extractSingle(buffer: ByteBuffer) = decoder.next()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.columnar

import org.apache.spark.sql.{Logging, Row}
import org.apache.spark.sql.catalyst.types.NativeType

private[sql] trait CompressedColumnBuilder[T <: NativeType] extends ColumnBuilder with Logging {
this: BasicColumnBuilder[T, T#JvmType] =>

val compressionSchemes = Seq(new CompressionAlgorithm.Noop)
.filter(_.supports(columnType))

def isWorthCompressing(scheme: CompressionAlgorithm) = {
scheme.compressionRatio < 0.8
}

abstract override def gatherStats(row: Row, ordinal: Int) {
compressionSchemes.foreach {
val field = columnType.getField(row, ordinal)
_.gatherCompressibilityStats(field, columnType)
}

super.gatherStats(row, ordinal)
}

abstract override def build() = {
val rawBuffer = super.build()

if (compressionSchemes.isEmpty) {
logger.info(s"Compression scheme chosen for [$columnName] is ${CompressionType.Noop}")
new CompressionAlgorithm.Noop().compress(rawBuffer, columnType)
} else {
val candidateScheme = compressionSchemes.minBy(_.compressionRatio)

logger.info(
s"Compression scheme chosen for [$columnName] is ${candidateScheme.compressionType} " +
s"ration ${candidateScheme.compressionRatio}")

if (isWorthCompressing(candidateScheme)) {
candidateScheme.compress(rawBuffer, columnType)
} else {
new CompressionAlgorithm.Noop().compress(rawBuffer, columnType)
}
}
}
}
Loading

0 comments on commit 211331c

Please sign in to comment.