Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/spark
Browse files Browse the repository at this point in the history
  • Loading branch information
zhzhan committed Sep 2, 2014
2 parents cb53a2c + 81b9d5b commit 789ea21
Show file tree
Hide file tree
Showing 51 changed files with 439 additions and 162 deletions.
18 changes: 6 additions & 12 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -306,26 +306,20 @@
</plugin>
<!-- Unzip py4j so we can include its files in the jar -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<executions>
<execution>
<phase>generate-resources</phase>
<goals>
<goal>exec</goal>
<goal>run</goal>
</goals>
</execution>
</executions>
<configuration>
<executable>unzip</executable>
<workingDirectory>../python</workingDirectory>
<arguments>
<argument>-o</argument>
<argument>lib/py4j*.zip</argument>
<argument>-d</argument>
<argument>build</argument>
</arguments>
<tasks>
<unzip src="../python/lib/py4j-0.8.2.1-src.zip" dest="../python/build" />
</tasks>
</configuration>
</plugin>
<plugin>
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/FutureAction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
case JobFailed(e: Exception) => scala.util.Failure(e)
}
}

/** Get the corresponding job id for this action. */
def jobId = jobWaiter.jobId
}


Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging {
def checkUIViewPermissions(user: String): Boolean = {
logDebug("user=" + user + " aclsEnabled=" + aclsEnabled() + " viewAcls=" +
viewAcls.mkString(","))
if (aclsEnabled() && (user != null) && (!viewAcls.contains(user))) false else true
!aclsEnabled || user == null || viewAcls.contains(user)
}

/**
Expand All @@ -309,7 +309,7 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging {
def checkModifyPermissions(user: String): Boolean = {
logDebug("user=" + user + " aclsEnabled=" + aclsEnabled() + " modifyAcls=" +
modifyAcls.mkString(","))
if (aclsEnabled() && (user != null) && (!modifyAcls.contains(user))) false else true
!aclsEnabled || user == null || modifyAcls.contains(user)
}


Expand Down
27 changes: 4 additions & 23 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -224,26 +224,7 @@ class SparkContext(config: SparkConf) extends Logging {
ui.bind()

/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration: Configuration = {
val hadoopConf = SparkHadoopUtil.get.newConfiguration()
// Explicitly check for S3 environment variables
if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
hadoopConf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
hadoopConf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
hadoopConf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
hadoopConf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
}
// Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
conf.getAll.foreach { case (key, value) =>
if (key.startsWith("spark.hadoop.")) {
hadoopConf.set(key.substring("spark.hadoop.".length), value)
}
}
val bufferSize = conf.get("spark.buffer.size", "65536")
hadoopConf.set("io.file.buffer.size", bufferSize)
hadoopConf
}
val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf)

// Optionally log Spark events
private[spark] val eventLogger: Option[EventLoggingListener] = {
Expand Down Expand Up @@ -815,7 +796,7 @@ class SparkContext(config: SparkConf) extends Logging {
* Add a file to be downloaded with this Spark job on every node.
* The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
* filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
* use `SparkFiles.get(path)` to find its download location.
* use `SparkFiles.get(fileName)` to find its download location.
*/
def addFile(path: String) {
val uri = new URI(path)
Expand All @@ -827,7 +808,8 @@ class SparkContext(config: SparkConf) extends Logging {
addedFiles(key) = System.currentTimeMillis

// Fetch the file locally in case a job is executed using DAGScheduler.runLocally().
Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager)
Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager,
hadoopConfiguration)

logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key))
postEnvironmentUpdate()
Expand Down Expand Up @@ -1637,4 +1619,3 @@ private[spark] class WritableConverter[T](
val writableClass: ClassTag[T] => Class[_ <: Writable],
val convert: Writable => T)
extends Serializable

15 changes: 14 additions & 1 deletion core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.reflect.ClassTag
import com.google.common.base.Optional
import org.apache.hadoop.io.compress.CompressionCodec

import org.apache.spark.{Partition, SparkContext, TaskContext}
import org.apache.spark.{FutureAction, Partition, SparkContext, TaskContext}
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaPairRDD._
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
Expand Down Expand Up @@ -574,4 +574,17 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {

def name(): String = rdd.name

/**
* :: Experimental ::
* The asynchronous version of the foreach action.
*
* @param f the function to apply to all the elements of the RDD
* @return a FutureAction for the action
*/
@Experimental
def foreachAsync(f: VoidFunction[T]): FutureAction[Unit] = {
import org.apache.spark.SparkContext._
rdd.foreachAsync(x => f.call(x))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
* Add a file to be downloaded with this Spark job on every node.
* The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
* filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
* use `SparkFiles.get(path)` to find its download location.
* use `SparkFiles.get(fileName)` to find its download location.
*/
def addFile(path: String) {
sc.addFile(path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
import org.apache.spark.util.ByteBufferInputStream
import org.apache.spark.util.io.ByteArrayChunkOutputStream

/**
* A BitTorrent-like implementation of [[org.apache.spark.broadcast.Broadcast]].
Expand Down Expand Up @@ -201,29 +202,12 @@ private object TorrentBroadcast extends Logging {
}

def blockifyObject[T: ClassTag](obj: T): Array[ByteBuffer] = {
// TODO: Create a special ByteArrayOutputStream that splits the output directly into chunks
// so we don't need to do the extra memory copy.
val bos = new ByteArrayOutputStream()
val bos = new ByteArrayChunkOutputStream(BLOCK_SIZE)
val out: OutputStream = if (compress) compressionCodec.compressedOutputStream(bos) else bos
val ser = SparkEnv.get.serializer.newInstance()
val serOut = ser.serializeStream(out)
serOut.writeObject[T](obj).close()
val byteArray = bos.toByteArray
val bais = new ByteArrayInputStream(byteArray)
val numBlocks = math.ceil(byteArray.length.toDouble / BLOCK_SIZE).toInt
val blocks = new Array[ByteBuffer](numBlocks)

var blockId = 0
for (i <- 0 until (byteArray.length, BLOCK_SIZE)) {
val thisBlockSize = math.min(BLOCK_SIZE, byteArray.length - i)
val tempByteArray = new Array[Byte](thisBlockSize)
bais.read(tempByteArray, 0, thisBlockSize)

blocks(blockId) = ByteBuffer.wrap(tempByteArray)
blockId += 1
}
bais.close()
blocks
bos.toArrays.map(ByteBuffer.wrap)
}

def unBlockifyObject[T: ClassTag](blocks: Array[ByteBuffer]): T = {
Expand Down
39 changes: 35 additions & 4 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,18 @@ import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.UserGroupInformation

import org.apache.spark.{Logging, SparkContext, SparkException}
import org.apache.spark.{Logging, SparkContext, SparkConf, SparkException}
import org.apache.spark.annotation.DeveloperApi

import scala.collection.JavaConversions._

/**
* :: DeveloperApi ::
* Contains util methods to interact with Hadoop from Spark.
*/
@DeveloperApi
class SparkHadoopUtil extends Logging {
val conf: Configuration = newConfiguration()
val conf: Configuration = newConfiguration(new SparkConf())
UserGroupInformation.setConfiguration(conf)

/**
Expand Down Expand Up @@ -64,11 +67,39 @@ class SparkHadoopUtil extends Logging {
}
}

@Deprecated
def newConfiguration(): Configuration = newConfiguration(null)

/**
* Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop
* subsystems.
*/
def newConfiguration(): Configuration = new Configuration()
def newConfiguration(conf: SparkConf): Configuration = {
val hadoopConf = new Configuration()

// Note: this null check is around more than just access to the "conf" object to maintain
// the behavior of the old implementation of this code, for backwards compatibility.
if (conf != null) {
// Explicitly check for S3 environment variables
if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
hadoopConf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
hadoopConf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
hadoopConf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
hadoopConf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
}
// Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
conf.getAll.foreach { case (key, value) =>
if (key.startsWith("spark.hadoop.")) {
hadoopConf.set(key.substring("spark.hadoop.".length), value)
}
}
val bufferSize = conf.get("spark.buffer.size", "65536")
hadoopConf.set("io.file.buffer.size", bufferSize)
}

hadoopConf
}

/**
* Add any user credentials to the job conf which are necessary for running on a secure Hadoop
Expand All @@ -86,7 +117,7 @@ class SparkHadoopUtil extends Logging {

def getSecretKeyFromUserCredentials(key: String): Array[Byte] = { null }

def loginUserFromKeytab(principalName: String, keytabFilename: String) {
def loginUserFromKeytab(principalName: String, keytabFilename: String) {
UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.collection.mutable
import org.apache.hadoop.fs.{FileStatus, Path}

import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler._
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.Utils
Expand All @@ -40,7 +41,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
.map { d => Utils.resolveURI(d) }
.getOrElse { throw new IllegalArgumentException("Logging directory must be specified.") }

private val fs = Utils.getHadoopFileSystem(resolvedLogDir)
private val fs = Utils.getHadoopFileSystem(resolvedLogDir,
SparkHadoopUtil.get.newConfiguration(conf))

// A timestamp of when the disk was last accessed to check for log updates
private var lastLogCheckTimeMs = -1L
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import akka.serialization.SerializationExtension

import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, ExecutorState}
import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, ExecutorState,
SparkHadoopUtil}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.deploy.master.DriverState.DriverState
Expand Down Expand Up @@ -673,7 +674,8 @@ private[spark] class Master(
app.desc.appUiUrl = notFoundBasePath
return false
}
val fileSystem = Utils.getHadoopFileSystem(eventLogDir)
val fileSystem = Utils.getHadoopFileSystem(eventLogDir,
SparkHadoopUtil.get.newConfiguration(conf))
val eventLogInfo = EventLoggingListener.parseLoggingInfo(eventLogDir, fileSystem)
val eventLogPaths = eventLogInfo.logPaths
val compressionCodec = eventLogInfo.compressionCodec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import com.google.common.io.Files
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileUtil, Path}

import org.apache.spark.Logging
import org.apache.spark.deploy.{Command, DriverDescription}
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.{Command, DriverDescription, SparkHadoopUtil}
import org.apache.spark.deploy.DeployMessages.DriverStateChanged
import org.apache.spark.deploy.master.DriverState
import org.apache.spark.deploy.master.DriverState.DriverState
Expand All @@ -39,6 +39,7 @@ import org.apache.spark.deploy.master.DriverState.DriverState
* This is currently only used in standalone cluster deploy mode.
*/
private[spark] class DriverRunner(
val conf: SparkConf,
val driverId: String,
val workDir: File,
val sparkHome: File,
Expand Down Expand Up @@ -144,8 +145,8 @@ private[spark] class DriverRunner(

val jarPath = new Path(driverDesc.jarUrl)

val emptyConf = new Configuration()
val jarFileSystem = jarPath.getFileSystem(emptyConf)
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
val jarFileSystem = jarPath.getFileSystem(hadoopConf)

val destPath = new File(driverDir.getAbsolutePath, jarPath.getName)
val jarFileName = jarPath.getName
Expand All @@ -154,7 +155,7 @@ private[spark] class DriverRunner(

if (!localJarFile.exists()) { // May already exist if running multiple workers on one node
logInfo(s"Copying user jar $jarPath to $destPath")
FileUtil.copy(jarFileSystem, jarPath, destPath, false, emptyConf)
FileUtil.copy(jarFileSystem, jarPath, destPath, false, hadoopConf)
}

if (!localJarFile.exists()) { // Verify copy succeeded
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ private[spark] class Worker(
val fullId = appId + "/" + execId
if (ExecutorState.isFinished(state)) {
executors.get(fullId) match {
case Some(executor) =>
case Some(executor) =>
logInfo("Executor " + fullId + " finished with state " + state +
message.map(" message " + _).getOrElse("") +
exitStatus.map(" exitStatus " + _).getOrElse(""))
Expand Down Expand Up @@ -288,7 +288,7 @@ private[spark] class Worker(

case LaunchDriver(driverId, driverDesc) => {
logInfo(s"Asked to launch driver $driverId")
val driver = new DriverRunner(driverId, workDir, sparkHome, driverDesc, self, akkaUrl)
val driver = new DriverRunner(conf, driverId, workDir, sparkHome, driverDesc, self, akkaUrl)
drivers(driverId) = driver
driver.start()

Expand Down
14 changes: 9 additions & 5 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap}

import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler._
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
Expand Down Expand Up @@ -294,9 +295,9 @@ private[spark] class Executor(
try {
val klass = Class.forName("org.apache.spark.repl.ExecutorClassLoader")
.asInstanceOf[Class[_ <: ClassLoader]]
val constructor = klass.getConstructor(classOf[String], classOf[ClassLoader],
classOf[Boolean])
constructor.newInstance(classUri, parent, userClassPathFirst)
val constructor = klass.getConstructor(classOf[SparkConf], classOf[String],
classOf[ClassLoader], classOf[Boolean])
constructor.newInstance(conf, classUri, parent, userClassPathFirst)
} catch {
case _: ClassNotFoundException =>
logError("Could not find org.apache.spark.repl.ExecutorClassLoader on classpath!")
Expand All @@ -313,16 +314,19 @@ private[spark] class Executor(
* SparkContext. Also adds any new JARs we fetched to the class loader.
*/
private def updateDependencies(newFiles: HashMap[String, Long], newJars: HashMap[String, Long]) {
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
synchronized {
// Fetch missing dependencies
for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) {
logInfo("Fetching " + name + " with timestamp " + timestamp)
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager)
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager,
hadoopConf)
currentFiles(name) = timestamp
}
for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < timestamp) {
logInfo("Fetching " + name + " with timestamp " + timestamp)
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager)
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager,
hadoopConf)
currentJars(name) = timestamp
// Add it to our class loader
val localName = name.split("/").last
Expand Down
Loading

0 comments on commit 789ea21

Please sign in to comment.