Skip to content

Commit

Permalink
Merge branch 'master' into SPARK-8103
Browse files Browse the repository at this point in the history
  • Loading branch information
squito committed Jun 11, 2015
2 parents 89a59b6 + 424b007 commit 8c29707
Show file tree
Hide file tree
Showing 79 changed files with 1,137 additions and 495 deletions.
1 change: 1 addition & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ spark-env.sh
spark-env.cmd
spark-env.sh.template
log4j-defaults.properties
log4j-defaults-repl.properties
bootstrap-tooltip.js
jquery-1.11.1.min.js
d3.min.js
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Set everything to be logged to the console
log4j.rootCategory=WARN, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark-project.jetty=WARN
log4j.logger.org.spark-project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
26 changes: 19 additions & 7 deletions core/src/main/scala/org/apache/spark/Logging.scala
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,25 @@ trait Logging {
if (usingLog4j12) {
val log4j12Initialized = LogManager.getRootLogger.getAllAppenders.hasMoreElements
if (!log4j12Initialized) {
val defaultLogProps = "org/apache/spark/log4j-defaults.properties"
Option(Utils.getSparkClassLoader.getResource(defaultLogProps)) match {
case Some(url) =>
PropertyConfigurator.configure(url)
System.err.println(s"Using Spark's default log4j profile: $defaultLogProps")
case None =>
System.err.println(s"Spark was unable to load $defaultLogProps")
if (Utils.isInInterpreter) {
val replDefaultLogProps = "org/apache/spark/log4j-defaults-repl.properties"
Option(Utils.getSparkClassLoader.getResource(replDefaultLogProps)) match {
case Some(url) =>
PropertyConfigurator.configure(url)
System.err.println(s"Using Spark's repl log4j profile: $replDefaultLogProps")
System.err.println("To adjust logging level use sc.setLogLevel(\"INFO\")")
case None =>
System.err.println(s"Spark was unable to load $replDefaultLogProps")
}
} else {
val defaultLogProps = "org/apache/spark/log4j-defaults.properties"
Option(Utils.getSparkClassLoader.getResource(defaultLogProps)) match {
case Some(url) =>
PropertyConfigurator.configure(url)
System.err.println(s"Using Spark's default log4j profile: $defaultLogProps")
case None =>
System.err.println(s"Spark was unable to load $defaultLogProps")
}
}
}
}
Expand Down
49 changes: 48 additions & 1 deletion core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io._
import java.util.concurrent.ConcurrentHashMap
import java.util.zip.{GZIPInputStream, GZIPOutputStream}

import scala.collection.mutable.{HashSet, Map}
import scala.collection.mutable.{HashMap, HashSet, Map}
import scala.collection.JavaConversions._
import scala.reflect.ClassTag

Expand Down Expand Up @@ -284,6 +284,53 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
cachedSerializedStatuses.contains(shuffleId) || mapStatuses.contains(shuffleId)
}

/**
* Return a list of locations that each have fraction of map output greater than the specified
* threshold.
*
* @param shuffleId id of the shuffle
* @param reducerId id of the reduce task
* @param numReducers total number of reducers in the shuffle
* @param fractionThreshold fraction of total map output size that a location must have
* for it to be considered large.
*
* This method is not thread-safe.
*/
def getLocationsWithLargestOutputs(
shuffleId: Int,
reducerId: Int,
numReducers: Int,
fractionThreshold: Double)
: Option[Array[BlockManagerId]] = {

if (mapStatuses.contains(shuffleId)) {
val statuses = mapStatuses(shuffleId)
if (statuses.nonEmpty) {
// HashMap to add up sizes of all blocks at the same location
val locs = new HashMap[BlockManagerId, Long]
var totalOutputSize = 0L
var mapIdx = 0
while (mapIdx < statuses.length) {
val status = statuses(mapIdx)
val blockSize = status.getSizeForBlock(reducerId)
if (blockSize > 0) {
locs(status.location) = locs.getOrElse(status.location, 0L) + blockSize
totalOutputSize += blockSize
}
mapIdx = mapIdx + 1
}
val topLocs = locs.filter { case (loc, size) =>
size.toDouble / totalOutputSize >= fractionThreshold
}
// Return if we have any locations which satisfy the required threshold
if (topLocs.nonEmpty) {
return Some(topLocs.map(_._1).toArray)
}
}
}
None
}

def incrementEpoch() {
epochLock.synchronized {
epoch += 1
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/api/r/RBackend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.handler.codec.LengthFieldBasedFrameDecoder
import io.netty.handler.codec.bytes.{ByteArrayDecoder, ByteArrayEncoder}

import org.apache.spark.Logging
import org.apache.spark.{Logging, SparkConf}

/**
* Netty-based backend server that is used to communicate between R and Java.
Expand All @@ -41,7 +41,8 @@ private[spark] class RBackend {
private[this] var bossGroup: EventLoopGroup = null

def init(): Int = {
bossGroup = new NioEventLoopGroup(2)
val conf = new SparkConf()
bossGroup = new NioEventLoopGroup(conf.getInt("spark.r.numRBackendThreads", 2))
val workerGroup = bossGroup
val handler = new RBackendHandler(this)

Expand Down
17 changes: 13 additions & 4 deletions core/src/main/scala/org/apache/spark/api/r/SerDe.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.api.r

import java.io.{DataInputStream, DataOutputStream}
import java.sql.{Date, Time}
import java.sql.{Timestamp, Date, Time}

import scala.collection.JavaConversions._

Expand Down Expand Up @@ -107,9 +107,12 @@ private[spark] object SerDe {
Date.valueOf(readString(in))
}

def readTime(in: DataInputStream): Time = {
val t = in.readDouble()
new Time((t * 1000L).toLong)
def readTime(in: DataInputStream): Timestamp = {
val seconds = in.readDouble()
val sec = Math.floor(seconds).toLong
val t = new Timestamp(sec * 1000L)
t.setNanos(((seconds - sec) * 1e9).toInt)
t
}

def readBytesArr(in: DataInputStream): Array[Array[Byte]] = {
Expand Down Expand Up @@ -227,6 +230,9 @@ private[spark] object SerDe {
case "java.sql.Time" =>
writeType(dos, "time")
writeTime(dos, value.asInstanceOf[Time])
case "java.sql.Timestamp" =>
writeType(dos, "time")
writeTime(dos, value.asInstanceOf[Timestamp])
case "[B" =>
writeType(dos, "raw")
writeBytes(dos, value.asInstanceOf[Array[Byte]])
Expand Down Expand Up @@ -289,6 +295,9 @@ private[spark] object SerDe {
out.writeDouble(value.getTime.toDouble / 1000.0)
}

def writeTime(out: DataOutputStream, value: Timestamp): Unit = {
out.writeDouble((value.getTime / 1000).toDouble + value.getNanos.toDouble / 1e9)
}

// NOTE: Only works for ASCII right now
def writeString(out: DataOutputStream, value: String): Unit = {
Expand Down
77 changes: 17 additions & 60 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -324,55 +324,20 @@ object SparkSubmit {
// Usage: PythonAppRunner <main python file> <extra python files> [app arguments]
args.mainClass = "org.apache.spark.deploy.PythonRunner"
args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs
args.files = mergeFileLists(args.files, args.primaryResource)
if (clusterManager != YARN) {
// The YARN backend distributes the primary file differently, so don't merge it.
args.files = mergeFileLists(args.files, args.primaryResource)
}
}
if (clusterManager != YARN) {
// The YARN backend handles python files differently, so don't merge the lists.
args.files = mergeFileLists(args.files, args.pyFiles)
}
args.files = mergeFileLists(args.files, args.pyFiles)
if (args.pyFiles != null) {
sysProps("spark.submit.pyFiles") = args.pyFiles
}
}

// In yarn mode for a python app, add pyspark archives to files
// that can be distributed with the job
if (args.isPython && clusterManager == YARN) {
var pyArchives: String = null
val pyArchivesEnvOpt = sys.env.get("PYSPARK_ARCHIVES_PATH")
if (pyArchivesEnvOpt.isDefined) {
pyArchives = pyArchivesEnvOpt.get
} else {
if (!sys.env.contains("SPARK_HOME")) {
printErrorAndExit("SPARK_HOME does not exist for python application in yarn mode.")
}
val pythonPath = new ArrayBuffer[String]
for (sparkHome <- sys.env.get("SPARK_HOME")) {
val pyLibPath = Seq(sparkHome, "python", "lib").mkString(File.separator)
val pyArchivesFile = new File(pyLibPath, "pyspark.zip")
if (!pyArchivesFile.exists()) {
printErrorAndExit("pyspark.zip does not exist for python application in yarn mode.")
}
val py4jFile = new File(pyLibPath, "py4j-0.8.2.1-src.zip")
if (!py4jFile.exists()) {
printErrorAndExit("py4j-0.8.2.1-src.zip does not exist for python application " +
"in yarn mode.")
}
pythonPath += pyArchivesFile.getAbsolutePath()
pythonPath += py4jFile.getAbsolutePath()
}
pyArchives = pythonPath.mkString(",")
}

pyArchives = pyArchives.split(",").map { localPath =>
val localURI = Utils.resolveURI(localPath)
if (localURI.getScheme != "local") {
args.files = mergeFileLists(args.files, localURI.toString)
new Path(localPath).getName
} else {
localURI.getPath
}
}.mkString(File.pathSeparator)
sysProps("spark.submit.pyArchives") = pyArchives
}

// If we're running a R app, set the main class to our specific R runner
if (args.isR && deployMode == CLIENT) {
if (args.primaryResource == SPARKR_SHELL) {
Expand All @@ -386,19 +351,10 @@ object SparkSubmit {
}
}

if (isYarnCluster) {
// In yarn-cluster mode for a python app, add primary resource and pyFiles to files
// that can be distributed with the job
if (args.isPython) {
args.files = mergeFileLists(args.files, args.primaryResource)
args.files = mergeFileLists(args.files, args.pyFiles)
}

if (isYarnCluster && args.isR) {
// In yarn-cluster mode for a R app, add primary resource to files
// that can be distributed with the job
if (args.isR) {
args.files = mergeFileLists(args.files, args.primaryResource)
}
args.files = mergeFileLists(args.files, args.primaryResource)
}

// Special flag to avoid deprecation warnings at the client
Expand Down Expand Up @@ -515,17 +471,18 @@ object SparkSubmit {
}
}

// Let YARN know it's a pyspark app, so it distributes needed libraries.
if (clusterManager == YARN && args.isPython) {
sysProps.put("spark.yarn.isPython", "true")
}

// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (isYarnCluster) {
childMainClass = "org.apache.spark.deploy.yarn.Client"
if (args.isPython) {
val mainPyFile = new Path(args.primaryResource).getName
childArgs += ("--primary-py-file", mainPyFile)
childArgs += ("--primary-py-file", args.primaryResource)
if (args.pyFiles != null) {
// These files will be distributed to each machine's working directory, so strip the
// path prefix
val pyFilesNames = args.pyFiles.split(",").map(p => (new Path(p)).getName).mkString(",")
childArgs += ("--py-files", pyFilesNames)
childArgs += ("--py-files", args.pyFiles)
}
childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
} else if (args.isR) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,9 @@ private[spark] object RDDOperationScope extends Logging {
private[spark] def withScope[T](
sc: SparkContext,
allowNesting: Boolean = false)(body: => T): T = {
val stackTrace = Thread.currentThread.getStackTrace().tail // ignore "Thread#getStackTrace"
val ourMethodName = stackTrace(1).getMethodName // i.e. withScope
// Climb upwards to find the first method that's called something different
val callerMethodName = stackTrace
val ourMethodName = "withScope"
val callerMethodName = Thread.currentThread.getStackTrace()
.dropWhile(_.getMethodName != ourMethodName)
.find(_.getMethodName != ourMethodName)
.map(_.getMethodName)
.getOrElse {
Expand Down
37 changes: 34 additions & 3 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,22 @@ class DAGScheduler(
private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
taskScheduler.setDAGScheduler(this)

// Flag to control if reduce tasks are assigned preferred locations
private val shuffleLocalityEnabled =
sc.getConf.getBoolean("spark.shuffle.reduceLocality.enabled", true)
// Number of map, reduce tasks above which we do not assign preferred locations
// based on map output sizes. We limit the size of jobs for which assign preferred locations
// as computing the top locations by size becomes expensive.
private[this] val SHUFFLE_PREF_MAP_THRESHOLD = 1000
// NOTE: This should be less than 2000 as we use HighlyCompressedMapStatus beyond that
private[this] val SHUFFLE_PREF_REDUCE_THRESHOLD = 1000

// Fraction of total map output that must be at a location for it to considered as a preferred
// location for a reduce task.
// Making this larger will focus on fewer locations where most data can be read locally, but
// may lead to more delay in scheduling if those locations are busy.
private[scheduler] val REDUCER_PREF_LOCS_FRACTION = 0.2

// Called by TaskScheduler to report task's starting.
def taskStarted(task: Task[_], taskInfo: TaskInfo) {
eventProcessLoop.post(BeginEvent(task, taskInfo))
Expand Down Expand Up @@ -1399,17 +1415,32 @@ class DAGScheduler(
if (rddPrefs.nonEmpty) {
return rddPrefs.map(TaskLocation(_))
}
// If the RDD has narrow dependencies, pick the first partition of the first narrow dep
// that has any placement preferences. Ideally we would choose based on transfer sizes,
// but this will do for now.

rdd.dependencies.foreach {
case n: NarrowDependency[_] =>
// If the RDD has narrow dependencies, pick the first partition of the first narrow dep
// that has any placement preferences. Ideally we would choose based on transfer sizes,
// but this will do for now.
for (inPart <- n.getParents(partition)) {
val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
if (locs != Nil) {
return locs
}
}
case s: ShuffleDependency[_, _, _] =>
// For shuffle dependencies, pick locations which have at least REDUCER_PREF_LOCS_FRACTION
// of data as preferred locations
if (shuffleLocalityEnabled &&
rdd.partitions.size < SHUFFLE_PREF_REDUCE_THRESHOLD &&
s.rdd.partitions.size < SHUFFLE_PREF_MAP_THRESHOLD) {
// Get the preferred map output locations for this reducer
val topLocsForReducer = mapOutputTracker.getLocationsWithLargestOutputs(s.shuffleId,
partition, rdd.partitions.size, REDUCER_PREF_LOCS_FRACTION)
if (topLocsForReducer.nonEmpty) {
return topLocsForReducer.get.map(loc => TaskLocation(loc.host, loc.executorId))
}
}

case _ =>
}
Nil
Expand Down
Loading

0 comments on commit 8c29707

Please sign in to comment.