Skip to content

Commit

Permalink
SPARK-1407 drain event queue before stopping event logger
Browse files Browse the repository at this point in the history
Author: Kan Zhang <kzhang@apache.org>

Closes #366 from kanzhang/SPARK-1407 and squashes the following commits:

cd0629f [Kan Zhang] code refactoring and adding test
b073ee6 [Kan Zhang] SPARK-1407 drain event queue before stopping event logger
  • Loading branch information
kanzhang authored and pwendell committed Apr 9, 2014
1 parent bde9cc1 commit eb5f2b6
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 16 deletions.
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -931,7 +931,6 @@ class SparkContext(config: SparkConf) extends Logging {
/** Shut down the SparkContext. */
def stop() {
ui.stop()
eventLogger.foreach(_.stop())
// Do this only if not stopped already - best case effort.
// prevent NPE if stopped more than once.
val dagSchedulerCopy = dagScheduler
Expand All @@ -940,13 +939,14 @@ class SparkContext(config: SparkConf) extends Logging {
metadataCleaner.cancel()
cleaner.foreach(_.stop())
dagSchedulerCopy.stop()
listenerBus.stop()
taskScheduler = null
// TODO: Cache.stop()?
env.stop()
SparkEnv.set(null)
ShuffleMapTask.clearCache()
ResultTask.clearCache()
listenerBus.stop()
eventLogger.foreach(_.stop())
logInfo("Successfully stopped SparkContext")
} else {
logInfo("SparkContext already stopped")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,22 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
private var queueFullErrorMessageLogged = false
private var started = false
private val listenerThread = new Thread("SparkListenerBus") {
setDaemon(true)
override def run() {
while (true) {
val event = eventQueue.take
if (event == SparkListenerShutdown) {
// Get out of the while loop and shutdown the daemon thread
return
}
postToAll(event)
}
}
}

// Exposed for testing
@volatile private[spark] var stopCalled = false

/**
* Start sending events to attached listeners.
Expand All @@ -48,20 +64,8 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
if (started) {
throw new IllegalStateException("Listener bus already started!")
}
listenerThread.start()
started = true
new Thread("SparkListenerBus") {
setDaemon(true)
override def run() {
while (true) {
val event = eventQueue.take
if (event == SparkListenerShutdown) {
// Get out of the while loop and shutdown the daemon thread
return
}
postToAll(event)
}
}
}.start()
}

def post(event: SparkListenerEvent) {
Expand Down Expand Up @@ -93,9 +97,11 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
}

def stop() {
stopCalled = true
if (!started) {
throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!")
}
post(SparkListenerShutdown)
listenerThread.join()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.scheduler

import java.util.concurrent.Semaphore

import scala.collection.mutable

import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
Expand Down Expand Up @@ -72,6 +74,49 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
}
}

test("bus.stop() waits for the event queue to completely drain") {
@volatile var drained = false

// Tells the listener to stop blocking
val listenerWait = new Semaphore(1)

// When stop has returned
val stopReturned = new Semaphore(1)

class BlockingListener extends SparkListener {
override def onJobEnd(jobEnd: SparkListenerJobEnd) = {
listenerWait.acquire()
drained = true
}
}

val bus = new LiveListenerBus
val blockingListener = new BlockingListener

bus.addListener(blockingListener)
bus.start()
bus.post(SparkListenerJobEnd(0, JobSucceeded))

// the queue should not drain immediately
assert(!drained)

new Thread("ListenerBusStopper") {
override def run() {
// stop() will block until notify() is called below
bus.stop()
stopReturned.release(1)
}
}.start()

while (!bus.stopCalled) {
Thread.sleep(10)
}

listenerWait.release()

This comment has been minimized.

Copy link
@kanzhang

kanzhang Apr 10, 2014

Author Contributor

Shouldn't we assert(!drained) after stop() is called but before listenerWait.release(), to make sure listenerThread is still blocked? I was thinking along the same line, but wasn't sure a single assert statement is enough to tell listenerThread is blocked (maybe it didn't get chance to run).

This comment has been minimized.

Copy link
@concretevitamin

concretevitamin Apr 10, 2014

Contributor

@kanzhang I am not sure if people get notifications for comments here... perhaps ping them in the pull request?

stopReturned.acquire()
assert(drained)
}

test("basic creation of StageInfo") {
val listener = new SaveStageAndTaskInfo
sc.addSparkListener(listener)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,6 @@ object SparkHdfsLR {
}

println("Final w: " + w)
System.exit(0)
sc.stop()
}
}

0 comments on commit eb5f2b6

Please sign in to comment.