Skip to content

Commit

Permalink
[SPARK-26491][CORE][TEST] Use ConfigEntry for hardcoded configs for t…
Browse files Browse the repository at this point in the history
…est categories

## What changes were proposed in this pull request?

The PR makes hardcoded `spark.test` and `spark.testing` configs to use `ConfigEntry` and put them in the config package.

## How was this patch tested?

existing UTs

Closes #23413 from mgaido91/SPARK-26491.

Authored-by: Marco Gaido <marcogaido91@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
  • Loading branch information
mgaido91 authored and Marcelo Vanzin committed Jan 7, 2019
1 parent 98be895 commit 1a64152
Show file tree
Hide file tree
Showing 35 changed files with 165 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import com.codahale.metrics.{Gauge, MetricRegistry}

import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL
import org.apache.spark.metrics.source.Source
import org.apache.spark.scheduler._
import org.apache.spark.storage.BlockManagerMaster
Expand Down Expand Up @@ -157,7 +158,7 @@ private[spark] class ExecutorAllocationManager(

// Polling loop interval (ms)
private val intervalMillis: Long = if (Utils.isTesting) {
conf.getLong(TESTING_SCHEDULE_INTERVAL_KEY, 100)
conf.get(TEST_SCHEDULE_INTERVAL)
} else {
100
}
Expand Down Expand Up @@ -899,5 +900,4 @@ private[spark] class ExecutorAllocationManager(

private object ExecutorAllocationManager {
val NOT_SET = Long.MaxValue
val TESTING_SCHEDULE_INTERVAL_KEY = "spark.testing.dynamicAllocation.scheduleInterval"
}
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.input.{FixedLengthBinaryInputFormat, PortableDataStream, StreamInputFormat, WholeTextFileInputFormat}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Tests._
import org.apache.spark.io.CompressionCodec
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
Expand Down Expand Up @@ -470,7 +471,7 @@ class SparkContext(config: SparkConf) extends Logging {

// Convert java options to env vars as a work around
// since we can't set env vars directly in sbt.
for { (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing"))
for { (envKey, propKey) <- Seq(("SPARK_TESTING", IS_TESTING.key))
value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {
executorEnvs(envKey) = value
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{DRIVER_LOG_DFS_DIR, History}
import org.apache.spark.internal.config.History._
import org.apache.spark.internal.config.Status._
import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.io.CompressionCodec
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.ReplayListenerBus._
Expand Down Expand Up @@ -267,7 +268,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}

// Disable the background thread during tests.
if (!conf.contains("spark.testing")) {
if (!conf.contains(IS_TESTING)) {
// A task that periodically checks for event log updates on disk.
logDebug(s"Scheduling update thread every $UPDATE_INTERVAL_S seconds")
pool.scheduleWithFixedDelay(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.spark.deploy.ExternalShuffleService
import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.deploy.worker.ui.WorkerWebUI
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.rpc._
import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, Utils}
Expand Down Expand Up @@ -103,7 +104,6 @@ private[deploy] class Worker(
private val CLEANUP_NON_SHUFFLE_FILES_ENABLED =
conf.getBoolean("spark.storage.cleanupFilesAfterExecutorExit", true)

private val testing: Boolean = sys.props.contains("spark.testing")
private var master: Option[RpcEndpointRef] = None

/**
Expand All @@ -127,7 +127,7 @@ private[deploy] class Worker(
private var connected = false
private val workerId = generateWorkerId()
private val sparkHome =
if (testing) {
if (sys.props.contains(IS_TESTING.key)) {
assert(sys.props.contains("spark.test.home"), "spark.test.home is not set!")
new File(sys.props("spark.test.home"))
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ private[spark] case class ProcfsMetrics(
// project.
private[spark] class ProcfsMetricsGetter(procfsDir: String = "/proc/") extends Logging {
private val procfsStatFile = "stat"
private val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing")
private val testing = Utils.isTesting
private val pageSize = computePageSize()
private var isAvailable: Boolean = isProcfsAvailable
private val pid = computePid()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.collection.mutable.{ArrayBuffer, LinkedHashMap}
import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.scheduler.AccumulableInfo
import org.apache.spark.storage.{BlockId, BlockStatus}
import org.apache.spark.util._
Expand Down Expand Up @@ -202,7 +203,7 @@ class TaskMetrics private[spark] () extends Serializable {
}

// Only used for test
private[spark] val testAccum = sys.props.get("spark.testing").map(_ => new LongAccumulator)
private[spark] val testAccum = sys.props.get(IS_TESTING.key).map(_ => new LongAccumulator)


import InternalAccumulator._
Expand Down
56 changes: 56 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/Tests.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.internal.config

private[spark] object Tests {

val TEST_USE_COMPRESSED_OOPS_KEY = "spark.test.useCompressedOops"

val TEST_MEMORY = ConfigBuilder("spark.testing.memory")
.longConf
.createWithDefault(Runtime.getRuntime.maxMemory)

val TEST_SCHEDULE_INTERVAL =
ConfigBuilder("spark.testing.dynamicAllocation.scheduleInterval")
.longConf
.createWithDefault(100)

val IS_TESTING = ConfigBuilder("spark.testing")
.booleanConf
.createOptional

val TEST_NO_STAGE_RETRY = ConfigBuilder("spark.test.noStageRetry")
.booleanConf
.createWithDefault(false)

val TEST_RESERVED_MEMORY = ConfigBuilder("spark.testing.reservedMemory")
.longConf
.createOptional

val TEST_N_HOSTS = ConfigBuilder("spark.testing.nHosts")
.intConf
.createWithDefault(5)

val TEST_N_EXECUTORS_HOST = ConfigBuilder("spark.testing.nExecutorsPerHost")
.intConf
.createWithDefault(4)

val TEST_N_CORES_EXECUTOR = ConfigBuilder("spark.testing.nCoresPerExecutor")
.intConf
.createWithDefault(2)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.memory

import org.apache.spark.SparkConf
import org.apache.spark.internal.config
import org.apache.spark.internal.config.Tests.TEST_MEMORY
import org.apache.spark.storage.BlockId

/**
Expand Down Expand Up @@ -120,7 +121,7 @@ private[spark] object StaticMemoryManager {
* Return the total amount of memory available for the storage region, in bytes.
*/
private def getMaxStorageMemory(conf: SparkConf): Long = {
val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
val systemMaxMemory = conf.get(TEST_MEMORY)
val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)
val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9)
(systemMaxMemory * memoryFraction * safetyFraction).toLong
Expand All @@ -130,7 +131,7 @@ private[spark] object StaticMemoryManager {
* Return the total amount of memory available for the execution region, in bytes.
*/
private def getMaxExecutionMemory(conf: SparkConf): Long = {
val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
val systemMaxMemory = conf.get(TEST_MEMORY)

if (systemMaxMemory < MIN_MEMORY_BYTES) {
throw new IllegalArgumentException(s"System memory $systemMaxMemory must " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.memory

import org.apache.spark.SparkConf
import org.apache.spark.internal.config
import org.apache.spark.internal.config.Tests._
import org.apache.spark.storage.BlockId

/**
Expand Down Expand Up @@ -210,9 +211,9 @@ object UnifiedMemoryManager {
* Return the total amount of memory shared between execution and storage, in bytes.
*/
private def getMaxMemory(conf: SparkConf): Long = {
val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
val reservedMemory = conf.getLong("spark.testing.reservedMemory",
if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
val systemMemory = conf.get(TEST_MEMORY)
val reservedMemory = conf.getLong(TEST_RESERVED_MEMORY.key,
if (conf.contains(IS_TESTING)) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
val minSystemMemory = (reservedMemory * 1.5).ceil.toLong
if (systemMemory < minSystemMemory) {
throw new IllegalArgumentException(s"System memory $systemMemory must " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config
import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd.{DeterministicLevel, RDD, RDDCheckpointData}
Expand Down Expand Up @@ -186,7 +187,7 @@ private[spark] class DAGScheduler(
private val closureSerializer = SparkEnv.get.closureSerializer.newInstance()

/** If enabled, FetchFailed will not cause stage retry, in order to surface the problem. */
private val disallowStageRetryForTest = sc.getConf.getBoolean("spark.test.noStageRetry", false)
private val disallowStageRetryForTest = sc.getConf.get(TEST_NO_STAGE_RETRY)

/**
* Whether to unregister all the outputs on the host in condition that we receive a FetchFailure,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.deploy.{ApplicationDescription, Command}
import org.apache.spark.deploy.client.{StandaloneAppClient, StandaloneAppClientListener}
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
import org.apache.spark.rpc.RpcEndpointAddress
import org.apache.spark.scheduler._
Expand Down Expand Up @@ -90,7 +91,7 @@ private[spark] class StandaloneSchedulerBackend(
// compute-classpath.{cmd,sh} and makes all needed jars available to child processes
// when the assembly is built with the "*-provided" profiles enabled.
val testingClassPath =
if (sys.props.contains("spark.testing")) {
if (sys.props.contains(IS_TESTING.key)) {
sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq
} else {
Nil
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import com.google.common.collect.MapMaker

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Tests.TEST_USE_COMPRESSED_OOPS_KEY
import org.apache.spark.util.collection.OpenHashSet

/**
Expand Down Expand Up @@ -126,8 +127,8 @@ object SizeEstimator extends Logging {
private def getIsCompressedOops: Boolean = {
// This is only used by tests to override the detection of compressed oops. The test
// actually uses a system property instead of a SparkConf, so we'll stick with that.
if (System.getProperty("spark.test.useCompressedOops") != null) {
return System.getProperty("spark.test.useCompressedOops").toBoolean
if (System.getProperty(TEST_USE_COMPRESSED_OOPS_KEY) != null) {
return System.getProperty(TEST_USE_COMPRESSED_OOPS_KEY).toBoolean
}

// java.vm.info provides compressed ref info for IBM JDKs
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
Expand Down Expand Up @@ -1847,7 +1848,7 @@ private[spark] object Utils extends Logging {
* Indicates whether Spark is currently running unit tests.
*/
def isTesting: Boolean = {
sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing")
sys.env.contains("SPARK_TESTING") || sys.props.contains(IS_TESTING.key)
}

/**
Expand Down Expand Up @@ -2175,7 +2176,7 @@ private[spark] object Utils extends Logging {
*/
def portMaxRetries(conf: SparkConf): Int = {
val maxRetries = conf.getOption("spark.port.maxRetries").map(_.toInt)
if (conf.contains("spark.testing")) {
if (conf.contains(IS_TESTING)) {
// Set a higher number of retries for tests...
maxRetries.getOrElse(100)
} else {
Expand Down
5 changes: 3 additions & 2 deletions core/src/test/scala/org/apache/spark/DistributedSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
import org.scalatest.time.{Millis, Span}

import org.apache.spark.internal.config
import org.apache.spark.internal.config.Tests._
import org.apache.spark.security.EncryptionFunSuite
import org.apache.spark.storage.{RDDBlockId, StorageLevel}
import org.apache.spark.util.io.ChunkedByteBuffer
Expand Down Expand Up @@ -217,7 +218,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
val size = 10000
val conf = new SparkConf()
.set("spark.storage.unrollMemoryThreshold", "1024")
.set("spark.testing.memory", (size / 2).toString)
.set(TEST_MEMORY, size.toLong / 2)
sc = new SparkContext(clusterUrl, "test", conf)
val data = sc.parallelize(1 to size, 2).persist(StorageLevel.MEMORY_ONLY)
assert(data.count() === size)
Expand All @@ -233,7 +234,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
val numPartitions = 20
val conf = new SparkConf()
.set("spark.storage.unrollMemoryThreshold", "1024")
.set("spark.testing.memory", size.toString)
.set(TEST_MEMORY, size.toLong)
sc = new SparkContext(clusterUrl, "test", conf)
val data = sc.parallelize(1 to size, numPartitions).persist(StorageLevel.MEMORY_ONLY)
assert(data.count() === size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.scalatest.{BeforeAndAfter, PrivateMethodTester}

import org.apache.spark.executor.TaskMetrics
import org.apache.spark.internal.config
import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.ExternalClusterManager
import org.apache.spark.scheduler.cluster.ExecutorInfo
Expand Down Expand Up @@ -1166,7 +1167,7 @@ class ExecutorAllocationManagerSuite
.set("spark.dynamicAllocation.testing", "true")
// SPARK-22864: effectively disable the allocation schedule by setting the period to a
// really long value.
.set(TESTING_SCHEDULE_INTERVAL_KEY, "10000")
.set(TEST_SCHEDULE_INTERVAL, 10000L)
val sc = new SparkContext(conf)
contexts += sc
sc
Expand Down
5 changes: 3 additions & 2 deletions core/src/test/scala/org/apache/spark/ShuffleSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.util.concurrent.{Callable, CyclicBarrier, Executors, ExecutorService
import org.scalatest.Matchers

import org.apache.spark.ShuffleSuite.NonJavaSerializableClass
import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY
import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.rdd.{CoGroupedRDD, OrderedRDDFunctions, RDD, ShuffledRDD, SubtractedRDD}
import org.apache.spark.scheduler.{MapStatus, MyRDD, SparkListener, SparkListenerTaskEnd}
Expand All @@ -37,7 +38,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC

// Ensure that the DAGScheduler doesn't retry stages whose fetches fail, so that we accurately
// test that the shuffle works (rather than retrying until all blocks are local to one Executor).
conf.set("spark.test.noStageRetry", "true")
conf.set(TEST_NO_STAGE_RETRY, true)

test("groupByKey without compression") {
val myConf = conf.clone().set("spark.shuffle.compress", "false")
Expand Down Expand Up @@ -269,7 +270,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
}

test("[SPARK-4085] rerun map stage if reduce stage cannot find its local shuffle file") {
val myConf = conf.clone().set("spark.test.noStageRetry", "false")
val myConf = conf.clone().set(TEST_NO_STAGE_RETRY, false)
sc = new SparkContext("local", "test", myConf)
val rdd = sc.parallelize(1 to 10, 2).map((_, 1)).reduceByKey(_ + _)
rdd.count()
Expand Down
3 changes: 2 additions & 1 deletion core/src/test/scala/org/apache/spark/SparkFunSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.io.File
import org.scalatest.{BeforeAndAfterAll, FunSuite, Outcome}

import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.util.{AccumulatorContext, Utils}

/**
Expand Down Expand Up @@ -59,7 +60,7 @@ abstract class SparkFunSuite
protected val enableAutoThreadAudit = true

protected override def beforeAll(): Unit = {
System.setProperty("spark.testing", "true")
System.setProperty(IS_TESTING.key, "true")
if (enableAutoThreadAudit) {
doThreadPreAudit()
}
Expand Down
Loading

0 comments on commit 1a64152

Please sign in to comment.