diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala index daa1ced63c701..a4d159bf38377 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala @@ -44,7 +44,7 @@ object StatefulNetworkWordCount { StreamingExamples.setStreamingLogLevels() val updateFunc = (values: Seq[Int], state: Option[Int]) => { - val currentCount = values.foldLeft(0)(_ + _) + val currentCount = values.sum val previousCount = state.getOrElse(0) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index ff6d86c8f81ac..059ac6c2dbee2 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -452,7 +452,7 @@ class BasicOperationsSuite extends TestSuiteBase { test("rdd cleanup - updateStateByKey") { val updateFunc = (values: Seq[Int], state: Option[Int]) => { - Some(values.foldLeft(0)(_ + _) + state.getOrElse(0)) + Some(values.sum + state.getOrElse(0)) } val stateStream = runCleanupTest( conf, _.map(_ -> 1).updateStateByKey(updateFunc).checkpoint(Seconds(3))) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 10ad3c9e1adc9..8511390cb1ad5 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -70,7 +70,7 @@ class CheckpointSuite extends TestSuiteBase { val input = (1 to 10).map(_ => Seq("a")).toSeq val operation = (st: DStream[String]) => { val updateFunc = (values: Seq[Int], state: Option[Int]) => { - Some((values.foldLeft(0)(_ + _) + state.getOrElse(0))) + Some((values.sum + state.getOrElse(0))) } st.map(x => (x, 1)) .updateStateByKey(updateFunc) @@ -214,7 +214,7 @@ class CheckpointSuite extends TestSuiteBase { val output = (1 to 10).map(x => Seq(("a", x))).toSeq val operation = (st: DStream[String]) => { val updateFunc = (values: Seq[Int], state: Option[Int]) => { - Some((values.foldLeft(0)(_ + _) + state.getOrElse(0))) + Some((values.sum + state.getOrElse(0))) } st.map(x => (x, 1)) .updateStateByKey(updateFunc)