diff --git a/bin/compute-classpath.sh b/bin/compute-classpath.sh index 298641f2684de..685051eeed9f1 100755 --- a/bin/compute-classpath.sh +++ b/bin/compute-classpath.sh @@ -68,14 +68,14 @@ else assembly_folder="$ASSEMBLY_DIR" fi -num_jars="$(ls "$assembly_folder" | grep "spark-assembly.*hadoop.*\.jar" | wc -l)" +num_jars="$(ls "$assembly_folder" | grep "spark-assembly.*hadoop.*\.jar$" | wc -l)" if [ "$num_jars" -eq "0" ]; then echo "Failed to find Spark assembly in $assembly_folder" echo "You need to build Spark before running this program." exit 1 fi if [ "$num_jars" -gt "1" ]; then - jars_list=$(ls "$assembly_folder" | grep "spark-assembly.*hadoop.*.jar") + jars_list=$(ls "$assembly_folder" | grep "spark-assembly.*hadoop.*.jar$") echo "Found multiple Spark assembly jars in $assembly_folder:" echo "$jars_list" echo "Please remove all but one jar." @@ -108,7 +108,7 @@ else datanucleus_dir="$FWDIR"/lib_managed/jars fi -datanucleus_jars="$(find "$datanucleus_dir" 2>/dev/null | grep "datanucleus-.*\\.jar")" +datanucleus_jars="$(find "$datanucleus_dir" 2>/dev/null | grep "datanucleus-.*\\.jar$")" datanucleus_jars="$(echo "$datanucleus_jars" | tr "\n" : | sed s/:$//g)" if [ -n "$datanucleus_jars" ]; then diff --git a/bin/spark-shell b/bin/spark-shell index 4a0670fc6c8aa..cca5aa0676123 100755 --- a/bin/spark-shell +++ b/bin/spark-shell @@ -45,6 +45,13 @@ source "$FWDIR"/bin/utils.sh SUBMIT_USAGE_FUNCTION=usage gatherSparkSubmitOpts "$@" +# SPARK-4161: scala does not assume use of the java classpath, +# so we need to add the "-Dscala.usejavacp=true" flag mnually. We +# do this specifically for the Spark shell because the scala REPL +# has its own class loader, and any additional classpath specified +# through spark.driver.extraClassPath is not automatically propagated. +SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Dscala.usejavacp=true" + function main() { if $cygwin; then # Workaround for issue involving JLine and Cygwin diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala index 000bbd6b532ad..5f31bfba3f8d6 100644 --- a/core/src/main/scala/org/apache/spark/Accumulators.scala +++ b/core/src/main/scala/org/apache/spark/Accumulators.scala @@ -19,6 +19,7 @@ package org.apache.spark import java.io.{ObjectInputStream, Serializable} import java.util.concurrent.atomic.AtomicLong +import java.lang.ThreadLocal import scala.collection.generic.Growable import scala.collection.mutable.Map @@ -278,10 +279,12 @@ object AccumulatorParam { // TODO: The multi-thread support in accumulators is kind of lame; check // if there's a more intuitive way of doing it right -private object Accumulators { +private[spark] object Accumulators { // TODO: Use soft references? => need to make readObject work properly then val originals = Map[Long, Accumulable[_, _]]() - val localAccums = Map[Thread, Map[Long, Accumulable[_, _]]]() + val localAccums = new ThreadLocal[Map[Long, Accumulable[_, _]]]() { + override protected def initialValue() = Map[Long, Accumulable[_, _]]() + } var lastId: Long = 0 def newId(): Long = synchronized { @@ -293,22 +296,21 @@ private object Accumulators { if (original) { originals(a.id) = a } else { - val accums = localAccums.getOrElseUpdate(Thread.currentThread, Map()) - accums(a.id) = a + localAccums.get()(a.id) = a } } // Clear the local (non-original) accumulators for the current thread def clear() { synchronized { - localAccums.remove(Thread.currentThread) + localAccums.get.clear } } // Get the values of the local accumulators for the current thread (by ID) def values: Map[Long, Any] = synchronized { val ret = Map[Long, Any]() - for ((id, accum) <- localAccums.getOrElse(Thread.currentThread, Map())) { + for ((id, accum) <- localAccums.get) { ret(id) = accum.localValue } return ret diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala index 79c9c451d273d..09eb9605fb799 100644 --- a/core/src/main/scala/org/apache/spark/Aggregator.scala +++ b/core/src/main/scala/org/apache/spark/Aggregator.scala @@ -34,7 +34,9 @@ case class Aggregator[K, V, C] ( mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) { - private val externalSorting = SparkEnv.get.conf.getBoolean("spark.shuffle.spill", true) + // When spilling is enabled sorting will happen externally, but not necessarily with an + // ExternalSorter. + private val isSpillEnabled = SparkEnv.get.conf.getBoolean("spark.shuffle.spill", true) @deprecated("use combineValuesByKey with TaskContext argument", "0.9.0") def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]): Iterator[(K, C)] = @@ -42,7 +44,7 @@ case class Aggregator[K, V, C] ( def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]], context: TaskContext): Iterator[(K, C)] = { - if (!externalSorting) { + if (!isSpillEnabled) { val combiners = new AppendOnlyMap[K,C] var kv: Product2[K, V] = null val update = (hadValue: Boolean, oldValue: C) => { @@ -71,9 +73,9 @@ case class Aggregator[K, V, C] ( combineCombinersByKey(iter, null) def combineCombinersByKey(iter: Iterator[_ <: Product2[K, C]], context: TaskContext) - : Iterator[(K, C)] = + : Iterator[(K, C)] = { - if (!externalSorting) { + if (!isSpillEnabled) { val combiners = new AppendOnlyMap[K,C] var kc: Product2[K, C] = null val update = (hadValue: Boolean, oldValue: C) => { diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 1a44e6f95dc40..556365a0c14e5 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -357,8 +357,12 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } // Optionally scale number of executors dynamically based on workload. Exposed for testing. + private val dynamicAllocationEnabled = conf.getBoolean("spark.dynamicAllocation.enabled", false) + private val dynamicAllocationTesting = conf.getBoolean("spark.dynamicAllocation.testing", false) private[spark] val executorAllocationManager: Option[ExecutorAllocationManager] = - if (conf.getBoolean("spark.dynamicAllocation.enabled", false)) { + if (dynamicAllocationEnabled) { + assert(master.contains("yarn") || dynamicAllocationTesting, + "Dynamic allocation of executors is currently only supported in YARN mode") Some(new ExecutorAllocationManager(this, listenerBus, conf)) } else { None @@ -989,6 +993,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli */ @DeveloperApi override def requestExecutors(numAdditionalExecutors: Int): Boolean = { + assert(master.contains("yarn") || dynamicAllocationTesting, + "Requesting executors is currently only supported in YARN mode") schedulerBackend match { case b: CoarseGrainedSchedulerBackend => b.requestExecutors(numAdditionalExecutors) @@ -1005,6 +1011,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli */ @DeveloperApi override def killExecutors(executorIds: Seq[String]): Boolean = { + assert(master.contains("yarn") || dynamicAllocationTesting, + "Killing executors is currently only supported in YARN mode") schedulerBackend match { case b: CoarseGrainedSchedulerBackend => b.killExecutors(executorIds) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index 5fdc350cd8512..0d5dcfb1ddffe 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -26,6 +26,7 @@ import org.apache.spark.ui.{WebUIPage, UIUtils} private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { private val pageSize = 20 + private val plusOrMinus = 2 def render(request: HttpServletRequest): Seq[Node] = { val requestedPage = Option(request.getParameter("page")).getOrElse("1").toInt @@ -39,6 +40,9 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { val last = Math.min(actualFirst + pageSize, allApps.size) - 1 val pageCount = allApps.size / pageSize + (if (allApps.size % pageSize > 0) 1 else 0) + val secondPageFromLeft = 2 + val secondPageFromRight = pageCount - 1 + val appTable = UIUtils.listingTable(appHeader, appRow, apps) val providerConfig = parent.getProviderConfig() val content = @@ -48,13 +52,38 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { {providerConfig.map { case (k, v) =>
  • {k}: {v}
  • }} { + // This displays the indices of pages that are within `plusOrMinus` pages of + // the current page. Regardless of where the current page is, this also links + // to the first and last page. If the current page +/- `plusOrMinus` is greater + // than the 2nd page from the first page or less than the 2nd page from the last + // page, `...` will be displayed. if (allApps.size > 0) { + val leftSideIndices = + rangeIndices(actualPage - plusOrMinus until actualPage, 1 < _) + val rightSideIndices = + rangeIndices(actualPage + 1 to actualPage + plusOrMinus, _ < pageCount) +

    Showing {actualFirst + 1}-{last + 1} of {allApps.size} - - {if (actualPage > 1) <} - {if (actualPage < pageCount) >} - + + { + if (actualPage > 1) { + < + 1 + } + } + {if (actualPage - plusOrMinus > secondPageFromLeft) " ... "} + {leftSideIndices} + {actualPage} + {rightSideIndices} + {if (actualPage + plusOrMinus < secondPageFromRight) " ... "} + { + if (actualPage < pageCount) { + {pageCount} + > + } + } +

    ++ appTable } else { @@ -81,6 +110,10 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { "Spark User", "Last Updated") + private def rangeIndices(range: Seq[Int], condition: Int => Boolean): Seq[Node] = { + range.filter(condition).map(nextPage => {nextPage} ) + } + private def appRow(info: ApplicationHistoryInfo): Seq[Node] = { val uiAddress = HistoryServer.UI_PATH_PREFIX + s"/${info.id}" val startTime = UIUtils.formatDate(info.startTime) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 835157fc520aa..52de6980ecbf8 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -172,7 +172,6 @@ private[spark] class Executor( val startGCTime = gcTime try { - Accumulators.clear() val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask) updateDependencies(taskFiles, taskJars) task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader) @@ -278,6 +277,8 @@ private[spark] class Executor( env.shuffleMemoryManager.releaseMemoryForThisThread() // Release memory used by this thread for unrolling blocks env.blockManager.memoryStore.releaseUnrollMemoryForThisThread() + // Release memory used by this thread for accumulators + Accumulators.clear() runningTasks.remove(taskId) } } diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala index 9fab1d78abb04..b073eba8a1574 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala @@ -35,11 +35,10 @@ import org.apache.spark.util.Utils * @param preferredLocation the preferred location for this partition */ private[spark] case class CoalescedRDDPartition( - index: Int, - @transient rdd: RDD[_], - parentsIndices: Array[Int], - @transient preferredLocation: String = "" - ) extends Partition { + index: Int, + @transient rdd: RDD[_], + parentsIndices: Array[Int], + @transient preferredLocation: Option[String] = None) extends Partition { var parents: Seq[Partition] = parentsIndices.map(rdd.partitions(_)) @throws(classOf[IOException]) @@ -55,9 +54,10 @@ private[spark] case class CoalescedRDDPartition( * @return locality of this coalesced partition between 0 and 1 */ def localFraction: Double = { - val loc = parents.count(p => - rdd.context.getPreferredLocs(rdd, p.index).map(tl => tl.host).contains(preferredLocation)) - + val loc = parents.count { p => + val parentPreferredLocations = rdd.context.getPreferredLocs(rdd, p.index).map(_.host) + preferredLocation.exists(parentPreferredLocations.contains) + } if (parents.size == 0) 0.0 else (loc.toDouble / parents.size.toDouble) } } @@ -73,9 +73,9 @@ private[spark] case class CoalescedRDDPartition( * @param balanceSlack used to trade-off balance and locality. 1.0 is all locality, 0 is all balance */ private[spark] class CoalescedRDD[T: ClassTag]( - @transient var prev: RDD[T], - maxPartitions: Int, - balanceSlack: Double = 0.10) + @transient var prev: RDD[T], + maxPartitions: Int, + balanceSlack: Double = 0.10) extends RDD[T](prev.context, Nil) { // Nil since we implement getDependencies override def getPartitions: Array[Partition] = { @@ -113,7 +113,7 @@ private[spark] class CoalescedRDD[T: ClassTag]( * @return the machine most preferred by split */ override def getPreferredLocations(partition: Partition): Seq[String] = { - List(partition.asInstanceOf[CoalescedRDDPartition].preferredLocation) + partition.asInstanceOf[CoalescedRDDPartition].preferredLocation.toSeq } } @@ -147,7 +147,7 @@ private[spark] class CoalescedRDD[T: ClassTag]( * */ -private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) { +private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) { def compare(o1: PartitionGroup, o2: PartitionGroup): Boolean = o1.size < o2.size def compare(o1: Option[PartitionGroup], o2: Option[PartitionGroup]): Boolean = @@ -341,8 +341,14 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc } } -private[spark] case class PartitionGroup(prefLoc: String = "") { +private case class PartitionGroup(prefLoc: Option[String] = None) { var arr = mutable.ArrayBuffer[Partition]() - def size = arr.size } + +private object PartitionGroup { + def apply(prefLoc: String): PartitionGroup = { + require(prefLoc != "", "Preferred location must not be empty") + PartitionGroup(Some(prefLoc)) + } +} diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index 315327c3c6b7c..d970fa30c1c35 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -181,7 +181,9 @@ private[spark] object UIUtils extends Logging { } val helpButton: Seq[Node] = helpText.map { helpText => - (?) + + (?) + }.getOrElse(Seq.empty) diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index ce804f94f3267..c817f6dcede75 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -35,6 +35,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext { .setMaster("local") .setAppName("test-executor-allocation-manager") .set("spark.dynamicAllocation.enabled", "true") + .set("spark.dynamicAllocation.testing", "true") intercept[SparkException] { new SparkContext(conf) } SparkEnv.get.stop() // cleanup the created environment SparkContext.clearActiveContext() diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 46fcb80fa1845..6836e9ab0fd6b 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -294,7 +294,7 @@ class RDDSuite extends FunSuite with SharedSparkContext { test("coalesced RDDs with locality") { val data3 = sc.makeRDD(List((1,List("a","c")), (2,List("a","b","c")), (3,List("b")))) val coal3 = data3.coalesce(3) - val list3 = coal3.partitions.map(p => p.asInstanceOf[CoalescedRDDPartition].preferredLocation) + val list3 = coal3.partitions.flatMap(_.asInstanceOf[CoalescedRDDPartition].preferredLocation) assert(list3.sorted === Array("a","b","c"), "Locality preferences are dropped") // RDD with locality preferences spread (non-randomly) over 6 machines, m0 through m5 diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md index ae7b81d5bb71f..5c6084fb46255 100644 --- a/docs/spark-standalone.md +++ b/docs/spark-standalone.md @@ -257,7 +257,7 @@ To run an interactive Spark shell against the cluster, run the following command You can also pass an option `--total-executor-cores ` to control the number of cores that spark-shell uses on the cluster. -# Launching Compiled Spark Applications +# Launching Spark Applications The [`spark-submit` script](submitting-applications.html) provides the most straightforward way to submit a compiled Spark application to the cluster. For standalone clusters, Spark currently @@ -272,6 +272,15 @@ should specify them through the `--jars` flag using comma as a delimiter (e.g. ` To control the application's configuration or execution environment, see [Spark Configuration](configuration.html). +Additionally, standalone `cluster` mode supports restarting your application automatically if it +exited with non-zero exit code. To use this feature, you may pass in the `--supervise` flag to +`spark-submit` when launching your application. Then, if you wish to kill an application that is +failing repeatedly, you may do so through: + + ./bin/spark-class org.apache.spark.deploy.Client kill + +You can find the driver ID through the standalone Master web UI at `http://:8080`. + # Resource Scheduling The standalone cluster mode currently only supports a simple FIFO scheduler across applications. diff --git a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java index f60573998f7ae..13b37f96f8ce2 100644 --- a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -35,14 +35,14 @@ public boolean preferDirectBufs() { return conf.getBoolean("spark.shuffle.io.preferDirectBufs", true); } - /** Connect timeout in secs. Default 120 secs. */ + /** Connect timeout in milliseconds. Default 120 secs. */ public int connectionTimeoutMs() { return conf.getInt("spark.shuffle.io.connectionTimeout", 120) * 1000; } /** Number of concurrent connections between two nodes for fetching data. **/ public int numConnectionsPerPeer() { - return conf.getInt("spark.shuffle.io.numConnectionsPerPeer", 2); + return conf.getInt("spark.shuffle.io.numConnectionsPerPeer", 1); } /** Requested maximum length of the queue of incoming connections. Default -1 for no backlog. */ @@ -67,7 +67,7 @@ public int numConnectionsPerPeer() { public int sendBuf() { return conf.getInt("spark.shuffle.io.sendBuffer", -1); } /** Timeout for a single round trip of SASL token exchange, in milliseconds. */ - public int saslRTTimeout() { return conf.getInt("spark.shuffle.sasl.timeout", 30000); } + public int saslRTTimeoutMs() { return conf.getInt("spark.shuffle.sasl.timeout", 30) * 1000; } /** * Max number of times we will try IO exceptions (such as connection timeouts) per request. @@ -79,7 +79,7 @@ public int numConnectionsPerPeer() { * Time (in milliseconds) that we will wait in order to perform a retry after an IOException. * Only relevant if maxIORetries > 0. */ - public int ioRetryWaitTime() { return conf.getInt("spark.shuffle.io.retryWaitMs", 5000); } + public int ioRetryWaitTimeMs() { return conf.getInt("spark.shuffle.io.retryWait", 5) * 1000; } /** * Minimum size of a block that we should start using memory map rather than reading in through diff --git a/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslClientBootstrap.java b/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslClientBootstrap.java index 7bc91e375371f..33aa1344345ff 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslClientBootstrap.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslClientBootstrap.java @@ -59,7 +59,7 @@ public void doBootstrap(TransportClient client) { ByteBuf buf = Unpooled.buffer(msg.encodedLength()); msg.encode(buf); - byte[] response = client.sendRpcSync(buf.array(), conf.saslRTTimeout()); + byte[] response = client.sendRpcSync(buf.array(), conf.saslRTTimeoutMs()); payload = saslClient.response(response); } } finally { diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java index f8a1a266863bb..4bb0498e5d5aa 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java @@ -106,7 +106,7 @@ public RetryingBlockFetcher( this.fetchStarter = fetchStarter; this.listener = listener; this.maxRetries = conf.maxIORetries(); - this.retryWaitTime = conf.ioRetryWaitTime(); + this.retryWaitTime = conf.ioRetryWaitTimeMs(); this.outstandingBlocksIds = Sets.newLinkedHashSet(); Collections.addAll(outstandingBlocksIds, blockIds); this.currentListener = new RetryingBlockFetchListener();