From 9e6a88deef969718d507006cdcd0b0561fc2b490 Mon Sep 17 00:00:00 2001 From: Adam Poswolsky Date: Mon, 15 Sep 2014 17:13:14 -0400 Subject: [PATCH] Fix serialization issue in TypedPipeWithOnCompleteTest --- .../com/twitter/scalding/TypedPipeTest.scala | 24 +++++++++---------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/scalding-core/src/test/scala/com/twitter/scalding/TypedPipeTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/TypedPipeTest.scala index 4280ce98b7..747eebdcf3 100644 --- a/scalding-core/src/test/scala/com/twitter/scalding/TypedPipeTest.scala +++ b/scalding-core/src/test/scala/com/twitter/scalding/TypedPipeTest.scala @@ -262,26 +262,24 @@ class TypedPipeTypedTest extends Specification { } } -class TypedWithOnCompleteJob(args: Args, cleanupFn: () => Unit) extends Job(args) { - val p = TypedTsv[String]("input").withOnComplete(cleanupFn): TypedPipe[String] +class TypedWithOnCompleteJob(args: Args) extends Job(args) { + val onCompleteCalledStat = Stat("onCompleteCalled") + def onComplete() = onCompleteCalledStat.inc + val p = TypedTsv[String]("input").withOnComplete(onComplete): TypedPipe[String] p.write(TypedTsv[String]("output")) } class TypedPipeWithOnCompleteTest extends Specification { import Dsl._ - var cleanupCalled: Boolean = false - def cleanupFn(): Unit = { cleanupCalled = true } noDetailedDiffs() "A TypedWithOnCompleteJob" should { - JobTest(new TypedWithOnCompleteJob(_, cleanupFn)) - .source(TypedTsv[String]("input"), (0 to 100).map { i => Tuple1(i.toString) }) - .sink[String](TypedTsv[String]("output")) { outBuf => - "have its onComplete function called" in { - cleanupCalled must be_==(true) - } - } - .runHadoop - .finish + "have the right counter values" in { + JobTest(new TypedWithOnCompleteJob(_)) + .source(TypedTsv[String]("input"), (0 to 100).map { i => Tuple1(i.toString) }) + .counter("onCompleteCalled") { _ must_== 1 } + .runHadoop + .finish + } } }