From 39012452daa0746fe5d218493b85f9b5f96190c1 Mon Sep 17 00:00:00 2001 From: Yadong Qi Date: Thu, 28 Aug 2014 14:08:48 -0700 Subject: [PATCH] [SPARK-3285] [examples] Using values.sum is easier to understand than using values.foldLeft(0)(_ + _) def sum[B >: A](implicit num: Numeric[B]): B = foldLeft(num.zero)(num.plus) Using values.sum is easier to understand than using values.foldLeft(0)(_ + _), so we'd better use values.sum instead of values.foldLeft(0)(_ + _) Author: Yadong Qi Closes #2182 from watermen/bug-fix3 and squashes the following commits: 17be9fb [Yadong Qi] Update CheckpointSuite.scala 714bda5 [Yadong Qi] Update BasicOperationsSuite.scala 57e704c [Yadong Qi] Update StatefulNetworkWordCount.scala --- .../spark/examples/streaming/StatefulNetworkWordCount.scala | 2 +- .../org/apache/spark/streaming/BasicOperationsSuite.scala | 2 +- .../scala/org/apache/spark/streaming/CheckpointSuite.scala | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala index daa1ced63c701..a4d159bf38377 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala @@ -44,7 +44,7 @@ object StatefulNetworkWordCount { StreamingExamples.setStreamingLogLevels() val updateFunc = (values: Seq[Int], state: Option[Int]) => { - val currentCount = values.foldLeft(0)(_ + _) + val currentCount = values.sum val previousCount = state.getOrElse(0) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index ff6d86c8f81ac..059ac6c2dbee2 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -452,7 +452,7 @@ class BasicOperationsSuite extends TestSuiteBase { test("rdd cleanup - updateStateByKey") { val updateFunc = (values: Seq[Int], state: Option[Int]) => { - Some(values.foldLeft(0)(_ + _) + state.getOrElse(0)) + Some(values.sum + state.getOrElse(0)) } val stateStream = runCleanupTest( conf, _.map(_ -> 1).updateStateByKey(updateFunc).checkpoint(Seconds(3))) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 10ad3c9e1adc9..8511390cb1ad5 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -70,7 +70,7 @@ class CheckpointSuite extends TestSuiteBase { val input = (1 to 10).map(_ => Seq("a")).toSeq val operation = (st: DStream[String]) => { val updateFunc = (values: Seq[Int], state: Option[Int]) => { - Some((values.foldLeft(0)(_ + _) + state.getOrElse(0))) + Some((values.sum + state.getOrElse(0))) } st.map(x => (x, 1)) .updateStateByKey(updateFunc) @@ -214,7 +214,7 @@ class CheckpointSuite extends TestSuiteBase { val output = (1 to 10).map(x => Seq(("a", x))).toSeq val operation = (st: DStream[String]) => { val updateFunc = (values: Seq[Int], state: Option[Int]) => { - Some((values.foldLeft(0)(_ + _) + state.getOrElse(0))) + Some((values.sum + state.getOrElse(0))) } st.map(x => (x, 1)) .updateStateByKey(updateFunc)