From 66b583d7b6c8119389c44d86474d98b324b12b62 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Mon, 7 Apr 2014 14:27:13 -0700 Subject: [PATCH] Fix the core test suite to compile --- .../java/org/apache/spark/JavaAPISuite.java | 32 ++++++++++++------- .../scala/org/apache/spark/FailureSuite.scala | 2 +- .../org/apache/spark/PipedRDDSuite.scala | 2 +- .../apache/spark/streaming/JavaAPISuite.java | 16 +++++++--- 4 files changed, 34 insertions(+), 18 deletions(-) diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 980a6236d3cf3..81c4a0223a465 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -78,7 +78,7 @@ public int compare(Integer a, Integer b) { } } - public int iteratorSize(Iterator a) { + private int iteratorSize(Iterator a) { int size = 0; while (a.hasNext()) { size++; @@ -87,7 +87,12 @@ public int iteratorSize(Iterator a) { return size; } - public String iteratorStr(Iterator a) { + private int iterableSize(Iterable a) { + return iteratorSize(a.iterator()); + } + + + private String iteratorStr(Iterator a) { StringBuilder str = new StringBuilder(); str.append("["); while (a.hasNext()) { @@ -100,6 +105,11 @@ public String iteratorStr(Iterator a) { return str.toString(); } + private String iterableStr(Iterable a) { + return iteratorStr(a.iterator()); + } + + @SuppressWarnings("unchecked") @Test public void sparkContextUnion() { @@ -220,7 +230,7 @@ public void lookup() { new Tuple2("Oranges", "Citrus") )); Assert.assertEquals(2, categories.lookup("Oranges").size()); - Assert.assertEquals(2, iteratorSize(categories.groupByKey().lookup("Oranges").get(0))); + Assert.assertEquals(2, iterableSize(categories.groupByKey().lookup("Oranges").get(0))); } @Test @@ -232,15 +242,15 @@ public Boolean call(Integer x) { return x % 2 == 0; } }; - JavaPairRDD> oddsAndEvens = rdd.groupBy(isOdd); + JavaPairRDD> oddsAndEvens = rdd.groupBy(isOdd); Assert.assertEquals(2, oddsAndEvens.count()); - Assert.assertEquals(2, iteratorSize(oddsAndEvens.lookup(true).get(0))); // Evens - Assert.assertEquals(5, iteratorSize(oddsAndEvens.lookup(false).get(0))); // Odds + Assert.assertEquals(2, iterableSize(oddsAndEvens.lookup(true).get(0))); // Evens + Assert.assertEquals(5, iterableSize(oddsAndEvens.lookup(false).get(0))); // Odds oddsAndEvens = rdd.groupBy(isOdd, 1); Assert.assertEquals(2, oddsAndEvens.count()); - Assert.assertEquals(2, iteratorSize(oddsAndEvens.lookup(true).get(0))); // Evens - Assert.assertEquals(5, iteratorSize(oddsAndEvens.lookup(false).get(0))); // Odds + Assert.assertEquals(2, iterableSize(oddsAndEvens.lookup(true).get(0))); // Evens + Assert.assertEquals(5, iterableSize(oddsAndEvens.lookup(false).get(0))); // Odds } @SuppressWarnings("unchecked") @@ -255,9 +265,9 @@ public void cogroup() { new Tuple2("Oranges", 2), new Tuple2("Apples", 3) )); - JavaPairRDD, Iterator>> cogrouped = categories.cogroup(prices); - Assert.assertEquals("[Fruit, Citrus]", iteratorStr(cogrouped.lookup("Oranges").get(0)._1())); - Assert.assertEquals("[2]", iteratorStr(cogrouped.lookup("Oranges").get(0)._2())); + JavaPairRDD, Iterable>> cogrouped = categories.cogroup(prices); + Assert.assertEquals("[Fruit, Citrus]", iterableStr(cogrouped.lookup("Oranges").get(0)._1())); + Assert.assertEquals("[2]", iterableStr(cogrouped.lookup("Oranges").get(0)._2())); cogrouped.collect(); } diff --git a/core/src/test/scala/org/apache/spark/FailureSuite.scala b/core/src/test/scala/org/apache/spark/FailureSuite.scala index fa052a10fd326..d6ee130f9fd13 100644 --- a/core/src/test/scala/org/apache/spark/FailureSuite.scala +++ b/core/src/test/scala/org/apache/spark/FailureSuite.scala @@ -72,7 +72,7 @@ class FailureSuite extends FunSuite with LocalSparkContext { throw new Exception("Intentional task failure") } } - val vHead = v.next() + val vHead = v.iterator.next() (k, vHead * vHead) }.collect() FailureSuiteState.synchronized { diff --git a/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala index d57b1d9c9dd81..867b28cc0d971 100644 --- a/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala @@ -85,7 +85,7 @@ class PipedRDDSuite extends FunSuite with SharedSparkContext { (f: String => Unit) => { bl.value.map(f(_)); f("\u0001") }, - (i: Tuple2[String, Iterator[String]], f: String => Unit) => { + (i: Tuple2[String, Iterable[String]], f: String => Unit) => { for (e <- i._2) { f(e + "_") } diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index 0644202de3434..97bcf41e2f645 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -23,6 +23,7 @@ import org.junit.Test; import java.io.*; import java.util.*; +import java.lang.Iterable; import com.google.common.base.Optional; import com.google.common.collect.Lists; @@ -52,6 +53,11 @@ public void equalIterator(Iterator a, Iterator b) { Assert.assertEquals(a.hasNext(), b.hasNext()); } + public void equalIterable(Iterable a, Iterable b) { + equalIterator(a.iterator(), b.iterator()); + } + + @SuppressWarnings("unchecked") @Test public void testCount() { @@ -1023,9 +1029,9 @@ public void testPairGroupByKey() { JavaDStream> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream> grouped = pairStream.groupByKey(); + JavaPairDStream> grouped = pairStream.groupByKey(); JavaTestUtils.attachTestOutputStream(grouped); - List>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + List>>> result = JavaTestUtils.runStreams(ssc, 2, 2); Assert.assertEquals(expected.size(), result.size()); Iterator>>> resultItr = result.iterator(); @@ -1148,7 +1154,7 @@ public void testGroupByKeyAndWindow() { JavaDStream> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream> groupWindowed = + JavaPairDStream> groupWindowed = pairStream.groupByKeyAndWindow(new Duration(2000), new Duration(1000)); JavaTestUtils.attachTestOutputStream(groupWindowed); List>>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -1491,9 +1497,9 @@ public void testCoGroup() { ssc, stringStringKVStream2, 1); JavaPairDStream pairStream2 = JavaPairDStream.fromJavaDStream(stream2); - JavaPairDStream, Iterator>> grouped = pairStream1.cogroup(pairStream2); + JavaPairDStream, Iterable>> grouped = pairStream1.cogroup(pairStream2); JavaTestUtils.attachTestOutputStream(grouped); - List, Iterator>>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + List, Iterable>>>> result = JavaTestUtils.runStreams(ssc, 2, 2); Assert.assertEquals(expected.size(), result.size()); Iterator, Iterator>>>> resultItr = result.iterator();