Skip to content

Commit

Permalink
SPARK-5425: Use synchronised methods in system properties to create S…
Browse files Browse the repository at this point in the history
…parkConf

SPARK-5425: Fixed usages of system properties

This patch fixes few problems caused by the fact that the Scala wrapper over system properties is not thread-safe and is basically invalid because it doesn't take into account the default values which could have been set in the properties object. The problem is fixed by modifying `Utils.getSystemProperties` method so that it uses `stringPropertyNames` method of the `Properties` class, which is thread-safe (internally it creates a defensive copy in a synchronized method) and returns keys of the properties which were set explicitly and which are defined as defaults.
The other related problem, which is fixed here. was in `ResetSystemProperties` mix-in. It created a copy of the system properties in the wrong way.

This patch also introduces a test case for thread-safeness of SparkConf creation.

Refer to the discussion in #4220 for more details.

Author: Jacek Lewandowski <lewandowski.jacek@gmail.com>

Closes #4221 from jacek-lewandowski/SPARK-5425-1.2 and squashes the following commits:

87951a2 [Jacek Lewandowski] SPARK-5425: Modified Utils.getSystemProperties to return a map of all system properties - explicit + defaults
01dd5cb [Jacek Lewandowski] SPARK-5425: Use SerializationUtils to save properties in ResetSystemProperties trait
94aeacf [Jacek Lewandowski] SPARK-5425: Use synchronised methods in system properties to create SparkConf
  • Loading branch information
jacek-lewandowski authored and JoshRosen committed Feb 8, 2015
1 parent d89964f commit 4bad854
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 7 deletions.
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.LinkedHashSet

import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.util.Utils

/**
* Configuration for a Spark application. Used to set various Spark parameters as key-value pairs.
Expand Down Expand Up @@ -53,8 +54,8 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {

if (loadDefaults) {
// Load any spark.* system properties
for ((k, v) <- System.getProperties.asScala if k.startsWith("spark.")) {
set(k, v)
for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {
set(key, value)
}
}

Expand Down
11 changes: 8 additions & 3 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1345,9 +1345,14 @@ private[spark] object Utils extends Logging {
hashAbs
}

/** Returns a copy of the system properties that is thread-safe to iterator over. */
def getSystemProperties(): Map[String, String] = {
System.getProperties.clone().asInstanceOf[java.util.Properties].toMap[String, String]
/** Returns the system properties map that is thread-safe to iterator over. It gets the
* properties which have been set explicitly, as well as those for which only a default value
* has been defined. */
def getSystemProperties: Map[String, String] = {
val sysProps = for (key <- System.getProperties.stringPropertyNames()) yield
(key, System.getProperty(key))

sysProps.toMap
}

/**
Expand Down
26 changes: 26 additions & 0 deletions core/src/test/scala/org/apache/spark/SparkConfSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package org.apache.spark

import java.util.concurrent.{TimeUnit, Executors}

import scala.util.{Try, Random}

import org.scalatest.FunSuite
import org.apache.spark.serializer.{KryoRegistrator, KryoSerializer}
import org.apache.spark.util.ResetSystemProperties
Expand Down Expand Up @@ -123,6 +127,28 @@ class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemPro
assert(conf.get("spark.test.a.b.c") === "a.b.c")
}

test("Thread safeness - SPARK-5425") {
import scala.collection.JavaConversions._

val executor = Executors.newSingleThreadScheduledExecutor()
val sf = executor.scheduleAtFixedRate(new Runnable {
override def run(): Unit =
System.setProperty("spark.5425." + Random.nextInt(), Random.nextInt().toString)
}, 0, 1, TimeUnit.MILLISECONDS)

try {
val t0 = System.currentTimeMillis()
while ((System.currentTimeMillis() - t0) < 1000) {
val conf = Try(new SparkConf(loadDefaults = true))
assert(conf.isSuccess === true)
}
} finally {
executor.shutdownNow()
for (key <- System.getProperties.stringPropertyNames() if key.startsWith("spark.5425."))
System.getProperties.remove(key)
}
}

test("register kryo classes through registerKryoClasses") {
val conf = new SparkConf().set("spark.kryo.registrationRequired", "true")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.util

import java.util.Properties

import org.apache.commons.lang3.SerializationUtils
import org.scalatest.{BeforeAndAfterEach, Suite}

/**
Expand All @@ -42,7 +43,11 @@ private[spark] trait ResetSystemProperties extends BeforeAndAfterEach { this: Su
var oldProperties: Properties = null

override def beforeEach(): Unit = {
oldProperties = new Properties(System.getProperties)
// we need SerializationUtils.clone instead of `new Properties(System.getProperties()` because
// the later way of creating a copy does not copy the properties but it initializes a new
// Properties object with the given properties as defaults. They are not recognized at all
// by standard Scala wrapper over Java Properties then.
oldProperties = SerializationUtils.clone(System.getProperties)
super.beforeEach()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.examples

import scala.collection.JavaConversions._

import org.apache.spark.util.Utils

/** Prints out environmental information, sleeps, and then exits. Made to
* test driver submission in the standalone scheduler. */
object DriverSubmissionTest {
Expand All @@ -30,7 +32,7 @@ object DriverSubmissionTest {
val numSecondsToSleep = args(0).toInt

val env = System.getenv()
val properties = System.getProperties()
val properties = Utils.getSystemProperties

println("Environment variables containing SPARK_TEST:")
env.filter{case (k, v) => k.contains("SPARK_TEST")}.foreach(println)
Expand Down

0 comments on commit 4bad854

Please sign in to comment.