Skip to content

Commit

Permalink
Updates based on Patrick's comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Apr 4, 2014
1 parent c43b8ae commit c69b3a7
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,12 @@ class StreamingContext private[streaming] (
private[streaming] val waiter = new ContextWaiter

/** Enumeration to identify current state of the StreamingContext */
private[streaming] object ContextState extends Enumeration {
private[streaming] object StreamingContextState extends Enumeration {
type CheckpointState = Value
val Initialized, Started, Stopped = Value
}

import ContextState._
import StreamingContextState._
private[streaming] var state = Initialized

/**
Expand Down Expand Up @@ -446,16 +446,25 @@ class StreamingContext private[streaming] (
}

/**
* Stop the execution of the streams.
* Stop the execution of the streams immediately (does not wait for all received data
* to be processed).
* @param stopSparkContext Stop the associated SparkContext or not
*
*/
def stop(stopSparkContext: Boolean = true): Unit = synchronized {
stop(stopSparkContext, false)
}

/**
* Stop the execution of the streams, with option of ensuring all received data
* has been processed.
* @param stopSparkContext Stop the associated SparkContext or not
* @param stopGracefully Stop gracefully by waiting for the processing of all
* received data to be completed
*/
def stop(
stopSparkContext: Boolean = true,
stopGracefully: Boolean = false
): Unit = synchronized {
// Silently warn if context is stopped twice, or context is stopped before starting
def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = synchronized {
// Warn (but not fail) if context is stopped twice,
// or context is stopped before starting
if (state == Initialized) {
logWarning("StreamingContext has not been started yet")
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
receivingThread

// Call user-defined onStart()
logInfo("Calling onStart")
logInfo("Starting receiver")
onStart()

// Wait until interrupt is called on this thread
Expand All @@ -157,7 +157,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
}

// Call user-defined onStop()
logInfo("Calling onStop")
logInfo("Stopping receiver")
try {
onStop()
} catch {
Expand Down Expand Up @@ -187,8 +187,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging

/**
* Stop the receiver. First it interrupts the main receiving thread,
* that is, the thread that called receiver.start(). Then it calls the user-defined
* onStop() method to stop other threads and/or do cleanup.
* that is, the thread that called receiver.start().
*/
def stop() {
// Stop receiving by interrupting the receiving thread
Expand All @@ -211,10 +210,9 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
* Push a block (as an ArrayBuffer filled with data) into the block manager.
*/
def pushBlock(blockId: BlockId, arrayBuffer: ArrayBuffer[T], metadata: Any, level: StorageLevel) {
logInfo("Block " + blockId + " has last element as " + arrayBuffer.last)
env.blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]], level)
trackerActor ! AddBlocks(streamId, Array(blockId), metadata)
logInfo("Pushed block " + blockId)
logDebug("Pushed block " + blockId)
}

/**
Expand Down Expand Up @@ -275,7 +273,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
}

def stop() {
blockIntervalTimer.stop(stopAfterNextCallback = true)
blockIntervalTimer.stop(false)
stopped = true
blockPushingThread.join()
logInfo("Stopped BlockGenerator")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,5 +180,4 @@ private[streaming] class ActorReceiver[T: ClassTag](
protected def onStop() = {
supervisor ! PoisonPill
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
private var eventActor: ActorRef = null

// last batch whose completion,checkpointing and metadata cleanup has been completed
private var lastBatchFullyProcessed: Time = null
private var lastProcessedBatch: Time = null

/** Start generation of jobs */
def start(): Unit = synchronized {
Expand All @@ -83,14 +83,14 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
}

/**
* Stop generation of jobs. processAllReceivedData = true makes this wait until jobs
* Stop generation of jobs. processReceivedData = true makes this wait until jobs
* of current ongoing time interval has been generated, processed and corresponding
* checkpoints written.
*/
def stop(processAllReceivedData: Boolean): Unit = synchronized {
def stop(processReceivedData: Boolean): Unit = synchronized {
if (eventActor == null) return // generator has already been stopped

if (processAllReceivedData) {
if (processReceivedData) {
logInfo("Stopping JobGenerator gracefully")
val timeWhenStopStarted = System.currentTimeMillis()
val stopTimeout = 10 * ssc.graph.batchDuration.milliseconds
Expand All @@ -112,23 +112,23 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
logInfo("Waited for all received blocsk to be consumed for job generation")

// Stop generating jobs
val stopTime = timer.stop(stopAfterNextCallback = true)
val stopTime = timer.stop(false)
graph.stop()
logInfo("Stopped generation timer")

// Wait for the jobs to complete and checkpoints to be written
def hasAllBatchesBeenFullyProcessed = {
lastBatchFullyProcessed != null && lastBatchFullyProcessed.milliseconds == stopTime
def haveAllBatchesBeenProcessed = {
lastProcessedBatch != null && lastProcessedBatch.milliseconds == stopTime
}
logInfo("Waiting for jobs to be processed and checkpoints to be written")
while (!hasTimedOut && !hasAllBatchesBeenFullyProcessed) {
while (!hasTimedOut && !haveAllBatchesBeenProcessed) {
Thread.sleep(pollTime)
}
logInfo("Waited for jobs to be processed and checkpoints to be written")
} else {
logInfo("Stopping JobGenerator immediately")
// Stop timer and graph immediately, ignore unprocessed data and pending jobs
timer.stop()
timer.stop(true)
graph.stop()
}

Expand Down Expand Up @@ -250,6 +250,6 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
}

private def markBatchFullyProcessed(time: Time) {
lastBatchFullyProcessed = time
lastProcessedBatch = time
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,24 @@ class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name:
override def run() { loop }
}

private var prevTime = -1L
private var nextTime = -1L
private var stopped = false
@volatile private var prevTime = -1L
@volatile private var nextTime = -1L
@volatile private var stopped = false

/**
* Get the earliest time when this timer can be started. The time must be a
* multiple of this timer's period and more than current time.
* Get the time when this timer will fire if it is started right now.
* The time will be a multiple of this timer's period and more than
* current system time.
*/
def getStartTime(): Long = {
(math.floor(clock.currentTime.toDouble / period) + 1).toLong * period
}

/**
* Get the earliest time when this timer can be restarted, based on the earlier start time.
* The time must be a multiple of this timer's period and more than current time.
* Get the time when the timer will fire if it is restarted right now.
* This time depends on when the timer was started the first time, and was stopped
* for whatever reason. The time must be a multiple of this timer's period and
* more than current time.
*/
def getRestartTime(originalStartTime: Long): Long = {
val gap = clock.currentTime - originalStartTime
Expand All @@ -67,16 +70,17 @@ class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name:
}

/**
* Stop the timer. stopAfterNextCallback = true make it wait for next callback to be completed.
* Returns the last time when it had called back.
* Stop the timer, and return the last time the callback was made.
* interruptTimer = true will interrupt the callback
* if it is in progress (not guaranteed to give correct time in this case).
*/
def stop(stopAfterNextCallback: Boolean = false): Long = synchronized {
def stop(interruptTimer: Boolean): Long = synchronized {
if (!stopped) {
stopped = true
if (!stopAfterNextCallback) thread.interrupt()
if (interruptTimer) thread.interrupt()
thread.join()
logInfo("Stopped timer for " + name + " after time " + prevTime)
}
logInfo("Stopped timer for " + name + " after time " + prevTime)
prevTime
}

Expand Down Expand Up @@ -113,6 +117,6 @@ object RecurringTimer {
val timer = new RecurringTimer(new SystemClock(), period, onRecur, "Test")
timer.start()
Thread.sleep(30 * 1000)
timer.stop()
timer.stop(true)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,11 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
ssc = new StreamingContext(master, appName, batchDuration)
addInputStream(ssc).register

assert(ssc.state === ssc.ContextState.Initialized)
assert(ssc.state === ssc.StreamingContextState.Initialized)
ssc.start()
assert(ssc.state === ssc.ContextState.Started)
assert(ssc.state === ssc.StreamingContextState.Started)
ssc.stop()
assert(ssc.state === ssc.ContextState.Stopped)
assert(ssc.state === ssc.StreamingContextState.Stopped)
}

test("start multiple times") {
Expand Down

0 comments on commit c69b3a7

Please sign in to comment.