Skip to content

Commit

Permalink
Merge pull request alteryx#117 from stephenh/avoid_concurrent_modific…
Browse files Browse the repository at this point in the history
…ation_exception

Handle ConcurrentModificationExceptions in SparkContext init.

System.getProperties.toMap will fail-fast when concurrently modified,
and it seems like some other thread started by SparkContext does
a System.setProperty during it's initialization.

Handle this by just looping on ConcurrentModificationException, which
seems the safest, since the non-fail-fast methods (Hastable.entrySet)
have undefined behavior under concurrent modification.
  • Loading branch information
mateiz committed Oct 31, 2013
2 parents dc9ce16 + 09f3b67 commit 8f1098a
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 3 deletions.
8 changes: 5 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger

import scala.collection.Map
import scala.collection.generic.Growable
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap

Expand Down Expand Up @@ -255,8 +255,10 @@ class SparkContext(
conf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
}
// Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
for (key <- System.getProperties.toMap[String, String].keys if key.startsWith("spark.hadoop.")) {
conf.set(key.substring("spark.hadoop.".length), System.getProperty(key))
Utils.getSystemProperties.foreach { case (key, value) =>
if (key.startsWith("spark.hadoop.")) {
conf.set(key.substring("spark.hadoop.".length), value)
}
}
val bufferSize = System.getProperty("spark.buffer.size", "65536")
conf.set("io.file.buffer.size", bufferSize)
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.spark.serializer.{DeserializationStream, SerializationStream,
import org.apache.spark.deploy.SparkHadoopUtil
import java.nio.ByteBuffer
import org.apache.spark.{SparkEnv, SparkException, Logging}
import java.util.ConcurrentModificationException


/**
Expand Down Expand Up @@ -818,4 +819,10 @@ private[spark] object Utils extends Logging {
// Nothing else to guard against ?
hashAbs
}

/** Returns a copy of the system properties that is thread-safe to iterator over. */
def getSystemProperties(): Map[String, String] = {
return System.getProperties().clone()
.asInstanceOf[java.util.Properties].toMap[String, String]
}
}

0 comments on commit 8f1098a

Please sign in to comment.