Skip to content

Commit

Permalink
Added Mridul's test changes for ExternalAppendOnlyMap
Browse files Browse the repository at this point in the history
  • Loading branch information
mateiz committed Aug 1, 2014
1 parent 9a78e4b commit 0d6dad7
Showing 1 changed file with 22 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,19 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
private def mergeValue(buffer: ArrayBuffer[Int], i: Int) = buffer += i
private def mergeCombiners(buf1: ArrayBuffer[Int], buf2: ArrayBuffer[Int]) = buf1 ++= buf2

private def createSparkConf(loadDefaults: Boolean): SparkConf = {
val conf = new SparkConf(loadDefaults)
// Make the Java serializer write a reset instruction (TC_RESET) after each object to test
// for a bug we had with bytes written past the last object in a batch (SPARK-2792)
conf.set("spark.serializer.objectStreamReset", "0")
conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
// Ensure that we actually have multiple batches per spill file
conf.set("spark.shuffle.spill.batchSize", "10")
conf
}

test("simple insert") {
val conf = new SparkConf(false)
val conf = createSparkConf(false)
sc = new SparkContext("local", "test", conf)

val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
Expand All @@ -57,7 +68,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
}

test("insert with collision") {
val conf = new SparkConf(false)
val conf = createSparkConf(false)
sc = new SparkContext("local", "test", conf)

val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
Expand All @@ -80,7 +91,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
}

test("ordering") {
val conf = new SparkConf(false)
val conf = createSparkConf(false)
sc = new SparkContext("local", "test", conf)

val map1 = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
Expand Down Expand Up @@ -125,7 +136,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
}

test("null keys and values") {
val conf = new SparkConf(false)
val conf = createSparkConf(false)
sc = new SparkContext("local", "test", conf)

val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
Expand Down Expand Up @@ -166,7 +177,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
}

test("simple aggregator") {
val conf = new SparkConf(false)
val conf = createSparkConf(false)
sc = new SparkContext("local", "test", conf)

// reduceByKey
Expand All @@ -181,7 +192,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
}

test("simple cogroup") {
val conf = new SparkConf(false)
val conf = createSparkConf(false)
sc = new SparkContext("local", "test", conf)
val rdd1 = sc.parallelize(1 to 4).map(i => (i, i))
val rdd2 = sc.parallelize(1 to 4).map(i => (i%2, i))
Expand All @@ -199,7 +210,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
}

test("spilling") {
val conf = new SparkConf(true) // Load defaults, otherwise SPARK_HOME is not found
val conf = createSparkConf(true) // Load defaults, otherwise SPARK_HOME is not found
conf.set("spark.shuffle.memoryFraction", "0.001")
sc = new SparkContext("local-cluster[1,1,512]", "test", conf)

Expand Down Expand Up @@ -249,7 +260,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
}

test("spilling with hash collisions") {
val conf = new SparkConf(true)
val conf = createSparkConf(true)
conf.set("spark.shuffle.memoryFraction", "0.001")
sc = new SparkContext("local-cluster[1,1,512]", "test", conf)

Expand Down Expand Up @@ -304,7 +315,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
}

test("spilling with many hash collisions") {
val conf = new SparkConf(true)
val conf = createSparkConf(true)
conf.set("spark.shuffle.memoryFraction", "0.0001")
sc = new SparkContext("local-cluster[1,1,512]", "test", conf)

Expand All @@ -329,7 +340,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
}

test("spilling with hash collisions using the Int.MaxValue key") {
val conf = new SparkConf(true)
val conf = createSparkConf(true)
conf.set("spark.shuffle.memoryFraction", "0.001")
sc = new SparkContext("local-cluster[1,1,512]", "test", conf)

Expand All @@ -347,7 +358,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
}

test("spilling with null keys and values") {
val conf = new SparkConf(true)
val conf = createSparkConf(true)
conf.set("spark.shuffle.memoryFraction", "0.001")
sc = new SparkContext("local-cluster[1,1,512]", "test", conf)

Expand Down

0 comments on commit 0d6dad7

Please sign in to comment.