Skip to content

Commit

Permalink
[SPARK-18975][CORE] Add an API to remove SparkListener
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

In current Spark we could add customized SparkListener through `SparkContext#addListener` API, but there's no equivalent API to remove the registered one. In our scenario SparkListener will be added repeatedly accordingly to the changed environment. If lacks the ability to remove listeners, there might be many registered listeners finally, this is unnecessary and potentially affects the performance. So here propose to add an API to remove registered listener.

## How was this patch tested?

Add an unit test to verify it.

Author: jerryshao <sshao@hortonworks.com>

Closes #16382 from jerryshao/SPARK-18975.
  • Loading branch information
jerryshao authored and rxin committed Dec 22, 2016
1 parent 2615100 commit 31da755
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 0 deletions.
9 changes: 9 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1571,6 +1571,15 @@ class SparkContext(config: SparkConf) extends Logging {
listenerBus.addListener(listener)
}

/**
* :: DeveloperApi ::
* Deregister the listener from Spark's listener bus.
*/
@DeveloperApi
def removeSparkListener(listener: SparkListenerInterface): Unit = {
listenerBus.removeListener(listener)
}

private[spark] def getExecutorIds(): Seq[String] = {
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
Expand Down
16 changes: 16 additions & 0 deletions core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFormat}
import org.scalatest.Matchers._

import org.apache.spark.scheduler.SparkListener
import org.apache.spark.util.Utils

class SparkContextSuite extends SparkFunSuite with LocalSparkContext {
Expand Down Expand Up @@ -451,4 +452,19 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext {
sc.stop()
}
}

test("register and deregister Spark listener from SparkContext") {
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
try {
val sparkListener1 = new SparkListener { }
val sparkListener2 = new SparkListener { }
sc.addSparkListener(sparkListener1)
sc.addSparkListener(sparkListener2)
assert(sc.listenerBus.listeners.contains(sparkListener1))
assert(sc.listenerBus.listeners.contains(sparkListener2))
sc.removeSparkListener(sparkListener1)
assert(!sc.listenerBus.listeners.contains(sparkListener1))
assert(sc.listenerBus.listeners.contains(sparkListener2))
}
}
}

0 comments on commit 31da755

Please sign in to comment.