diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 3f0b71bbe17f1..b60a6331d58bf 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -27,6 +27,7 @@ import com.codahale.metrics.{Gauge, MetricRegistry} import org.apache.spark.internal.{config, Logging} import org.apache.spark.internal.config._ +import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL import org.apache.spark.metrics.source.Source import org.apache.spark.scheduler._ import org.apache.spark.storage.BlockManagerMaster @@ -157,7 +158,7 @@ private[spark] class ExecutorAllocationManager( // Polling loop interval (ms) private val intervalMillis: Long = if (Utils.isTesting) { - conf.getLong(TESTING_SCHEDULE_INTERVAL_KEY, 100) + conf.get(TEST_SCHEDULE_INTERVAL) } else { 100 } @@ -899,5 +900,4 @@ private[spark] class ExecutorAllocationManager( private object ExecutorAllocationManager { val NOT_SET = Long.MaxValue - val TESTING_SCHEDULE_INTERVAL_KEY = "spark.testing.dynamicAllocation.scheduleInterval" } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 09cc346db0ed2..1a945a66a9201 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -45,6 +45,7 @@ import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil} import org.apache.spark.input.{FixedLengthBinaryInputFormat, PortableDataStream, StreamInputFormat, WholeTextFileInputFormat} import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ +import org.apache.spark.internal.config.Tests._ import org.apache.spark.io.CompressionCodec import org.apache.spark.partial.{ApproximateEvaluator, PartialResult} import org.apache.spark.rdd._ @@ -471,7 +472,7 @@ class SparkContext(config: SparkConf) extends Logging { // Convert java options to env vars as a work around // since we can't set env vars directly in sbt. - for { (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing")) + for { (envKey, propKey) <- Seq(("SPARK_TESTING", IS_TESTING.key)) value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} { executorEnvs(envKey) = value } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 709a380dfb636..3c5648434fa66 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -45,6 +45,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config.{DRIVER_LOG_DFS_DIR, History} import org.apache.spark.internal.config.History._ import org.apache.spark.internal.config.Status._ +import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.io.CompressionCodec import org.apache.spark.scheduler._ import org.apache.spark.scheduler.ReplayListenerBus._ @@ -267,7 +268,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } // Disable the background thread during tests. - if (!conf.contains("spark.testing")) { + if (!conf.contains(IS_TESTING)) { // A task that periodically checks for event log updates on disk. logDebug(s"Scheduling update thread every $UPDATE_INTERVAL_S seconds") pool.scheduleWithFixedDelay( diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index d5ea2523c628b..e949b1d6d29e0 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -37,6 +37,7 @@ import org.apache.spark.deploy.ExternalShuffleService import org.apache.spark.deploy.master.{DriverState, Master} import org.apache.spark.deploy.worker.ui.WorkerWebUI import org.apache.spark.internal.{config, Logging} +import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.metrics.MetricsSystem import org.apache.spark.rpc._ import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, Utils} @@ -103,7 +104,7 @@ private[deploy] class Worker( private val CLEANUP_NON_SHUFFLE_FILES_ENABLED = conf.getBoolean("spark.storage.cleanupFilesAfterExecutorExit", true) - private val testing: Boolean = sys.props.contains("spark.testing") + private val testing: Boolean = sys.props.contains(IS_TESTING.key) private var master: Option[RpcEndpointRef] = None /** diff --git a/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala b/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala index af67f41e94af1..ebba1d6fd403a 100644 --- a/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala +++ b/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala @@ -28,6 +28,7 @@ import scala.util.Try import org.apache.spark.{SparkEnv, SparkException} import org.apache.spark.internal.{config, Logging} +import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.util.Utils @@ -43,7 +44,7 @@ private[spark] case class ProcfsMetrics( // project. private[spark] class ProcfsMetricsGetter(procfsDir: String = "/proc/") extends Logging { private val procfsStatFile = "stat" - private val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + private val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains(IS_TESTING.key) private val pageSize = computePageSize() private var isAvailable: Boolean = isProcfsAvailable private val pid = computePid() diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 85b2745a2aec4..ea79c7310349d 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -23,6 +23,7 @@ import scala.collection.mutable.{ArrayBuffer, LinkedHashMap} import org.apache.spark._ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.scheduler.AccumulableInfo import org.apache.spark.storage.{BlockId, BlockStatus} import org.apache.spark.util._ @@ -202,7 +203,7 @@ class TaskMetrics private[spark] () extends Serializable { } // Only used for test - private[spark] val testAccum = sys.props.get("spark.testing").map(_ => new LongAccumulator) + private[spark] val testAccum = sys.props.get(IS_TESTING.key).map(_ => new LongAccumulator) import InternalAccumulator._ diff --git a/core/src/main/scala/org/apache/spark/internal/config/Tests.scala b/core/src/main/scala/org/apache/spark/internal/config/Tests.scala new file mode 100644 index 0000000000000..62f2b3bcf989d --- /dev/null +++ b/core/src/main/scala/org/apache/spark/internal/config/Tests.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.config + +private[spark] object Tests { + val TEST_MEMORY = ConfigBuilder("spark.testing.memory") + .longConf + .createWithDefault(Runtime.getRuntime.maxMemory) + + val TEST_SCHEDULE_INTERVAL = + ConfigBuilder("spark.testing.dynamicAllocation.scheduleInterval") + .longConf + .createWithDefault(100) + + val IS_TESTING = ConfigBuilder("spark.testing") + .booleanConf + .createOptional + + val TEST_USE_COMPRESSED_OOPS = ConfigBuilder("spark.test.useCompressedOops") + .booleanConf + .createOptional + + val TEST_NO_STAGE_RETRY = ConfigBuilder("spark.test.noStageRetry") + .booleanConf + .createWithDefault(false) + + val TEST_RESERVED_MEMORY = ConfigBuilder("spark.testing.reservedMemory") + .longConf + .createOptional + + val TEST_N_HOSTS = ConfigBuilder("spark.testing.nHosts") + .intConf + .createWithDefault(5) + + val TEST_N_EXECUTORS_HOST = ConfigBuilder("spark.testing.nExecutorsPerHost") + .intConf + .createWithDefault(4) + + val TEST_N_CORES_EXECUTOR = ConfigBuilder("spark.testing.nCoresPerExecutor") + .intConf + .createWithDefault(2) +} diff --git a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala index a6f7db0600e60..3d1b711c587e9 100644 --- a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala @@ -18,6 +18,7 @@ package org.apache.spark.memory import org.apache.spark.SparkConf +import org.apache.spark.internal.config.Tests.TEST_MEMORY import org.apache.spark.storage.BlockId /** @@ -112,7 +113,7 @@ private[spark] object StaticMemoryManager { * Return the total amount of memory available for the storage region, in bytes. */ private def getMaxStorageMemory(conf: SparkConf): Long = { - val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory) + val systemMaxMemory = conf.get(TEST_MEMORY) val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6) val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9) (systemMaxMemory * memoryFraction * safetyFraction).toLong @@ -122,7 +123,7 @@ private[spark] object StaticMemoryManager { * Return the total amount of memory available for the execution region, in bytes. */ private def getMaxExecutionMemory(conf: SparkConf): Long = { - val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory) + val systemMaxMemory = conf.get(TEST_MEMORY) if (systemMaxMemory < MIN_MEMORY_BYTES) { throw new IllegalArgumentException(s"System memory $systemMaxMemory must " + diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala index 78edd2c4d7faa..13f1d9d9ce2d2 100644 --- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala @@ -18,6 +18,7 @@ package org.apache.spark.memory import org.apache.spark.SparkConf +import org.apache.spark.internal.config.Tests._ import org.apache.spark.storage.BlockId /** @@ -209,9 +210,9 @@ object UnifiedMemoryManager { * Return the total amount of memory shared between execution and storage, in bytes. */ private def getMaxMemory(conf: SparkConf): Long = { - val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory) - val reservedMemory = conf.getLong("spark.testing.reservedMemory", - if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES) + val systemMemory = conf.get(TEST_MEMORY) + val reservedMemory = conf.getLong(TEST_RESERVED_MEMORY.key, + if (conf.contains(IS_TESTING)) 0 else RESERVED_SYSTEM_MEMORY_BYTES) val minSystemMemory = (reservedMemory * 1.5).ceil.toLong if (systemMemory < minSystemMemory) { throw new IllegalArgumentException(s"System memory $systemMemory must " + diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 6f4c326442e1e..f6ade180ee25f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -38,6 +38,7 @@ import org.apache.spark.broadcast.Broadcast import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} import org.apache.spark.internal.Logging import org.apache.spark.internal.config +import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY import org.apache.spark.network.util.JavaUtils import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult} import org.apache.spark.rdd.{DeterministicLevel, RDD, RDDCheckpointData} @@ -186,7 +187,7 @@ private[spark] class DAGScheduler( private val closureSerializer = SparkEnv.get.closureSerializer.newInstance() /** If enabled, FetchFailed will not cause stage retry, in order to surface the problem. */ - private val disallowStageRetryForTest = sc.getConf.getBoolean("spark.test.noStageRetry", false) + private val disallowStageRetryForTest = sc.getConf.get(TEST_NO_STAGE_RETRY) /** * Whether to unregister all the outputs on the host in condition that we receive a FetchFailure, diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index f73a58ff5d48c..3e508314100ef 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -26,6 +26,7 @@ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.deploy.{ApplicationDescription, Command} import org.apache.spark.deploy.client.{StandaloneAppClient, StandaloneAppClientListener} import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle} import org.apache.spark.rpc.RpcEndpointAddress import org.apache.spark.scheduler._ @@ -90,7 +91,7 @@ private[spark] class StandaloneSchedulerBackend( // compute-classpath.{cmd,sh} and makes all needed jars available to child processes // when the assembly is built with the "*-provided" profiles enabled. val testingClassPath = - if (sys.props.contains("spark.testing")) { + if (sys.props.contains(IS_TESTING.key)) { sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq } else { Nil diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala index 3bfdf95db84c6..0a43f342567ec 100644 --- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala +++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala @@ -28,6 +28,7 @@ import com.google.common.collect.MapMaker import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.Tests.TEST_USE_COMPRESSED_OOPS import org.apache.spark.util.collection.OpenHashSet /** @@ -126,8 +127,8 @@ object SizeEstimator extends Logging { private def getIsCompressedOops: Boolean = { // This is only used by tests to override the detection of compressed oops. The test // actually uses a system property instead of a SparkConf, so we'll stick with that. - if (System.getProperty("spark.test.useCompressedOops") != null) { - return System.getProperty("spark.test.useCompressedOops").toBoolean + if (System.getProperty(TEST_USE_COMPRESSED_OOPS.key) != null) { + return System.getProperty(TEST_USE_COMPRESSED_OOPS.key).toBoolean } // java.vm.info provides compressed ref info for IBM JDKs diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index f322e92c6c8cb..7cc0d998d45fe 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -60,6 +60,7 @@ import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.{config, Logging} import org.apache.spark.internal.config._ +import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.launcher.SparkLauncher import org.apache.spark.network.util.JavaUtils import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance} @@ -1847,7 +1848,7 @@ private[spark] object Utils extends Logging { * Indicates whether Spark is currently running unit tests. */ def isTesting: Boolean = { - sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + sys.env.contains("SPARK_TESTING") || sys.props.contains(IS_TESTING.key) } /** @@ -2175,7 +2176,7 @@ private[spark] object Utils extends Logging { */ def portMaxRetries(conf: SparkConf): Int = { val maxRetries = conf.getOption("spark.port.maxRetries").map(_.toInt) - if (conf.contains("spark.testing")) { + if (conf.contains(IS_TESTING)) { // Set a higher number of retries for tests... maxRetries.getOrElse(100) } else { diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala index 4083b20c23594..21050e44414f5 100644 --- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala +++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala @@ -22,6 +22,7 @@ import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} import org.scalatest.time.{Millis, Span} import org.apache.spark.internal.config +import org.apache.spark.internal.config.Tests._ import org.apache.spark.security.EncryptionFunSuite import org.apache.spark.storage.{RDDBlockId, StorageLevel} import org.apache.spark.util.io.ChunkedByteBuffer @@ -217,7 +218,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex val size = 10000 val conf = new SparkConf() .set("spark.storage.unrollMemoryThreshold", "1024") - .set("spark.testing.memory", (size / 2).toString) + .set(TEST_MEMORY, size.toLong / 2) sc = new SparkContext(clusterUrl, "test", conf) val data = sc.parallelize(1 to size, 2).persist(StorageLevel.MEMORY_ONLY) assert(data.count() === size) @@ -233,7 +234,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex val numPartitions = 20 val conf = new SparkConf() .set("spark.storage.unrollMemoryThreshold", "1024") - .set("spark.testing.memory", size.toString) + .set(TEST_MEMORY, size.toLong) sc = new SparkContext(clusterUrl, "test", conf) val data = sc.parallelize(1 to size, numPartitions).persist(StorageLevel.MEMORY_ONLY) assert(data.count() === size) diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 5c718cb654ce8..3229e300a58fe 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -25,6 +25,7 @@ import org.scalatest.{BeforeAndAfter, PrivateMethodTester} import org.apache.spark.executor.TaskMetrics import org.apache.spark.internal.config +import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL import org.apache.spark.scheduler._ import org.apache.spark.scheduler.ExternalClusterManager import org.apache.spark.scheduler.cluster.ExecutorInfo @@ -1166,7 +1167,7 @@ class ExecutorAllocationManagerSuite .set("spark.dynamicAllocation.testing", "true") // SPARK-22864: effectively disable the allocation schedule by setting the period to a // really long value. - .set(TESTING_SCHEDULE_INTERVAL_KEY, "10000") + .set(TEST_SCHEDULE_INTERVAL, 10000L) val sc = new SparkContext(conf) contexts += sc sc diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala index 35f728cd57fe2..ffa70425ea367 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala @@ -23,6 +23,7 @@ import java.util.concurrent.{Callable, CyclicBarrier, Executors, ExecutorService import org.scalatest.Matchers import org.apache.spark.ShuffleSuite.NonJavaSerializableClass +import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY import org.apache.spark.memory.TaskMemoryManager import org.apache.spark.rdd.{CoGroupedRDD, OrderedRDDFunctions, RDD, ShuffledRDD, SubtractedRDD} import org.apache.spark.scheduler.{MapStatus, MyRDD, SparkListener, SparkListenerTaskEnd} @@ -37,7 +38,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC // Ensure that the DAGScheduler doesn't retry stages whose fetches fail, so that we accurately // test that the shuffle works (rather than retrying until all blocks are local to one Executor). - conf.set("spark.test.noStageRetry", "true") + conf.set(TEST_NO_STAGE_RETRY, true) test("groupByKey without compression") { val myConf = conf.clone().set("spark.shuffle.compress", "false") @@ -269,7 +270,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC } test("[SPARK-4085] rerun map stage if reduce stage cannot find its local shuffle file") { - val myConf = conf.clone().set("spark.test.noStageRetry", "false") + val myConf = conf.clone().set(TEST_NO_STAGE_RETRY, false) sc = new SparkContext("local", "test", myConf) val rdd = sc.parallelize(1 to 10, 2).map((_, 1)).reduceByKey(_ + _) rdd.count() diff --git a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala index dad24d7c01b8b..7d114b1b0c144 100644 --- a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala @@ -23,6 +23,7 @@ import java.io.File import org.scalatest.{BeforeAndAfterAll, FunSuite, Outcome} import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.util.{AccumulatorContext, Utils} /** @@ -59,7 +60,7 @@ abstract class SparkFunSuite protected val enableAutoThreadAudit = true protected override def beforeAll(): Unit = { - System.setProperty("spark.testing", "true") + System.setProperty(IS_TESTING.key, "true") if (enableAutoThreadAudit) { doThreadPreAudit() } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala index 6b479873f69f2..5903ae71ec66e 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala @@ -23,7 +23,7 @@ import com.google.common.io.Files import org.apache.spark._ import org.apache.spark.internal.config.History._ -import org.apache.spark.util.Utils +import org.apache.spark.internal.config.Tests._ class HistoryServerArgumentsSuite extends SparkFunSuite { @@ -31,14 +31,14 @@ class HistoryServerArgumentsSuite extends SparkFunSuite { private val conf = new SparkConf() .set(HISTORY_LOG_DIR, logDir.getAbsolutePath) .set(UPDATE_INTERVAL_S, 1L) - .set("spark.testing", "true") + .set(IS_TESTING, true) test("No Arguments Parsing") { val argStrings = Array.empty[String] val hsa = new HistoryServerArguments(conf, argStrings) assert(conf.get(HISTORY_LOG_DIR) === logDir.getAbsolutePath) assert(conf.get(UPDATE_INTERVAL_S) === 1L) - assert(conf.get("spark.testing") === "true") + assert(conf.get(IS_TESTING).getOrElse(false)) } test("Properties File Arguments Parsing --properties-file") { diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala index a9dee67ae9383..54236224bfb48 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala @@ -46,6 +46,7 @@ import org.scalatest.selenium.WebBrowser import org.apache.spark._ import org.apache.spark.internal.config.History._ +import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.status.api.v1.ApplicationInfo import org.apache.spark.status.api.v1.JobData import org.apache.spark.ui.SparkUI @@ -80,7 +81,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers val conf = new SparkConf() .set(HISTORY_LOG_DIR, logDir) .set(UPDATE_INTERVAL_S.key, "0") - .set("spark.testing", "true") + .set(IS_TESTING, true) .set(LOCAL_STORE_DIR, storeDir.getAbsolutePath()) .set("spark.eventLog.logStageExecutorMetrics.enabled", "true") .set("spark.eventLog.logStageExecutorProcessTreeMetrics.enabled", "true") @@ -399,7 +400,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers */ test("security manager starts with spark.authenticate set") { val conf = new SparkConf() - .set("spark.testing", "true") + .set(IS_TESTING, true) .set(SecurityManager.SPARK_AUTH_CONF, "true") HistoryServer.createSecurityManager(conf) } @@ -421,7 +422,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers .set(UPDATE_INTERVAL_S.key, "1s") .set("spark.eventLog.enabled", "true") .set(LOCAL_STORE_DIR, storeDir.getAbsolutePath()) - .remove("spark.testing") + .remove(IS_TESTING) val provider = new FsHistoryProvider(myConf) val securityManager = HistoryServer.createSecurityManager(myConf) diff --git a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala index 0f32fe4059fbb..c3275add50f48 100644 --- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala @@ -21,6 +21,7 @@ import org.mockito.Mockito.when import org.apache.spark.SparkConf import org.apache.spark.internal.config.MEMORY_OFFHEAP_SIZE +import org.apache.spark.internal.config.Tests.TEST_MEMORY import org.apache.spark.storage.TestBlockId import org.apache.spark.storage.memory.MemoryStore @@ -48,8 +49,8 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite { new StaticMemoryManager( conf.clone .set("spark.memory.fraction", "1") - .set("spark.testing.memory", maxOnHeapExecutionMemory.toString) - .set(MEMORY_OFFHEAP_SIZE.key, maxOffHeapExecutionMemory.toString), + .set(TEST_MEMORY, maxOnHeapExecutionMemory) + .set(MEMORY_OFFHEAP_SIZE, maxOffHeapExecutionMemory), maxOnHeapExecutionMemory = maxOnHeapExecutionMemory, maxOnHeapStorageMemory = 0, numCores = 1) diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala index d56cfc183d921..6b94b0f66023a 100644 --- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala @@ -21,6 +21,7 @@ import org.scalatest.PrivateMethodTester import org.apache.spark.SparkConf import org.apache.spark.internal.config._ +import org.apache.spark.internal.config.Tests._ import org.apache.spark.storage.TestBlockId import org.apache.spark.storage.memory.MemoryStore @@ -42,9 +43,8 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes maxOnHeapExecutionMemory: Long, maxOffHeapExecutionMemory: Long): UnifiedMemoryManager = { val conf = new SparkConf() - .set("spark.memory.fraction", "1") - .set("spark.testing.memory", maxOnHeapExecutionMemory.toString) - .set(MEMORY_OFFHEAP_SIZE.key, maxOffHeapExecutionMemory.toString) + .set(TEST_MEMORY, maxOnHeapExecutionMemory) + .set(MEMORY_OFFHEAP_SIZE, maxOffHeapExecutionMemory) .set("spark.memory.storageFraction", storageFraction.toString) UnifiedMemoryManager(conf, numCores = 1) } @@ -218,19 +218,19 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes } test("small heap") { - val systemMemory = 1024 * 1024 - val reservedMemory = 300 * 1024 + val systemMemory = 1024L * 1024 + val reservedMemory = 300L * 1024 val memoryFraction = 0.8 val conf = new SparkConf() .set("spark.memory.fraction", memoryFraction.toString) - .set("spark.testing.memory", systemMemory.toString) - .set("spark.testing.reservedMemory", reservedMemory.toString) + .set(TEST_MEMORY, systemMemory) + .set(TEST_RESERVED_MEMORY, reservedMemory) val mm = UnifiedMemoryManager(conf, numCores = 1) val expectedMaxMemory = ((systemMemory - reservedMemory) * memoryFraction).toLong assert(mm.maxHeapMemory === expectedMaxMemory) // Try using a system memory that's too small - val conf2 = conf.clone().set("spark.testing.memory", (reservedMemory / 2).toString) + val conf2 = conf.clone().set(TEST_MEMORY, reservedMemory / 2) val exception = intercept[IllegalArgumentException] { UnifiedMemoryManager(conf2, numCores = 1) } @@ -238,13 +238,13 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes } test("insufficient executor memory") { - val systemMemory = 1024 * 1024 - val reservedMemory = 300 * 1024 + val systemMemory = 1024L * 1024 + val reservedMemory = 300L * 1024 val memoryFraction = 0.8 val conf = new SparkConf() .set("spark.memory.fraction", memoryFraction.toString) - .set("spark.testing.memory", systemMemory.toString) - .set("spark.testing.reservedMemory", reservedMemory.toString) + .set(TEST_MEMORY, systemMemory) + .set(TEST_RESERVED_MEMORY, reservedMemory) val mm = UnifiedMemoryManager(conf, numCores = 1) // Try using an executor memory that's too small @@ -259,7 +259,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes val conf = new SparkConf() .set("spark.memory.fraction", "1") .set("spark.memory.storageFraction", "0") - .set("spark.testing.memory", "1000") + .set(TEST_MEMORY, 1000L) val mm = UnifiedMemoryManager(conf, numCores = 2) val ms = makeMemoryStore(mm) val memoryMode = MemoryMode.ON_HEAP @@ -285,7 +285,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes val conf = new SparkConf() .set("spark.memory.fraction", "1") .set("spark.memory.storageFraction", "0") - .set("spark.testing.memory", "1000") + .set(TEST_MEMORY, 1000L) val mm = UnifiedMemoryManager(conf, numCores = 2) makeBadMemoryStore(mm) val memoryMode = MemoryMode.ON_HEAP @@ -306,9 +306,9 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes test("not enough free memory in the storage pool --OFF_HEAP") { val conf = new SparkConf() - .set(MEMORY_OFFHEAP_SIZE.key, "1000") - .set("spark.testing.memory", "1000") - .set(MEMORY_OFFHEAP_ENABLED.key, "true") + .set(MEMORY_OFFHEAP_SIZE, 1000L) + .set(TEST_MEMORY, 1000L) + .set(MEMORY_OFFHEAP_ENABLED, true) val taskAttemptId = 0L val mm = UnifiedMemoryManager(conf, numCores = 1) val ms = makeMemoryStore(mm) diff --git a/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala index 36dd620a56853..112fd31a060e6 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.scheduler import scala.util.Random import org.apache.spark._ +import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext { @@ -76,7 +77,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext { test("throw exception on barrier() call timeout") { val conf = new SparkConf() .set("spark.barrier.sync.timeout", "1") - .set("spark.test.noStageRetry", "true") + .set(TEST_NO_STAGE_RETRY, true) .setMaster("local-cluster[4, 1, 1024]") .setAppName("test-cluster") sc = new SparkContext(conf) @@ -101,7 +102,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext { test("throw exception if barrier() call doesn't happen on every task") { val conf = new SparkConf() .set("spark.barrier.sync.timeout", "1") - .set("spark.test.noStageRetry", "true") + .set(TEST_NO_STAGE_RETRY, true) .setMaster("local-cluster[4, 1, 1024]") .setAppName("test-cluster") sc = new SparkContext(conf) @@ -124,7 +125,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext { test("throw exception if the number of barrier() calls are not the same on every task") { val conf = new SparkConf() .set("spark.barrier.sync.timeout", "1") - .set("spark.test.noStageRetry", "true") + .set(TEST_NO_STAGE_RETRY, true) .setMaster("local-cluster[4, 1, 1024]") .setAppName("test-cluster") sc = new SparkContext(conf) diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala index 29bb8232f44f5..2215f7f366213 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala @@ -20,6 +20,7 @@ import scala.concurrent.duration._ import org.apache.spark._ import org.apache.spark.internal.config +import org.apache.spark.internal.config.Tests._ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorMockBackend]{ @@ -58,9 +59,9 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM extraConfs = Seq( config.BLACKLIST_ENABLED.key -> "true", config.MAX_TASK_FAILURES.key -> "4", - "spark.testing.nHosts" -> "2", - "spark.testing.nExecutorsPerHost" -> "5", - "spark.testing.nCoresPerExecutor" -> "10" + TEST_N_HOSTS.key -> "2", + TEST_N_EXECUTORS_HOST.key -> "5", + TEST_N_CORES_EXECUTOR.key -> "10" ) ) { // To reliably reproduce the failure that would occur without blacklisting, we have to use 1 @@ -102,9 +103,9 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM "SPARK-15865 Progress with fewer executors than maxTaskFailures", extraConfs = Seq( config.BLACKLIST_ENABLED.key -> "true", - "spark.testing.nHosts" -> "2", - "spark.testing.nExecutorsPerHost" -> "1", - "spark.testing.nCoresPerExecutor" -> "1", + TEST_N_HOSTS.key -> "2", + TEST_N_EXECUTORS_HOST.key -> "1", + TEST_N_CORES_EXECUTOR.key -> "1", "spark.scheduler.blacklist.unschedulableTaskSetTimeout" -> "0s" ) ) { @@ -129,9 +130,9 @@ class MultiExecutorMockBackend( conf: SparkConf, taskScheduler: TaskSchedulerImpl) extends MockBackend(conf, taskScheduler) { - val nHosts = conf.getInt("spark.testing.nHosts", 5) - val nExecutorsPerHost = conf.getInt("spark.testing.nExecutorsPerHost", 4) - val nCoresPerExecutor = conf.getInt("spark.testing.nCoresPerExecutor", 2) + val nHosts = conf.get(TEST_N_HOSTS) + val nExecutorsPerHost = conf.get(TEST_N_EXECUTORS_HOST) + val nCoresPerExecutor = conf.get(TEST_N_CORES_EXECUTOR) override val executorIdToExecutor: Map[String, ExecutorTaskStatus] = { (0 until nHosts).flatMap { hostIdx => diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala index b9f0e873375b0..43621cb85762c 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala @@ -24,6 +24,7 @@ import org.scalatest.mockito.MockitoSugar import org.apache.spark._ import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics} +import org.apache.spark.internal.config.Tests._ import org.apache.spark.memory._ import org.apache.spark.unsafe.Platform @@ -33,8 +34,8 @@ class ShuffleExternalSorterSuite extends SparkFunSuite with LocalSparkContext wi val conf = new SparkConf() .setMaster("local[1]") .setAppName("ShuffleExternalSorterSuite") - .set("spark.testing", "true") - .set("spark.testing.memory", "1600") + .set(IS_TESTING, true) + .set(TEST_MEMORY, 1600L) .set("spark.memory.fraction", "1") sc = new SparkContext(conf) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index 3962bdc27d22c..357981171028d 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -32,6 +32,7 @@ import org.apache.spark._ import org.apache.spark.broadcast.BroadcastManager import org.apache.spark.internal.Logging import org.apache.spark.internal.config.MEMORY_OFFHEAP_SIZE +import org.apache.spark.internal.config.Tests._ import org.apache.spark.memory.UnifiedMemoryManager import org.apache.spark.network.BlockTransferService import org.apache.spark.network.netty.NettyBlockTransferService @@ -69,8 +70,8 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite protected def makeBlockManager( maxMem: Long, name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = { - conf.set("spark.testing.memory", maxMem.toString) - conf.set(MEMORY_OFFHEAP_SIZE.key, maxMem.toString) + conf.set(TEST_MEMORY, maxMem) + conf.set(MEMORY_OFFHEAP_SIZE, maxMem) val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1) val memManager = UnifiedMemoryManager(conf, numCores = 1) val serializerManager = new SerializerManager(serializer, conf) @@ -87,7 +88,7 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite conf.set("spark.authenticate", "false") conf.set("spark.driver.port", rpcEnv.address.port.toString) - conf.set("spark.testing", "true") + conf.set(IS_TESTING, true) conf.set("spark.memory.fraction", "1") conf.set("spark.memory.storageFraction", "1") conf.set("spark.storage.unrollFraction", "0.4") @@ -233,7 +234,7 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite val failableTransfer = mock(classOf[BlockTransferService]) // this wont actually work when(failableTransfer.hostName).thenReturn("some-hostname") when(failableTransfer.port).thenReturn(1000) - conf.set("spark.testing.memory", "10000") + conf.set(TEST_MEMORY, 10000L) val memManager = UnifiedMemoryManager(conf, numCores = 1) val serializerManager = new SerializerManager(serializer, conf) val failableStore = new BlockManager("failable-store", rpcEnv, master, serializerManager, conf, diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index cf00c1c3aad39..32dc4586cf8b8 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -37,6 +37,7 @@ import org.apache.spark._ import org.apache.spark.broadcast.BroadcastManager import org.apache.spark.executor.DataReadMethod import org.apache.spark.internal.config._ +import org.apache.spark.internal.config.Tests._ import org.apache.spark.memory.UnifiedMemoryManager import org.apache.spark.network.{BlockDataManager, BlockTransferService, TransportContext} import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} @@ -89,8 +90,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE transferService: Option[BlockTransferService] = Option.empty, testConf: Option[SparkConf] = None): BlockManager = { val bmConf = testConf.map(_.setAll(conf.getAll)).getOrElse(conf) - bmConf.set("spark.testing.memory", maxMem.toString) - bmConf.set(MEMORY_OFFHEAP_SIZE.key, maxMem.toString) + bmConf.set(TEST_MEMORY, maxMem) + bmConf.set(MEMORY_OFFHEAP_SIZE, maxMem) val serializer = new KryoSerializer(bmConf) val encryptionKey = if (bmConf.get(IO_ENCRYPTION_ENABLED)) { Some(CryptoStreamUtils.createKey(bmConf)) @@ -115,11 +116,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE System.setProperty("os.arch", "amd64") conf = new SparkConf(false) .set("spark.app.id", "test") - .set("spark.testing", "true") + .set(IS_TESTING, true) .set("spark.memory.fraction", "1") .set("spark.memory.storageFraction", "1") .set("spark.kryoserializer.buffer", "1m") - .set("spark.test.useCompressedOops", "true") + .set(TEST_USE_COMPRESSED_OOPS, true) .set("spark.storage.unrollFraction", "0.4") .set("spark.storage.unrollMemoryThreshold", "512") @@ -901,7 +902,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE test("block store put failure") { // Use Java serializer so we can create an unserializable error. - conf.set("spark.testing.memory", "1200") + conf.set(TEST_MEMORY, 1200L) val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1) val memoryManager = UnifiedMemoryManager(conf, numCores = 1) val serializerManager = new SerializerManager(new JavaSerializer(conf), conf) diff --git a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala index 7274072e5049a..2d3a8c4791848 100644 --- a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala @@ -26,6 +26,7 @@ import scala.reflect.ClassTag import org.scalatest._ import org.apache.spark._ +import org.apache.spark.internal.config.Tests.TEST_USE_COMPRESSED_OOPS import org.apache.spark.memory.{MemoryMode, StaticMemoryManager} import org.apache.spark.serializer.{KryoSerializer, SerializerManager} import org.apache.spark.storage.memory.{BlockEvictionHandler, MemoryStore, PartiallySerializedBlock, PartiallyUnrolledIterator} @@ -39,7 +40,7 @@ class MemoryStoreSuite with ResetSystemProperties { var conf: SparkConf = new SparkConf(false) - .set("spark.test.useCompressedOops", "true") + .set(TEST_USE_COMPRESSED_OOPS, true) .set("spark.storage.unrollFraction", "0.4") .set("spark.storage.unrollMemoryThreshold", "512") diff --git a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala index 63f9f82adf3e0..c149b3ab094d8 100644 --- a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala @@ -22,6 +22,7 @@ import scala.collection.mutable.ArrayBuffer import org.scalatest.{BeforeAndAfterEach, PrivateMethodTester} import org.apache.spark.SparkFunSuite +import org.apache.spark.internal.config.Tests.TEST_USE_COMPRESSED_OOPS class DummyClass1 {} @@ -76,7 +77,7 @@ class SizeEstimatorSuite // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case super.beforeEach() System.setProperty("os.arch", "amd64") - System.setProperty("spark.test.useCompressedOops", "true") + System.setProperty(TEST_USE_COMPRESSED_OOPS.key, "true") } override def afterEach(): Unit = { @@ -192,7 +193,7 @@ class SizeEstimatorSuite // (Sun vs IBM). Use a DummyString class to make tests deterministic. test("64-bit arch with no compressed oops") { System.setProperty("os.arch", "amd64") - System.setProperty("spark.test.useCompressedOops", "false") + System.setProperty(TEST_USE_COMPRESSED_OOPS.key, "false") val initialize = PrivateMethod[Unit]('initialize) SizeEstimator invokePrivate initialize() diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala index 35fba1a3b73c6..6211399005e1a 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala @@ -27,6 +27,7 @@ import org.scalatest.concurrent.Eventually import org.apache.spark._ import org.apache.spark.internal.config._ +import org.apache.spark.internal.config.Tests.TEST_MEMORY import org.apache.spark.io.CompressionCodec import org.apache.spark.memory.MemoryTestingUtils import org.apache.spark.util.CompletionIterator @@ -552,7 +553,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite val conf = createSparkConf(loadDefaults = false) .set("spark.shuffle.memoryFraction", "0.01") .set("spark.memory.useLegacyMode", "true") - .set("spark.testing.memory", "100000000") + .set(TEST_MEMORY, 100000000L) .set("spark.shuffle.sort.bypassMergeThreshold", "0") sc = new SparkContext("local", "test", conf) val N = 2e5.toInt diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala index 47173b89e91e2..aa400dd74e9ca 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala @@ -23,6 +23,7 @@ import scala.collection.mutable.ArrayBuffer import scala.util.Random import org.apache.spark._ +import org.apache.spark.internal.config.Tests.TEST_MEMORY import org.apache.spark.memory.MemoryTestingUtils import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} import org.apache.spark.unsafe.array.LongArray @@ -639,7 +640,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { val conf = createSparkConf(loadDefaults = false, kryo = false) .set("spark.shuffle.memoryFraction", "0.01") .set("spark.memory.useLegacyMode", "true") - .set("spark.testing.memory", "100000000") + .set(TEST_MEMORY, 100000000L) .set("spark.shuffle.sort.bypassMergeThreshold", "0") sc = new SparkContext("local", "test", conf) val N = 2e5.toInt diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala index c0b435efb8c9c..cc89683949010 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala @@ -27,6 +27,7 @@ import org.scalatest.concurrent.Eventually import org.apache.spark.deploy.k8s.integrationtest.TestConstants._ import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.Tests.IS_TESTING private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesClient) { @@ -67,7 +68,7 @@ private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesCl .set("spark.executors.instances", "1") .set("spark.app.name", "spark-test-app") .set("spark.ui.enabled", "true") - .set("spark.testing", "false") + .set(IS_TESTING, false) .set("spark.kubernetes.submission.waitAppCompletion", "false") .set("spark.kubernetes.authenticate.driver.serviceAccountName", serviceAccountName) } diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index f5866651dc90b..5d8165cec8dfb 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -34,6 +34,7 @@ import org.apache.spark.deploy.mesos.config._ import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.internal.config import org.apache.spark.internal.config.EXECUTOR_HEARTBEAT_INTERVAL +import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle} import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient @@ -300,7 +301,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( } protected def driverURL: String = { - if (conf.contains("spark.testing")) { + if (conf.contains(IS_TESTING)) { "driverURL" } else { RpcEndpointAddress( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index dda7cb55f5395..5b38fe5c46bbb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicLong -import org.apache.spark.SparkContext +import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart} @@ -38,7 +38,7 @@ object SQLExecution { executionIdToQueryExecution.get(executionId) } - private val testing = sys.props.contains("spark.testing") + private val testing = sys.props.contains(IS_TESTING.key) private[sql] def checkSQLExecutionId(sparkSession: SparkSession): Unit = { val sc = sparkSession.sparkContext diff --git a/sql/core/src/test/scala/org/apache/spark/sql/BenchmarkQueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/BenchmarkQueryTest.scala index d95794d624033..c37d663941d8d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/BenchmarkQueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/BenchmarkQueryTest.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql import org.scalatest.BeforeAndAfterAll +import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.sql.catalyst.expressions.codegen.{CodeFormatter, CodeGenerator} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.execution.{SparkPlan, WholeStageCodegenExec} @@ -29,7 +30,7 @@ abstract class BenchmarkQueryTest extends QueryTest with SharedSQLContext with B // When Utils.isTesting is true, the RuleExecutor will issue an exception when hitting // the max iteration of analyzer/optimizer batches. - assert(Utils.isTesting, "spark.testing is not set to true") + assert(Utils.isTesting, s"${IS_TESTING.key} is not set to true") /** * Drop all the tables diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala index ca8692290edb2..963e42517b441 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala @@ -21,6 +21,7 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File} import java.util.Properties import org.apache.spark._ +import org.apache.spark.internal.config.Tests.TEST_MEMORY import org.apache.spark.memory.TaskMemoryManager import org.apache.spark.rdd.RDD import org.apache.spark.sql.{LocalSparkSession, Row, SparkSession} @@ -99,7 +100,7 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkSession { val conf = new SparkConf() .set("spark.shuffle.spill.initialMemoryThreshold", "1") .set("spark.shuffle.sort.bypassMergeThreshold", "0") - .set("spark.testing.memory", "80000") + .set(TEST_MEMORY, 80000L) spark = SparkSession.builder().master("local").appName("test").config(conf).getOrCreate() val outputFile = File.createTempFile("test-unsafe-row-serializer-spill", "") outputFile.deleteOnExit()