Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-5425: Use synchronised methods in system properties to create SparkConf #4220

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@

package org.apache.spark

import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap

import org.apache.spark.util.Utils

/**
* Configuration for a Spark application. Used to set various Spark parameters as key-value pairs.
*
Expand Down Expand Up @@ -49,8 +50,8 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {

if (loadDefaults) {
// Load any spark.* system properties
for ((k, v) <- System.getProperties.asScala if k.startsWith("spark.")) {
settings(k) = v
for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {
settings(key) = value
}
}

Expand Down
11 changes: 8 additions & 3 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1227,9 +1227,14 @@ private[spark] object Utils extends Logging {
hashAbs
}

/** Returns a copy of the system properties that is thread-safe to iterator over. */
def getSystemProperties(): Map[String, String] = {
System.getProperties.clone().asInstanceOf[java.util.Properties].toMap[String, String]
/** Returns the system properties map that is thread-safe to iterator over. It gets the
* properties which have been set explicitly, as well as those for which only a default value
* has been defined. */
def getSystemProperties: Map[String, String] = {
val sysProps = for (key <- System.getProperties.stringPropertyNames()) yield
(key, System.getProperty(key))

sysProps.toMap
}

/**
Expand Down
25 changes: 25 additions & 0 deletions core/src/test/scala/org/apache/spark/SparkConfSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package org.apache.spark

import java.util.concurrent.{TimeUnit, Executors}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ultra nit: sort imports


import scala.util.{Try, Random}

import org.scalatest.FunSuite

import org.apache.spark.util.ResetSystemProperties
Expand Down Expand Up @@ -121,4 +125,25 @@ class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemPro
assert(conf.get("spark.test.a.b") === "A.B")
assert(conf.get("spark.test.a.b.c") === "a.b.c")
}

test("Thread safeness - SPARK-5425") {
import scala.collection.JavaConversions._
val executor = Executors.newSingleThreadScheduledExecutor()
val sf = executor.scheduleAtFixedRate(new Runnable {
override def run(): Unit =
System.setProperty("spark.5425." + Random.nextInt(), Random.nextInt().toString)
}, 0, 1, TimeUnit.MILLISECONDS)

try {
val t0 = System.currentTimeMillis()
while ((System.currentTimeMillis() - t0) < 1000) {
val conf = Try(new SparkConf(loadDefaults = true))
assert(conf.isSuccess === true)
}
} finally {
executor.shutdownNow()
for (key <- System.getProperties.stringPropertyNames() if key.startsWith("spark.5425."))
System.getProperties.remove(key)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.util

import java.util.Properties

import org.apache.commons.lang3.SerializationUtils
import org.scalatest.{BeforeAndAfterEach, Suite}

/**
Expand All @@ -42,7 +43,11 @@ private[spark] trait ResetSystemProperties extends BeforeAndAfterEach { this: Su
var oldProperties: Properties = null

override def beforeEach(): Unit = {
oldProperties = new Properties(System.getProperties)
// we need SerializationUtils.clone instead of `new Properties(System.getProperties()` because
// the later way of creating a copy does not copy the properties but it initializes a new
// Properties object with the given properties as defaults. They are not recognized at all
// by standard Scala wrapper over Java Properties then.
oldProperties = SerializationUtils.clone(System.getProperties)
super.beforeEach()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.examples

import scala.collection.JavaConversions._

import org.apache.spark.util.Utils

/** Prints out environmental information, sleeps, and then exits. Made to
* test driver submission in the standalone scheduler. */
object DriverSubmissionTest {
Expand All @@ -30,7 +32,7 @@ object DriverSubmissionTest {
val numSecondsToSleep = args(0).toInt

val env = System.getenv()
val properties = System.getProperties()
val properties = Utils.getSystemProperties

println("Environment variables containing SPARK_TEST:")
env.filter{case (k, v) => k.contains("SPARK_TEST")}.foreach(println)
Expand Down