Skip to content

Commit

Permalink
Update CheckpointSuite.scala
Browse files Browse the repository at this point in the history
  • Loading branch information
watermen committed Aug 28, 2014
1 parent 714bda5 commit 17be9fb
Showing 1 changed file with 2 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class CheckpointSuite extends TestSuiteBase {
val input = (1 to 10).map(_ => Seq("a")).toSeq
val operation = (st: DStream[String]) => {
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
Some((values.foldLeft(0)(_ + _) + state.getOrElse(0)))
Some((values.sum + state.getOrElse(0)))
}
st.map(x => (x, 1))
.updateStateByKey(updateFunc)
Expand Down Expand Up @@ -214,7 +214,7 @@ class CheckpointSuite extends TestSuiteBase {
val output = (1 to 10).map(x => Seq(("a", x))).toSeq
val operation = (st: DStream[String]) => {
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
Some((values.foldLeft(0)(_ + _) + state.getOrElse(0)))
Some((values.sum + state.getOrElse(0)))
}
st.map(x => (x, 1))
.updateStateByKey(updateFunc)
Expand Down

0 comments on commit 17be9fb

Please sign in to comment.