diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index c0e125e3af88b..0644202de3434 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -1493,9 +1493,23 @@ public void testCoGroup() { JavaPairDStream, Iterator>> grouped = pairStream1.cogroup(pairStream2); JavaTestUtils.attachTestOutputStream(grouped); - List, List>>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + List, Iterator>>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - Assert.assertEquals(expected, result); + Assert.assertEquals(expected.size(), result.size()); + Iterator, Iterator>>>> resultItr = result.iterator(); + Iterator, List>>>> expectedItr = expected.iterator(); + while (resultItr.hasNext() && expectedItr.hasNext()) { + Iterator, Iterator>>> resultElements = resultItr.next().iterator(); + Iterator, List>>> expectedElements = expectedItr.next().iterator(); + while (resultElements.hasNext() && expectedElements.hasNext()) { + Tuple2, Iterator>> resultElement = resultElements.next(); + Tuple2, List>> expectedElement = expectedElements.next(); + Assert.assertEquals(expectedElement._1(), resultElement._1()); + equalIterator(expectedElement._2()._1().iterator(), resultElement._2()._1()); + equalIterator(expectedElement._2()._2().iterator(), resultElement._2()._2()); + } + Assert.assertEquals(resultElements.hasNext(), expectedElements.hasNext()); + } } @SuppressWarnings("unchecked")