Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-1331] Added graceful shutdown to Spark Streaming #247

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions project/MimaBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,19 @@ object MimaBuild {
SparkBuild.SPARK_VERSION match {
case v if v.startsWith("1.0") =>
Seq(
excludePackage("org.apache.spark.api.java"),
excludePackage("org.apache.spark.streaming.api.java"),
excludePackage("org.apache.spark.mllib")
) ++
excludeSparkClass("rdd.ClassTags") ++
excludeSparkClass("util.XORShiftRandom") ++
excludeSparkClass("mllib.recommendation.MFDataGenerator") ++
excludeSparkClass("mllib.optimization.SquaredGradient") ++
excludeSparkClass("mllib.regression.RidgeRegressionWithSGD") ++
excludeSparkClass("mllib.regression.LassoWithSGD") ++
excludeSparkClass("mllib.regression.LinearRegressionWithSGD")
excludePackage("org.apache.spark.api.java"),
excludePackage("org.apache.spark.streaming.api.java"),
excludePackage("org.apache.spark.mllib")
) ++
excludeSparkClass("rdd.ClassTags") ++
excludeSparkClass("util.XORShiftRandom") ++
excludeSparkClass("mllib.recommendation.MFDataGenerator") ++
excludeSparkClass("mllib.optimization.SquaredGradient") ++
excludeSparkClass("mllib.regression.RidgeRegressionWithSGD") ++
excludeSparkClass("mllib.regression.LassoWithSGD") ++
excludeSparkClass("mllib.regression.LinearRegressionWithSGD") ++
excludeSparkClass("streaming.dstream.NetworkReceiver") ++
excludeSparkClass("streaming.dstream.NetworkReceiver#NetworkReceiverActor")
case _ => Seq()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,19 +194,19 @@ class CheckpointWriter(
}
}

def stop() {
synchronized {
if (stopped) {
return
}
stopped = true
}
def stop(): Unit = synchronized {
if (stopped) return

executor.shutdown()
val startTime = System.currentTimeMillis()
val terminated = executor.awaitTermination(10, java.util.concurrent.TimeUnit.SECONDS)
if (!terminated) {
executor.shutdownNow()
}
val endTime = System.currentTimeMillis()
logInfo("CheckpointWriter executor terminated ? " + terminated +
", waited for " + (endTime - startTime) + " ms.")
stopped = true
}

private def fs = synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,15 @@ class StreamingContext private[streaming] (

private[streaming] val waiter = new ContextWaiter

/** Enumeration to identify current state of the StreamingContext */
private[streaming] object StreamingContextState extends Enumeration {
type CheckpointState = Value
val Initialized, Started, Stopped = Value
}

import StreamingContextState._
private[streaming] var state = Initialized

/**
* Return the associated Spark context
*/
Expand Down Expand Up @@ -405,9 +414,18 @@ class StreamingContext private[streaming] (
/**
* Start the execution of the streams.
*/
def start() = synchronized {
def start(): Unit = synchronized {
// Throw exception if the context has already been started once
// or if a stopped context is being started again
if (state == Started) {
throw new SparkException("StreamingContext has already been started")
}
if (state == Stopped) {
throw new SparkException("StreamingContext has already been stopped")
}
validate()
scheduler.start()
state = Started
}

/**
Expand All @@ -428,14 +446,38 @@ class StreamingContext private[streaming] (
}

/**
* Stop the execution of the streams.
* Stop the execution of the streams immediately (does not wait for all received data
* to be processed).
* @param stopSparkContext Stop the associated SparkContext or not
*
*/
def stop(stopSparkContext: Boolean = true): Unit = synchronized {
scheduler.stop()
stop(stopSparkContext, false)
}

/**
* Stop the execution of the streams, with option of ensuring all received data
* has been processed.
* @param stopSparkContext Stop the associated SparkContext or not
* @param stopGracefully Stop gracefully by waiting for the processing of all
* received data to be completed
*/
def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = synchronized {
// Warn (but not fail) if context is stopped twice,
// or context is stopped before starting
if (state == Initialized) {
logWarning("StreamingContext has not been started yet")
return
}
if (state == Stopped) {
logWarning("StreamingContext has already been stopped")
return
} // no need to throw an exception as its okay to stop twice
scheduler.stop(stopGracefully)
logInfo("StreamingContext stopped successfully")
waiter.notifyStop()
if (stopSparkContext) sc.stop()
state = Stopped
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,8 +509,16 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* Stop the execution of the streams.
* @param stopSparkContext Stop the associated SparkContext or not
*/
def stop(stopSparkContext: Boolean): Unit = {
ssc.stop(stopSparkContext)
def stop(stopSparkContext: Boolean) = ssc.stop(stopSparkContext)

/**
* Stop the execution of the streams.
* @param stopSparkContext Stop the associated SparkContext or not
* @param stopGracefully Stop gracefully by waiting for the processing of all
* received data to be completed
*/
def stop(stopSparkContext: Boolean, stopGracefully: Boolean) = {
ssc.stop(stopSparkContext, stopGracefully)
}
}

Expand Down
Loading