Skip to content

Commit

Permalink
Fix style issues
Browse files Browse the repository at this point in the history
  • Loading branch information
holdenk committed Apr 8, 2014
1 parent 71e8b9f commit b4e0b1d
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,8 @@ object JavaPairRDD {

private[spark]
def cogroupResult2ToJava[K: ClassTag, V, W1, W2](
rdd: RDD[(K, (Iterator[V], Iterator[W1], Iterator[W2]))]): RDD[(K, (JIterator[V], JIterator[W1], JIterator[W2]))] = {
rdd: RDD[(K, (Iterator[V], Iterator[W1], Iterator[W2]))])
: RDD[(K, (JIterator[V], JIterator[W1], JIterator[W2]))] = {
rddToPairRDDFunctions(rdd)
.mapValues(x => (asJavaIterator(x._1), asJavaIterator(x._2), asJavaIterator(x._3)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,8 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterator[V], Iterator[W]))] = {
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Iterator[V], Iterator[W]))] = {
if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
Expand All @@ -477,7 +478,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
}
val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
cg.mapValues { case Seq(vs, w1s, w2s) =>
(vs.asInstanceOf[Seq[V]].iterator, w1s.asInstanceOf[Seq[W1]].iterator, w2s.asInstanceOf[Seq[W2]].iterator)
(vs.asInstanceOf[Seq[V]].iterator,
w1s.asInstanceOf[Seq[W1]].iterator,
w2s.asInstanceOf[Seq[W2]].iterator)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
*/
def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Iterator[V])] =
def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration)
: DStream[(K, Iterator[V])] =
{
groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner())
}
Expand Down

0 comments on commit b4e0b1d

Please sign in to comment.