Skip to content

Commit

Permalink
Test suites refactored
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed Apr 1, 2014
1 parent 2780d6a commit c298b76
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 157 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,68 +17,32 @@

package org.apache.spark.sql.columnar

import scala.reflect.ClassTag
import scala.util.Random

import org.scalatest.FunSuite

import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
import org.apache.spark.sql.catalyst.types._

class ColumnStatsSuite extends FunSuite {
testColumnStats[BooleanType.type, BooleanColumnStats] {
Random.nextBoolean()
}

testColumnStats[IntegerType.type, IntColumnStats] {
Random.nextInt()
}

testColumnStats[LongType.type, LongColumnStats] {
Random.nextLong()
}

testColumnStats[ShortType.type, ShortColumnStats] {
(Random.nextInt(Short.MaxValue * 2) - Short.MaxValue).toShort
}

testColumnStats[ByteType.type, ByteColumnStats] {
(Random.nextInt(Byte.MaxValue * 2) - Byte.MaxValue).toByte
}

testColumnStats[DoubleType.type, DoubleColumnStats] {
Random.nextDouble()
}

testColumnStats[FloatType.type, FloatColumnStats] {
Random.nextFloat()
}

testColumnStats[StringType.type, StringColumnStats] {
Random.nextString(Random.nextInt(32))
}
testColumnStats(classOf[IntColumnStats], INT)

def testColumnStats[T <: NativeType, U <: NativeColumnStats[T]: ClassTag](
mkRandomValue: => U#JvmType) {
def testColumnStats[T <: NativeType, U <: NativeColumnStats[T]](
columnStatsClass: Class[U],
columnType: NativeColumnType[T]) {

val columnStatsClass = implicitly[ClassTag[U]].runtimeClass
val columnStatsName = columnStatsClass.getSimpleName

test(s"$columnStatsName: empty") {
val columnStats = columnStatsClass.newInstance().asInstanceOf[U]
val columnStats = columnStatsClass.newInstance()
assert((columnStats.lowerBound, columnStats.upperBound) === columnStats.initialBounds)
}

test(s"$columnStatsName: non-empty") {
val columnStats = columnStatsClass.newInstance().asInstanceOf[U]
val values = Seq.fill[U#JvmType](10)(mkRandomValue)
val row = new GenericMutableRow(1)
import ColumnarTestData._

values.foreach { value =>
row(0) = value
columnStats.gatherStats(row, 0)
}
val columnStats = columnStatsClass.newInstance()
val rows = Seq.fill(10)(makeRandomRow(columnType))
rows.foreach(columnStats.gatherStats(_, 0))

val values = rows.map(_.head.asInstanceOf[T#JvmType])
assert(columnStats.lowerBound === values.min(columnStats.ordering))
assert(columnStats.upperBound === values.max(columnStats.ordering))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ package org.apache.spark.sql.columnar

import java.nio.ByteBuffer

import scala.util.Random

import org.scalatest.FunSuite

import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.columnar.ColumnarTestData._
import org.apache.spark.sql.execution.SparkSqlSerializer

class ColumnTypeSuite extends FunSuite {
val DEFAULT_BUFFER_SIZE = 512

val columnTypes = Seq(INT, SHORT, LONG, BYTE, DOUBLE, FLOAT, STRING, BINARY, GENERIC)

test("defaultSize") {
Expand Down Expand Up @@ -55,116 +56,69 @@ class ColumnTypeSuite extends FunSuite {
}
}

testNumericColumnType[BooleanType.type, Boolean](
testNativeColumnStats[BooleanType.type](
BOOLEAN,
Array.fill(4)(Random.nextBoolean()),
ByteBuffer.allocate(32),
(buffer: ByteBuffer, v: Boolean) => {
buffer.put((if (v) 1 else 0).toByte)
},
(buffer: ByteBuffer) => {
buffer.get() == 1
})

testNumericColumnType[IntegerType.type, Int](
testNativeColumnStats[IntegerType.type](
INT,
Array.fill(4)(Random.nextInt()),
ByteBuffer.allocate(32),
(_: ByteBuffer).putInt(_),
(_: ByteBuffer).getInt)

testNumericColumnType[ShortType.type, Short](
testNativeColumnStats[ShortType.type](
SHORT,
Array.fill(4)(Random.nextInt(Short.MaxValue).asInstanceOf[Short]),
ByteBuffer.allocate(32),
(_: ByteBuffer).putShort(_),
(_: ByteBuffer).getShort)

testNumericColumnType[LongType.type, Long](
testNativeColumnStats[LongType.type](
LONG,
Array.fill(4)(Random.nextLong()),
ByteBuffer.allocate(64),
(_: ByteBuffer).putLong(_),
(_: ByteBuffer).getLong)

testNumericColumnType[ByteType.type, Byte](
testNativeColumnStats[ByteType.type](
BYTE,
Array.fill(4)(Random.nextInt(Byte.MaxValue).asInstanceOf[Byte]),
ByteBuffer.allocate(64),
(_: ByteBuffer).put(_),
(_: ByteBuffer).get)

testNumericColumnType[DoubleType.type, Double](
testNativeColumnStats[DoubleType.type](
DOUBLE,
Array.fill(4)(Random.nextDouble()),
ByteBuffer.allocate(64),
(_: ByteBuffer).putDouble(_),
(_: ByteBuffer).getDouble)

testNumericColumnType[FloatType.type, Float](
testNativeColumnStats[FloatType.type](
FLOAT,
Array.fill(4)(Random.nextFloat()),
ByteBuffer.allocate(64),
(_: ByteBuffer).putFloat(_),
(_: ByteBuffer).getFloat)

test("STRING") {
val buffer = ByteBuffer.allocate(128)
val seq = Array("hello", "world", "spark", "sql")

seq.map(_.getBytes).foreach { bytes: Array[Byte] =>
buffer.putInt(bytes.length).put(bytes)
}

buffer.rewind()
seq.foreach { s =>
assert(s === STRING.extract(buffer))
}

buffer.rewind()
seq.foreach(STRING.append(_, buffer))

buffer.rewind()
seq.foreach { s =>
val length = buffer.getInt
assert(length === s.getBytes.length)

testNativeColumnStats[StringType.type](
STRING,
(buffer: ByteBuffer, string: String) => {
val bytes = string.getBytes()
buffer.putInt(bytes.length).put(string.getBytes)
},
(buffer: ByteBuffer) => {
val length = buffer.getInt()
val bytes = new Array[Byte](length)
buffer.get(bytes, 0, length)
assert(s === new String(bytes))
}
}

test("BINARY") {
val buffer = ByteBuffer.allocate(128)
val seq = Array.fill(4) {
val bytes = new Array[Byte](4)
Random.nextBytes(bytes)
bytes
}
new String(bytes)
})

seq.foreach { bytes =>
testColumnStats[BinaryType.type, Array[Byte]](
BINARY,
(buffer: ByteBuffer, bytes: Array[Byte]) => {
buffer.putInt(bytes.length).put(bytes)
}

buffer.rewind()
seq.foreach { b =>
assert(b === BINARY.extract(buffer))
}

buffer.rewind()
seq.foreach(BINARY.append(_, buffer))

buffer.rewind()
seq.foreach { b =>
val length = buffer.getInt
assert(length === b.length)

},
(buffer: ByteBuffer) => {
val length = buffer.getInt()
val bytes = new Array[Byte](length)
buffer.get(bytes, 0, length)
assert(b === bytes)
}
}
bytes
})

test("GENERIC") {
val buffer = ByteBuffer.allocate(512)
Expand All @@ -188,14 +142,22 @@ class ColumnTypeSuite extends FunSuite {
assert(obj === SparkSqlSerializer.deserialize(GENERIC.extract(buffer)))
}

def testNumericColumnType[T <: DataType, JvmType](
def testNativeColumnStats[T <: NativeType](
columnType: NativeColumnType[T],
putter: (ByteBuffer, T#JvmType) => Unit,
getter: (ByteBuffer) => T#JvmType) {

testColumnStats[T, T#JvmType](columnType, putter, getter)
}

def testColumnStats[T <: DataType, JvmType](
columnType: ColumnType[T, JvmType],
seq: Seq[JvmType],
buffer: ByteBuffer,
putter: (ByteBuffer, JvmType) => Unit,
getter: (ByteBuffer) => JvmType) {

val buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE)
val columnTypeName = columnType.getClass.getSimpleName.stripSuffix("$")
val seq = (0 until 4).map(_ => makeRandomValue(columnType))

test(s"$columnTypeName.extract") {
buffer.rewind()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,37 +19,57 @@ package org.apache.spark.sql.columnar

import scala.util.Random

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
import org.apache.spark.sql.catalyst.types.DataType

// TODO Enrich test data
object ColumnarTestData {
object GenericMutableRow {
def apply(values: Any*) = {
val row = new GenericMutableRow(values.length)
row.indices.foreach { i =>
row(i) = values(i)
}
row
def makeNullRow(length: Int) = {
val row = new GenericMutableRow(length)
(0 until length).foreach(row.setNullAt)
row
}

def makeRandomValue[T <: DataType, JvmType](columnType: ColumnType[T, JvmType]): JvmType = {
def randomBytes(length: Int) = {
val bytes = new Array[Byte](length)
Random.nextBytes(bytes)
bytes
}

(columnType match {
case BYTE => (Random.nextInt(Byte.MaxValue * 2) - Byte.MaxValue).toByte
case SHORT => (Random.nextInt(Short.MaxValue * 2) - Short.MaxValue).toShort
case INT => Random.nextInt()
case LONG => Random.nextLong()
case FLOAT => Random.nextFloat()
case DOUBLE => Random.nextDouble()
case STRING => Random.nextString(Random.nextInt(32))
case BOOLEAN => Random.nextBoolean()
case BINARY => randomBytes(Random.nextInt(32))
case _ =>
// Using an random one-element map instead of an arbitrary object
Map(Random.nextInt() -> Random.nextString(Random.nextInt(32)))
}).asInstanceOf[JvmType]
}

def randomBytes(length: Int) = {
val bytes = new Array[Byte](length)
Random.nextBytes(bytes)
bytes
def makeRandomValues(
head: ColumnType[_ <: DataType, _],
tail: ColumnType[_ <: DataType, _]*): Seq[Any] = makeRandomValues(Seq(head) ++ tail)

def makeRandomValues(columnTypes: Seq[ColumnType[_ <: DataType, _]]): Seq[Any] = {
columnTypes.map(makeRandomValue(_))
}

val nonNullRandomRow = GenericMutableRow(
Random.nextInt(),
Random.nextLong(),
Random.nextFloat(),
Random.nextDouble(),
Random.nextBoolean(),
Random.nextInt(Byte.MaxValue).asInstanceOf[Byte],
Random.nextInt(Short.MaxValue).asInstanceOf[Short],
Random.nextString(Random.nextInt(64)),
randomBytes(Random.nextInt(64)),
Map(Random.nextInt() -> Random.nextString(4)))

val nullRow = GenericMutableRow(Seq.fill(10)(null): _*)
def makeRandomRow(
head: ColumnType[_ <: DataType, _],
tail: ColumnType[_ <: DataType, _]*): Row = makeRandomRow(Seq(head) ++ tail)

def makeRandomRow(columnTypes: Seq[ColumnType[_ <: DataType, _]]): Row = {
val row = new GenericMutableRow(columnTypes.length)
makeRandomValues(columnTypes).zipWithIndex.foreach { case (value, index) =>
row(index) = value
}
row
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,27 +47,29 @@ class NullableColumnAccessorSuite extends FunSuite {

def testNullableColumnAccessor[T <: DataType, JvmType](columnType: ColumnType[T, JvmType]) {
val typeName = columnType.getClass.getSimpleName.stripSuffix("$")
val nullRow = makeNullRow(1)

test(s"$typeName accessor: empty column") {
test(s"Nullable $typeName accessor: empty column") {
val builder = TestNullableColumnBuilder(columnType)
val accessor = TestNullableColumnAccessor(builder.build(), columnType)
assert(!accessor.hasNext)
}

test(s"$typeName accessor: access null values") {
test(s"Nullable $typeName accessor: access null values") {
val builder = TestNullableColumnBuilder(columnType)
val randomRow = makeRandomRow(columnType)

(0 until 4).foreach { _ =>
builder.appendFrom(nonNullRandomRow, columnType.typeId)
builder.appendFrom(nullRow, columnType.typeId)
builder.appendFrom(randomRow, 0)
builder.appendFrom(nullRow, 0)
}

val accessor = TestNullableColumnAccessor(builder.build(), columnType)
val row = new GenericMutableRow(1)

(0 until 4).foreach { _ =>
accessor.extractTo(row, 0)
assert(row(0) === nonNullRandomRow(columnType.typeId))
assert(row(0) === randomRow(0))

accessor.extractTo(row, 0)
assert(row(0) === null)
Expand Down
Loading

0 comments on commit c298b76

Please sign in to comment.