Skip to content

Commit

Permalink
SPARK-2113: awaitTermination() after stop() will hang in Spark Stremaing
Browse files Browse the repository at this point in the history
Author: Lars Albertsson <lalle@spotify.com>

Closes #1001 from lallea/contextwaiter_stopped and squashes the following commits:

93cd314 [Lars Albertsson] Mend StreamingContext stop() followed by awaitTermination().
  • Loading branch information
Lars Albertsson authored and pwendell committed Jun 11, 2014
1 parent e508f59 commit 4d5c12a
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ private[streaming] class ContextWaiter {
}

def notifyStop() = synchronized {
stopped = true
notifyAll()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,18 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
}
}

test("awaitTermination after stop") {
ssc = new StreamingContext(master, appName, batchDuration)
val inputStream = addInputStream(ssc)
inputStream.map(x => x).register()

failAfter(10000 millis) {
ssc.start()
ssc.stop()
ssc.awaitTermination()
}
}

test("awaitTermination with error in task") {
ssc = new StreamingContext(master, appName, batchDuration)
val inputStream = addInputStream(ssc)
Expand Down

0 comments on commit 4d5c12a

Please sign in to comment.