Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26491][CORE][TEST] Use ConfigEntry for hardcoded configs for test categories #23413

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import com.codahale.metrics.{Gauge, MetricRegistry}

import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL
import org.apache.spark.metrics.source.Source
import org.apache.spark.scheduler._
import org.apache.spark.storage.BlockManagerMaster
Expand Down Expand Up @@ -157,7 +158,7 @@ private[spark] class ExecutorAllocationManager(

// Polling loop interval (ms)
private val intervalMillis: Long = if (Utils.isTesting) {
conf.getLong(TESTING_SCHEDULE_INTERVAL_KEY, 100)
conf.get(TEST_SCHEDULE_INTERVAL)
} else {
100
}
Expand Down Expand Up @@ -899,5 +900,4 @@ private[spark] class ExecutorAllocationManager(

private object ExecutorAllocationManager {
val NOT_SET = Long.MaxValue
val TESTING_SCHEDULE_INTERVAL_KEY = "spark.testing.dynamicAllocation.scheduleInterval"
}
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.input.{FixedLengthBinaryInputFormat, PortableDataStream, StreamInputFormat, WholeTextFileInputFormat}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Tests._
import org.apache.spark.io.CompressionCodec
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
Expand Down Expand Up @@ -471,7 +472,7 @@ class SparkContext(config: SparkConf) extends Logging {

// Convert java options to env vars as a work around
// since we can't set env vars directly in sbt.
for { (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing"))
for { (envKey, propKey) <- Seq(("SPARK_TESTING", IS_TESTING.key))
value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {
executorEnvs(envKey) = value
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{DRIVER_LOG_DFS_DIR, History}
import org.apache.spark.internal.config.History._
import org.apache.spark.internal.config.Status._
import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.io.CompressionCodec
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.ReplayListenerBus._
Expand Down Expand Up @@ -267,7 +268,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}

// Disable the background thread during tests.
if (!conf.contains("spark.testing")) {
if (!conf.contains(IS_TESTING)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can probably be Utils.isTesting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, here we are checking the SparkConf, while in Utils.isTesting we check the system properties. So I don't think it is doable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, that's kinda the point. spark.testing is set as a system property by the build scripts. SparkConf just inherits system properties, which is why you can also check it there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I saw your comment about the SHS tests removing the testing conf, let me take a look...)

// A task that periodically checks for event log updates on disk.
logDebug(s"Scheduling update thread every $UPDATE_INTERVAL_S seconds")
pool.scheduleWithFixedDelay(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.spark.deploy.ExternalShuffleService
import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.deploy.worker.ui.WorkerWebUI
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.rpc._
import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, Utils}
Expand Down Expand Up @@ -103,7 +104,6 @@ private[deploy] class Worker(
private val CLEANUP_NON_SHUFFLE_FILES_ENABLED =
conf.getBoolean("spark.storage.cleanupFilesAfterExecutorExit", true)

private val testing: Boolean = sys.props.contains("spark.testing")
private var master: Option[RpcEndpointRef] = None

/**
Expand All @@ -127,7 +127,7 @@ private[deploy] class Worker(
private var connected = false
private val workerId = generateWorkerId()
private val sparkHome =
if (testing) {
if (sys.props.contains(IS_TESTING.key)) {
assert(sys.props.contains("spark.test.home"), "spark.test.home is not set!")
new File(sys.props("spark.test.home"))
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ private[spark] case class ProcfsMetrics(
// project.
private[spark] class ProcfsMetricsGetter(procfsDir: String = "/proc/") extends Logging {
private val procfsStatFile = "stat"
private val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing")
private val testing = Utils.isTesting
private val pageSize = computePageSize()
private var isAvailable: Boolean = isProcfsAvailable
private val pid = computePid()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.collection.mutable.{ArrayBuffer, LinkedHashMap}
import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.scheduler.AccumulableInfo
import org.apache.spark.storage.{BlockId, BlockStatus}
import org.apache.spark.util._
Expand Down Expand Up @@ -202,7 +203,7 @@ class TaskMetrics private[spark] () extends Serializable {
}

// Only used for test
private[spark] val testAccum = sys.props.get("spark.testing").map(_ => new LongAccumulator)
private[spark] val testAccum = sys.props.get(IS_TESTING.key).map(_ => new LongAccumulator)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it is doable but the code would become an if which seems to me less clean than the current code



import InternalAccumulator._
Expand Down
57 changes: 57 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/Tests.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.internal.config

private[spark] object Tests {
val TEST_MEMORY = ConfigBuilder("spark.testing.memory")
.longConf
.createWithDefault(Runtime.getRuntime.maxMemory)

val TEST_SCHEDULE_INTERVAL =
ConfigBuilder("spark.testing.dynamicAllocation.scheduleInterval")
.longConf
.createWithDefault(100)

val IS_TESTING = ConfigBuilder("spark.testing")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to me that if you use Utils.isTesting everywhere, you don't need this. I took a look at the places where IS_TESTING is set explicitly, and they should work with that code removed. Unless there's a test that relies on isTesting being false, that should be doable.

spark.testing is set by the build scripts, so tests shouldn't need to set it to true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are cases when it is removed (see the HistoryServer one) and when it is set to false (in the Kubernetes one). So I don't think it is doable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually tried out the k8s tests without that line and they work fine, which tells me it's not needed. Same for a lot of other explicit checks and setting of that property. I think eventually we should fix all this code to not mess with that property, especially since it's so inconsistent (SparkConf vs. system properties vs. env variable).

The SHS test does need to remove the conf, though; it could be implemented differently, but at that point it's probably better to do a separate change and leave this one as a simple "make everybody use constants for this" change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for this. I think we should check case by case if some of them are not needed and removed, but I agree we best do that in dedicated PRs for each of these cases.

.booleanConf
.createOptional

val TEST_USE_COMPRESSED_OOPS = ConfigBuilder("spark.test.useCompressedOops")
.booleanConf
.createOptional

val TEST_NO_STAGE_RETRY = ConfigBuilder("spark.test.noStageRetry")
.booleanConf
.createWithDefault(false)

val TEST_RESERVED_MEMORY = ConfigBuilder("spark.testing.reservedMemory")
.longConf
.createOptional

val TEST_N_HOSTS = ConfigBuilder("spark.testing.nHosts")
.intConf
.createWithDefault(5)

val TEST_N_EXECUTORS_HOST = ConfigBuilder("spark.testing.nExecutorsPerHost")
.intConf
.createWithDefault(4)

val TEST_N_CORES_EXECUTOR = ConfigBuilder("spark.testing.nCoresPerExecutor")
.intConf
.createWithDefault(2)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.memory

import org.apache.spark.SparkConf
import org.apache.spark.internal.config.Tests.TEST_MEMORY
import org.apache.spark.storage.BlockId

/**
Expand Down Expand Up @@ -112,7 +113,7 @@ private[spark] object StaticMemoryManager {
* Return the total amount of memory available for the storage region, in bytes.
*/
private def getMaxStorageMemory(conf: SparkConf): Long = {
val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
val systemMaxMemory = conf.get(TEST_MEMORY)
val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)
val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9)
(systemMaxMemory * memoryFraction * safetyFraction).toLong
Expand All @@ -122,7 +123,7 @@ private[spark] object StaticMemoryManager {
* Return the total amount of memory available for the execution region, in bytes.
*/
private def getMaxExecutionMemory(conf: SparkConf): Long = {
val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
val systemMaxMemory = conf.get(TEST_MEMORY)

if (systemMaxMemory < MIN_MEMORY_BYTES) {
throw new IllegalArgumentException(s"System memory $systemMaxMemory must " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.memory

import org.apache.spark.SparkConf
import org.apache.spark.internal.config.Tests._
import org.apache.spark.storage.BlockId

/**
Expand Down Expand Up @@ -209,9 +210,9 @@ object UnifiedMemoryManager {
* Return the total amount of memory shared between execution and storage, in bytes.
*/
private def getMaxMemory(conf: SparkConf): Long = {
val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
val reservedMemory = conf.getLong("spark.testing.reservedMemory",
if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
val systemMemory = conf.get(TEST_MEMORY)
val reservedMemory = conf.getLong(TEST_RESERVED_MEMORY.key,
if (conf.contains(IS_TESTING)) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
val minSystemMemory = (reservedMemory * 1.5).ceil.toLong
if (systemMemory < minSystemMemory) {
throw new IllegalArgumentException(s"System memory $systemMemory must " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config
import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd.{DeterministicLevel, RDD, RDDCheckpointData}
Expand Down Expand Up @@ -186,7 +187,7 @@ private[spark] class DAGScheduler(
private val closureSerializer = SparkEnv.get.closureSerializer.newInstance()

/** If enabled, FetchFailed will not cause stage retry, in order to surface the problem. */
private val disallowStageRetryForTest = sc.getConf.getBoolean("spark.test.noStageRetry", false)
private val disallowStageRetryForTest = sc.getConf.get(TEST_NO_STAGE_RETRY)

/**
* Whether to unregister all the outputs on the host in condition that we receive a FetchFailure,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.deploy.{ApplicationDescription, Command}
import org.apache.spark.deploy.client.{StandaloneAppClient, StandaloneAppClientListener}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
import org.apache.spark.rpc.RpcEndpointAddress
import org.apache.spark.scheduler._
Expand Down Expand Up @@ -90,7 +91,7 @@ private[spark] class StandaloneSchedulerBackend(
// compute-classpath.{cmd,sh} and makes all needed jars available to child processes
// when the assembly is built with the "*-provided" profiles enabled.
val testingClassPath =
if (sys.props.contains("spark.testing")) {
if (sys.props.contains(IS_TESTING.key)) {
sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq
} else {
Nil
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import com.google.common.collect.MapMaker

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Tests.TEST_USE_COMPRESSED_OOPS
import org.apache.spark.util.collection.OpenHashSet

/**
Expand Down Expand Up @@ -126,8 +127,8 @@ object SizeEstimator extends Logging {
private def getIsCompressedOops: Boolean = {
// This is only used by tests to override the detection of compressed oops. The test
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given this comment I wonder if changing this one config is needed. (I also don't see any SparkConf available in this context, so looks that even that route is not possible.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are cases when this config is set in the tests. It seems it is useless indeed, since we read it only here as a system property. I think we can consider removing the useless set as part of another PR related to that.

Otherwise we can try and replace the config with a string TEST_USE_COMPRESSED_OOPS_KEY which we use when setting/reading the system property. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't mean it's useless (it might be, didn't look), just that it could be kept as a system property instead of a config constant (since it's never set in or read from SparkConf). Using a string constant sounds fine, but given what I think is the goal of these changes (see SPARK-26060), that wouldn't really change much.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the point is: it is set in SparkConf in some tests. But this is probably useless because we read it only here as a system property.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see; then it can probably be just removed from those tests.

// actually uses a system property instead of a SparkConf, so we'll stick with that.
if (System.getProperty("spark.test.useCompressedOops") != null) {
return System.getProperty("spark.test.useCompressedOops").toBoolean
if (System.getProperty(TEST_USE_COMPRESSED_OOPS.key) != null) {
return System.getProperty(TEST_USE_COMPRESSED_OOPS.key).toBoolean
}

// java.vm.info provides compressed ref info for IBM JDKs
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
Expand Down Expand Up @@ -1847,7 +1848,7 @@ private[spark] object Utils extends Logging {
* Indicates whether Spark is currently running unit tests.
*/
def isTesting: Boolean = {
sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing")
sys.env.contains("SPARK_TESTING") || sys.props.contains(IS_TESTING.key)
}

/**
Expand Down Expand Up @@ -2175,7 +2176,7 @@ private[spark] object Utils extends Logging {
*/
def portMaxRetries(conf: SparkConf): Int = {
val maxRetries = conf.getOption("spark.port.maxRetries").map(_.toInt)
if (conf.contains("spark.testing")) {
if (conf.contains(IS_TESTING)) {
// Set a higher number of retries for tests...
maxRetries.getOrElse(100)
} else {
Expand Down
5 changes: 3 additions & 2 deletions core/src/test/scala/org/apache/spark/DistributedSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
import org.scalatest.time.{Millis, Span}

import org.apache.spark.internal.config
import org.apache.spark.internal.config.Tests._
import org.apache.spark.security.EncryptionFunSuite
import org.apache.spark.storage.{RDDBlockId, StorageLevel}
import org.apache.spark.util.io.ChunkedByteBuffer
Expand Down Expand Up @@ -217,7 +218,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
val size = 10000
val conf = new SparkConf()
.set("spark.storage.unrollMemoryThreshold", "1024")
.set("spark.testing.memory", (size / 2).toString)
.set(TEST_MEMORY, size.toLong / 2)
sc = new SparkContext(clusterUrl, "test", conf)
val data = sc.parallelize(1 to size, 2).persist(StorageLevel.MEMORY_ONLY)
assert(data.count() === size)
Expand All @@ -233,7 +234,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
val numPartitions = 20
val conf = new SparkConf()
.set("spark.storage.unrollMemoryThreshold", "1024")
.set("spark.testing.memory", size.toString)
.set(TEST_MEMORY, size.toLong)
sc = new SparkContext(clusterUrl, "test", conf)
val data = sc.parallelize(1 to size, numPartitions).persist(StorageLevel.MEMORY_ONLY)
assert(data.count() === size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.scalatest.{BeforeAndAfter, PrivateMethodTester}

import org.apache.spark.executor.TaskMetrics
import org.apache.spark.internal.config
import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.ExternalClusterManager
import org.apache.spark.scheduler.cluster.ExecutorInfo
Expand Down Expand Up @@ -1166,7 +1167,7 @@ class ExecutorAllocationManagerSuite
.set("spark.dynamicAllocation.testing", "true")
// SPARK-22864: effectively disable the allocation schedule by setting the period to a
// really long value.
.set(TESTING_SCHEDULE_INTERVAL_KEY, "10000")
.set(TEST_SCHEDULE_INTERVAL, 10000L)
val sc = new SparkContext(conf)
contexts += sc
sc
Expand Down
5 changes: 3 additions & 2 deletions core/src/test/scala/org/apache/spark/ShuffleSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.util.concurrent.{Callable, CyclicBarrier, Executors, ExecutorService
import org.scalatest.Matchers

import org.apache.spark.ShuffleSuite.NonJavaSerializableClass
import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY
import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.rdd.{CoGroupedRDD, OrderedRDDFunctions, RDD, ShuffledRDD, SubtractedRDD}
import org.apache.spark.scheduler.{MapStatus, MyRDD, SparkListener, SparkListenerTaskEnd}
Expand All @@ -37,7 +38,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC

// Ensure that the DAGScheduler doesn't retry stages whose fetches fail, so that we accurately
// test that the shuffle works (rather than retrying until all blocks are local to one Executor).
conf.set("spark.test.noStageRetry", "true")
conf.set(TEST_NO_STAGE_RETRY, true)

test("groupByKey without compression") {
val myConf = conf.clone().set("spark.shuffle.compress", "false")
Expand Down Expand Up @@ -269,7 +270,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
}

test("[SPARK-4085] rerun map stage if reduce stage cannot find its local shuffle file") {
val myConf = conf.clone().set("spark.test.noStageRetry", "false")
val myConf = conf.clone().set(TEST_NO_STAGE_RETRY, false)
sc = new SparkContext("local", "test", myConf)
val rdd = sc.parallelize(1 to 10, 2).map((_, 1)).reduceByKey(_ + _)
rdd.count()
Expand Down
3 changes: 2 additions & 1 deletion core/src/test/scala/org/apache/spark/SparkFunSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.io.File
import org.scalatest.{BeforeAndAfterAll, FunSuite, Outcome}

import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.util.{AccumulatorContext, Utils}

/**
Expand Down Expand Up @@ -59,7 +60,7 @@ abstract class SparkFunSuite
protected val enableAutoThreadAudit = true

protected override def beforeAll(): Unit = {
System.setProperty("spark.testing", "true")
System.setProperty(IS_TESTING.key, "true")
if (enableAutoThreadAudit) {
doThreadPreAudit()
}
Expand Down
Loading