Skip to content

Commit

Permalink
Merge branch 'master' into SPARK-1149
Browse files Browse the repository at this point in the history
  • Loading branch information
liguoqiang committed Mar 11, 2014
2 parents 8425395 + 2a2c964 commit 3dcdcaf
Show file tree
Hide file tree
Showing 32 changed files with 298 additions and 171 deletions.
48 changes: 28 additions & 20 deletions bin/spark-class
Original file line number Diff line number Diff line change
Expand Up @@ -40,34 +40,46 @@ if [ -z "$1" ]; then
exit 1
fi

# If this is a standalone cluster daemon, reset SPARK_JAVA_OPTS and SPARK_MEM to reasonable
# values for that; it doesn't need a lot
if [ "$1" = "org.apache.spark.deploy.master.Master" -o "$1" = "org.apache.spark.deploy.worker.Worker" ]; then
SPARK_MEM=${SPARK_DAEMON_MEMORY:-512m}
SPARK_DAEMON_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS -Dspark.akka.logLifecycleEvents=true"
# Do not overwrite SPARK_JAVA_OPTS environment variable in this script
OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS" # Empty by default
else
OUR_JAVA_OPTS="$SPARK_JAVA_OPTS"
if [ -n "$SPARK_MEM" ]; then
echo "Warning: SPARK_MEM is deprecated, please use a more specific config option"
echo "(e.g., spark.executor.memory or SPARK_DRIVER_MEMORY)."
fi

# Use SPARK_MEM or 512m as the default memory, to be overridden by specific options
DEFAULT_MEM=${SPARK_MEM:-512m}

SPARK_DAEMON_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS -Dspark.akka.logLifecycleEvents=true"

# Add java opts for master, worker, executor. The opts maybe null
# Add java opts and memory settings for master, worker, executors, and repl.
case "$1" in
# Master and Worker use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
'org.apache.spark.deploy.master.Master')
OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_MASTER_OPTS"
OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS $SPARK_MASTER_OPTS"
OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM}
;;
'org.apache.spark.deploy.worker.Worker')
OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_WORKER_OPTS"
OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS $SPARK_WORKER_OPTS"
OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM}
;;

# Executors use SPARK_JAVA_OPTS + SPARK_EXECUTOR_MEMORY.
'org.apache.spark.executor.CoarseGrainedExecutorBackend')
OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_EXECUTOR_OPTS"
OUR_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_EXECUTOR_OPTS"
OUR_JAVA_MEM=${SPARK_EXECUTOR_MEMORY:-$DEFAULT_MEM}
;;
'org.apache.spark.executor.MesosExecutorBackend')
OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_EXECUTOR_OPTS"
OUR_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_EXECUTOR_OPTS"
OUR_JAVA_MEM=${SPARK_EXECUTOR_MEMORY:-$DEFAULT_MEM}
;;

# All drivers use SPARK_JAVA_OPTS + SPARK_DRIVER_MEMORY. The repl also uses SPARK_REPL_OPTS.
'org.apache.spark.repl.Main')
OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_REPL_OPTS"
OUR_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_REPL_OPTS"
OUR_JAVA_MEM=${SPARK_DRIVER_MEMORY:-$DEFAULT_MEM}
;;
*)
OUR_JAVA_OPTS="$SPARK_JAVA_OPTS"
OUR_JAVA_MEM=${SPARK_DRIVER_MEMORY:-$DEFAULT_MEM}
;;
esac

Expand All @@ -83,14 +95,10 @@ else
fi
fi

# Set SPARK_MEM if it isn't already set since we also use it for this process
SPARK_MEM=${SPARK_MEM:-512m}
export SPARK_MEM

# Set JAVA_OPTS to be able to load native libraries and to set heap size
JAVA_OPTS="$OUR_JAVA_OPTS"
JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$SPARK_LIBRARY_PATH"
JAVA_OPTS="$JAVA_OPTS -Xms$SPARK_MEM -Xmx$SPARK_MEM"
JAVA_OPTS="$JAVA_OPTS -Xms$OUR_JAVA_MEM -Xmx$OUR_JAVA_MEM"
# Load extra JAVA_OPTS from conf/java-opts, if it exists
if [ -e "$FWDIR/conf/java-opts" ] ; then
JAVA_OPTS="$JAVA_OPTS `cat $FWDIR/conf/java-opts`"
Expand Down
47 changes: 35 additions & 12 deletions bin/spark-class2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,45 @@ if not "x%1"=="x" goto arg_given
goto exit
:arg_given

set RUNNING_DAEMON=0
if "%1"=="spark.deploy.master.Master" set RUNNING_DAEMON=1
if "%1"=="spark.deploy.worker.Worker" set RUNNING_DAEMON=1
if "x%SPARK_DAEMON_MEMORY%" == "x" set SPARK_DAEMON_MEMORY=512m
if not "x%SPARK_MEM%"=="x" (
echo Warning: SPARK_MEM is deprecated, please use a more specific config option
echo e.g., spark.executor.memory or SPARK_DRIVER_MEMORY.
)

rem Use SPARK_MEM or 512m as the default memory, to be overridden by specific options
set OUR_JAVA_MEM=%SPARK_MEM%
if "x%OUR_JAVA_MEM%"=="x" set OUR_JAVA_MEM=512m

set SPARK_DAEMON_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% -Dspark.akka.logLifecycleEvents=true
if "%RUNNING_DAEMON%"=="1" set SPARK_MEM=%SPARK_DAEMON_MEMORY%
rem Do not overwrite SPARK_JAVA_OPTS environment variable in this script
if "%RUNNING_DAEMON%"=="0" set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS%
if "%RUNNING_DAEMON%"=="1" set OUR_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS%

rem Figure out how much memory to use per executor and set it as an environment
rem variable so that our process sees it and can report it to Mesos
if "x%SPARK_MEM%"=="x" set SPARK_MEM=512m
rem Add java opts and memory settings for master, worker, executors, and repl.
rem Master and Worker use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
if "%1"=="org.apache.spark.deploy.master.Master" (
set OUR_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% %SPARK_MASTER_OPTS%
if not "x%SPARK_DAEMON_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DAEMON_MEMORY%
) else if "%1"=="org.apache.spark.deploy.worker.Worker" (
set OUR_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% %SPARK_WORKER_OPTS%
if not "x%SPARK_DAEMON_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DAEMON_MEMORY%

rem Executors use SPARK_JAVA_OPTS + SPARK_EXECUTOR_MEMORY.
) else if "%1"=="org.apache.spark.executor.CoarseGrainedExecutorBackend" (
set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS% %SPARK_EXECUTOR_OPTS%
if not "x%SPARK_EXECUTOR_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_EXECUTOR_MEMORY%
) else if "%1"=="org.apache.spark.executor.MesosExecutorBackend" (
set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS% %SPARK_EXECUTOR_OPTS%
if not "x%SPARK_EXECUTOR_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_EXECUTOR_MEMORY%

rem All drivers use SPARK_JAVA_OPTS + SPARK_DRIVER_MEMORY. The repl also uses SPARK_REPL_OPTS.
) else if "%1"=="org.apache.spark.repl.Main" (
set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS% %SPARK_REPL_OPTS%
if not "x%SPARK_DRIVER_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DRIVER_MEMORY%
) else (
set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS%
if not "x%SPARK_DRIVER_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DRIVER_MEMORY%
)

rem Set JAVA_OPTS to be able to load native libraries and to set heap size
set JAVA_OPTS=%OUR_JAVA_OPTS% -Djava.library.path=%SPARK_LIBRARY_PATH% -Xms%SPARK_MEM% -Xmx%SPARK_MEM%
set JAVA_OPTS=%OUR_JAVA_OPTS% -Djava.library.path=%SPARK_LIBRARY_PATH% -Xms%OUR_JAVA_MEM% -Xmx%OUR_JAVA_MEM%
rem Attention: when changing the way the JAVA_OPTS are assembled, the change must be reflected in ExecutorRunner.scala!

rem Test whether the user has built Spark
Expand Down
28 changes: 14 additions & 14 deletions bin/spark-shell
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,11 @@ if [ "$1" = "--help" ] || [ "$1" = "-h" ]; then
exit
fi

SPARK_SHELL_OPTS=""

for o in "$@"; do
if [ "$1" = "-c" -o "$1" = "--cores" ]; then
shift
if [[ "$1" =~ $CORE_PATTERN ]]; then
SPARK_SHELL_OPTS="$SPARK_SHELL_OPTS -Dspark.cores.max=$1"
SPARK_REPL_OPTS="$SPARK_REPL_OPTS -Dspark.cores.max=$1"
shift
else
echo "ERROR: wrong format for -c/--cores"
Expand All @@ -61,7 +59,7 @@ for o in "$@"; do
if [ "$1" = "-em" -o "$1" = "--execmem" ]; then
shift
if [[ $1 =~ $MEM_PATTERN ]]; then
SPARK_SHELL_OPTS="$SPARK_SHELL_OPTS -Dspark.executor.memory=$1"
SPARK_REPL_OPTS="$SPARK_REPL_OPTS -Dspark.executor.memory=$1"
shift
else
echo "ERROR: wrong format for --execmem/-em"
Expand All @@ -71,7 +69,7 @@ for o in "$@"; do
if [ "$1" = "-dm" -o "$1" = "--drivermem" ]; then
shift
if [[ $1 =~ $MEM_PATTERN ]]; then
export SPARK_MEM=$1
export SPARK_DRIVER_MEMORY=$1
shift
else
echo "ERROR: wrong format for --drivermem/-dm"
Expand Down Expand Up @@ -125,16 +123,18 @@ if [[ ! $? ]]; then
fi

if $cygwin; then
# Workaround for issue involving JLine and Cygwin
# (see http://sourceforge.net/p/jline/bugs/40/).
# If you're using the Mintty terminal emulator in Cygwin, may need to set the
# "Backspace sends ^H" setting in "Keys" section of the Mintty options
# (see https://github.com/sbt/sbt/issues/562).
stty -icanon min 1 -echo > /dev/null 2>&1
$FWDIR/bin/spark-class -Djline.terminal=unix $SPARK_SHELL_OPTS org.apache.spark.repl.Main "$@"
stty icanon echo > /dev/null 2>&1
# Workaround for issue involving JLine and Cygwin
# (see http://sourceforge.net/p/jline/bugs/40/).
# If you're using the Mintty terminal emulator in Cygwin, may need to set the
# "Backspace sends ^H" setting in "Keys" section of the Mintty options
# (see https://github.com/sbt/sbt/issues/562).
stty -icanon min 1 -echo > /dev/null 2>&1
export SPARK_REPL_OPTS="$SPARK_REPL_OPTS -Djline.terminal=unix"
$FWDIR/bin/spark-class org.apache.spark.repl.Main "$@"
stty icanon echo > /dev/null 2>&1
else
$FWDIR/bin/spark-class $SPARK_SHELL_OPTS org.apache.spark.repl.Main "$@"
export SPARK_REPL_OPTS
$FWDIR/bin/spark-class org.apache.spark.repl.Main "$@"
fi

# record the exit status lest it be overwritten:
Expand Down
4 changes: 0 additions & 4 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,6 @@
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</dependency>
<dependency>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>chill_${scala.binary.version}</artifactId>
Expand Down
7 changes: 5 additions & 2 deletions core/src/main/scala/org/apache/spark/Logging.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark

import org.apache.log4j.{LogManager, PropertyConfigurator}
import org.slf4j.{Logger, LoggerFactory}
import org.slf4j.impl.StaticLoggerBinder

/**
* Utility trait for classes that want to log data. Creates a SLF4J logger for the class and allows
Expand Down Expand Up @@ -101,9 +102,11 @@ trait Logging {
}

private def initializeLogging() {
// If Log4j doesn't seem initialized, load a default properties file
// If Log4j is being used, but is not initialized, load a default properties file
val binder = StaticLoggerBinder.getSingleton
val usingLog4j = binder.getLoggerFactoryClassStr.endsWith("Log4jLoggerFactory")
val log4jInitialized = LogManager.getRootLogger.getAllAppenders.hasMoreElements
if (!log4jInitialized) {
if (!log4jInitialized && usingLog4j) {
val defaultLogProps = "org/apache/spark/log4j-defaults.properties"
val classLoader = this.getClass.getClassLoader
Option(classLoader.getResource(defaultLogProps)) match {
Expand Down
31 changes: 16 additions & 15 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -162,19 +162,20 @@ class SparkContext(
jars.foreach(addJar)
}

def warnSparkMem(value: String): String = {
logWarning("Using SPARK_MEM to set amount of memory to use per executor process is " +
"deprecated, please use spark.executor.memory instead.")
value
}

private[spark] val executorMemory = conf.getOption("spark.executor.memory")
.orElse(Option(System.getenv("SPARK_MEM")))
.orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
.orElse(Option(System.getenv("SPARK_MEM")).map(warnSparkMem))
.map(Utils.memoryStringToMb)
.getOrElse(512)

if (!conf.contains("spark.executor.memory") && sys.env.contains("SPARK_MEM")) {
logWarning("Using SPARK_MEM to set amount of memory to use per executor process is " +
"deprecated, instead use spark.executor.memory")
}

// Environment variables to pass to our executors
private[spark] val executorEnvs = HashMap[String, String]()
// Note: SPARK_MEM is included for Mesos, but overwritten for standalone mode in ExecutorRunner
for (key <- Seq("SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS");
value <- Option(System.getenv(key))) {
executorEnvs(key) = value
Expand All @@ -185,8 +186,9 @@ class SparkContext(
value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {
executorEnvs(envKey) = value
}
// Since memory can be set with a system property too, use that
executorEnvs("SPARK_MEM") = executorMemory + "m"
// The Mesos scheduler backend relies on this environment variable to set executor memory.
// TODO: Set this only in the Mesos scheduler.
executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
executorEnvs ++= conf.getExecutorEnv

// Set SPARK_USER for user who is running SparkContext.
Expand Down Expand Up @@ -830,13 +832,12 @@ class SparkContext(
setLocalProperty("externalCallSite", null)
}

/**
* Capture the current user callsite and return a formatted version for printing. If the user
* has overridden the call site, this will return the user's version.
*/
private[spark] def getCallSite(): String = {
val callSite = getLocalProperty("externalCallSite")
if (callSite == null) {
Utils.formatSparkCallSite
} else {
callSite
}
Option(getLocalProperty("externalCallSite")).getOrElse(Utils.formatCallSiteInfo())
}

/**
Expand Down
2 changes: 0 additions & 2 deletions core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,6 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
def subtract(other: JavaRDD[T], p: Partitioner): JavaRDD[T] =
wrapRDD(rdd.subtract(other, p))

def generator: String = rdd.generator

override def toString = rdd.toString

/** Assign a name to this RDD */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.api.java

import java.util.{Comparator, List => JList}

import scala.Tuple2
import scala.collection.JavaConversions._
import scala.reflect.ClassTag

Expand Down Expand Up @@ -500,8 +499,4 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {

def name(): String = rdd.name

/** Reset generator */
def setGenerator(_generator: String) = {
rdd.setGenerator(_generator)
}
}
18 changes: 4 additions & 14 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,6 @@ abstract class RDD[T: ClassTag](
this
}

/** User-defined generator of this RDD*/
@transient var generator = Utils.getCallSiteInfo.firstUserClass

/** Reset generator*/
def setGenerator(_generator: String) = {
generator = _generator
}

/**
* Set this RDD's storage level to persist its values across operations after the first time
* it is computed. This can only be used to assign a new storage level if the RDD does not
Expand Down Expand Up @@ -1031,8 +1023,9 @@ abstract class RDD[T: ClassTag](

private var storageLevel: StorageLevel = StorageLevel.NONE

/** Record user function generating this RDD. */
@transient private[spark] val origin = sc.getCallSite()
/** User code that created this RDD (e.g. `textFile`, `parallelize`). */
@transient private[spark] val creationSiteInfo = Utils.getCallSiteInfo
private[spark] def getCreationSite = Utils.formatCallSiteInfo(creationSiteInfo)

private[spark] def elementClassTag: ClassTag[T] = classTag[T]

Expand Down Expand Up @@ -1095,10 +1088,7 @@ abstract class RDD[T: ClassTag](
}

override def toString: String = "%s%s[%d] at %s".format(
Option(name).map(_ + " ").getOrElse(""),
getClass.getSimpleName,
id,
origin)
Option(name).map(_ + " ").getOrElse(""), getClass.getSimpleName, id, getCreationSite)

def toJavaRDD() : JavaRDD[T] = {
new JavaRDD(this)(elementClassTag)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ class DAGScheduler(
} else {
// Kind of ugly: need to register RDDs with the cache and map output tracker here
// since we can't do it in the RDD constructor because # of partitions is unknown
logInfo("Registering RDD " + rdd.id + " (" + rdd.origin + ")")
logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.size)
}
stage
Expand Down
10 changes: 3 additions & 7 deletions core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -213,14 +213,10 @@ class JobLogger(val user: String, val logDirName: String)
* @param indent Indent number before info
*/
protected def recordRddInStageGraph(jobID: Int, rdd: RDD[_], indent: Int) {
val cacheStr = if (rdd.getStorageLevel != StorageLevel.NONE) "CACHED" else "NONE"
val rddInfo =
if (rdd.getStorageLevel != StorageLevel.NONE) {
"RDD_ID=" + rdd.id + " " + getRddName(rdd) + " CACHED" + " " +
rdd.origin + " " + rdd.generator
} else {
"RDD_ID=" + rdd.id + " " + getRddName(rdd) + " NONE" + " " +
rdd.origin + " " + rdd.generator
}
s"RDD_ID=$rdd.id ${getRddName(rdd)} $cacheStr " +
s"${rdd.getCreationSite} ${rdd.creationSiteInfo.firstUserClass}"
jobLogInfo(jobID, indentString(indent) + rddInfo, false)
rdd.dependencies.foreach {
case shufDep: ShuffleDependency[_, _] =>
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/scheduler/Stage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ private[spark] class Stage(
id
}

val name = callSite.getOrElse(rdd.origin)
val name = callSite.getOrElse(rdd.getCreationSite)

override def toString = "Stage " + id

Expand Down
Loading

0 comments on commit 3dcdcaf

Please sign in to comment.