Skip to content

Commit

Permalink
Mend StreamingContext stop() followed by awaitTermination().
Browse files Browse the repository at this point in the history
  • Loading branch information
Lars Albertsson committed Jun 6, 2014
1 parent 9535f40 commit 93cd314
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ private[streaming] class ContextWaiter {
}

def notifyStop() = synchronized {
stopped = true
notifyAll()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,18 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
}
}

test("awaitTermination after stop") {
ssc = new StreamingContext(master, appName, batchDuration)
val inputStream = addInputStream(ssc)
inputStream.map(x => x).register()

failAfter(10000 millis) {
ssc.start()
ssc.stop()
ssc.awaitTermination()
}
}

test("awaitTermination with error in task") {
ssc = new StreamingContext(master, appName, batchDuration)
val inputStream = addInputStream(ssc)
Expand Down

0 comments on commit 93cd314

Please sign in to comment.