Skip to content

Commit

Permalink
[SPARK-17240][CORE] Make SparkConf serializable again.
Browse files Browse the repository at this point in the history
Make the config reader transient, and initialize it lazily so that
serialization works with both java and kryo (and hopefully any other
custom serializer).

Added unit test to make sure SparkConf remains serializable and the
reader works with both built-in serializers.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #14813 from vanzin/SPARK-17240.
  • Loading branch information
Marcelo Vanzin committed Aug 25, 2016
1 parent 3e4c7db commit 9b5a1d1
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 5 deletions.
11 changes: 7 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,13 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria

private val settings = new ConcurrentHashMap[String, String]()

private val reader = new ConfigReader(new SparkConfigProvider(settings))
reader.bindEnv(new ConfigProvider {
override def get(key: String): Option[String] = Option(getenv(key))
})
@transient private lazy val reader: ConfigReader = {
val _reader = new ConfigReader(new SparkConfigProvider(settings))
_reader.bindEnv(new ConfigProvider {
override def get(key: String): Option[String] = Option(getenv(key))
})
_reader
}

if (loadDefaults) {
loadFromSystemProperties(false)
Expand Down
22 changes: 21 additions & 1 deletion core/src/test/scala/org/apache/spark/SparkConfSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ import scala.util.{Random, Try}

import com.esotericsoftware.kryo.Kryo

import org.apache.spark.internal.config._
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.serializer.{KryoRegistrator, KryoSerializer}
import org.apache.spark.serializer.{JavaSerializer, KryoRegistrator, KryoSerializer}
import org.apache.spark.util.{ResetSystemProperties, RpcUtils}

class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSystemProperties {
Expand Down Expand Up @@ -283,6 +284,25 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
assert(conf.contains("spark.io.compression.lz4.blockSize"))
assert(conf.contains("spark.io.unknown") === false)
}

val serializers = Map(
"java" -> new JavaSerializer(new SparkConf()),
"kryo" -> new KryoSerializer(new SparkConf()))

serializers.foreach { case (name, ser) =>
test(s"SPARK-17240: SparkConf should be serializable ($name)") {
val conf = new SparkConf()
conf.set(DRIVER_CLASS_PATH, "${" + DRIVER_JAVA_OPTIONS.key + "}")
conf.set(DRIVER_JAVA_OPTIONS, "test")

val serializer = ser.newInstance()
val bytes = serializer.serialize(conf)
val deser = serializer.deserialize[SparkConf](bytes)

assert(conf.get(DRIVER_CLASS_PATH) === deser.get(DRIVER_CLASS_PATH))
}
}

}

class Class1 {}
Expand Down

0 comments on commit 9b5a1d1

Please sign in to comment.