From ec8cc3ee3b728d35de0a73ffa5c839e880136ef1 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Wed, 26 Mar 2014 19:33:18 -0700 Subject: [PATCH] Fix test issues\! --- .../spark/streaming/BasicOperationsSuite.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index 270ceb081480f..8aec27e39478a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -117,8 +117,8 @@ class BasicOperationsSuite extends TestSuiteBase { test("groupByKey") { testOperation( Seq( Seq("a", "a", "b"), Seq("", ""), Seq() ), - (s: DStream[String]) => s.map(x => (x, 1)).groupByKey(), - Seq( Seq(("a", Seq(1, 1).toIterator), ("b", Seq(1).toIterator)), Seq(("", Seq(1, 1).toIterator)), Seq() ), + (s: DStream[String]) => s.map(x => (x, 1)).groupByKey().mapValues(_.toSeq), + Seq( Seq(("a", Seq(1, 1)), ("b", Seq(1))), Seq(("", Seq(1, 1))), Seq() ), true ) } @@ -245,13 +245,13 @@ class BasicOperationsSuite extends TestSuiteBase { val inputData1 = Seq( Seq("a", "a", "b"), Seq("a", ""), Seq(""), Seq() ) val inputData2 = Seq( Seq("a", "a", "b"), Seq("b", ""), Seq(), Seq() ) val outputData = Seq( - Seq( ("a", (Seq(1, 1).toIterator, Seq("x", "x").toIterator)), ("b", (Seq(1).toIterator, Seq("x").toIterator)) ), - Seq( ("a", (Seq(1).toIterator, Seq().toIterator)), ("b", (Seq().toIterator, Seq("x").toIterator)), ("", (Seq(1).toIterator, Seq("x").toIterator)) ), - Seq( ("", (Seq(1).toIterator, Seq().toIterator)) ), + Seq( ("a", (Seq(1, 1), Seq("x", "x"))), ("b", (Seq(1), Seq("x"))) ), + Seq( ("a", (Seq(1), Seq())), ("b", (Seq(), Seq("x"))), ("", (Seq(1), Seq("x"))) ), + Seq( ("", (Seq(1), Seq())) ), Seq( ) ) val operation = (s1: DStream[String], s2: DStream[String]) => { - s1.map(x => (x,1)).cogroup(s2.map(x => (x, "x"))) + s1.map(x => (x,1)).cogroup(s2.map(x => (x, "x"))).mapValues(x => (x._1.toSeq, x._2.toSeq)) } testOperation(inputData1, inputData2, operation, outputData, true) }