Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-1331] Added graceful shutdown to Spark Streaming #247

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -194,19 +194,19 @@ class CheckpointWriter(
}
}

def stop() {
synchronized {
if (stopped) {
return
}
stopped = true
}
def stop(): Unit = synchronized {
if (stopped) return

executor.shutdown()
val startTime = System.currentTimeMillis()
val terminated = executor.awaitTermination(10, java.util.concurrent.TimeUnit.SECONDS)
if (!terminated) {
executor.shutdownNow()
}
val endTime = System.currentTimeMillis()
logInfo("CheckpointWriter executor terminated ? " + terminated +
", waited for " + (endTime - startTime) + " ms.")
stopped = true
}

private def fs = synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,15 @@ class StreamingContext private[streaming] (

private[streaming] val waiter = new ContextWaiter

/** Enumeration to identify current state of the StreamingContext */
private[streaming] object ContextState extends Enumeration {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: but how would you feel about calling this StreamingContextState? I found it a bit confusing when looking at it directly. Then you could probably even remove the doc because it would be totally clear.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed. But keeping the doc in. Extra doc do no harm. ;)

type CheckpointState = Value
val Initialized, Started, Stopped = Value
}

import ContextState._
private[streaming] var state = Initialized

/**
* Return the associated Spark context
*/
Expand Down Expand Up @@ -405,9 +414,18 @@ class StreamingContext private[streaming] (
/**
* Start the execution of the streams.
*/
def start() = synchronized {
def start(): Unit = synchronized {
// Throw exception if the context has already been started once
// or if a stopped context is being started again
if (state == Started) {
throw new SparkException("StreamingContext has already been started")
}
if (state == Stopped) {
throw new SparkException("StreamingContext has already been stopped")
}
validate()
scheduler.start()
state = Started
}

/**
Expand All @@ -430,12 +448,27 @@ class StreamingContext private[streaming] (
/**
* Stop the execution of the streams.
* @param stopSparkContext Stop the associated SparkContext or not
* @param stopGracefully Stop gracefully by waiting for the processing of all
* received data to be completed
*/
def stop(stopSparkContext: Boolean = true) = synchronized {
scheduler.stop()
def stop(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a compile API change right? I.e. if someone compiled against the old stop() it will fail... we could fix this by implementing another function. Or maybe we just punt on it for 1.0.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did implement another function. Lets not break stuff unless we absolutely have to.

stopSparkContext: Boolean = true,
stopGracefully: Boolean = false
): Unit = synchronized {
// Silently warn if context is stopped twice, or context is stopped before starting
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does "silently" mean here? Doesn't it print a warning? Or did you mean "warn but don't fail".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed silently. But on that note, do you think whether the semantics make sense?

  1. Stopping twice --> only warn
  2. Stop before start --> only warn
  3. Starting twice --> fail
  4. Starting after stopping --> fail

?

if (state == Initialized) {
logWarning("StreamingContext has not been started yet")
return
}
if (state == Stopped) {
logWarning("StreamingContext has already been stopped")
return
} // no need to throw an exception as its okay to stop twice
scheduler.stop(stopGracefully)
logInfo("StreamingContext stopped successfully")
waiter.notifyStop()
if (stopSparkContext) sc.stop()
state = Stopped
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,16 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @param stopSparkContext Stop the associated SparkContext or not
*/
def stop(stopSparkContext: Boolean) = ssc.stop(stopSparkContext)

/**
* Stop the execution of the streams.
* @param stopSparkContext Stop the associated SparkContext or not
* @param stopGracefully Stop gracefully by waiting for the processing of all
* received data to be completed
*/
def stop(stopSparkContext: Boolean, stopGracefully: Boolean) = {
ssc.stop(stopSparkContext, stopGracefully)
}
}

/**
Expand Down
Loading