Skip to content

Commit

Permalink
Ensure SynchronizedBuffer is used in every TestSuiteBase
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed May 7, 2015
1 parent 9f1f9b1 commit 6aea07a
Showing 1 changed file with 5 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class TestInputStream[T: ClassTag](ssc_ : StreamingContext, input: Seq[Seq[T]],
* The buffer contains a sequence of RDD's, each containing a sequence of items
*/
class TestOutputStream[T: ClassTag](parent: DStream[T],
val output: ArrayBuffer[Seq[T]] = ArrayBuffer[Seq[T]]())
val output: SynchronizedBuffer[Seq[T]] = new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]])
extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
val collected = rdd.collect()
output += collected
Expand All @@ -95,8 +95,10 @@ class TestOutputStream[T: ClassTag](parent: DStream[T],
* The buffer contains a sequence of RDD's, each containing a sequence of partitions, each
* containing a sequence of items.
*/
class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T],
val output: ArrayBuffer[Seq[Seq[T]]] = ArrayBuffer[Seq[Seq[T]]]())
class TestOutputStreamWithPartitions[T: ClassTag](
parent: DStream[T],
val output: SynchronizedBuffer[Seq[Seq[T]]] =
new ArrayBuffer[Seq[Seq[T]]] with SynchronizedBuffer[Seq[Seq[T]]])
extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
val collected = rdd.glom().collect().map(_.toSeq)
output += collected
Expand All @@ -108,10 +110,6 @@ class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T],
ois.defaultReadObject()
output.clear()
}

def toTestOutputStream: TestOutputStream[T] = {
new TestOutputStream[T](this.parent, this.output.map(_.flatten))
}
}

/**
Expand Down

0 comments on commit 6aea07a

Please sign in to comment.