Skip to content

Commit

Permalink
Merge pull request alteryx#124 from tgravescs/sparkHadoopUtilFix
Browse files Browse the repository at this point in the history
Pull SparkHadoopUtil out of SparkEnv (jira SPARK-886)

Having the logic to initialize the correct SparkHadoopUtil in SparkEnv prevents it from being used until after the SparkContext is initialized.   This causes issues like https://spark-project.atlassian.net/browse/SPARK-886.  It also makes it hard to use in singleton objects.  For instance I want to use it in the security code.
  • Loading branch information
mateiz committed Oct 30, 2013
2 parents 618c1f6 + f231aaa commit 33de11c
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 38 deletions.
11 changes: 5 additions & 6 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor

import org.apache.mesos.MesosNativeLibrary

import org.apache.spark.deploy.LocalSparkCluster
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
import org.apache.spark.scheduler._
Expand Down Expand Up @@ -245,7 +245,7 @@ class SparkContext(
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration = {
val env = SparkEnv.get
val conf = env.hadoop.newConfiguration()
val conf = SparkHadoopUtil.get.newConfiguration()
// Explicitly check for S3 environment variables
if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
Expand Down Expand Up @@ -379,7 +379,7 @@ class SparkContext(
minSplits: Int = defaultMinSplits
): RDD[(K, V)] = {
// Add necessary security credentials to the JobConf before broadcasting it.
SparkEnv.get.hadoop.addCredentials(conf)
SparkHadoopUtil.get.addCredentials(conf)
new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
}

Expand Down Expand Up @@ -698,7 +698,7 @@ class SparkContext(
key = uri.getScheme match {
// A JAR file which exists only on the driver node
case null | "file" =>
if (env.hadoop.isYarnMode()) {
if (SparkHadoopUtil.get.isYarnMode()) {
// In order for this to work on yarn the user must specify the --addjars option to
// the client to upload the file into the distributed cache to make it show up in the
// current working directory.
Expand Down Expand Up @@ -936,9 +936,8 @@ class SparkContext(
* prevent accidental overriding of checkpoint files in the existing directory.
*/
def setCheckpointDir(dir: String, useExisting: Boolean = false) {
val env = SparkEnv.get
val path = new Path(dir)
val fs = path.getFileSystem(env.hadoop.newConfiguration())
val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration())
if (!useExisting) {
if (fs.exists(path)) {
throw new Exception("Checkpoint directory '" + path + "' already exists.")
Expand Down
17 changes: 4 additions & 13 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ import akka.remote.RemoteActorRefProvider

import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.storage.{BlockManagerMasterActor, BlockManager, BlockManagerMaster}
import org.apache.spark.network.ConnectionManager
import org.apache.spark.serializer.{Serializer, SerializerManager}
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.api.python.PythonWorkerFactory

import com.google.common.collect.MapMaker

/**
* Holds all the runtime environment objects for a running Spark instance (either master or worker),
Expand All @@ -58,18 +58,9 @@ class SparkEnv (

private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()

val hadoop = {
val yarnMode = java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
if(yarnMode) {
try {
Class.forName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil").newInstance.asInstanceOf[SparkHadoopUtil]
} catch {
case th: Throwable => throw new SparkException("Unable to load YARN support", th)
}
} else {
new SparkHadoopUtil
}
}
// A general, soft-reference map for metadata needed during HadoopRDD split computation
// (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()

def stop() {
pythonWorkers.foreach { case(key, worker) => worker.stop() }
Expand Down
26 changes: 20 additions & 6 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,13 @@ package org.apache.spark.deploy
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.JobConf

import com.google.common.collect.MapMaker

import org.apache.spark.SparkException

/**
* Contains util methods to interact with Hadoop from Spark.
*/
private[spark]
class SparkHadoopUtil {
// A general, soft-reference map for metadata needed during HadoopRDD split computation
// (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()

/**
* Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop
Expand All @@ -45,5 +41,23 @@ class SparkHadoopUtil {
def addCredentials(conf: JobConf) {}

def isYarnMode(): Boolean = { false }

}

object SparkHadoopUtil {
private val hadoop = {
val yarnMode = java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
if (yarnMode) {
try {
Class.forName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil").newInstance.asInstanceOf[SparkHadoopUtil]
} catch {
case th: Throwable => throw new SparkException("Unable to load YARN support", th)
}
} else {
new SparkHadoopUtil
}
}

def get: SparkHadoopUtil = {
hadoop
}
}
7 changes: 4 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.rdd

import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.hadoop.mapred.{FileInputFormat, SequenceFileInputFormat, JobConf, Reporter}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{NullWritable, BytesWritable}
Expand Down Expand Up @@ -83,7 +84,7 @@ private[spark] object CheckpointRDD extends Logging {
def writeToFile[T](path: String, blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]) {
val env = SparkEnv.get
val outputDir = new Path(path)
val fs = outputDir.getFileSystem(env.hadoop.newConfiguration())
val fs = outputDir.getFileSystem(SparkHadoopUtil.get.newConfiguration())

val finalOutputName = splitIdToFile(ctx.partitionId)
val finalOutputPath = new Path(outputDir, finalOutputName)
Expand Down Expand Up @@ -122,7 +123,7 @@ private[spark] object CheckpointRDD extends Logging {

def readFromFile[T](path: Path, context: TaskContext): Iterator[T] = {
val env = SparkEnv.get
val fs = path.getFileSystem(env.hadoop.newConfiguration())
val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration())
val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
val fileInputStream = fs.open(path, bufferSize)
val serializer = env.serializer.newInstance()
Expand All @@ -145,7 +146,7 @@ private[spark] object CheckpointRDD extends Logging {
val sc = new SparkContext(cluster, "CheckpointRDD Test")
val rdd = sc.makeRDD(1 to 10, 10).flatMap(x => 1 to 10000)
val path = new Path(hdfsPath, "temp")
val fs = path.getFileSystem(env.hadoop.newConfiguration())
val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration())
sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, 1024) _)
val cpRDD = new CheckpointRDD[Int](sc, path.toString)
assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same")
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.hadoop.util.ReflectionUtils

import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.NextIterator
import org.apache.hadoop.conf.{Configuration, Configurable}

Expand Down Expand Up @@ -198,10 +199,10 @@ private[spark] object HadoopRDD {
* The three methods below are helpers for accessing the local map, a property of the SparkEnv of
* the local process.
*/
def getCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.get(key)
def getCachedMetadata(key: String) = SparkEnv.get.hadoopJobMetadata.get(key)

def containsCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.containsKey(key)
def containsCachedMetadata(key: String) = SparkEnv.get.hadoopJobMetadata.containsKey(key)

def putCachedMetadata(key: String, value: Any) =
SparkEnv.get.hadoop.hadoopJobMetadata.put(key, value)
SparkEnv.get.hadoopJobMetadata.put(key, value)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.scheduler

import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.deploy.SparkHadoopUtil
import scala.collection.immutable.Set
import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
import org.apache.hadoop.security.UserGroupInformation
Expand Down Expand Up @@ -87,9 +88,8 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl

// This method does not expect failures, since validate has already passed ...
private def prefLocsFromMapreduceInputFormat(): Set[SplitInfo] = {
val env = SparkEnv.get
val conf = new JobConf(configuration)
env.hadoop.addCredentials(conf)
SparkHadoopUtil.get.addCredentials(conf)
FileInputFormat.setInputPaths(conf, path)

val instance: org.apache.hadoop.mapreduce.InputFormat[_, _] =
Expand All @@ -108,9 +108,8 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl

// This method does not expect failures, since validate has already passed ...
private def prefLocsFromMapredInputFormat(): Set[SplitInfo] = {
val env = SparkEnv.get
val jobConf = new JobConf(configuration)
env.hadoop.addCredentials(jobConf)
SparkHadoopUtil.get.addCredentials(jobConf)
FileInputFormat.setInputPaths(jobConf, path)

val instance: org.apache.hadoop.mapred.InputFormat[_, _] =
Expand Down
3 changes: 1 addition & 2 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,8 @@ private[spark] object Utils extends Logging {
}
case _ =>
// Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others
val env = SparkEnv.get
val uri = new URI(url)
val conf = env.hadoop.newConfiguration()
val conf = SparkHadoopUtil.get.newConfiguration()
val fs = FileSystem.get(uri, conf)
val in = fs.open(new Path(uri))
val out = new FileOutputStream(tempFile)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.Random
import scala.math.exp
import org.apache.spark.util.Vector
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler.InputFormatInfo

/**
Expand Down Expand Up @@ -51,7 +52,7 @@ object SparkHdfsLR {
System.exit(1)
}
val inputPath = args(1)
val conf = SparkEnv.get.hadoop.newConfiguration()
val conf = SparkHadoopUtil.get.newConfiguration()
val sc = new SparkContext(args(0), "SparkHdfsLR",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")), Map(),
InputFormatInfo.computePreferredLocations(
Expand Down

0 comments on commit 33de11c

Please sign in to comment.