From 0a33ccdd29477c0fc4b904aa23b6e2c9a894a8ca Mon Sep 17 00:00:00 2001 From: zsxwing Date: Tue, 5 May 2015 23:25:28 -0700 Subject: [PATCH] [SPARK-7384][Core][Tests] Fix flaky tests for distributed mode in BroadcastSuite Fixed the following failure: https://amplab.cs.berkeley.edu/jenkins/job/Spark-1.3-Maven-pre-YARN/hadoop.version=1.0.4,label=centos/452/testReport/junit/org.apache.spark.broadcast/BroadcastSuite/Unpersisting_HttpBroadcast_on_executors_and_driver_in_distributed_mode/ The tests should wait until all slaves are up. Otherwise, there may be only a part of `BlockManager`s registered, and fail the tests. Author: zsxwing Closes #5925 from zsxwing/SPARK-7384 and squashes the following commits: 783cb7b [zsxwing] Add comments for _jobProgressListener and remove postfixOps 1009ef1 [zsxwing] [SPARK-7384][Core][Tests] Fix flaky tests for distributed mode in BroadcastSuite --- .../main/scala/org/apache/spark/SparkContext.scala | 8 +++++--- .../apache/spark/broadcast/BroadcastSuite.scala | 14 +++++++++++++- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 682dec44ac1a5..b5f040ceb15ca 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -407,15 +407,17 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true") + // "_jobProgressListener" should be set up before creating SparkEnv because when creating + // "SparkEnv", some messages will be posted to "listenerBus" and we should not miss them. + _jobProgressListener = new JobProgressListener(_conf) + listenerBus.addListener(jobProgressListener) + // Create the Spark execution environment (cache, map output tracker, etc) _env = createSparkEnv(_conf, isLocal, listenerBus) SparkEnv.set(_env) _metadataCleaner = new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, _conf) - _jobProgressListener = new JobProgressListener(_conf) - listenerBus.addListener(jobProgressListener) - _statusTracker = new SparkStatusTracker(this) _progressBar = diff --git a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala index c8fdfa693912e..06e5f1cf6b96f 100644 --- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala +++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala @@ -17,9 +17,11 @@ package org.apache.spark.broadcast +import scala.concurrent.duration._ import scala.util.Random import org.scalatest.{Assertions, FunSuite} +import org.scalatest.concurrent.Eventually._ import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkEnv} import org.apache.spark.io.SnappyCompressionCodec @@ -307,7 +309,17 @@ class BroadcastSuite extends FunSuite with LocalSparkContext { removeFromDriver: Boolean) { sc = if (distributed) { - new SparkContext("local-cluster[%d, 1, 512]".format(numSlaves), "test", broadcastConf) + val _sc = + new SparkContext("local-cluster[%d, 1, 512]".format(numSlaves), "test", broadcastConf) + // Wait until all salves are up + eventually(timeout(10.seconds), interval(10.milliseconds)) { + _sc.jobProgressListener.synchronized { + val numBlockManagers = _sc.jobProgressListener.blockManagerIds.size + assert(numBlockManagers == numSlaves + 1, + s"Expect ${numSlaves + 1} block managers, but was ${numBlockManagers}") + } + } + _sc } else { new SparkContext("local", "test", broadcastConf) }