Skip to content

Commit

Permalink
Added row based ColumnType.append/extract
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed Sep 10, 2014
1 parent 456c366 commit 9cf30b0
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ private[sql] abstract class BasicColumnAccessor[T <: DataType, JvmType](
}

def extractSingle(row: MutableRow, ordinal: Int) {
columnType.setField(row, ordinal, columnType.extract(buffer))
columnType.extract(buffer, row, ordinal)
}

protected def underlyingBuffer = buffer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ private[sql] class BasicColumnBuilder[T <: DataType, JvmType](

override def appendFrom(row: Row, ordinal: Int) {
buffer = ensureFreeSpace(buffer, columnType.actualSize(row, ordinal))
columnType.append(columnType.getField(row, ordinal), buffer)
columnType.append(row, ordinal, buffer)
}

override def build() = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.sql.Timestamp
import scala.reflect.runtime.universe.TypeTag

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.{MutableAny, MutableRow, MutableValue}
import org.apache.spark.sql.catalyst.expressions.MutableRow
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.execution.SparkSqlSerializer

Expand All @@ -45,11 +45,28 @@ private[sql] sealed abstract class ColumnType[T <: DataType, JvmType](
*/
def extract(buffer: ByteBuffer): JvmType

/**
* Extracts a value out of the buffer at the buffer's current position and stores in
* `row(ordinal)`. Subclasses should override this method to avoid boxing/unboxing costs whenever
* possible.
*/
def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int) {
setField(row, ordinal, extract(buffer))
}

/**
* Appends the given value v of type T into the given ByteBuffer.
*/
def append(v: JvmType, buffer: ByteBuffer)

/**
* Appends `row(ordinal)` of type T into the given ByteBuffer. Subclasses should override this
* method to avoid boxing/unboxing costs whenever possible.
*/
def append(row: Row, ordinal: Int, buffer: ByteBuffer) {
append(getField(row, ordinal), buffer)
}

/**
* Returns the size of the value `row(ordinal)`. This is used to calculate the size of variable
* length types such as byte arrays and strings.
Expand Down Expand Up @@ -101,10 +118,18 @@ private[sql] object INT extends NativeColumnType(IntegerType, 0, 4) {
buffer.putInt(v)
}

override def append(row: Row, ordinal: Int, buffer: ByteBuffer) {
buffer.putInt(row.getInt(ordinal))
}

def extract(buffer: ByteBuffer) = {
buffer.getInt()
}

override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int) {
row.setInt(ordinal, buffer.getInt())
}

override def setField(row: MutableRow, ordinal: Int, value: Int) {
row.setInt(ordinal, value)
}
Expand All @@ -121,10 +146,18 @@ private[sql] object LONG extends NativeColumnType(LongType, 1, 8) {
buffer.putLong(v)
}

override def append(row: Row, ordinal: Int, buffer: ByteBuffer) {
buffer.putLong(row.getLong(ordinal))
}

override def extract(buffer: ByteBuffer) = {
buffer.getLong()
}

override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int) {
row.setLong(ordinal, buffer.getLong())
}

override def setField(row: MutableRow, ordinal: Int, value: Long) {
row.setLong(ordinal, value)
}
Expand All @@ -141,10 +174,18 @@ private[sql] object FLOAT extends NativeColumnType(FloatType, 2, 4) {
buffer.putFloat(v)
}

override def append(row: Row, ordinal: Int, buffer: ByteBuffer) {
buffer.putFloat(row.getFloat(ordinal))
}

override def extract(buffer: ByteBuffer) = {
buffer.getFloat()
}

override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int) {
row.setFloat(ordinal, buffer.getFloat())
}

override def setField(row: MutableRow, ordinal: Int, value: Float) {
row.setFloat(ordinal, value)
}
Expand All @@ -161,10 +202,18 @@ private[sql] object DOUBLE extends NativeColumnType(DoubleType, 3, 8) {
buffer.putDouble(v)
}

override def append(row: Row, ordinal: Int, buffer: ByteBuffer) {
buffer.putDouble(row.getDouble(ordinal))
}

override def extract(buffer: ByteBuffer) = {
buffer.getDouble()
}

override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int) {
row.setDouble(ordinal, buffer.getDouble())
}

override def setField(row: MutableRow, ordinal: Int, value: Double) {
row.setDouble(ordinal, value)
}
Expand All @@ -178,11 +227,19 @@ private[sql] object DOUBLE extends NativeColumnType(DoubleType, 3, 8) {

private[sql] object BOOLEAN extends NativeColumnType(BooleanType, 4, 1) {
override def append(v: Boolean, buffer: ByteBuffer) {
buffer.put(if (v) 1.toByte else 0.toByte)
buffer.put(if (v) 1: Byte else 0: Byte)
}

override def append(row: Row, ordinal: Int, buffer: ByteBuffer) {
buffer.put(if (row.getBoolean(ordinal)) 1: Byte else 0: Byte)
}

override def extract(buffer: ByteBuffer) = buffer.get() == 1

override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int) {
row.setBoolean(ordinal, buffer.get() == 1)
}

override def setField(row: MutableRow, ordinal: Int, value: Boolean) {
row.setBoolean(ordinal, value)
}
Expand All @@ -199,10 +256,18 @@ private[sql] object BYTE extends NativeColumnType(ByteType, 5, 1) {
buffer.put(v)
}

override def append(row: Row, ordinal: Int, buffer: ByteBuffer) {
buffer.put(row.getByte(ordinal))
}

override def extract(buffer: ByteBuffer) = {
buffer.get()
}

override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int) {
row.setByte(ordinal, buffer.get())
}

override def setField(row: MutableRow, ordinal: Int, value: Byte) {
row.setByte(ordinal, value)
}
Expand All @@ -219,10 +284,18 @@ private[sql] object SHORT extends NativeColumnType(ShortType, 6, 2) {
buffer.putShort(v)
}

override def append(row: Row, ordinal: Int, buffer: ByteBuffer) {
buffer.putShort(row.getShort(ordinal))
}

override def extract(buffer: ByteBuffer) = {
buffer.getShort()
}

override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int) {
row.setShort(ordinal, buffer.getShort())
}

override def setField(row: MutableRow, ordinal: Int, value: Short) {
row.setShort(ordinal, value)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ private[sql] case object PassThrough extends CompressionScheme {
extends compression.Decoder[T] {

override def next(row: MutableRow, ordinal: Int) {
columnType.setField(row, ordinal, columnType.extract(buffer))
columnType.extract(buffer, row, ordinal)
}

override def hasNext = buffer.hasRemaining
Expand Down Expand Up @@ -117,27 +117,30 @@ private[sql] case object RunLengthEncoding extends CompressionScheme {
to.putInt(RunLengthEncoding.typeId)

if (from.hasRemaining) {
var currentValue = columnType.extract(from)
val currentValue = new SpecificMutableRow(Seq(columnType.dataType))
var currentRun = 1
val value = new SpecificMutableRow(Seq(columnType.dataType))

columnType.extract(from, currentValue, 0)

while (from.hasRemaining) {
val value = columnType.extract(from)
columnType.extract(from, value, 0)

if (value == currentValue) {
if (value.head == currentValue.head) {
currentRun += 1
} else {
// Writes current run
columnType.append(currentValue, to)
columnType.append(currentValue, 0, to)
to.putInt(currentRun)

// Resets current run
currentValue = value
columnType.copyField(value, 0, currentValue, 0)
currentRun = 1
}
}

// Writes the last run
columnType.append(currentValue, to)
columnType.append(currentValue, 0, to)
to.putInt(currentRun)
}

Expand Down

0 comments on commit 9cf30b0

Please sign in to comment.