Skip to content

Commit

Permalink
If guava 14 had iterables
Browse files Browse the repository at this point in the history
  • Loading branch information
holdenk committed Apr 8, 2014
1 parent 2d06e10 commit c5075aa
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
*/
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
val wlist = ws.toList
for (v <- vs; w <- wlist.iterator) yield (v, w)
for (v <- vs; w <- ws) yield (v, w)
}
}

Expand All @@ -314,8 +313,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
if (ws.isEmpty) {
vs.map(v => (v, None))
} else {
val wlist = ws.toList
for (v <- vs; w <- wlist.iterator) yield (v, Some(w))
for (v <- vs; w <- ws) yield (v, Some(w))
}
}
}
Expand All @@ -332,8 +330,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
if (vs.isEmpty) {
ws.map(w => (None, w))
} else {
val wlist = ws.toList
for (v <- vs; w <- wlist) yield (Some(v), w)
for (v <- vs; w <- ws) yield (Some(v), w)
}
}
}
Expand Down
26 changes: 3 additions & 23 deletions core/src/test/java/org/apache/spark/JavaAPISuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import scala.Tuple2;

import com.google.common.collect.Lists;
import com.google.collect.Iterables;
import com.google.common.base.Optional;
import com.google.common.base.Charsets;
import com.google.common.io.Files;
Expand Down Expand Up @@ -78,35 +79,14 @@ public int compare(Integer a, Integer b) {
}
}

private int iteratorSize(Iterator<?> a) {
int size = 0;
while (a.hasNext()) {
size++;
a.next();
}
return size;
}

private int iterableSize(Iterable<?> a) {
return iteratorSize(a.iterator());
return Iterables.size(a.iterator());
}


private String iteratorStr(Iterator<?> a) {
StringBuilder str = new StringBuilder();
str.append("[");
while (a.hasNext()) {
str.append(a.next().toString());
if (a.hasNext()) {
str.append(", ");
}
}
str.append("]");
return str.toString();
}

private String iterableStr(Iterable<?> a) {
return iteratorStr(a.iterator());
return Iterables.toString(a.iterator());
}


Expand Down
3 changes: 1 addition & 2 deletions core/src/test/scala/org/apache/spark/FailureSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ class FailureSuite extends FunSuite with LocalSparkContext {
throw new Exception("Intentional task failure")
}
}
val vHead = v.iterator.next()
(k, vHead * vHead)
(k, v.head * v.head)
}.collect()
FailureSuiteState.synchronized {
assert(FailureSuiteState.tasksRun === 4)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package org.apache.spark.examples;


import scala.Tuple2;

import com.google.collect.Iterables;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
Expand Down Expand Up @@ -90,12 +93,7 @@ public Double call(Iterable<String> rs) {
.flatMapToPair(new PairFlatMapFunction<Tuple2<Iterable<String>, Double>, String, Double>() {
@Override
public Iterable<Tuple2<String, Double>> call(Tuple2<Iterable<String>, Double> s) {
int urlCount = 0;
Iterator<String> urls = s._1.iterator();
while (urls.hasNext()) {
urls.next();
urlCount++;
}
int urlCount = Iterables.size(s._1);
List<Tuple2<String, Double>> results = new ArrayList<Tuple2<String, Double>>();
for (String n : s._1) {
results.add(new Tuple2<String, Double>(n, s._2() / urlCount));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import scala.Tuple2;

import com.google.collections.Iterables;
import com.google.common.base.Optional;
import com.google.common.io.Files;
import org.apache.hadoop.io.IntWritable;
Expand Down Expand Up @@ -60,16 +61,6 @@ public void tearDown() {
System.clearProperty("spark.driver.port");
}

private int iterableSize(Iterable<?> a) {
int count = 0;
Iterator aItr = a.iterator();
while (aItr.hasNext()) {
aItr.next();
count++;
}
return count;
}

@Test
public void foreachWithAnonymousClass() {
foreachCalls = 0;
Expand Down Expand Up @@ -97,13 +88,13 @@ public void groupBy() {
Function<Integer, Boolean> isOdd = x -> x % 2 == 0;
JavaPairRDD<Boolean, Iterable<Integer>> oddsAndEvens = rdd.groupBy(isOdd);
Assert.assertEquals(2, oddsAndEvens.count());
Assert.assertEquals(2, iterableSize(oddsAndEvens.lookup(true).get(0))); // Evens
Assert.assertEquals(5, iterableSize(oddsAndEvens.lookup(false).get(0))); // Odds
Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens
Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds

oddsAndEvens = rdd.groupBy(isOdd, 1);
Assert.assertEquals(2, oddsAndEvens.count());
Assert.assertEquals(2, iterableSize(oddsAndEvens.lookup(true).get(0))); // Evens
Assert.assertEquals(5, iterableSize(oddsAndEvens.lookup(false).get(0))); // Odds
Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens
Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds
}

@Test
Expand Down

0 comments on commit c5075aa

Please sign in to comment.