Skip to content

Commit

Permalink
Made compression decoder row based
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed Sep 10, 2014
1 parent edac3cd commit 456c366
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,6 @@ private[sql] trait CompressibleColumnAccessor[T <: NativeType] extends ColumnAcc
abstract override def hasNext = super.hasNext || decoder.hasNext

override def extractSingle(row: MutableRow, ordinal: Int) {
columnType.setField(row, ordinal, decoder.next())
decoder.next(row, ordinal)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.columnar.compression
import java.nio.{ByteBuffer, ByteOrder}

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.MutableRow
import org.apache.spark.sql.catalyst.types.NativeType
import org.apache.spark.sql.columnar.{ColumnType, NativeColumnType}

Expand All @@ -37,7 +38,11 @@ private[sql] trait Encoder[T <: NativeType] {
def compress(from: ByteBuffer, to: ByteBuffer): ByteBuffer
}

private[sql] trait Decoder[T <: NativeType] extends Iterator[T#JvmType]
private[sql] trait Decoder[T <: NativeType] {
def next(row: MutableRow, ordinal: Int): Unit

def hasNext: Boolean
}

private[sql] trait CompressionScheme {
def typeId: Int
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.reflect.ClassTag
import scala.reflect.runtime.universe.runtimeMirror

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.SpecificMutableRow
import org.apache.spark.sql.catalyst.expressions.{MutableRow, SpecificMutableRow}
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.columnar._
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -57,7 +57,9 @@ private[sql] case object PassThrough extends CompressionScheme {
class Decoder[T <: NativeType](buffer: ByteBuffer, columnType: NativeColumnType[T])
extends compression.Decoder[T] {

override def next() = columnType.extract(buffer)
override def next(row: MutableRow, ordinal: Int) {
columnType.setField(row, ordinal, columnType.extract(buffer))
}

override def hasNext = buffer.hasRemaining
}
Expand Down Expand Up @@ -151,7 +153,7 @@ private[sql] case object RunLengthEncoding extends CompressionScheme {
private var valueCount = 0
private var currentValue: T#JvmType = _

override def next() = {
override def next(row: MutableRow, ordinal: Int) {
if (valueCount == run) {
currentValue = columnType.extract(buffer)
run = buffer.getInt()
Expand All @@ -160,7 +162,7 @@ private[sql] case object RunLengthEncoding extends CompressionScheme {
valueCount += 1
}

currentValue
columnType.setField(row, ordinal, currentValue)
}

override def hasNext = valueCount < run || buffer.hasRemaining
Expand Down Expand Up @@ -274,7 +276,9 @@ private[sql] case object DictionaryEncoding extends CompressionScheme {
}
}

override def next() = dictionary(buffer.getShort())
override def next(row: MutableRow, ordinal: Int) {
columnType.setField(row, ordinal, dictionary(buffer.getShort()))
}

override def hasNext = buffer.hasRemaining
}
Expand Down Expand Up @@ -354,15 +358,15 @@ private[sql] case object BooleanBitSet extends CompressionScheme {

private var visited: Int = 0

override def next(): Boolean = {
override def next(row: MutableRow, ordinal: Int) {
val bit = visited % BITS_PER_LONG

visited += 1
if (bit == 0) {
currentWord = buffer.getLong()
}

((currentWord >> bit) & 1) != 0
row.setBoolean(ordinal, ((currentWord >> bit) & 1) != 0)
}

override def hasNext: Boolean = visited < count
Expand Down Expand Up @@ -440,10 +444,10 @@ private[sql] case object IntDelta extends CompressionScheme {

override def hasNext: Boolean = buffer.hasRemaining

override def next() = {
override def next(row: MutableRow, ordinal: Int) {
val delta = buffer.get()
prev = if (delta > Byte.MinValue) prev + delta else buffer.getInt()
prev
row.setInt(ordinal, prev)
}
}
}
Expand Down Expand Up @@ -519,10 +523,10 @@ private[sql] case object LongDelta extends CompressionScheme {

override def hasNext: Boolean = buffer.hasRemaining

override def next() = {
override def next(row: MutableRow, ordinal: Int) {
val delta = buffer.get()
prev = if (delta > Byte.MinValue) prev + delta else buffer.getLong()
prev
row.setLong(ordinal, prev)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.columnar.compression
import org.scalatest.FunSuite

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
import org.apache.spark.sql.columnar.{NoopColumnStats, BOOLEAN}
import org.apache.spark.sql.columnar.ColumnarTestUtils._

Expand Down Expand Up @@ -72,10 +73,14 @@ class BooleanBitSetSuite extends FunSuite {
buffer.rewind().position(headerSize + 4)

val decoder = BooleanBitSet.decoder(buffer, BOOLEAN)
val mutableRow = new GenericMutableRow(1)
if (values.nonEmpty) {
values.foreach {
assert(decoder.hasNext)
assertResult(_, "Wrong decoded value")(decoder.next())
assertResult(_, "Wrong decoded value") {
decoder.next(mutableRow, 0)
mutableRow.getBoolean(0)
}
}
}
assert(!decoder.hasNext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.nio.ByteBuffer

import org.scalatest.FunSuite

import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
import org.apache.spark.sql.catalyst.types.NativeType
import org.apache.spark.sql.columnar._
import org.apache.spark.sql.columnar.ColumnarTestUtils._
Expand Down Expand Up @@ -97,11 +98,15 @@ class DictionaryEncodingSuite extends FunSuite {
buffer.rewind().position(headerSize + 4)

val decoder = DictionaryEncoding.decoder(buffer, columnType)
val mutableRow = new GenericMutableRow(1)

if (inputSeq.nonEmpty) {
inputSeq.foreach { i =>
assert(decoder.hasNext)
assertResult(values(i), "Wrong decoded value")(decoder.next())
assertResult(values(i), "Wrong decoded value") {
decoder.next(mutableRow, 0)
columnType.getField(mutableRow, 0)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,15 @@ class IntegralDeltaSuite extends FunSuite {
buffer.rewind().position(headerSize + 4)

val decoder = scheme.decoder(buffer, columnType)
val mutableRow = new GenericMutableRow(1)

if (input.nonEmpty) {
input.foreach{
assert(decoder.hasNext)
assertResult(_, "Wrong decoded value")(decoder.next())
assertResult(_, "Wrong decoded value") {
decoder.next(mutableRow, 0)
columnType.getField(mutableRow, 0)
}
}
}
assert(!decoder.hasNext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.columnar.compression

import org.scalatest.FunSuite

import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
import org.apache.spark.sql.catalyst.types.NativeType
import org.apache.spark.sql.columnar._
import org.apache.spark.sql.columnar.ColumnarTestUtils._
Expand Down Expand Up @@ -80,11 +81,15 @@ class RunLengthEncodingSuite extends FunSuite {
buffer.rewind().position(headerSize + 4)

val decoder = RunLengthEncoding.decoder(buffer, columnType)
val mutableRow = new GenericMutableRow(1)

if (inputSeq.nonEmpty) {
inputSeq.foreach { i =>
assert(decoder.hasNext)
assertResult(values(i), "Wrong decoded value")(decoder.next())
assertResult(values(i), "Wrong decoded value") {
decoder.next(mutableRow, 0)
columnType.getField(mutableRow, 0)
}
}
}

Expand Down

0 comments on commit 456c366

Please sign in to comment.