Skip to content

Commit

Permalink
[SPARK-7384][Core][Tests] Fix flaky tests for distributed mode in Bro…
Browse files Browse the repository at this point in the history
…adcastSuite

Fixed the following failure: https://amplab.cs.berkeley.edu/jenkins/job/Spark-1.3-Maven-pre-YARN/hadoop.version=1.0.4,label=centos/452/testReport/junit/org.apache.spark.broadcast/BroadcastSuite/Unpersisting_HttpBroadcast_on_executors_and_driver_in_distributed_mode/

The tests should wait until all slaves are up. Otherwise, there may be only a part of `BlockManager`s registered, and fail the tests.

Author: zsxwing <zsxwing@gmail.com>

Closes apache#5925 from zsxwing/SPARK-7384 and squashes the following commits:

783cb7b [zsxwing] Add comments for _jobProgressListener and remove postfixOps
1009ef1 [zsxwing] [SPARK-7384][Core][Tests] Fix flaky tests for distributed mode in BroadcastSuite
  • Loading branch information
zsxwing authored and jeanlyn committed May 28, 2015
1 parent b8e6ded commit 0a33ccd
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 4 deletions.
8 changes: 5 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -407,15 +407,17 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli

if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")

// "_jobProgressListener" should be set up before creating SparkEnv because when creating
// "SparkEnv", some messages will be posted to "listenerBus" and we should not miss them.
_jobProgressListener = new JobProgressListener(_conf)
listenerBus.addListener(jobProgressListener)

// Create the Spark execution environment (cache, map output tracker, etc)
_env = createSparkEnv(_conf, isLocal, listenerBus)
SparkEnv.set(_env)

_metadataCleaner = new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, _conf)

_jobProgressListener = new JobProgressListener(_conf)
listenerBus.addListener(jobProgressListener)

_statusTracker = new SparkStatusTracker(this)

_progressBar =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.spark.broadcast

import scala.concurrent.duration._
import scala.util.Random

import org.scalatest.{Assertions, FunSuite}
import org.scalatest.concurrent.Eventually._

import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkEnv}
import org.apache.spark.io.SnappyCompressionCodec
Expand Down Expand Up @@ -307,7 +309,17 @@ class BroadcastSuite extends FunSuite with LocalSparkContext {
removeFromDriver: Boolean) {

sc = if (distributed) {
new SparkContext("local-cluster[%d, 1, 512]".format(numSlaves), "test", broadcastConf)
val _sc =
new SparkContext("local-cluster[%d, 1, 512]".format(numSlaves), "test", broadcastConf)
// Wait until all salves are up
eventually(timeout(10.seconds), interval(10.milliseconds)) {
_sc.jobProgressListener.synchronized {
val numBlockManagers = _sc.jobProgressListener.blockManagerIds.size
assert(numBlockManagers == numSlaves + 1,
s"Expect ${numSlaves + 1} block managers, but was ${numBlockManagers}")
}
}
_sc
} else {
new SparkContext("local", "test", broadcastConf)
}
Expand Down

0 comments on commit 0a33ccd

Please sign in to comment.