Skip to content

Commit

Permalink
Make parquet tests less order dependent
Browse files Browse the repository at this point in the history
  • Loading branch information
marmbrus committed Sep 13, 2014
1 parent b42eeab commit 0bdbf21
Showing 1 changed file with 31 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ case class AllDataTypes(
doubleField: Double,
shortField: Short,
byteField: Byte,
booleanField: Boolean,
binaryField: Array[Byte])
booleanField: Boolean)

case class AllDataTypesWithNonPrimitiveType(
stringField: String,
Expand All @@ -70,13 +69,14 @@ case class AllDataTypesWithNonPrimitiveType(
shortField: Short,
byteField: Byte,
booleanField: Boolean,
binaryField: Array[Byte],
array: Seq[Int],
arrayContainsNull: Seq[Option[Int]],
map: Map[Int, Long],
mapValueContainsNull: Map[Int, Option[Long]],
data: Data)

case class BinaryData(binaryData: Array[Byte])

class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterAll {
TestData // Load test data tables.

Expand Down Expand Up @@ -108,23 +108,23 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
test("Read/Write All Types") {
val tempDir = getTempFilePath("parquetTest").getCanonicalPath
val range = (0 to 255)
TestSQLContext.sparkContext.parallelize(range)
.map(x => AllDataTypes(s"$x", x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 == 0,
(0 to x).map(_.toByte).toArray))
.saveAsParquetFile(tempDir)
val result = parquetFile(tempDir).collect()
range.foreach {
i =>
assert(result(i).getString(0) == s"$i", s"row $i String field did not match, got ${result(i).getString(0)}")
assert(result(i).getInt(1) === i)
assert(result(i).getLong(2) === i.toLong)
assert(result(i).getFloat(3) === i.toFloat)
assert(result(i).getDouble(4) === i.toDouble)
assert(result(i).getShort(5) === i.toShort)
assert(result(i).getByte(6) === i.toByte)
assert(result(i).getBoolean(7) === (i % 2 == 0))
assert(result(i)(8) === (0 to i).map(_.toByte).toArray)
}
val data = sparkContext.parallelize(range)
.map(x => AllDataTypes(s"$x", x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 == 0))

data.saveAsParquetFile(tempDir)

checkAnswer(
parquetFile(tempDir),
data.toSchemaRDD.collect().toSeq)
}

test("read/write binary data") {
// Since equality for Array[Byte] is broken we test this separately.
val tempDir = getTempFilePath("parquetTest").getCanonicalPath
sparkContext.parallelize(BinaryData("test".getBytes("utf8")) :: Nil).saveAsParquetFile(tempDir)
parquetFile(tempDir)
.map(r => new String(r(0).asInstanceOf[Array[Byte]], "utf8"))
.collect().toSeq == Seq("test")
}

test("Treat binary as string") {
Expand Down Expand Up @@ -275,34 +275,19 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
test("Read/Write All Types with non-primitive type") {
val tempDir = getTempFilePath("parquetTest").getCanonicalPath
val range = (0 to 255)
TestSQLContext.sparkContext.parallelize(range)
val data = sparkContext.parallelize(range)
.map(x => AllDataTypesWithNonPrimitiveType(
s"$x", x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 == 0,
(0 to x).map(_.toByte).toArray,
(0 until x),
(0 until x).map(Option(_).filter(_ % 3 == 0)),
(0 until x).map(i => i -> i.toLong).toMap,
(0 until x).map(i => i -> Option(i.toLong)).toMap + (x -> None),
Data((0 until x), Nested(x, s"$x"))))
.saveAsParquetFile(tempDir)
val result = parquetFile(tempDir).collect()
range.foreach {
i =>
assert(result(i).getString(0) == s"$i", s"row $i String field did not match, got ${result(i).getString(0)}")
assert(result(i).getInt(1) === i)
assert(result(i).getLong(2) === i.toLong)
assert(result(i).getFloat(3) === i.toFloat)
assert(result(i).getDouble(4) === i.toDouble)
assert(result(i).getShort(5) === i.toShort)
assert(result(i).getByte(6) === i.toByte)
assert(result(i).getBoolean(7) === (i % 2 == 0))
assert(result(i)(8) === (0 to i).map(_.toByte).toArray)
assert(result(i)(9) === (0 until i))
assert(result(i)(10) === (0 until i).map(i => if (i % 3 == 0) i else null))
assert(result(i)(11) === (0 until i).map(i => i -> i.toLong).toMap)
assert(result(i)(12) === (0 until i).map(i => i -> i.toLong).toMap + (i -> null))
assert(result(i)(13) === new GenericRow(Array[Any]((0 until i), new GenericRow(Array[Any](i, s"$i")))))
}
data.saveAsParquetFile(tempDir)

checkAnswer(
parquetFile(tempDir),
data.toSchemaRDD.collect().toSeq)
}

test("self-join parquet files") {
Expand Down Expand Up @@ -399,23 +384,6 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
}
}

test("Saving case class RDD table to file and reading it back in") {
val file = getTempFilePath("parquet")
val path = file.toString
val rdd = TestSQLContext.sparkContext.parallelize((1 to 100))
.map(i => TestRDDEntry(i, s"val_$i"))
rdd.saveAsParquetFile(path)
val readFile = parquetFile(path)
readFile.registerTempTable("tmpx")
val rdd_copy = sql("SELECT * FROM tmpx").collect()
val rdd_orig = rdd.collect()
for(i <- 0 to 99) {
assert(rdd_copy(i).apply(0) === rdd_orig(i).key, s"key error in line $i")
assert(rdd_copy(i).apply(1) === rdd_orig(i).value, s"value error in line $i")
}
Utils.deleteRecursively(file)
}

test("Read a parquet file instead of a directory") {
val file = getTempFilePath("parquet")
val path = file.toString
Expand Down Expand Up @@ -453,14 +421,14 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
// TODO: why does collecting break things? It seems InsertIntoParquet::execute() is
// executed twice otherwise?!
sql("INSERT INTO dest SELECT * FROM source")
val rdd_copy2 = sql("SELECT * FROM dest").collect()
val rdd_copy2 = sql("SELECT * FROM dest").collect().sortBy(_.getInt(0))
assert(rdd_copy2.size === 200)
assert(rdd_copy2(0).apply(0) === 1)
assert(rdd_copy2(0).apply(1) === "val_1")
assert(rdd_copy2(99).apply(0) === 100)
assert(rdd_copy2(99).apply(1) === "val_100")
assert(rdd_copy2(100).apply(0) === 1)
assert(rdd_copy2(100).apply(1) === "val_1")
assert(rdd_copy2(99).apply(0) === 50)
assert(rdd_copy2(99).apply(1) === "val_50")
assert(rdd_copy2(199).apply(0) === 100)
assert(rdd_copy2(199).apply(1) === "val_100")
Utils.deleteRecursively(dirname)
}

Expand Down

0 comments on commit 0bdbf21

Please sign in to comment.