diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala index c5d9b059c1410..fa269da89b905 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala @@ -72,7 +72,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) val mergeValue = (c: ArrayBuffer[V], v: V) => (c += v) val mergeCombiner = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => (c1 ++ c2) combineByKey(createCombiner, mergeValue, mergeCombiner, partitioner) - .asInstanceOf[DStream[(K, Iterator[V])]] + .asInstanceOf[DStream[(K, Seq[V])]].mapValues(_.toIterator) } /** @@ -188,7 +188,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) self.groupByKey(partitioner) .window(windowDuration, slideDuration) .combineByKey[ArrayBuffer[V]](createCombiner, mergeValue, mergeCombiner, partitioner) - .asInstanceOf[DStream[(K, Iterator[V])]] + .asInstanceOf[DStream[(K, Seq[V])]].mapValues(_.toIterator) } /**