Skip to content

Commit

Permalink
Fix cogroup test in JavaAPISuite for streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
holdenk committed Apr 8, 2014
1 parent a5ee714 commit 88a5cef
Showing 1 changed file with 16 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1493,9 +1493,23 @@ public void testCoGroup() {

JavaPairDStream<String, Tuple2<Iterator<String>, Iterator<String>>> grouped = pairStream1.cogroup(pairStream2);
JavaTestUtils.attachTestOutputStream(grouped);
List<List<Tuple2<String, Tuple2<List<String>, List<String>>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
List<List<Tuple2<String, Tuple2<Iterator<String>, Iterator<String>>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);

Assert.assertEquals(expected, result);
Assert.assertEquals(expected.size(), result.size());
Iterator<List<Tuple2<String, Tuple2<Iterator<String>, Iterator<String>>>>> resultItr = result.iterator();
Iterator<List<Tuple2<String, Tuple2<List<String>, List<String>>>>> expectedItr = expected.iterator();
while (resultItr.hasNext() && expectedItr.hasNext()) {
Iterator<Tuple2<String, Tuple2<Iterator<String>, Iterator<String>>>> resultElements = resultItr.next().iterator();
Iterator<Tuple2<String, Tuple2<List<String>, List<String>>>> expectedElements = expectedItr.next().iterator();
while (resultElements.hasNext() && expectedElements.hasNext()) {
Tuple2<String, Tuple2<Iterator<String>, Iterator<String>>> resultElement = resultElements.next();
Tuple2<String, Tuple2<List<String>, List<String>>> expectedElement = expectedElements.next();
Assert.assertEquals(expectedElement._1(), resultElement._1());
equalIterator(expectedElement._2()._1().iterator(), resultElement._2()._1());
equalIterator(expectedElement._2()._2().iterator(), resultElement._2()._2());
}
Assert.assertEquals(resultElements.hasNext(), expectedElements.hasNext());
}
}

@SuppressWarnings("unchecked")
Expand Down

0 comments on commit 88a5cef

Please sign in to comment.