Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
bilna committed Jan 9, 2015
2 parents ae56514 + 167a5ab commit 5718d66
Show file tree
Hide file tree
Showing 39 changed files with 1,755 additions and 1,304 deletions.
20 changes: 20 additions & 0 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -354,5 +354,25 @@
</dependency>
</dependencies>
</profile>

<!-- Profiles that disable inclusion of certain dependencies. -->
<profile>
<id>hadoop-provided</id>
<properties>
<hadoop.deps.scope>provided</hadoop.deps.scope>
</properties>
</profile>
<profile>
<id>hive-provided</id>
<properties>
<hive.deps.scope>provided</hive.deps.scope>
</properties>
</profile>
<profile>
<id>parquet-provided</id>
<properties>
<parquet.deps.scope>provided</parquet.deps.scope>
</properties>
</profile>
</profiles>
</project>
4 changes: 0 additions & 4 deletions bagel/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>
Expand Down
7 changes: 7 additions & 0 deletions bin/compute-classpath.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,13 @@ if "x%YARN_CONF_DIR%"=="x" goto no_yarn_conf_dir
set CLASSPATH=%CLASSPATH%;%YARN_CONF_DIR%
:no_yarn_conf_dir

rem To allow for distributions to append needed libraries to the classpath (e.g. when
rem using the "hadoop-provided" profile to build Spark), check SPARK_DIST_CLASSPATH and
rem append it to tbe final classpath.
if not "x%$SPARK_DIST_CLASSPATH%"=="x" (
set CLASSPATH=%CLASSPATH%;%SPARK_DIST_CLASSPATH%
)

rem A bit of a hack to allow calling this script within run2.cmd without seeing output
if "%DONT_PRINT_CLASSPATH%"=="1" goto exit

Expand Down
7 changes: 7 additions & 0 deletions bin/compute-classpath.sh
Original file line number Diff line number Diff line change
Expand Up @@ -146,4 +146,11 @@ if [ -n "$YARN_CONF_DIR" ]; then
CLASSPATH="$CLASSPATH:$YARN_CONF_DIR"
fi

# To allow for distributions to append needed libraries to the classpath (e.g. when
# using the "hadoop-provided" profile to build Spark), check SPARK_DIST_CLASSPATH and
# append it to tbe final classpath.
if [ -n "$SPARK_DIST_CLASSPATH" ]; then
CLASSPATH="$CLASSPATH:$SPARK_DIST_CLASSPATH"
fi

echo "$CLASSPATH"
5 changes: 5 additions & 0 deletions bin/spark-submit
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,16 @@ while (($#)); do
export SPARK_SUBMIT_CLASSPATH=$2
elif [ "$1" = "--driver-java-options" ]; then
export SPARK_SUBMIT_OPTS=$2
elif [ "$1" = "--master" ]; then
export MASTER=$2
fi
shift
done

DEFAULT_PROPERTIES_FILE="$SPARK_HOME/conf/spark-defaults.conf"
if [ "$MASTER" == "yarn-cluster" ]; then
SPARK_SUBMIT_DEPLOY_MODE=cluster
fi
export SPARK_SUBMIT_DEPLOY_MODE=${SPARK_SUBMIT_DEPLOY_MODE:-"client"}
export SPARK_SUBMIT_PROPERTIES_FILE=${SPARK_SUBMIT_PROPERTIES_FILE:-"$DEFAULT_PROPERTIES_FILE"}

Expand Down
6 changes: 6 additions & 0 deletions bin/spark-submit2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,17 @@ if [%1] == [] goto continue
set SPARK_SUBMIT_CLASSPATH=%2
) else if [%1] == [--driver-java-options] (
set SPARK_SUBMIT_OPTS=%2
) else if [%1] == [--master] (
set MASTER=%2
)
shift
goto loop
:continue

if [%MASTER%] == [yarn-cluster] (
set SPARK_SUBMIT_DEPLOY_MODE=cluster
)

rem For client mode, the driver will be launched in the same JVM that launches
rem SparkSubmit, so we may need to read the properties file for any extra class
rem paths, library paths, java options and memory early on. Otherwise, it will
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// An asynchronous listener bus for Spark events
private[spark] val listenerBus = new LiveListenerBus

conf.set("spark.executor.id", "driver")
conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)

// Create the Spark execution environment (cache, map output tracker, etc)
private[spark] val env = SparkEnv.createDriverEnv(conf, isLocal, listenerBus)
Expand Down
39 changes: 20 additions & 19 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -720,26 +720,27 @@ private[spark] class Master(
def rebuildSparkUI(app: ApplicationInfo): Boolean = {
val appName = app.desc.name
val notFoundBasePath = HistoryServer.UI_PATH_PREFIX + "/not-found"
val eventLogFile = app.desc.eventLogDir
.map { dir => EventLoggingListener.getLogPath(dir, app.id) }
.getOrElse {
// Event logging is not enabled for this application
app.desc.appUiUrl = notFoundBasePath
return false
}
val fs = Utils.getHadoopFileSystem(eventLogFile, hadoopConf)
try {
val eventLogFile = app.desc.eventLogDir
.map { dir => EventLoggingListener.getLogPath(dir, app.id) }
.getOrElse {
// Event logging is not enabled for this application
app.desc.appUiUrl = notFoundBasePath
return false
}

val fs = Utils.getHadoopFileSystem(eventLogFile, hadoopConf)

if (fs.exists(new Path(eventLogFile + EventLoggingListener.IN_PROGRESS))) {
// Event logging is enabled for this application, but the application is still in progress
val title = s"Application history not found (${app.id})"
var msg = s"Application $appName is still in progress."
logWarning(msg)
msg = URLEncoder.encode(msg, "UTF-8")
app.desc.appUiUrl = notFoundBasePath + s"?msg=$msg&title=$title"
return false
}
if (fs.exists(new Path(eventLogFile + EventLoggingListener.IN_PROGRESS))) {
// Event logging is enabled for this application, but the application is still in progress
val title = s"Application history not found (${app.id})"
var msg = s"Application $appName is still in progress."
logWarning(msg)
msg = URLEncoder.encode(msg, "UTF-8")
app.desc.appUiUrl = notFoundBasePath + s"?msg=$msg&title=$title"
return false
}

try {
val (logInput, sparkVersion) = EventLoggingListener.openEventLog(new Path(eventLogFile), fs)
val replayBus = new ReplayListenerBus()
val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf),
Expand All @@ -758,7 +759,7 @@ private[spark] class Master(
case fnf: FileNotFoundException =>
// Event logging is enabled for this application, but no event logs are found
val title = s"Application history not found (${app.id})"
var msg = s"No event logs found for application $appName in $eventLogFile."
var msg = s"No event logs found for application $appName in ${app.desc.eventLogDir}."
logWarning(msg)
msg += " Did you specify the correct logging directory?"
msg = URLEncoder.encode(msg, "UTF-8")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* Note: This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
*
* Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any
* key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]].
*/
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = {
// groupByKey shouldn't use map side combine because map side combine does not
Expand All @@ -458,6 +461,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* Note: This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
*
* Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any
* key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]].
*/
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = {
groupByKey(new HashPartitioner(numPartitions))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,26 @@ private[spark] class SparkDeploySchedulerBackend(
"{{WORKER_URL}}")
val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
.map(Utils.splitCommandString).getOrElse(Seq.empty)
val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath").toSeq.flatMap { cp =>
cp.split(java.io.File.pathSeparator)
}
val libraryPathEntries =
sc.conf.getOption("spark.executor.extraLibraryPath").toSeq.flatMap { cp =>
cp.split(java.io.File.pathSeparator)
val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath")
.map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath")
.map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)

// When testing, expose the parent class path to the child. This is processed by
// compute-classpath.{cmd,sh} and makes all needed jars available to child processes
// when the assembly is built with the "*-provided" profiles enabled.
val testingClassPath =
if (sys.props.contains("spark.testing")) {
sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq
} else {
Nil
}

// Start executors with a few necessary configs for registering with the scheduler
val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts)
args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
appUIAddress, sc.eventLogDir)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
/** Cleanup local dirs and stop shuffle sender. */
private[spark] def stop() {
// Only perform cleanup if an external service is not serving our shuffle files.
if (!blockManager.externalShuffleServiceEnabled) {
if (!blockManager.externalShuffleServiceEnabled || blockManager.blockManagerId.isDriver) {
localDirs.foreach { localDir =>
if (localDir.isDirectory() && localDir.exists()) {
try {
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -990,11 +990,12 @@ private[spark] object Utils extends Logging {
for ((key, value) <- extraEnvironment) {
environment.put(key, value)
}

val process = builder.start()
new Thread("read stderr for " + command(0)) {
override def run() {
for (line <- Source.fromInputStream(process.getErrorStream).getLines()) {
System.err.println(line)
logInfo(line)
}
}
}.start()
Expand Down Expand Up @@ -1089,7 +1090,7 @@ private[spark] object Utils extends Logging {
var firstUserLine = 0
var insideSpark = true
var callStack = new ArrayBuffer[String]() :+ "<unknown>"

Thread.currentThread.getStackTrace().foreach { ste: StackTraceElement =>
// When running under some profilers, the current stack trace might contain some bogus
// frames. This is intended to ensure that we don't crash in these situations by
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/org/apache/spark/DriverSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class DriverSuite extends FunSuite with Timeouts {
forAll(masters) { (master: String) =>
failAfter(60 seconds) {
Utils.executeAndGetOutput(
Seq("./bin/spark-class", "org.apache.spark.DriverWithoutCleanup", master),
Seq(s"$sparkHome/bin/spark-class", "org.apache.spark.DriverWithoutCleanup", master),
new File(sparkHome),
Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome))
}
Expand Down
Loading

0 comments on commit 5718d66

Please sign in to comment.