Skip to content

Commit

Permalink
[SPARK-1331] Added graceful shutdown to Spark Streaming
Browse files Browse the repository at this point in the history
Current version of StreamingContext.stop() directly kills all the data receivers (NetworkReceiver) without waiting for the data already received to be persisted and processed. This PR provides the fix. Now, when the StreamingContext.stop() is called, the following sequence of steps will happen.
1. The driver will send a stop signal to all the active receivers.
2. Each receiver, when it gets a stop signal from the driver, first stop receiving more data, then waits for the thread that persists data blocks to BlockManager to finish persisting all receive data, and finally quits.
3. After all the receivers have stopped, the driver will wait for the Job Generator and Job Scheduler to finish processing all the received data.

It also fixes the semantics of StreamingContext.start and stop. It will throw appropriate errors and warnings if stop() is called before start(), stop() is called twice, etc.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #247 from tdas/graceful-shutdown and squashes the following commits:

61c0016 [Tathagata Das] Updated MIMA binary check excludes.
ae1d39b [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into graceful-shutdown
6b59cfc [Tathagata Das] Minor changes based on Andrew's comment on PR.
d0b8d65 [Tathagata Das] Reduced time taken by graceful shutdown unit test.
f55bc67 [Tathagata Das] Fix scalastyle
c69b3a7 [Tathagata Das] Updates based on Patrick's comments.
c43b8ae [Tathagata Das] Added graceful shutdown to Spark Streaming.
  • Loading branch information
tdas authored and pwendell committed Apr 8, 2014
1 parent 11eabbe commit 83ac9a4
Show file tree
Hide file tree
Showing 15 changed files with 552 additions and 215 deletions.
24 changes: 13 additions & 11 deletions project/MimaBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,19 @@ object MimaBuild {
SparkBuild.SPARK_VERSION match {
case v if v.startsWith("1.0") =>
Seq(
excludePackage("org.apache.spark.api.java"),
excludePackage("org.apache.spark.streaming.api.java"),
excludePackage("org.apache.spark.mllib")
) ++
excludeSparkClass("rdd.ClassTags") ++
excludeSparkClass("util.XORShiftRandom") ++
excludeSparkClass("mllib.recommendation.MFDataGenerator") ++
excludeSparkClass("mllib.optimization.SquaredGradient") ++
excludeSparkClass("mllib.regression.RidgeRegressionWithSGD") ++
excludeSparkClass("mllib.regression.LassoWithSGD") ++
excludeSparkClass("mllib.regression.LinearRegressionWithSGD")
excludePackage("org.apache.spark.api.java"),
excludePackage("org.apache.spark.streaming.api.java"),
excludePackage("org.apache.spark.mllib")
) ++
excludeSparkClass("rdd.ClassTags") ++
excludeSparkClass("util.XORShiftRandom") ++
excludeSparkClass("mllib.recommendation.MFDataGenerator") ++
excludeSparkClass("mllib.optimization.SquaredGradient") ++
excludeSparkClass("mllib.regression.RidgeRegressionWithSGD") ++
excludeSparkClass("mllib.regression.LassoWithSGD") ++
excludeSparkClass("mllib.regression.LinearRegressionWithSGD") ++
excludeSparkClass("streaming.dstream.NetworkReceiver") ++
excludeSparkClass("streaming.dstream.NetworkReceiver#NetworkReceiverActor")
case _ => Seq()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,19 +194,19 @@ class CheckpointWriter(
}
}

def stop() {
synchronized {
if (stopped) {
return
}
stopped = true
}
def stop(): Unit = synchronized {
if (stopped) return

executor.shutdown()
val startTime = System.currentTimeMillis()
val terminated = executor.awaitTermination(10, java.util.concurrent.TimeUnit.SECONDS)
if (!terminated) {
executor.shutdownNow()
}
val endTime = System.currentTimeMillis()
logInfo("CheckpointWriter executor terminated ? " + terminated +
", waited for " + (endTime - startTime) + " ms.")
stopped = true
}

private def fs = synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,15 @@ class StreamingContext private[streaming] (

private[streaming] val waiter = new ContextWaiter

/** Enumeration to identify current state of the StreamingContext */
private[streaming] object StreamingContextState extends Enumeration {
type CheckpointState = Value
val Initialized, Started, Stopped = Value
}

import StreamingContextState._
private[streaming] var state = Initialized

/**
* Return the associated Spark context
*/
Expand Down Expand Up @@ -405,9 +414,18 @@ class StreamingContext private[streaming] (
/**
* Start the execution of the streams.
*/
def start() = synchronized {
def start(): Unit = synchronized {
// Throw exception if the context has already been started once
// or if a stopped context is being started again
if (state == Started) {
throw new SparkException("StreamingContext has already been started")
}
if (state == Stopped) {
throw new SparkException("StreamingContext has already been stopped")
}
validate()
scheduler.start()
state = Started
}

/**
Expand All @@ -428,14 +446,38 @@ class StreamingContext private[streaming] (
}

/**
* Stop the execution of the streams.
* Stop the execution of the streams immediately (does not wait for all received data
* to be processed).
* @param stopSparkContext Stop the associated SparkContext or not
*
*/
def stop(stopSparkContext: Boolean = true): Unit = synchronized {
scheduler.stop()
stop(stopSparkContext, false)
}

/**
* Stop the execution of the streams, with option of ensuring all received data
* has been processed.
* @param stopSparkContext Stop the associated SparkContext or not
* @param stopGracefully Stop gracefully by waiting for the processing of all
* received data to be completed
*/
def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = synchronized {
// Warn (but not fail) if context is stopped twice,
// or context is stopped before starting
if (state == Initialized) {
logWarning("StreamingContext has not been started yet")
return
}
if (state == Stopped) {
logWarning("StreamingContext has already been stopped")
return
} // no need to throw an exception as its okay to stop twice
scheduler.stop(stopGracefully)
logInfo("StreamingContext stopped successfully")
waiter.notifyStop()
if (stopSparkContext) sc.stop()
state = Stopped
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,8 +509,16 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* Stop the execution of the streams.
* @param stopSparkContext Stop the associated SparkContext or not
*/
def stop(stopSparkContext: Boolean): Unit = {
ssc.stop(stopSparkContext)
def stop(stopSparkContext: Boolean) = ssc.stop(stopSparkContext)

/**
* Stop the execution of the streams.
* @param stopSparkContext Stop the associated SparkContext or not
* @param stopGracefully Stop gracefully by waiting for the processing of all
* received data to be completed
*/
def stop(stopSparkContext: Boolean, stopGracefully: Boolean) = {
ssc.stop(stopSparkContext, stopGracefully)
}
}

Expand Down
Loading

0 comments on commit 83ac9a4

Please sign in to comment.