Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into rest
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed Jan 28, 2015
2 parents e42c131 + d743732 commit efa5e18
Show file tree
Hide file tree
Showing 104 changed files with 4,388 additions and 2,015 deletions.
16 changes: 6 additions & 10 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,9 @@ class SparkHadoopUtil extends Logging {
* statistics are only available as of Hadoop 2.5 (see HADOOP-10688).
* Returns None if the required method can't be found.
*/
private[spark] def getFSBytesReadOnThreadCallback(path: Path, conf: Configuration)
: Option[() => Long] = {
private[spark] def getFSBytesReadOnThreadCallback(): Option[() => Long] = {
try {
val threadStats = getFileSystemThreadStatistics(path, conf)
val threadStats = getFileSystemThreadStatistics()
val getBytesReadMethod = getFileSystemThreadStatisticsMethod("getBytesRead")
val f = () => threadStats.map(getBytesReadMethod.invoke(_).asInstanceOf[Long]).sum
val baselineBytesRead = f()
Expand All @@ -156,10 +155,9 @@ class SparkHadoopUtil extends Logging {
* statistics are only available as of Hadoop 2.5 (see HADOOP-10688).
* Returns None if the required method can't be found.
*/
private[spark] def getFSBytesWrittenOnThreadCallback(path: Path, conf: Configuration)
: Option[() => Long] = {
private[spark] def getFSBytesWrittenOnThreadCallback(): Option[() => Long] = {
try {
val threadStats = getFileSystemThreadStatistics(path, conf)
val threadStats = getFileSystemThreadStatistics()
val getBytesWrittenMethod = getFileSystemThreadStatisticsMethod("getBytesWritten")
val f = () => threadStats.map(getBytesWrittenMethod.invoke(_).asInstanceOf[Long]).sum
val baselineBytesWritten = f()
Expand All @@ -172,10 +170,8 @@ class SparkHadoopUtil extends Logging {
}
}

private def getFileSystemThreadStatistics(path: Path, conf: Configuration): Seq[AnyRef] = {
val qualifiedPath = path.getFileSystem(conf).makeQualified(path)
val scheme = qualifiedPath.toUri().getScheme()
val stats = FileSystem.getAllStatistics().filter(_.getScheme().equals(scheme))
private def getFileSystemThreadStatistics(): Seq[AnyRef] = {
val stats = FileSystem.getAllStatistics()
stats.map(Utils.invoke(classOf[Statistics], _, "getThreadStatistics"))
}

Expand Down
24 changes: 12 additions & 12 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,18 @@ object SparkSubmit {
}
}

// Add the application jar automatically so the user doesn't have to call sc.addJar
// For YARN cluster mode, the jar is already distributed on each node as "app.jar"
// For python files, the primary resource is already distributed as a regular file
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
if (!isYarnCluster && !args.isPython) {
var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq.empty)
if (isUserJar(args.primaryResource)) {
jars = jars ++ Seq(args.primaryResource)
}
sysProps.put("spark.jars", jars.mkString(","))
}

// In standalone-cluster mode, use Client as a wrapper around the user class
// Note that we won't actually launch this class if we're using the stable REST protocol
if (args.isStandaloneCluster && !args.isRestEnabled) {
Expand All @@ -306,18 +318,6 @@ object SparkSubmit {
}
}

// Add the application jar automatically so the user doesn't have to call sc.addJar
// For YARN cluster mode, the jar is already distributed on each node as "app.jar"
// For python files, the primary resource is already distributed as a regular file
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
if (!isYarnCluster && !args.isPython) {
var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq.empty)
if (isUserJar(args.primaryResource)) {
jars = jars ++ Seq(args.primaryResource)
}
sysProps.put("spark.jars", jars.mkString(","))
}

// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (isYarnCluster) {
childMainClass = "org.apache.spark.deploy.yarn.Client"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,12 @@ private[spark] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
<ul class="unstyled">
<li><strong>URL:</strong> {state.uri}</li>
{
state.stableUri
.map { uri =>
<li>
<strong>Stable URL:</strong> {uri}
<span class="stable-uri"> (for standalone cluster mode in Spark 1.3+)</span>
</li> }
.getOrElse { Seq.empty }
state.stableUri.map { uri =>
<li>
<strong>Stable URL:</strong> {uri}
<span class="stable-uri"> (for standalone cluster mode in Spark 1.3+)</span>
</li>
}.getOrElse { Seq.empty }
}
<li><strong>Workers:</strong> {state.workers.size}</li>
<li><strong>Cores:</strong> {state.workers.map(_.cores).sum} Total,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.executor

import java.util.concurrent.atomic.AtomicLong

import org.apache.spark.executor.DataReadMethod
import org.apache.spark.executor.DataReadMethod.DataReadMethod

import scala.collection.mutable.ArrayBuffer
Expand Down
12 changes: 7 additions & 5 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.hadoop.mapred.Reporter
import org.apache.hadoop.mapred.JobID
import org.apache.hadoop.mapred.TaskAttemptID
import org.apache.hadoop.mapred.TaskID
import org.apache.hadoop.mapred.lib.CombineFileSplit
import org.apache.hadoop.util.ReflectionUtils

import org.apache.spark._
Expand Down Expand Up @@ -218,13 +219,13 @@ class HadoopRDD[K, V](

// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
val bytesReadCallback = inputMetrics.bytesReadCallback.orElse(
val bytesReadCallback = inputMetrics.bytesReadCallback.orElse {
split.inputSplit.value match {
case split: FileSplit =>
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(split.getPath, jobConf)
case _: FileSplit | _: CombineFileSplit =>
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
case _ => None
}
)
}
inputMetrics.setBytesReadCallback(bytesReadCallback)

var reader: RecordReader[K, V] = null
Expand Down Expand Up @@ -254,7 +255,8 @@ class HadoopRDD[K, V](
reader.close()
if (bytesReadCallback.isDefined) {
inputMetrics.updateBytesRead()
} else if (split.inputSplit.value.isInstanceOf[FileSplit]) {
} else if (split.inputSplit.value.isInstanceOf[FileSplit] ||
split.inputSplit.value.isInstanceOf[CombineFileSplit]) {
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
Expand Down
15 changes: 8 additions & 7 deletions core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.reflect.ClassTag
import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.FileSplit
import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileSplit}

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.input.WholeTextFileInputFormat
Expand All @@ -34,7 +34,7 @@ import org.apache.spark.Logging
import org.apache.spark.Partition
import org.apache.spark.SerializableWritable
import org.apache.spark.{SparkContext, TaskContext}
import org.apache.spark.executor.{DataReadMethod, InputMetrics}
import org.apache.spark.executor.DataReadMethod
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -114,13 +114,13 @@ class NewHadoopRDD[K, V](

// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
val bytesReadCallback = inputMetrics.bytesReadCallback.orElse(
val bytesReadCallback = inputMetrics.bytesReadCallback.orElse {
split.serializableHadoopSplit.value match {
case split: FileSplit =>
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(split.getPath, conf)
case _: FileSplit | _: CombineFileSplit =>
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
case _ => None
}
)
}
inputMetrics.setBytesReadCallback(bytesReadCallback)

val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
Expand Down Expand Up @@ -163,7 +163,8 @@ class NewHadoopRDD[K, V](
reader.close()
if (bytesReadCallback.isDefined) {
inputMetrics.updateBytesRead()
} else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit]) {
} else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
Expand Down
11 changes: 4 additions & 7 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -990,7 +990,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val committer = format.getOutputCommitter(hadoopContext)
committer.setupTask(hadoopContext)

val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context, config)
val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context)

val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
try {
Expand Down Expand Up @@ -1061,7 +1061,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val taskAttemptId = (context.taskAttemptId % Int.MaxValue).toInt

val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context, config)
val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context)

writer.setup(context.stageId, context.partitionId, taskAttemptId)
writer.open()
Expand All @@ -1086,11 +1086,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
writer.commitJob()
}

private def initHadoopOutputMetrics(context: TaskContext, config: Configuration)
: (OutputMetrics, Option[() => Long]) = {
val bytesWrittenCallback = Option(config.get("mapreduce.output.fileoutputformat.outputdir"))
.map(new Path(_))
.flatMap(SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback(_, config))
private def initHadoopOutputMetrics(context: TaskContext): (OutputMetrics, Option[() => Long]) = {
val bytesWrittenCallback = SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback()
val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop)
if (bytesWrittenCallback.isDefined) {
context.taskMetrics.outputMetrics = Some(outputMetrics)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class StandaloneRestProtocolSuite extends FunSuite with BeforeAndAfterAll with B
val resultsFile = File.createTempFile("test-submit", ".txt")
val numbers = Seq(1, 2, 3)
val size = 500
val driverId = submitApp(resultsFile, numbers, size)
val driverId = submitApplication(resultsFile, numbers, size)
waitUntilFinished(driverId)
validateResult(resultsFile, numbers, size)
}
Expand All @@ -68,7 +68,7 @@ class StandaloneRestProtocolSuite extends FunSuite with BeforeAndAfterAll with B
val resultsFile = File.createTempFile("test-kill", ".txt")
val numbers = Seq(1, 2, 3)
val size = 500
val driverId = submitApp(resultsFile, numbers, size)
val driverId = submitApplication(resultsFile, numbers, size)
val killResponse = client.killDriver(masterRestUrl, driverId)
val killSuccess = killResponse.getFieldNotNull(KillDriverResponseField.SUCCESS)
waitUntilFinished(driverId)
Expand All @@ -90,6 +90,7 @@ class StandaloneRestProtocolSuite extends FunSuite with BeforeAndAfterAll with B
/**
* Start a local cluster containing one Master and a few Workers.
* Do not use org.apache.spark.deploy.LocalCluster here because we want the REST URL.
* Return the Master's REST URL to which applications should be submitted.
*/
private def startLocalCluster(): String = {
val conf = new SparkConf(false)
Expand All @@ -111,10 +112,8 @@ class StandaloneRestProtocolSuite extends FunSuite with BeforeAndAfterAll with B
masterRestUrl
}

/**
* Submit an application through the stable gateway and return the corresponding driver ID.
*/
private def submitApp(resultsFile: File, numbers: Seq[Int], size: Int): String = {
/** Submit the StandaloneRestApp and return the corresponding driver ID. */
private def submitApplication(resultsFile: File, numbers: Seq[Int], size: Int): String = {
val appArgs = Seq(resultsFile.getAbsolutePath) ++ numbers.map(_.toString) ++ Seq(size.toString)
val commandLineArgs = Array(
"--deploy-mode", "cluster",
Expand All @@ -129,10 +128,7 @@ class StandaloneRestProtocolSuite extends FunSuite with BeforeAndAfterAll with B
submitResponse.getFieldNotNull(SubmitDriverResponseField.DRIVER_ID)
}

/**
* Wait until the given driver has finished running,
* up to the specified maximum number of seconds.
*/
/** Wait until the given driver has finished running up to the specified timeout. */
private def waitUntilFinished(driverId: String, maxSeconds: Int = 10): Unit = {
var finished = false
val expireTime = System.currentTimeMillis + maxSeconds * 1000
Expand Down Expand Up @@ -189,11 +185,11 @@ private object StandaloneRestProtocolSuite {

/**
* Return a list of class files compiled for StandaloneRestApp.
* This includes all the anonymous classes used in StandaloneRestApp#main.
* This includes all the anonymous classes used in the application.
*/
private def getClassFiles: Seq[File] = {
val clazz = StandaloneRestApp.getClass
val className = Utils.getFormattedClassName(StandaloneRestApp)
val clazz = StandaloneRestApp.getClass
val basePath = clazz.getProtectionDomain.getCodeSource.getLocation.toURI.getPath
val baseDir = new File(basePath + "/" + pathPrefix)
baseDir.listFiles().filter(_.getName.contains(className))
Expand All @@ -202,22 +198,21 @@ private object StandaloneRestProtocolSuite {

/**
* Sample application to be submitted to the cluster using the stable gateway.
* All relevant classes will be packaged into a jar dynamically and submitted to the cluster.
* All relevant classes will be packaged into a jar at run time.
*/
object StandaloneRestApp {
// Usage: [path to results file] [num1] [num2] [num3] [rddSize]
// The first line of the results file should be (num1 + num2 + num3)
// The second line should be (rddSize / 2) + 1
def main(args: Array[String]) {
assert(args.size == 5)
assert(args.size == 5, s"Expected exactly 5 arguments: ${args.mkString(",")}")
val resultFile = new File(args(0))
val writer = new PrintWriter(resultFile)
try {
val firstLine = args(1).toInt + args(2).toInt + args(3).toInt
val rddSize = args(4).toInt
val conf = new SparkConf()
val sc = new SparkContext(conf)
val secondLine = sc.parallelize(1 to rddSize)
val firstLine = args(1).toInt + args(2).toInt + args(3).toInt
val secondLine = sc.parallelize(1 to args(4).toInt)
.map { i => (i / 2, i) }
.reduceByKey(_ + _)
.count()
Expand Down
Loading

0 comments on commit efa5e18

Please sign in to comment.