Skip to content

Commit

Permalink
Fix the core test suite to compile
Browse files Browse the repository at this point in the history
  • Loading branch information
holdenk committed Apr 8, 2014
1 parent 4ed579b commit 66b583d
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 18 deletions.
32 changes: 21 additions & 11 deletions core/src/test/java/org/apache/spark/JavaAPISuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public int compare(Integer a, Integer b) {
}
}

public int iteratorSize(Iterator<?> a) {
private int iteratorSize(Iterator<?> a) {
int size = 0;
while (a.hasNext()) {
size++;
Expand All @@ -87,7 +87,12 @@ public int iteratorSize(Iterator<?> a) {
return size;
}

public String iteratorStr(Iterator<?> a) {
private int iterableSize(Iterable<?> a) {
return iteratorSize(a.iterator());
}


private String iteratorStr(Iterator<?> a) {
StringBuilder str = new StringBuilder();
str.append("[");
while (a.hasNext()) {
Expand All @@ -100,6 +105,11 @@ public String iteratorStr(Iterator<?> a) {
return str.toString();
}

private String iterableStr(Iterable<?> a) {
return iteratorStr(a.iterator());
}


@SuppressWarnings("unchecked")
@Test
public void sparkContextUnion() {
Expand Down Expand Up @@ -220,7 +230,7 @@ public void lookup() {
new Tuple2<String, String>("Oranges", "Citrus")
));
Assert.assertEquals(2, categories.lookup("Oranges").size());
Assert.assertEquals(2, iteratorSize(categories.groupByKey().lookup("Oranges").get(0)));
Assert.assertEquals(2, iterableSize(categories.groupByKey().lookup("Oranges").get(0)));
}

@Test
Expand All @@ -232,15 +242,15 @@ public Boolean call(Integer x) {
return x % 2 == 0;
}
};
JavaPairRDD<Boolean, Iterator<Integer>> oddsAndEvens = rdd.groupBy(isOdd);
JavaPairRDD<Boolean, Iterable<Integer>> oddsAndEvens = rdd.groupBy(isOdd);
Assert.assertEquals(2, oddsAndEvens.count());
Assert.assertEquals(2, iteratorSize(oddsAndEvens.lookup(true).get(0))); // Evens
Assert.assertEquals(5, iteratorSize(oddsAndEvens.lookup(false).get(0))); // Odds
Assert.assertEquals(2, iterableSize(oddsAndEvens.lookup(true).get(0))); // Evens
Assert.assertEquals(5, iterableSize(oddsAndEvens.lookup(false).get(0))); // Odds

oddsAndEvens = rdd.groupBy(isOdd, 1);
Assert.assertEquals(2, oddsAndEvens.count());
Assert.assertEquals(2, iteratorSize(oddsAndEvens.lookup(true).get(0))); // Evens
Assert.assertEquals(5, iteratorSize(oddsAndEvens.lookup(false).get(0))); // Odds
Assert.assertEquals(2, iterableSize(oddsAndEvens.lookup(true).get(0))); // Evens
Assert.assertEquals(5, iterableSize(oddsAndEvens.lookup(false).get(0))); // Odds
}

@SuppressWarnings("unchecked")
Expand All @@ -255,9 +265,9 @@ public void cogroup() {
new Tuple2<String, Integer>("Oranges", 2),
new Tuple2<String, Integer>("Apples", 3)
));
JavaPairRDD<String, Tuple2<Iterator<String>, Iterator<Integer>>> cogrouped = categories.cogroup(prices);
Assert.assertEquals("[Fruit, Citrus]", iteratorStr(cogrouped.lookup("Oranges").get(0)._1()));
Assert.assertEquals("[2]", iteratorStr(cogrouped.lookup("Oranges").get(0)._2()));
JavaPairRDD<String, Tuple2<Iterable<String>, Iterable<Integer>>> cogrouped = categories.cogroup(prices);
Assert.assertEquals("[Fruit, Citrus]", iterableStr(cogrouped.lookup("Oranges").get(0)._1()));
Assert.assertEquals("[2]", iterableStr(cogrouped.lookup("Oranges").get(0)._2()));

cogrouped.collect();
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/org/apache/spark/FailureSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class FailureSuite extends FunSuite with LocalSparkContext {
throw new Exception("Intentional task failure")
}
}
val vHead = v.next()
val vHead = v.iterator.next()
(k, vHead * vHead)
}.collect()
FailureSuiteState.synchronized {
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/org/apache/spark/PipedRDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class PipedRDDSuite extends FunSuite with SharedSparkContext {
(f: String => Unit) => {
bl.value.map(f(_)); f("\u0001")
},
(i: Tuple2[String, Iterator[String]], f: String => Unit) => {
(i: Tuple2[String, Iterable[String]], f: String => Unit) => {
for (e <- i._2) {
f(e + "_")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.junit.Test;
import java.io.*;
import java.util.*;
import java.lang.Iterable;

import com.google.common.base.Optional;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -52,6 +53,11 @@ public void equalIterator(Iterator<?> a, Iterator<?> b) {
Assert.assertEquals(a.hasNext(), b.hasNext());
}

public void equalIterable(Iterable<?> a, Iterable<?> b) {
equalIterator(a.iterator(), b.iterator());
}


@SuppressWarnings("unchecked")
@Test
public void testCount() {
Expand Down Expand Up @@ -1023,9 +1029,9 @@ public void testPairGroupByKey() {
JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);

JavaPairDStream<String, Iterator<String>> grouped = pairStream.groupByKey();
JavaPairDStream<String, Iterable<String>> grouped = pairStream.groupByKey();
JavaTestUtils.attachTestOutputStream(grouped);
List<List<Tuple2<String, Iterator<String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
List<List<Tuple2<String, Iterable<String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);

Assert.assertEquals(expected.size(), result.size());
Iterator<List<Tuple2<String, Iterator<String>>>> resultItr = result.iterator();
Expand Down Expand Up @@ -1148,7 +1154,7 @@ public void testGroupByKeyAndWindow() {
JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);

JavaPairDStream<String, Iterator<Integer>> groupWindowed =
JavaPairDStream<String, Iterable<Integer>> groupWindowed =
pairStream.groupByKeyAndWindow(new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(groupWindowed);
List<List<Tuple2<String, List<Integer>>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
Expand Down Expand Up @@ -1491,9 +1497,9 @@ public void testCoGroup() {
ssc, stringStringKVStream2, 1);
JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);

JavaPairDStream<String, Tuple2<Iterator<String>, Iterator<String>>> grouped = pairStream1.cogroup(pairStream2);
JavaPairDStream<String, Tuple2<Iterator<String>, Iterable<String>>> grouped = pairStream1.cogroup(pairStream2);
JavaTestUtils.attachTestOutputStream(grouped);
List<List<Tuple2<String, Tuple2<Iterator<String>, Iterator<String>>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
List<List<Tuple2<String, Tuple2<Iterator<String>, Iterable<String>>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);

Assert.assertEquals(expected.size(), result.size());
Iterator<List<Tuple2<String, Tuple2<Iterator<String>, Iterator<String>>>>> resultItr = result.iterator();
Expand Down

0 comments on commit 66b583d

Please sign in to comment.