Skip to content

Commit

Permalink
[SPARK-4459] Change groupBy type parameter from K to U
Browse files Browse the repository at this point in the history
Please see https://issues.apache.org/jira/browse/SPARK-4459

Author: Saldanha <saldaal1@phusca-l24858.wlan.na.novartis.net>

Closes apache#3327 from alokito/master and squashes the following commits:

54b1095 [Saldanha] [SPARK-4459] changed type parameter for keyBy from K to U
d5f73c3 [Saldanha] [SPARK-4459] added keyBy test
316ad77 [Saldanha] SPARK-4459 changed type parameter for groupBy from K to U.
62ddd4b [Saldanha] SPARK-4459 added failing unit test
  • Loading branch information
Saldanha authored and JoshRosen committed Dec 4, 2014
1 parent 794f3ae commit 743a889
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 7 deletions.
17 changes: 10 additions & 7 deletions core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
* mapping to that key.
*/
def groupBy[K](f: JFunction[T, K]): JavaPairRDD[K, JIterable[T]] = {
implicit val ctagK: ClassTag[K] = fakeClassTag
def groupBy[U](f: JFunction[T, U]): JavaPairRDD[U, JIterable[T]] = {
// The type parameter is U instead of K in order to work around a compiler bug; see SPARK-4459
implicit val ctagK: ClassTag[U] = fakeClassTag
implicit val ctagV: ClassTag[JList[T]] = fakeClassTag
JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f)(fakeClassTag)))
}
Expand All @@ -221,10 +222,11 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
* mapping to that key.
*/
def groupBy[K](f: JFunction[T, K], numPartitions: Int): JavaPairRDD[K, JIterable[T]] = {
implicit val ctagK: ClassTag[K] = fakeClassTag
def groupBy[U](f: JFunction[T, U], numPartitions: Int): JavaPairRDD[U, JIterable[T]] = {
// The type parameter is U instead of K in order to work around a compiler bug; see SPARK-4459
implicit val ctagK: ClassTag[U] = fakeClassTag
implicit val ctagV: ClassTag[JList[T]] = fakeClassTag
JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(fakeClassTag[K])))
JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(fakeClassTag[U])))
}

/**
Expand Down Expand Up @@ -458,8 +460,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Creates tuples of the elements in this RDD by applying `f`.
*/
def keyBy[K](f: JFunction[T, K]): JavaPairRDD[K, T] = {
implicit val ctag: ClassTag[K] = fakeClassTag
def keyBy[U](f: JFunction[T, U]): JavaPairRDD[U, T] = {
// The type parameter is U instead of K in order to work around a compiler bug; see SPARK-4459
implicit val ctag: ClassTag[U] = fakeClassTag
JavaPairRDD.fromRDD(rdd.keyBy(f))
}

Expand Down
41 changes: 41 additions & 0 deletions core/src/test/java/org/apache/spark/JavaAPISuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,47 @@ public Boolean call(Integer x) {
Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds
}

@Test
public void groupByOnPairRDD() {
// Regression test for SPARK-4459
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
Function<Tuple2<Integer, Integer>, Boolean> areOdd =
new Function<Tuple2<Integer, Integer>, Boolean>() {
@Override
public Boolean call(Tuple2<Integer, Integer> x) {
return (x._1() % 2 == 0) && (x._2() % 2 == 0);
}
};
JavaPairRDD<Integer, Integer> pairRDD = rdd.zip(rdd);
JavaPairRDD<Boolean, Iterable<Tuple2<Integer, Integer>>> oddsAndEvens = pairRDD.groupBy(areOdd);
Assert.assertEquals(2, oddsAndEvens.count());
Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens
Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds

oddsAndEvens = pairRDD.groupBy(areOdd, 1);
Assert.assertEquals(2, oddsAndEvens.count());
Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens
Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds
}

@SuppressWarnings("unchecked")
@Test
public void keyByOnPairRDD() {
// Regression test for SPARK-4459
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
Function<Tuple2<Integer, Integer>, String> sumToString =
new Function<Tuple2<Integer, Integer>, String>() {
@Override
public String call(Tuple2<Integer, Integer> x) {
return String.valueOf(x._1() + x._2());
}
};
JavaPairRDD<Integer, Integer> pairRDD = rdd.zip(rdd);
JavaPairRDD<String, Tuple2<Integer, Integer>> keyed = pairRDD.keyBy(sumToString);
Assert.assertEquals(7, keyed.count());
Assert.assertEquals(1, (long) keyed.lookup("2").get(0)._1());
}

@SuppressWarnings("unchecked")
@Test
public void cogroup() {
Expand Down

0 comments on commit 743a889

Please sign in to comment.