From 685780e4ed8d2bc4a734e002e03482bf52a6c517 Mon Sep 17 00:00:00 2001 From: Jacek Lewandowski Date: Tue, 27 Jan 2015 12:36:52 +0100 Subject: [PATCH 1/3] SPARK-5425: Use synchronised methods in system properties to create SparkConf --- .../scala/org/apache/spark/SparkConf.scala | 5 ++-- .../org/apache/spark/SparkConfSuite.scala | 25 +++++++++++++++++++ 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 605df0e929faa..32a2380cbe69c 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -49,8 +49,9 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { if (loadDefaults) { // Load any spark.* system properties - for ((k, v) <- System.getProperties.asScala if k.startsWith("spark.")) { - settings(k) = v + val propNames = System.getProperties.stringPropertyNames().asScala + for (k <- propNames if k.startsWith("spark.")) { + settings(k) = System.getProperty(k) } } diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index ae8a3c78c91ff..cc56d9c5a8eba 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -17,6 +17,10 @@ package org.apache.spark +import java.util.concurrent.{TimeUnit, Executors} + +import scala.util.{Try, Random} + import org.scalatest.FunSuite import org.apache.spark.util.ResetSystemProperties @@ -121,4 +125,25 @@ class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemPro assert(conf.get("spark.test.a.b") === "A.B") assert(conf.get("spark.test.a.b.c") === "a.b.c") } + + test("Thread safeness - SPARK-5425") { + import scala.collection.JavaConversions._ + val executor = Executors.newSingleThreadScheduledExecutor() + val sf = executor.scheduleAtFixedRate(new Runnable { + override def run(): Unit = + System.setProperty("spark.5425." + Random.nextInt(), Random.nextInt().toString) + }, 0, 1, TimeUnit.MILLISECONDS) + + try { + val t0 = System.currentTimeMillis() + while ((System.currentTimeMillis() - t0) < 1000) { + val conf = Try(new SparkConf(loadDefaults = true)) + assert(conf.isSuccess === true) + } + } finally { + executor.shutdownNow() + for (key <- System.getProperties.stringPropertyNames() if key.startsWith("spark.5425.")) + System.getProperties.remove(key) + } + } } From 74b4489b52ea05813fb1990ab42dbc4ad19ba1cd Mon Sep 17 00:00:00 2001 From: Jacek Lewandowski Date: Thu, 29 Jan 2015 10:21:08 +0100 Subject: [PATCH 2/3] SPARK-5425: Use SerializationUtils to save properties in ResetSystemProperties trait --- .../org/apache/spark/util/ResetSystemProperties.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala b/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala index d4b92f33dd9e6..bad1aa99952cf 100644 --- a/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala +++ b/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala @@ -19,6 +19,7 @@ package org.apache.spark.util import java.util.Properties +import org.apache.commons.lang3.SerializationUtils import org.scalatest.{BeforeAndAfterEach, Suite} /** @@ -42,7 +43,11 @@ private[spark] trait ResetSystemProperties extends BeforeAndAfterEach { this: Su var oldProperties: Properties = null override def beforeEach(): Unit = { - oldProperties = new Properties(System.getProperties) + // we need SerializationUtils.clone instead of `new Properties(System.getProperties()` because + // the later way of creating a copy does not copy the properties but it initializes a new + // Properties object with the given properties as defaults. They are not recognized at all + // by standard Scala wrapper over Java Properties then. + oldProperties = SerializationUtils.clone(System.getProperties) super.beforeEach() } From 6c48a1f58922140985834f62ad43e1ee5595abd6 Mon Sep 17 00:00:00 2001 From: Jacek Lewandowski Date: Mon, 2 Feb 2015 09:15:59 +0100 Subject: [PATCH 3/3] SPARK-5425: Modified Utils.getSystemProperties to return a map of all system properties - explicit + defaults --- core/src/main/scala/org/apache/spark/SparkConf.scala | 8 ++++---- core/src/main/scala/org/apache/spark/util/Utils.scala | 11 ++++++++--- .../apache/spark/examples/DriverSubmissionTest.scala | 4 +++- 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 32a2380cbe69c..068a03460e08d 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -17,9 +17,10 @@ package org.apache.spark -import scala.collection.JavaConverters._ import scala.collection.mutable.HashMap +import org.apache.spark.util.Utils + /** * Configuration for a Spark application. Used to set various Spark parameters as key-value pairs. * @@ -49,9 +50,8 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { if (loadDefaults) { // Load any spark.* system properties - val propNames = System.getProperties.stringPropertyNames().asScala - for (k <- propNames if k.startsWith("spark.")) { - settings(k) = System.getProperty(k) + for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) { + settings(key) = value } } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 4259145ec93d1..90402ca905957 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1227,9 +1227,14 @@ private[spark] object Utils extends Logging { hashAbs } - /** Returns a copy of the system properties that is thread-safe to iterator over. */ - def getSystemProperties(): Map[String, String] = { - System.getProperties.clone().asInstanceOf[java.util.Properties].toMap[String, String] + /** Returns the system properties map that is thread-safe to iterator over. It gets the + * properties which have been set explicitly, as well as those for which only a default value + * has been defined. */ + def getSystemProperties: Map[String, String] = { + val sysProps = for (key <- System.getProperties.stringPropertyNames()) yield + (key, System.getProperty(key)) + + sysProps.toMap } /** diff --git a/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala b/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala index 65251e93190f0..e757283823fc3 100644 --- a/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala @@ -19,6 +19,8 @@ package org.apache.spark.examples import scala.collection.JavaConversions._ +import org.apache.spark.util.Utils + /** Prints out environmental information, sleeps, and then exits. Made to * test driver submission in the standalone scheduler. */ object DriverSubmissionTest { @@ -30,7 +32,7 @@ object DriverSubmissionTest { val numSecondsToSleep = args(0).toInt val env = System.getenv() - val properties = System.getProperties() + val properties = Utils.getSystemProperties println("Environment variables containing SPARK_TEST:") env.filter{case (k, v) => k.contains("SPARK_TEST")}.foreach(println)