Skip to content

Commit

Permalink
Fix serialization issue in TypedPipeWithOnCompleteTest
Browse files Browse the repository at this point in the history
  • Loading branch information
Adam Poswolsky committed Sep 15, 2014
1 parent 1e6ea09 commit 9e6a88d
Showing 1 changed file with 11 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -262,26 +262,24 @@ class TypedPipeTypedTest extends Specification {
}
}

class TypedWithOnCompleteJob(args: Args, cleanupFn: () => Unit) extends Job(args) {
val p = TypedTsv[String]("input").withOnComplete(cleanupFn): TypedPipe[String]
class TypedWithOnCompleteJob(args: Args) extends Job(args) {
val onCompleteCalledStat = Stat("onCompleteCalled")
def onComplete() = onCompleteCalledStat.inc
val p = TypedTsv[String]("input").withOnComplete(onComplete): TypedPipe[String]
p.write(TypedTsv[String]("output"))
}

class TypedPipeWithOnCompleteTest extends Specification {
import Dsl._
var cleanupCalled: Boolean = false
def cleanupFn(): Unit = { cleanupCalled = true }
noDetailedDiffs()
"A TypedWithOnCompleteJob" should {
JobTest(new TypedWithOnCompleteJob(_, cleanupFn))
.source(TypedTsv[String]("input"), (0 to 100).map { i => Tuple1(i.toString) })
.sink[String](TypedTsv[String]("output")) { outBuf =>
"have its onComplete function called" in {
cleanupCalled must be_==(true)
}
}
.runHadoop
.finish
"have the right counter values" in {
JobTest(new TypedWithOnCompleteJob(_))
.source(TypedTsv[String]("input"), (0 to 100).map { i => Tuple1(i.toString) })
.counter("onCompleteCalled") { _ must_== 1 }
.runHadoop
.finish
}
}
}

Expand Down

0 comments on commit 9e6a88d

Please sign in to comment.