From 9bd9334f588dbb44d01554f9f4ca68a153a48993 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 9 Dec 2014 19:29:09 -0800 Subject: [PATCH 01/11] Config updates for the new shuffle transport. Author: Reynold Xin Closes #3657 from rxin/conf-update and squashes the following commits: 7370eab [Reynold Xin] Config updates for the new shuffle transport. --- .../java/org/apache/spark/network/util/TransportConf.java | 8 ++++---- .../apache/spark/network/sasl/SaslClientBootstrap.java | 2 +- .../spark/network/shuffle/RetryingBlockFetcher.java | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java index f60573998f7ae..13b37f96f8ce2 100644 --- a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -35,14 +35,14 @@ public boolean preferDirectBufs() { return conf.getBoolean("spark.shuffle.io.preferDirectBufs", true); } - /** Connect timeout in secs. Default 120 secs. */ + /** Connect timeout in milliseconds. Default 120 secs. */ public int connectionTimeoutMs() { return conf.getInt("spark.shuffle.io.connectionTimeout", 120) * 1000; } /** Number of concurrent connections between two nodes for fetching data. **/ public int numConnectionsPerPeer() { - return conf.getInt("spark.shuffle.io.numConnectionsPerPeer", 2); + return conf.getInt("spark.shuffle.io.numConnectionsPerPeer", 1); } /** Requested maximum length of the queue of incoming connections. Default -1 for no backlog. */ @@ -67,7 +67,7 @@ public int numConnectionsPerPeer() { public int sendBuf() { return conf.getInt("spark.shuffle.io.sendBuffer", -1); } /** Timeout for a single round trip of SASL token exchange, in milliseconds. */ - public int saslRTTimeout() { return conf.getInt("spark.shuffle.sasl.timeout", 30000); } + public int saslRTTimeoutMs() { return conf.getInt("spark.shuffle.sasl.timeout", 30) * 1000; } /** * Max number of times we will try IO exceptions (such as connection timeouts) per request. @@ -79,7 +79,7 @@ public int numConnectionsPerPeer() { * Time (in milliseconds) that we will wait in order to perform a retry after an IOException. * Only relevant if maxIORetries > 0. */ - public int ioRetryWaitTime() { return conf.getInt("spark.shuffle.io.retryWaitMs", 5000); } + public int ioRetryWaitTimeMs() { return conf.getInt("spark.shuffle.io.retryWait", 5) * 1000; } /** * Minimum size of a block that we should start using memory map rather than reading in through diff --git a/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslClientBootstrap.java b/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslClientBootstrap.java index 7bc91e375371f..33aa1344345ff 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslClientBootstrap.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslClientBootstrap.java @@ -59,7 +59,7 @@ public void doBootstrap(TransportClient client) { ByteBuf buf = Unpooled.buffer(msg.encodedLength()); msg.encode(buf); - byte[] response = client.sendRpcSync(buf.array(), conf.saslRTTimeout()); + byte[] response = client.sendRpcSync(buf.array(), conf.saslRTTimeoutMs()); payload = saslClient.response(response); } } finally { diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java index f8a1a266863bb..4bb0498e5d5aa 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java @@ -106,7 +106,7 @@ public RetryingBlockFetcher( this.fetchStarter = fetchStarter; this.listener = listener; this.maxRetries = conf.maxIORetries(); - this.retryWaitTime = conf.ioRetryWaitTime(); + this.retryWaitTime = conf.ioRetryWaitTimeMs(); this.outstandingBlocksIds = Sets.newLinkedHashSet(); Collections.addAll(outstandingBlocksIds, blockIds); this.currentListener = new RetryingBlockFetchListener(); From f79c1cfc997c1a7ddee480ca3d46f5341b69d3b7 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 9 Dec 2014 23:47:05 -0800 Subject: [PATCH 02/11] [Minor] Use tag for help icon in web UI page header This small commit makes the `(?)` web UI help link into a superscript, which should address feedback that the current design makes it look like an error occurred or like information is missing. Before: ![image](https://cloud.githubusercontent.com/assets/50748/5370611/a3ed0034-7fd9-11e4-870f-05bd9faad5b9.png) After: ![image](https://cloud.githubusercontent.com/assets/50748/5370602/6c5ca8d6-7fd9-11e4-8d1a-568d71290aa7.png) Author: Josh Rosen Closes #3659 from JoshRosen/webui-help-sup and squashes the following commits: bd72899 [Josh Rosen] Use tag for help icon in web UI page header. --- core/src/main/scala/org/apache/spark/ui/UIUtils.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index 315327c3c6b7c..d970fa30c1c35 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -181,7 +181,9 @@ private[spark] object UIUtils extends Logging { } val helpButton: Seq[Node] = helpText.map { helpText => - (?) + + (?) + }.getOrElse(Seq.empty) From 94b377f94487109a1cc3e07dd230b1df7a96e28d Mon Sep 17 00:00:00 2001 From: Nathan Kronenfeld Date: Tue, 9 Dec 2014 23:53:17 -0800 Subject: [PATCH 03/11] [SPARK-4772] Clear local copies of accumulators as soon as we're done with them Accumulators keep thread-local copies of themselves. These copies were only cleared at the beginning of a task. This meant that (a) the memory they used was tied up until the next task ran on that thread, and (b) if a thread died, the memory it had used for accumulators was locked up forever on that worker. This PR clears the thread-local copies of accumulators at the end of each task, in the tasks finally block, to make sure they are cleaned up between tasks. It also stores them in a ThreadLocal object, so that if, for some reason, the thread dies, any memory they are using at the time should be freed up. Author: Nathan Kronenfeld Closes #3570 from nkronenfeld/Accumulator-Improvements and squashes the following commits: a581f3f [Nathan Kronenfeld] Change Accumulators to private[spark] instead of adding mima exclude to get around false positive in mima tests b6c2180 [Nathan Kronenfeld] Include MiMa exclude as per build error instructions - this version incompatibility should be irrelevent, as it will only surface if a master is talking to a worker running a different version of spark. 537baad [Nathan Kronenfeld] Fuller refactoring as intended, incorporating JR's suggestions for ThreadLocal localAccums, and keeping clear(), but also calling it in tasks' finally block, rather than just at the beginning of the task. 39a82f2 [Nathan Kronenfeld] Clear local copies of accumulators as soon as we're done with them --- .../main/scala/org/apache/spark/Accumulators.scala | 14 ++++++++------ .../scala/org/apache/spark/executor/Executor.scala | 3 ++- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala index 000bbd6b532ad..5f31bfba3f8d6 100644 --- a/core/src/main/scala/org/apache/spark/Accumulators.scala +++ b/core/src/main/scala/org/apache/spark/Accumulators.scala @@ -19,6 +19,7 @@ package org.apache.spark import java.io.{ObjectInputStream, Serializable} import java.util.concurrent.atomic.AtomicLong +import java.lang.ThreadLocal import scala.collection.generic.Growable import scala.collection.mutable.Map @@ -278,10 +279,12 @@ object AccumulatorParam { // TODO: The multi-thread support in accumulators is kind of lame; check // if there's a more intuitive way of doing it right -private object Accumulators { +private[spark] object Accumulators { // TODO: Use soft references? => need to make readObject work properly then val originals = Map[Long, Accumulable[_, _]]() - val localAccums = Map[Thread, Map[Long, Accumulable[_, _]]]() + val localAccums = new ThreadLocal[Map[Long, Accumulable[_, _]]]() { + override protected def initialValue() = Map[Long, Accumulable[_, _]]() + } var lastId: Long = 0 def newId(): Long = synchronized { @@ -293,22 +296,21 @@ private object Accumulators { if (original) { originals(a.id) = a } else { - val accums = localAccums.getOrElseUpdate(Thread.currentThread, Map()) - accums(a.id) = a + localAccums.get()(a.id) = a } } // Clear the local (non-original) accumulators for the current thread def clear() { synchronized { - localAccums.remove(Thread.currentThread) + localAccums.get.clear } } // Get the values of the local accumulators for the current thread (by ID) def values: Map[Long, Any] = synchronized { val ret = Map[Long, Any]() - for ((id, accum) <- localAccums.getOrElse(Thread.currentThread, Map())) { + for ((id, accum) <- localAccums.get) { ret(id) = accum.localValue } return ret diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 835157fc520aa..52de6980ecbf8 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -172,7 +172,6 @@ private[spark] class Executor( val startGCTime = gcTime try { - Accumulators.clear() val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask) updateDependencies(taskFiles, taskJars) task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader) @@ -278,6 +277,8 @@ private[spark] class Executor( env.shuffleMemoryManager.releaseMemoryForThisThread() // Release memory used by this thread for unrolling blocks env.blockManager.memoryStore.releaseUnrollMemoryForThisThread() + // Release memory used by this thread for accumulators + Accumulators.clear() runningTasks.remove(taskId) } } From 742e7093eca8865225c29bacf4344f2e89bfea41 Mon Sep 17 00:00:00 2001 From: GuoQiang Li Date: Wed, 10 Dec 2014 12:24:04 -0800 Subject: [PATCH 04/11] [SPARK-4161]Spark shell class path is not correctly set if "spark.driver.extraClassPath" is set in defaults.conf Author: GuoQiang Li Closes #3050 from witgo/SPARK-4161 and squashes the following commits: abb6fa4 [GuoQiang Li] move usejavacp opt to spark-shell 89e39e7 [GuoQiang Li] review commit c2a6f04 [GuoQiang Li] Spark shell class path is not correctly set if "spark.driver.extraClassPath" is set in defaults.conf --- bin/spark-shell | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/bin/spark-shell b/bin/spark-shell index 4a0670fc6c8aa..cca5aa0676123 100755 --- a/bin/spark-shell +++ b/bin/spark-shell @@ -45,6 +45,13 @@ source "$FWDIR"/bin/utils.sh SUBMIT_USAGE_FUNCTION=usage gatherSparkSubmitOpts "$@" +# SPARK-4161: scala does not assume use of the java classpath, +# so we need to add the "-Dscala.usejavacp=true" flag mnually. We +# do this specifically for the Spark shell because the scala REPL +# has its own class loader, and any additional classpath specified +# through spark.driver.extraClassPath is not automatically propagated. +SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Dscala.usejavacp=true" + function main() { if $cygwin; then # Workaround for issue involving JLine and Cygwin From 0fc637b4c27f9afdf5c829d26c7a86efd8681490 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Wed, 10 Dec 2014 12:29:00 -0800 Subject: [PATCH 05/11] [SPARK-4329][WebUI] HistoryPage pagenation Current HistoryPage have links only to previous page or next page. I suggest to add index to access history pages easily. I implemented like following pics. If there are many pages, current page +/- N pages, head page and last page are indexed. ![2014-11-10 16 13 25](https://cloud.githubusercontent.com/assets/4736016/4986246/9c7bbac4-6937-11e4-8695-8634d039d5b6.png) ![2014-11-10 16 03 21](https://cloud.githubusercontent.com/assets/4736016/4986210/3951bb74-6937-11e4-8b4e-9f90d266d736.png) ![2014-11-10 16 03 39](https://cloud.githubusercontent.com/assets/4736016/4986211/3b196ad8-6937-11e4-9f81-74bc0a6dad5b.png) ![2014-11-10 16 03 49](https://cloud.githubusercontent.com/assets/4736016/4986213/40686138-6937-11e4-86c0-41100f0404f6.png) ![2014-11-10 16 04 04](https://cloud.githubusercontent.com/assets/4736016/4986215/4326c9b4-6937-11e4-87ac-0f30c86ec6e3.png) Author: Kousuke Saruta Closes #3194 from sarutak/history-page-indexing and squashes the following commits: 15d3d2d [Kousuke Saruta] Simplified code c93932e [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into history-page-indexing 1c2f605 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into history-page-indexing 76b05e3 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into history-page-indexing b2240f8 [Kousuke Saruta] Fixed style ec7922e [Kousuke Saruta] Simplified code 755a004 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into history-page-indexing cfa242b [Kousuke Saruta] Added index to HistoryPage --- .../spark/deploy/history/HistoryPage.scala | 41 +++++++++++++++++-- 1 file changed, 37 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index 5fdc350cd8512..0d5dcfb1ddffe 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -26,6 +26,7 @@ import org.apache.spark.ui.{WebUIPage, UIUtils} private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { private val pageSize = 20 + private val plusOrMinus = 2 def render(request: HttpServletRequest): Seq[Node] = { val requestedPage = Option(request.getParameter("page")).getOrElse("1").toInt @@ -39,6 +40,9 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { val last = Math.min(actualFirst + pageSize, allApps.size) - 1 val pageCount = allApps.size / pageSize + (if (allApps.size % pageSize > 0) 1 else 0) + val secondPageFromLeft = 2 + val secondPageFromRight = pageCount - 1 + val appTable = UIUtils.listingTable(appHeader, appRow, apps) val providerConfig = parent.getProviderConfig() val content = @@ -48,13 +52,38 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { {providerConfig.map { case (k, v) =>
  • {k}: {v}
  • }} { + // This displays the indices of pages that are within `plusOrMinus` pages of + // the current page. Regardless of where the current page is, this also links + // to the first and last page. If the current page +/- `plusOrMinus` is greater + // than the 2nd page from the first page or less than the 2nd page from the last + // page, `...` will be displayed. if (allApps.size > 0) { + val leftSideIndices = + rangeIndices(actualPage - plusOrMinus until actualPage, 1 < _) + val rightSideIndices = + rangeIndices(actualPage + 1 to actualPage + plusOrMinus, _ < pageCount) +

    Showing {actualFirst + 1}-{last + 1} of {allApps.size} - - {if (actualPage > 1) <} - {if (actualPage < pageCount) >} - + + { + if (actualPage > 1) { + < + 1 + } + } + {if (actualPage - plusOrMinus > secondPageFromLeft) " ... "} + {leftSideIndices} + {actualPage} + {rightSideIndices} + {if (actualPage + plusOrMinus < secondPageFromRight) " ... "} + { + if (actualPage < pageCount) { + {pageCount} + > + } + } +

    ++ appTable } else { @@ -81,6 +110,10 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { "Spark User", "Last Updated") + private def rangeIndices(range: Seq[Int], condition: Int => Boolean): Seq[Node] = { + range.filter(condition).map(nextPage => {nextPage} ) + } + private def appRow(info: ApplicationHistoryInfo): Seq[Node] = { val uiAddress = HistoryServer.UI_PATH_PREFIX + s"/${info.id}" val startTime = UIUtils.formatDate(info.startTime) From 56212831c6436e287a19908e82c26117cbcb16b0 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 10 Dec 2014 12:41:36 -0800 Subject: [PATCH 06/11] [SPARK-4771][Docs] Document standalone cluster supervise mode tdas looks like streaming already refers to the supervise mode. The link from there is broken though. Author: Andrew Or Closes #3627 from andrewor14/document-supervise and squashes the following commits: 9ca0908 [Andrew Or] Wording changes 2b55ed2 [Andrew Or] Document standalone cluster supervise mode --- docs/spark-standalone.md | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md index ae7b81d5bb71f..5c6084fb46255 100644 --- a/docs/spark-standalone.md +++ b/docs/spark-standalone.md @@ -257,7 +257,7 @@ To run an interactive Spark shell against the cluster, run the following command You can also pass an option `--total-executor-cores ` to control the number of cores that spark-shell uses on the cluster. -# Launching Compiled Spark Applications +# Launching Spark Applications The [`spark-submit` script](submitting-applications.html) provides the most straightforward way to submit a compiled Spark application to the cluster. For standalone clusters, Spark currently @@ -272,6 +272,15 @@ should specify them through the `--jars` flag using comma as a delimiter (e.g. ` To control the application's configuration or execution environment, see [Spark Configuration](configuration.html). +Additionally, standalone `cluster` mode supports restarting your application automatically if it +exited with non-zero exit code. To use this feature, you may pass in the `--supervise` flag to +`spark-submit` when launching your application. Then, if you wish to kill an application that is +failing repeatedly, you may do so through: + + ./bin/spark-class org.apache.spark.deploy.Client kill + +You can find the driver ID through the standalone Master web UI at `http://:8080`. + # Resource Scheduling The standalone cluster mode currently only supports a simple FIFO scheduler across applications. From faa8fd8178642ef8fce14186abc45a189042efd4 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 10 Dec 2014 12:48:24 -0800 Subject: [PATCH 07/11] [SPARK-4215] Allow requesting / killing executors only in YARN mode Currently this doesn't do anything in other modes, so we might as well just disable it rather than having the user mistakenly rely on it. Author: Andrew Or Closes #3615 from andrewor14/dynamic-allocation-yarn-only and squashes the following commits: ce6487a [Andrew Or] Allow requesting / killing executors only in YARN mode --- .../src/main/scala/org/apache/spark/SparkContext.scala | 10 +++++++++- .../apache/spark/ExecutorAllocationManagerSuite.scala | 1 + 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index aded7c12e274e..8e5378ecc08de 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -357,8 +357,12 @@ class SparkContext(config: SparkConf) extends Logging { } // Optionally scale number of executors dynamically based on workload. Exposed for testing. + private val dynamicAllocationEnabled = conf.getBoolean("spark.dynamicAllocation.enabled", false) + private val dynamicAllocationTesting = conf.getBoolean("spark.dynamicAllocation.testing", false) private[spark] val executorAllocationManager: Option[ExecutorAllocationManager] = - if (conf.getBoolean("spark.dynamicAllocation.enabled", false)) { + if (dynamicAllocationEnabled) { + assert(master.contains("yarn") || dynamicAllocationTesting, + "Dynamic allocation of executors is currently only supported in YARN mode") Some(new ExecutorAllocationManager(this)) } else { None @@ -989,6 +993,8 @@ class SparkContext(config: SparkConf) extends Logging { */ @DeveloperApi def requestExecutors(numAdditionalExecutors: Int): Boolean = { + assert(master.contains("yarn") || dynamicAllocationTesting, + "Requesting executors is currently only supported in YARN mode") schedulerBackend match { case b: CoarseGrainedSchedulerBackend => b.requestExecutors(numAdditionalExecutors) @@ -1005,6 +1011,8 @@ class SparkContext(config: SparkConf) extends Logging { */ @DeveloperApi def killExecutors(executorIds: Seq[String]): Boolean = { + assert(master.contains("yarn") || dynamicAllocationTesting, + "Killing executors is currently only supported in YARN mode") schedulerBackend match { case b: CoarseGrainedSchedulerBackend => b.killExecutors(executorIds) diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index ce804f94f3267..c817f6dcede75 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -35,6 +35,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext { .setMaster("local") .setAppName("test-executor-allocation-manager") .set("spark.dynamicAllocation.enabled", "true") + .set("spark.dynamicAllocation.testing", "true") intercept[SparkException] { new SparkContext(conf) } SparkEnv.get.stop() // cleanup the created environment SparkContext.clearActiveContext() From e230da18f8c354b4b80416aa5277420397acf4f2 Mon Sep 17 00:00:00 2001 From: Daoyuan Wang Date: Wed, 10 Dec 2014 13:29:27 -0800 Subject: [PATCH 08/11] [SPARK-4793] [Deploy] ensure .jar at end of line sometimes I switch between different version and do not want to rebuild spark, so I rename assembly.jar into .jar.bak, but still caught by `compute-classpath.sh` Author: Daoyuan Wang Closes #3641 from adrian-wang/jar and squashes the following commits: 45cbfd0 [Daoyuan Wang] ensure .jar at end of line --- bin/compute-classpath.sh | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/bin/compute-classpath.sh b/bin/compute-classpath.sh index 298641f2684de..685051eeed9f1 100755 --- a/bin/compute-classpath.sh +++ b/bin/compute-classpath.sh @@ -68,14 +68,14 @@ else assembly_folder="$ASSEMBLY_DIR" fi -num_jars="$(ls "$assembly_folder" | grep "spark-assembly.*hadoop.*\.jar" | wc -l)" +num_jars="$(ls "$assembly_folder" | grep "spark-assembly.*hadoop.*\.jar$" | wc -l)" if [ "$num_jars" -eq "0" ]; then echo "Failed to find Spark assembly in $assembly_folder" echo "You need to build Spark before running this program." exit 1 fi if [ "$num_jars" -gt "1" ]; then - jars_list=$(ls "$assembly_folder" | grep "spark-assembly.*hadoop.*.jar") + jars_list=$(ls "$assembly_folder" | grep "spark-assembly.*hadoop.*.jar$") echo "Found multiple Spark assembly jars in $assembly_folder:" echo "$jars_list" echo "Please remove all but one jar." @@ -108,7 +108,7 @@ else datanucleus_dir="$FWDIR"/lib_managed/jars fi -datanucleus_jars="$(find "$datanucleus_dir" 2>/dev/null | grep "datanucleus-.*\\.jar")" +datanucleus_jars="$(find "$datanucleus_dir" 2>/dev/null | grep "datanucleus-.*\\.jar$")" datanucleus_jars="$(echo "$datanucleus_jars" | tr "\n" : | sed s/:$//g)" if [ -n "$datanucleus_jars" ]; then From 447ae2de5d4c2af865fdb63f8b876b865de60f74 Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Wed, 10 Dec 2014 14:19:37 -0800 Subject: [PATCH 09/11] [SPARK-4569] Rename 'externalSorting' in Aggregator Hi all - I've renamed the unhelpfully named variable and added a comment clarifying what's actually happening. Author: Ilya Ganelin Closes #3666 from ilganeli/SPARK-4569B and squashes the following commits: 1810394 [Ilya Ganelin] [SPARK-4569] Rename 'externalSorting' in Aggregator e2d2092 [Ilya Ganelin] [SPARK-4569] Rename 'externalSorting' in Aggregator d7cefec [Ilya Ganelin] [SPARK-4569] Rename 'externalSorting' in Aggregator 5b3f39c [Ilya Ganelin] [SPARK-4569] Rename in Aggregator --- core/src/main/scala/org/apache/spark/Aggregator.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala index 79c9c451d273d..09eb9605fb799 100644 --- a/core/src/main/scala/org/apache/spark/Aggregator.scala +++ b/core/src/main/scala/org/apache/spark/Aggregator.scala @@ -34,7 +34,9 @@ case class Aggregator[K, V, C] ( mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) { - private val externalSorting = SparkEnv.get.conf.getBoolean("spark.shuffle.spill", true) + // When spilling is enabled sorting will happen externally, but not necessarily with an + // ExternalSorter. + private val isSpillEnabled = SparkEnv.get.conf.getBoolean("spark.shuffle.spill", true) @deprecated("use combineValuesByKey with TaskContext argument", "0.9.0") def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]): Iterator[(K, C)] = @@ -42,7 +44,7 @@ case class Aggregator[K, V, C] ( def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]], context: TaskContext): Iterator[(K, C)] = { - if (!externalSorting) { + if (!isSpillEnabled) { val combiners = new AppendOnlyMap[K,C] var kv: Product2[K, V] = null val update = (hadValue: Boolean, oldValue: C) => { @@ -71,9 +73,9 @@ case class Aggregator[K, V, C] ( combineCombinersByKey(iter, null) def combineCombinersByKey(iter: Iterator[_ <: Product2[K, C]], context: TaskContext) - : Iterator[(K, C)] = + : Iterator[(K, C)] = { - if (!externalSorting) { + if (!isSpillEnabled) { val combiners = new AppendOnlyMap[K,C] var kc: Product2[K, C] = null val update = (hadValue: Boolean, oldValue: C) => { From 4f93d0cabe5d1fc7c0fd0a33d992fd85df1fecb4 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 10 Dec 2014 14:27:53 -0800 Subject: [PATCH 10/11] [SPARK-4759] Fix driver hanging from coalescing partitions The driver hangs sometimes when we coalesce RDD partitions. See JIRA for more details and reproduction. This is because our use of empty string as default preferred location in `CoalescedRDDPartition` causes the `TaskSetManager` to schedule the corresponding task on host `""` (empty string). The intended semantics here, however, is that the partition does not have a preferred location, and the TSM should schedule the corresponding task accordingly. Author: Andrew Or Closes #3633 from andrewor14/coalesce-preferred-loc and squashes the following commits: e520d6b [Andrew Or] Oops 3ebf8bd [Andrew Or] A few comments f370a4e [Andrew Or] Fix tests 2f7dfb6 [Andrew Or] Avoid using empty string as default preferred location --- .../org/apache/spark/rdd/CoalescedRDD.scala | 36 +++++++++++-------- .../scala/org/apache/spark/rdd/RDDSuite.scala | 2 +- 2 files changed, 22 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala index 9fab1d78abb04..b073eba8a1574 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala @@ -35,11 +35,10 @@ import org.apache.spark.util.Utils * @param preferredLocation the preferred location for this partition */ private[spark] case class CoalescedRDDPartition( - index: Int, - @transient rdd: RDD[_], - parentsIndices: Array[Int], - @transient preferredLocation: String = "" - ) extends Partition { + index: Int, + @transient rdd: RDD[_], + parentsIndices: Array[Int], + @transient preferredLocation: Option[String] = None) extends Partition { var parents: Seq[Partition] = parentsIndices.map(rdd.partitions(_)) @throws(classOf[IOException]) @@ -55,9 +54,10 @@ private[spark] case class CoalescedRDDPartition( * @return locality of this coalesced partition between 0 and 1 */ def localFraction: Double = { - val loc = parents.count(p => - rdd.context.getPreferredLocs(rdd, p.index).map(tl => tl.host).contains(preferredLocation)) - + val loc = parents.count { p => + val parentPreferredLocations = rdd.context.getPreferredLocs(rdd, p.index).map(_.host) + preferredLocation.exists(parentPreferredLocations.contains) + } if (parents.size == 0) 0.0 else (loc.toDouble / parents.size.toDouble) } } @@ -73,9 +73,9 @@ private[spark] case class CoalescedRDDPartition( * @param balanceSlack used to trade-off balance and locality. 1.0 is all locality, 0 is all balance */ private[spark] class CoalescedRDD[T: ClassTag]( - @transient var prev: RDD[T], - maxPartitions: Int, - balanceSlack: Double = 0.10) + @transient var prev: RDD[T], + maxPartitions: Int, + balanceSlack: Double = 0.10) extends RDD[T](prev.context, Nil) { // Nil since we implement getDependencies override def getPartitions: Array[Partition] = { @@ -113,7 +113,7 @@ private[spark] class CoalescedRDD[T: ClassTag]( * @return the machine most preferred by split */ override def getPreferredLocations(partition: Partition): Seq[String] = { - List(partition.asInstanceOf[CoalescedRDDPartition].preferredLocation) + partition.asInstanceOf[CoalescedRDDPartition].preferredLocation.toSeq } } @@ -147,7 +147,7 @@ private[spark] class CoalescedRDD[T: ClassTag]( * */ -private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) { +private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) { def compare(o1: PartitionGroup, o2: PartitionGroup): Boolean = o1.size < o2.size def compare(o1: Option[PartitionGroup], o2: Option[PartitionGroup]): Boolean = @@ -341,8 +341,14 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc } } -private[spark] case class PartitionGroup(prefLoc: String = "") { +private case class PartitionGroup(prefLoc: Option[String] = None) { var arr = mutable.ArrayBuffer[Partition]() - def size = arr.size } + +private object PartitionGroup { + def apply(prefLoc: String): PartitionGroup = { + require(prefLoc != "", "Preferred location must not be empty") + PartitionGroup(Some(prefLoc)) + } +} diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 46fcb80fa1845..6836e9ab0fd6b 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -294,7 +294,7 @@ class RDDSuite extends FunSuite with SharedSparkContext { test("coalesced RDDs with locality") { val data3 = sc.makeRDD(List((1,List("a","c")), (2,List("a","b","c")), (3,List("b")))) val coal3 = data3.coalesce(3) - val list3 = coal3.partitions.map(p => p.asInstanceOf[CoalescedRDDPartition].preferredLocation) + val list3 = coal3.partitions.flatMap(_.asInstanceOf[CoalescedRDDPartition].preferredLocation) assert(list3.sorted === Array("a","b","c"), "Locality preferences are dropped") // RDD with locality preferences spread (non-randomly) over 6 machines, m0 through m5 From 36bdb5b748ff670a9bafd787e40c9e142c9a85b9 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Wed, 10 Dec 2014 14:41:16 -0800 Subject: [PATCH 11/11] MAINTENANCE: Automated closing of pull requests. This commit exists to close the following pull requests on Github: Closes #2883 (close requested by 'pwendell') Closes #3364 (close requested by 'pwendell') Closes #4458 (close requested by 'pwendell') Closes #1574 (close requested by 'andrewor14') Closes #2546 (close requested by 'andrewor14') Closes #2516 (close requested by 'andrewor14') Closes #154 (close requested by 'andrewor14')