Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into rest
Browse files Browse the repository at this point in the history
Conflicts:
	core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
  • Loading branch information
Andrew Or committed Feb 2, 2015
2 parents 9e0d1af + 62a93a1 commit 581f7bf
Show file tree
Hide file tree
Showing 55 changed files with 1,490 additions and 349 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ To build Spark and its example programs, run:

(You do not need to do this if you downloaded a pre-built package.)
More detailed documentation is available from the project site, at
["Building Spark with Maven"](http://spark.apache.org/docs/latest/building-spark.html).
["Building Spark"](http://spark.apache.org/docs/latest/building-spark.html).

## Interactive Scala Shell

Expand Down
4 changes: 3 additions & 1 deletion bin/compute-classpath.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ fi
if [ -n "$SPARK_PREPEND_CLASSES" ]; then
echo "NOTE: SPARK_PREPEND_CLASSES is set, placing locally compiled Spark"\
"classes ahead of assembly." >&2
# Spark classes
CLASSPATH="$CLASSPATH:$FWDIR/core/target/scala-$SPARK_SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/core/target/jars/*"
CLASSPATH="$CLASSPATH:$FWDIR/repl/target/scala-$SPARK_SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/mllib/target/scala-$SPARK_SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/bagel/target/scala-$SPARK_SCALA_VERSION/classes"
Expand All @@ -63,6 +63,8 @@ if [ -n "$SPARK_PREPEND_CLASSES" ]; then
CLASSPATH="$CLASSPATH:$FWDIR/sql/hive/target/scala-$SPARK_SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/sql/hive-thriftserver/target/scala-$SPARK_SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/yarn/stable/target/scala-$SPARK_SCALA_VERSION/classes"
# Jars for shaded deps in their original form (copied here during build)
CLASSPATH="$CLASSPATH:$FWDIR/core/target/jars/*"
fi

# Use spark-assembly jar from either RELEASE or assembly directory
Expand Down
22 changes: 20 additions & 2 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -94,22 +94,35 @@
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</dependency>

<!-- Jetty dependencies promoted to compile here so they are shaded
and inlined into spark-core jar -->
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-plus</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-security</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-http</artifactId>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
Expand Down Expand Up @@ -356,19 +369,24 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<!-- When using SPARK_PREPEND_CLASSES Spark classes compiled locally don't use
shaded deps. So here we store jars in their original form which are added
when the classpath is computed. -->
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<configuration>
<outputDirectory>${project.build.directory}</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<useSubDirectoryPerType>true</useSubDirectoryPerType>
<includeArtifactIds>guava</includeArtifactIds>
<includeArtifactIds>
guava,jetty-io,jetty-http,jetty-plus,jetty-util,jetty-server
</includeArtifactIds>
<silent>true</silent>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import org.apache.spark.scheduler._
* spark.dynamicAllocation.enabled - Whether this feature is enabled
* spark.dynamicAllocation.minExecutors - Lower bound on the number of executors
* spark.dynamicAllocation.maxExecutors - Upper bound on the number of executors
* spark.dynamicAllocation.initialExecutors - Number of executors to start with
*
* spark.dynamicAllocation.schedulerBacklogTimeout (M) -
* If there are backlogged tasks for this duration, add new executors
Expand All @@ -70,9 +71,10 @@ private[spark] class ExecutorAllocationManager(

import ExecutorAllocationManager._

// Lower and upper bounds on the number of executors. These are required.
private val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", -1)
private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", -1)
// Lower and upper bounds on the number of executors.
private val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", 0)
private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors",
Integer.MAX_VALUE)

// How long there must be backlogged tasks for before an addition is triggered
private val schedulerBacklogTimeout = conf.getLong(
Expand Down Expand Up @@ -132,10 +134,10 @@ private[spark] class ExecutorAllocationManager(
*/
private def validateSettings(): Unit = {
if (minNumExecutors < 0 || maxNumExecutors < 0) {
throw new SparkException("spark.dynamicAllocation.{min/max}Executors must be set!")
throw new SparkException("spark.dynamicAllocation.{min/max}Executors must be positive!")
}
if (minNumExecutors == 0 || maxNumExecutors == 0) {
throw new SparkException("spark.dynamicAllocation.{min/max}Executors cannot be 0!")
if (maxNumExecutors == 0) {
throw new SparkException("spark.dynamicAllocation.maxExecutors cannot be 0!")
}
if (minNumExecutors > maxNumExecutors) {
throw new SparkException(s"spark.dynamicAllocation.minExecutors ($minNumExecutors) must " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.api.python.PythonUtils
import org.apache.spark.util.{RedirectThread, Utils}

/**
* A main class used by spark-submit to launch Python applications. It executes python as a
* A main class used to launch Python applications. It executes python as a
* subprocess and then has it connect back to the JVM to access system properties, etc.
*/
object PythonRunner {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ class SparkHadoopUtil extends Logging {
val baselineBytesRead = f()
Some(() => f() - baselineBytesRead)
} catch {
case e: NoSuchMethodException => {
case e @ (_: NoSuchMethodException | _: ClassNotFoundException) => {
logDebug("Couldn't find method for retrieving thread-level FileSystem input data", e)
None
}
Expand All @@ -163,7 +163,7 @@ class SparkHadoopUtil extends Logging {
val baselineBytesWritten = f()
Some(() => f() - baselineBytesWritten)
} catch {
case e: NoSuchMethodException => {
case e @ (_: NoSuchMethodException | _: ClassNotFoundException) => {
logDebug("Couldn't find method for retrieving thread-level FileSystem output data", e)
None
}
Expand Down
58 changes: 51 additions & 7 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import java.net.URL

import scala.collection.mutable.{ArrayBuffer, HashMap, Map}

import org.apache.hadoop.fs.Path

import org.apache.spark.deploy.rest._
import org.apache.spark.executor.ExecutorURLClassLoader
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -203,21 +205,38 @@ object SparkSubmit {
}
}

val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER

// Require all python files to be local, so we can add them to the PYTHONPATH
// In YARN cluster mode, python files are distributed as regular files, which can be non-local
if (args.isPython && !isYarnCluster) {
if (Utils.nonLocalPaths(args.primaryResource).nonEmpty) {
printErrorAndExit(s"Only local python files are supported: $args.primaryResource")
}
val nonLocalPyFiles = Utils.nonLocalPaths(args.pyFiles).mkString(",")
if (nonLocalPyFiles.nonEmpty) {
printErrorAndExit(s"Only local additional python files are supported: $nonLocalPyFiles")
}
}

// The following modes are not supported or applicable
(clusterManager, deployMode) match {
case (MESOS, CLUSTER) =>
printErrorAndExit("Cluster deploy mode is currently not supported for Mesos clusters.")
case (_, CLUSTER) if args.isPython =>
printErrorAndExit("Cluster deploy mode is currently not supported for python applications.")
case (STANDALONE, CLUSTER) if args.isPython =>
printErrorAndExit("Cluster deploy mode is currently not supported for python " +
"applications on standalone clusters.")
case (_, CLUSTER) if isShell(args.primaryResource) =>
printErrorAndExit("Cluster deploy mode is not applicable to Spark shells.")
case (_, CLUSTER) if isSqlShell(args.mainClass) =>
printErrorAndExit("Cluster deploy mode is not applicable to Spark SQL shell.")
case (_, CLUSTER) if isThriftServer(args.mainClass) =>
printErrorAndExit("Cluster deploy mode is not applicable to Spark Thrift server.")
case _ =>
}

// If we're running a python app, set the main class to our specific python runner
if (args.isPython) {
if (args.isPython && deployMode == CLIENT) {
if (args.primaryResource == PYSPARK_SHELL) {
args.mainClass = "py4j.GatewayServer"
args.childArgs = ArrayBuffer("--die-on-broken-pipe", "0")
Expand All @@ -234,6 +253,13 @@ object SparkSubmit {
}
}

// In yarn-cluster mode for a python app, add primary resource and pyFiles to files
// that can be distributed with the job
if (args.isPython && isYarnCluster) {
args.files = mergeFileLists(args.files, args.primaryResource)
args.files = mergeFileLists(args.files, args.pyFiles)
}

// Special flag to avoid deprecation warnings at the client
sysProps("SPARK_SUBMIT") = "true"

Expand Down Expand Up @@ -311,7 +337,6 @@ object SparkSubmit {
// Add the application jar automatically so the user doesn't have to call sc.addJar
// For YARN cluster mode, the jar is already distributed on each node as "app.jar"
// For python files, the primary resource is already distributed as a regular file
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
if (!isYarnCluster && !args.isPython) {
var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq.empty)
if (isUserJar(args.primaryResource)) {
Expand All @@ -337,10 +362,22 @@ object SparkSubmit {
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (isYarnCluster) {
childMainClass = "org.apache.spark.deploy.yarn.Client"
if (args.primaryResource != SPARK_INTERNAL) {
childArgs += ("--jar", args.primaryResource)
if (args.isPython) {
val mainPyFile = new Path(args.primaryResource).getName
childArgs += ("--primary-py-file", mainPyFile)
if (args.pyFiles != null) {
// These files will be distributed to each machine's working directory, so strip the
// path prefix
val pyFilesNames = args.pyFiles.split(",").map(p => (new Path(p)).getName).mkString(",")
childArgs += ("--py-files", pyFilesNames)
}
childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
} else {
if (args.primaryResource != SPARK_INTERNAL) {
childArgs += ("--jar", args.primaryResource)
}
childArgs += ("--class", args.mainClass)
}
childArgs += ("--class", args.mainClass)
if (args.childArgs != null) {
args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
}
Expand Down Expand Up @@ -504,6 +541,13 @@ object SparkSubmit {
mainClass == "org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"
}

/**
* Return whether the given main class represents a thrift server.
*/
private[spark] def isThriftServer(mainClass: String): Boolean = {
mainClass == "org.apache.spark.sql.hive.thriftserver.HiveThriftServer2"
}

/**
* Return whether the given primary resource requires running python.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,18 +207,6 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
SparkSubmit.printErrorAndExit("--py-files given but primary resource is not a Python script")
}

// Require all python files to be local, so we can add them to the PYTHONPATH
if (isPython) {
if (Utils.nonLocalPaths(primaryResource).nonEmpty) {
SparkSubmit.printErrorAndExit(s"Only local python files are supported: $primaryResource")
}
val nonLocalPyFiles = Utils.nonLocalPaths(pyFiles).mkString(",")
if (nonLocalPyFiles.nonEmpty) {
SparkSubmit.printErrorAndExit(
s"Only local additional python files are supported: $nonLocalPyFiles")
}
}

if (master.startsWith("yarn")) {
val hasHadoopEnv = env.contains("HADOOP_CONF_DIR") || env.contains("YARN_CONF_DIR")
if (!hasHadoopEnv && !Utils.isTesting) {
Expand Down
Loading

0 comments on commit 581f7bf

Please sign in to comment.