Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-3294][SQL] Eliminates boxing costs from in-memory columnar storage #2327

Closed
wants to merge 13 commits into from
Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ final class MutableByte extends MutableValue {
}

final class MutableAny extends MutableValue {
var value: Any = 0
var value: Any = _
def boxed = if (isNull) null else value
def update(v: Any) = value = {
isNull = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,12 @@ private[sql] abstract class BasicColumnAccessor[T <: DataType, JvmType](
def hasNext = buffer.hasRemaining

def extractTo(row: MutableRow, ordinal: Int) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This style is going to go away in 2.12 or 2.13 I think. Should be :Unit =

columnType.setField(row, ordinal, extractSingle(buffer))
extractSingle(row, ordinal)
}

def extractSingle(buffer: ByteBuffer): JvmType = columnType.extract(buffer)
def extractSingle(row: MutableRow, ordinal: Int) {
columnType.extract(buffer, row, ordinal)
}

protected def underlyingBuffer = buffer
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,8 @@ private[sql] class BasicColumnBuilder[T <: DataType, JvmType](
}

override def appendFrom(row: Row, ordinal: Int) {
val field = columnType.getField(row, ordinal)
buffer = ensureFreeSpace(buffer, columnType.actualSize(field))
columnType.append(field, buffer)
buffer = ensureFreeSpace(buffer, columnType.actualSize(row, ordinal))
columnType.append(row, ordinal, buffer)
}

override def build() = {
Expand Down
132 changes: 124 additions & 8 deletions sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@
package org.apache.spark.sql.columnar

import java.nio.ByteBuffer
import java.sql.Timestamp

import scala.reflect.runtime.universe.TypeTag

import java.sql.Timestamp

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.MutableRow
import org.apache.spark.sql.catalyst.types._
Expand All @@ -46,16 +45,33 @@ private[sql] sealed abstract class ColumnType[T <: DataType, JvmType](
*/
def extract(buffer: ByteBuffer): JvmType

/**
* Extracts a value out of the buffer at the buffer's current position and stores in
* `row(ordinal)`. Subclasses should override this method to avoid boxing/unboxing costs whenever
* possible.
*/
def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int) {
setField(row, ordinal, extract(buffer))
}

/**
* Appends the given value v of type T into the given ByteBuffer.
*/
def append(v: JvmType, buffer: ByteBuffer)

/**
* Returns the size of the value. This is used to calculate the size of variable length types
* such as byte arrays and strings.
* Appends `row(ordinal)` of type T into the given ByteBuffer. Subclasses should override this
* method to avoid boxing/unboxing costs whenever possible.
*/
def append(row: Row, ordinal: Int, buffer: ByteBuffer) {
append(getField(row, ordinal), buffer)
}

/**
* Returns the size of the value `row(ordinal)`. This is used to calculate the size of variable
* length types such as byte arrays and strings.
*/
def actualSize(v: JvmType): Int = defaultSize
def actualSize(row: Row, ordinal: Int): Int = defaultSize

/**
* Returns `row(ordinal)`. Subclasses should override this method to avoid boxing/unboxing costs
Expand All @@ -69,6 +85,14 @@ private[sql] sealed abstract class ColumnType[T <: DataType, JvmType](
*/
def setField(row: MutableRow, ordinal: Int, value: JvmType)

/**
* Copies `from(fromOrdinal)` to `to(toOrdinal)`. Subclasses should override this method to avoid
* boxing/unboxing costs whenever possible.
*/
def copyField(from: Row, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
to(toOrdinal) = from(fromOrdinal)
}

/**
* Creates a duplicated copy of the value.
*/
Expand All @@ -94,113 +118,199 @@ private[sql] object INT extends NativeColumnType(IntegerType, 0, 4) {
buffer.putInt(v)
}

override def append(row: Row, ordinal: Int, buffer: ByteBuffer) {
buffer.putInt(row.getInt(ordinal))
}

def extract(buffer: ByteBuffer) = {
buffer.getInt()
}

override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int) {
row.setInt(ordinal, buffer.getInt())
}

override def setField(row: MutableRow, ordinal: Int, value: Int) {
row.setInt(ordinal, value)
}

override def getField(row: Row, ordinal: Int) = row.getInt(ordinal)

override def copyField(from: Row, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
to.setInt(toOrdinal, from.getInt(fromOrdinal))
}
}

private[sql] object LONG extends NativeColumnType(LongType, 1, 8) {
override def append(v: Long, buffer: ByteBuffer) {
buffer.putLong(v)
}

override def append(row: Row, ordinal: Int, buffer: ByteBuffer) {
buffer.putLong(row.getLong(ordinal))
}

override def extract(buffer: ByteBuffer) = {
buffer.getLong()
}

override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int) {
row.setLong(ordinal, buffer.getLong())
}

override def setField(row: MutableRow, ordinal: Int, value: Long) {
row.setLong(ordinal, value)
}

override def getField(row: Row, ordinal: Int) = row.getLong(ordinal)

override def copyField(from: Row, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
to.setLong(toOrdinal, from.getLong(fromOrdinal))
}
}

private[sql] object FLOAT extends NativeColumnType(FloatType, 2, 4) {
override def append(v: Float, buffer: ByteBuffer) {
buffer.putFloat(v)
}

override def append(row: Row, ordinal: Int, buffer: ByteBuffer) {
buffer.putFloat(row.getFloat(ordinal))
}

override def extract(buffer: ByteBuffer) = {
buffer.getFloat()
}

override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int) {
row.setFloat(ordinal, buffer.getFloat())
}

override def setField(row: MutableRow, ordinal: Int, value: Float) {
row.setFloat(ordinal, value)
}

override def getField(row: Row, ordinal: Int) = row.getFloat(ordinal)

override def copyField(from: Row, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
to.setFloat(toOrdinal, from.getFloat(fromOrdinal))
}
}

private[sql] object DOUBLE extends NativeColumnType(DoubleType, 3, 8) {
override def append(v: Double, buffer: ByteBuffer) {
buffer.putDouble(v)
}

override def append(row: Row, ordinal: Int, buffer: ByteBuffer) {
buffer.putDouble(row.getDouble(ordinal))
}

override def extract(buffer: ByteBuffer) = {
buffer.getDouble()
}

override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int) {
row.setDouble(ordinal, buffer.getDouble())
}

override def setField(row: MutableRow, ordinal: Int, value: Double) {
row.setDouble(ordinal, value)
}

override def getField(row: Row, ordinal: Int) = row.getDouble(ordinal)

override def copyField(from: Row, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
to.setDouble(toOrdinal, from.getDouble(fromOrdinal))
}
}

private[sql] object BOOLEAN extends NativeColumnType(BooleanType, 4, 1) {
override def append(v: Boolean, buffer: ByteBuffer) {
buffer.put(if (v) 1.toByte else 0.toByte)
buffer.put(if (v) 1: Byte else 0: Byte)
}

override def append(row: Row, ordinal: Int, buffer: ByteBuffer) {
buffer.put(if (row.getBoolean(ordinal)) 1: Byte else 0: Byte)
}

override def extract(buffer: ByteBuffer) = buffer.get() == 1

override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int) {
row.setBoolean(ordinal, buffer.get() == 1)
}

override def setField(row: MutableRow, ordinal: Int, value: Boolean) {
row.setBoolean(ordinal, value)
}

override def getField(row: Row, ordinal: Int) = row.getBoolean(ordinal)

override def copyField(from: Row, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
to.setBoolean(toOrdinal, from.getBoolean(fromOrdinal))
}
}

private[sql] object BYTE extends NativeColumnType(ByteType, 5, 1) {
override def append(v: Byte, buffer: ByteBuffer) {
buffer.put(v)
}

override def append(row: Row, ordinal: Int, buffer: ByteBuffer) {
buffer.put(row.getByte(ordinal))
}

override def extract(buffer: ByteBuffer) = {
buffer.get()
}

override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int) {
row.setByte(ordinal, buffer.get())
}

override def setField(row: MutableRow, ordinal: Int, value: Byte) {
row.setByte(ordinal, value)
}

override def getField(row: Row, ordinal: Int) = row.getByte(ordinal)

override def copyField(from: Row, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
to.setByte(toOrdinal, from.getByte(fromOrdinal))
}
}

private[sql] object SHORT extends NativeColumnType(ShortType, 6, 2) {
override def append(v: Short, buffer: ByteBuffer) {
buffer.putShort(v)
}

override def append(row: Row, ordinal: Int, buffer: ByteBuffer) {
buffer.putShort(row.getShort(ordinal))
}

override def extract(buffer: ByteBuffer) = {
buffer.getShort()
}

override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int) {
row.setShort(ordinal, buffer.getShort())
}

override def setField(row: MutableRow, ordinal: Int, value: Short) {
row.setShort(ordinal, value)
}

override def getField(row: Row, ordinal: Int) = row.getShort(ordinal)

override def copyField(from: Row, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
to.setShort(toOrdinal, from.getShort(fromOrdinal))
}
}

private[sql] object STRING extends NativeColumnType(StringType, 7, 8) {
override def actualSize(v: String): Int = v.getBytes("utf-8").length + 4
override def actualSize(row: Row, ordinal: Int): Int = {
row.getString(ordinal).getBytes("utf-8").length + 4
}

override def append(v: String, buffer: ByteBuffer) {
val stringBytes = v.getBytes("utf-8")
Expand All @@ -219,6 +329,10 @@ private[sql] object STRING extends NativeColumnType(StringType, 7, 8) {
}

override def getField(row: Row, ordinal: Int) = row.getString(ordinal)

override def copyField(from: Row, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
to.setString(toOrdinal, from.getString(fromOrdinal))
}
}

private[sql] object TIMESTAMP extends NativeColumnType(TimestampType, 8, 12) {
Expand Down Expand Up @@ -246,7 +360,9 @@ private[sql] sealed abstract class ByteArrayColumnType[T <: DataType](
defaultSize: Int)
extends ColumnType[T, Array[Byte]](typeId, defaultSize) {

override def actualSize(v: Array[Byte]) = v.length + 4
override def actualSize(row: Row, ordinal: Int) = {
getField(row, ordinal).length + 4
}

override def append(v: Array[Byte], buffer: ByteBuffer) {
buffer.putInt(v.length).put(v, 0, v.length)
Expand Down
Loading