From 8ad486add941c9686dfb39309adaf5b7ca66345d Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Fri, 7 Mar 2014 23:23:59 -0800 Subject: [PATCH 01/14] Allow sbt to use more than 1G of heap. There was a mistake in sbt build file ( introduced by 012bd5fbc97dc40bb61e0e2b9cc97ed0083f37f6 ) in which we set the default to 2048 and the immediately reset it to 1024. Without this, building Spark can run out of permgen space on my machine. Author: Reynold Xin Closes #103 from rxin/sbt and squashes the following commits: 8829c34 [Reynold Xin] Allow sbt to use more than 1G of heap. --- sbt/sbt-launch-lib.bash | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sbt/sbt-launch-lib.bash b/sbt/sbt-launch-lib.bash index 00a6b41013e5f..64e40a88206be 100755 --- a/sbt/sbt-launch-lib.bash +++ b/sbt/sbt-launch-lib.bash @@ -105,7 +105,7 @@ get_mem_opts () { local mem=${1:-2048} local perm=$(( $mem / 4 )) (( $perm > 256 )) || perm=256 - (( $perm < 1024 )) || perm=1024 + (( $perm < 4096 )) || perm=4096 local codecache=$(( $perm / 2 )) echo "-Xms${mem}m -Xmx${mem}m -XX:MaxPermSize=${perm}m -XX:ReservedCodeCacheSize=${codecache}m" From 0b7b7fd45cd9037d23cb090e62be3ff075214fe7 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Fri, 7 Mar 2014 23:26:46 -0800 Subject: [PATCH 02/14] [SPARK-1194] Fix the same-RDD rule for cache replacement SPARK-1194: https://spark-project.atlassian.net/browse/SPARK-1194 In the current implementation, when selecting candidate blocks to be swapped out, once we find a block from the same RDD that the block to be stored belongs to, cache eviction fails and aborts. In this PR, we keep selecting blocks *not* from the RDD that the block to be stored belongs to until either enough free space can be ensured (cache eviction succeeds) or all such blocks are checked (cache eviction fails). Author: Cheng Lian Closes #96 from liancheng/fix-spark-1194 and squashes the following commits: 2524ab9 [Cheng Lian] Added regression test case for SPARK-1194 6e40c22 [Cheng Lian] Remove redundant comments 40cdcb2 [Cheng Lian] Bug fix, and addressed PR comments from @mridulm 62c92ac [Cheng Lian] Fixed SPARK-1194 https://spark-project.atlassian.net/browse/SPARK-1194 --- .../org/apache/spark/storage/MemoryStore.scala | 11 +++++------ .../apache/spark/storage/BlockManagerSuite.scala | 14 ++++++++++++++ 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index b89212eaabf6c..38836d44b04e8 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -236,13 +236,10 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) while (maxMemory - (currentMemory - selectedMemory) < space && iterator.hasNext) { val pair = iterator.next() val blockId = pair.getKey - if (rddToAdd.isDefined && rddToAdd == getRddId(blockId)) { - logInfo("Will not store " + blockIdToAdd + " as it would require dropping another " + - "block from the same RDD") - return false + if (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) { + selectedBlocks += blockId + selectedMemory += pair.getValue.size } - selectedBlocks += blockId - selectedMemory += pair.getValue.size } } @@ -264,6 +261,8 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) } return true } else { + logInfo(s"Will not store $blockIdToAdd as it would require dropping another block " + + "from the same RDD") return false } } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 121e47c7b1b41..1036b9f34e9dd 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -662,4 +662,18 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT assert(store.getSingle("a1") == None, "a1 should not be in store") } } + + test("SPARK-1194 regression: fix the same-RDD rule for cache replacement") { + store = new BlockManager("", actorSystem, master, serializer, 1200, conf, securityMgr) + store.putSingle(rdd(0, 0), new Array[Byte](400), StorageLevel.MEMORY_ONLY) + store.putSingle(rdd(1, 0), new Array[Byte](400), StorageLevel.MEMORY_ONLY) + // Access rdd_1_0 to ensure it's not least recently used. + assert(store.getSingle(rdd(1, 0)).isDefined, "rdd_1_0 was not in store") + // According to the same-RDD rule, rdd_1_0 should be replaced here. + store.putSingle(rdd(0, 1), new Array[Byte](400), StorageLevel.MEMORY_ONLY) + // rdd_1_0 should have been replaced, even it's not least recently used. + assert(store.memoryStore.contains(rdd(0, 0)), "rdd_0_0 was not in store") + assert(store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was not in store") + assert(!store.memoryStore.contains(rdd(1, 0)), "rdd_1_0 was in store") + } } From c2834ec081df392ca501a75b5af06efaa5448509 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Sat, 8 Mar 2014 12:40:26 -0800 Subject: [PATCH 03/14] Update junitxml plugin to the latest version to avoid recompilation in every SBT command. Author: Reynold Xin Closes #104 from rxin/junitxml and squashes the following commits: 67ef7bf [Reynold Xin] Update junitxml plugin to the latest version to avoid recompilation in every SBT command. --- project/plugins.sbt | 1 + project/project/SparkPluginBuild.scala | 26 -------------------------- 2 files changed, 1 insertion(+), 26 deletions(-) delete mode 100644 project/project/SparkPluginBuild.scala diff --git a/project/plugins.sbt b/project/plugins.sbt index 914f2e05a402a..32bc044a93221 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -19,3 +19,4 @@ addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.7.4") addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "0.4.0") +addSbtPlugin("com.alpinenow" % "junit_xml_listener" % "0.5.0") diff --git a/project/project/SparkPluginBuild.scala b/project/project/SparkPluginBuild.scala deleted file mode 100644 index a88a5e14539ec..0000000000000 --- a/project/project/SparkPluginBuild.scala +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import sbt._ - -object SparkPluginDef extends Build { - lazy val root = Project("plugins", file(".")) dependsOn(junitXmlListener) - /* This is not published in a Maven repository, so we get it from GitHub directly */ - lazy val junitXmlListener = uri( - "https://github.com/chenkelmann/junit_xml_listener.git#3f8029fbfda54dc7a68b1afd2f885935e1090016" - ) -} From e59a3b6c415b95e8137f5a154716b12653a8aed0 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sat, 8 Mar 2014 16:02:42 -0800 Subject: [PATCH 04/14] SPARK-1190: Do not initialize log4j if slf4j log4j backend is not being used Author: Patrick Wendell Closes #107 from pwendell/logging and squashes the following commits: be21c11 [Patrick Wendell] Logging fix --- core/src/main/scala/org/apache/spark/Logging.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Logging.scala b/core/src/main/scala/org/apache/spark/Logging.scala index b749e5414dab6..7423082e34f47 100644 --- a/core/src/main/scala/org/apache/spark/Logging.scala +++ b/core/src/main/scala/org/apache/spark/Logging.scala @@ -19,6 +19,7 @@ package org.apache.spark import org.apache.log4j.{LogManager, PropertyConfigurator} import org.slf4j.{Logger, LoggerFactory} +import org.slf4j.impl.StaticLoggerBinder /** * Utility trait for classes that want to log data. Creates a SLF4J logger for the class and allows @@ -101,9 +102,11 @@ trait Logging { } private def initializeLogging() { - // If Log4j doesn't seem initialized, load a default properties file + // If Log4j is being used, but is not initialized, load a default properties file + val binder = StaticLoggerBinder.getSingleton + val usingLog4j = binder.getLoggerFactoryClassStr.endsWith("Log4jLoggerFactory") val log4jInitialized = LogManager.getRootLogger.getAllAppenders.hasMoreElements - if (!log4jInitialized) { + if (!log4jInitialized && usingLog4j) { val defaultLogProps = "org/apache/spark/log4j-defaults.properties" val classLoader = this.getClass.getClassLoader Option(classLoader.getResource(defaultLogProps)) match { From 52834d761b059264214dfc6a1f9c70b8bc7ec089 Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Sun, 9 Mar 2014 11:08:39 -0700 Subject: [PATCH 05/14] SPARK-929: Fully deprecate usage of SPARK_MEM (Continued from old repo, prior discussion at https://github.com/apache/incubator-spark/pull/615) This patch cements our deprecation of the SPARK_MEM environment variable by replacing it with three more specialized variables: SPARK_DAEMON_MEMORY, SPARK_EXECUTOR_MEMORY, and SPARK_DRIVER_MEMORY The creation of the latter two variables means that we can safely set driver/job memory without accidentally setting the executor memory. Neither is public. SPARK_EXECUTOR_MEMORY is only used by the Mesos scheduler (and set within SparkContext). The proper way of configuring executor memory is through the "spark.executor.memory" property. SPARK_DRIVER_MEMORY is the new way of specifying the amount of memory run by jobs launched by spark-class, without possibly affecting executor memory. Other memory considerations: - The repl's memory can be set through the "--drivermem" command-line option, which really just sets SPARK_DRIVER_MEMORY. - run-example doesn't use spark-class, so the only way to modify examples' memory is actually an unusual use of SPARK_JAVA_OPTS (which is normally overriden in all cases by spark-class). This patch also fixes a lurking bug where spark-shell misused spark-class (the first argument is supposed to be the main class name, not java options), as well as a bug in the Windows spark-class2.cmd. I have not yet tested this patch on either Windows or Mesos, however. Author: Aaron Davidson Closes #99 from aarondav/sparkmem and squashes the following commits: 9df4c68 [Aaron Davidson] SPARK-929: Fully deprecate usage of SPARK_MEM --- bin/spark-class | 48 +++++++++++-------- bin/spark-class2.cmd | 47 +++++++++++++----- bin/spark-shell | 28 +++++------ .../scala/org/apache/spark/SparkContext.scala | 20 ++++---- .../scala/org/apache/spark/util/Utils.scala | 2 - docs/tuning.md | 2 +- python/pyspark/java_gateway.py | 2 +- 7 files changed, 90 insertions(+), 59 deletions(-) diff --git a/bin/spark-class b/bin/spark-class index c4225a392d6da..229ae2cebbab3 100755 --- a/bin/spark-class +++ b/bin/spark-class @@ -40,34 +40,46 @@ if [ -z "$1" ]; then exit 1 fi -# If this is a standalone cluster daemon, reset SPARK_JAVA_OPTS and SPARK_MEM to reasonable -# values for that; it doesn't need a lot -if [ "$1" = "org.apache.spark.deploy.master.Master" -o "$1" = "org.apache.spark.deploy.worker.Worker" ]; then - SPARK_MEM=${SPARK_DAEMON_MEMORY:-512m} - SPARK_DAEMON_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS -Dspark.akka.logLifecycleEvents=true" - # Do not overwrite SPARK_JAVA_OPTS environment variable in this script - OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS" # Empty by default -else - OUR_JAVA_OPTS="$SPARK_JAVA_OPTS" +if [ -n "$SPARK_MEM" ]; then + echo "Warning: SPARK_MEM is deprecated, please use a more specific config option" + echo "(e.g., spark.executor.memory or SPARK_DRIVER_MEMORY)." fi +# Use SPARK_MEM or 512m as the default memory, to be overridden by specific options +DEFAULT_MEM=${SPARK_MEM:-512m} + +SPARK_DAEMON_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS -Dspark.akka.logLifecycleEvents=true" -# Add java opts for master, worker, executor. The opts maybe null +# Add java opts and memory settings for master, worker, executors, and repl. case "$1" in + # Master and Worker use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY. 'org.apache.spark.deploy.master.Master') - OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_MASTER_OPTS" + OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS $SPARK_MASTER_OPTS" + OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM} ;; 'org.apache.spark.deploy.worker.Worker') - OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_WORKER_OPTS" + OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS $SPARK_WORKER_OPTS" + OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM} ;; + + # Executors use SPARK_JAVA_OPTS + SPARK_EXECUTOR_MEMORY. 'org.apache.spark.executor.CoarseGrainedExecutorBackend') - OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_EXECUTOR_OPTS" + OUR_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_EXECUTOR_OPTS" + OUR_JAVA_MEM=${SPARK_EXECUTOR_MEMORY:-$DEFAULT_MEM} ;; 'org.apache.spark.executor.MesosExecutorBackend') - OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_EXECUTOR_OPTS" + OUR_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_EXECUTOR_OPTS" + OUR_JAVA_MEM=${SPARK_EXECUTOR_MEMORY:-$DEFAULT_MEM} ;; + + # All drivers use SPARK_JAVA_OPTS + SPARK_DRIVER_MEMORY. The repl also uses SPARK_REPL_OPTS. 'org.apache.spark.repl.Main') - OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_REPL_OPTS" + OUR_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_REPL_OPTS" + OUR_JAVA_MEM=${SPARK_DRIVER_MEMORY:-$DEFAULT_MEM} + ;; + *) + OUR_JAVA_OPTS="$SPARK_JAVA_OPTS" + OUR_JAVA_MEM=${SPARK_DRIVER_MEMORY:-$DEFAULT_MEM} ;; esac @@ -83,14 +95,10 @@ else fi fi -# Set SPARK_MEM if it isn't already set since we also use it for this process -SPARK_MEM=${SPARK_MEM:-512m} -export SPARK_MEM - # Set JAVA_OPTS to be able to load native libraries and to set heap size JAVA_OPTS="$OUR_JAVA_OPTS" JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$SPARK_LIBRARY_PATH" -JAVA_OPTS="$JAVA_OPTS -Xms$SPARK_MEM -Xmx$SPARK_MEM" +JAVA_OPTS="$JAVA_OPTS -Xms$OUR_JAVA_MEM -Xmx$OUR_JAVA_MEM" # Load extra JAVA_OPTS from conf/java-opts, if it exists if [ -e "$FWDIR/conf/java-opts" ] ; then JAVA_OPTS="$JAVA_OPTS `cat $FWDIR/conf/java-opts`" diff --git a/bin/spark-class2.cmd b/bin/spark-class2.cmd index 80818c78ec24b..f488cfdbeceb6 100755 --- a/bin/spark-class2.cmd +++ b/bin/spark-class2.cmd @@ -34,22 +34,45 @@ if not "x%1"=="x" goto arg_given goto exit :arg_given -set RUNNING_DAEMON=0 -if "%1"=="spark.deploy.master.Master" set RUNNING_DAEMON=1 -if "%1"=="spark.deploy.worker.Worker" set RUNNING_DAEMON=1 -if "x%SPARK_DAEMON_MEMORY%" == "x" set SPARK_DAEMON_MEMORY=512m +if not "x%SPARK_MEM%"=="x" ( + echo Warning: SPARK_MEM is deprecated, please use a more specific config option + echo e.g., spark.executor.memory or SPARK_DRIVER_MEMORY. +) + +rem Use SPARK_MEM or 512m as the default memory, to be overridden by specific options +set OUR_JAVA_MEM=%SPARK_MEM% +if "x%OUR_JAVA_MEM%"=="x" set OUR_JAVA_MEM=512m + set SPARK_DAEMON_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% -Dspark.akka.logLifecycleEvents=true -if "%RUNNING_DAEMON%"=="1" set SPARK_MEM=%SPARK_DAEMON_MEMORY% -rem Do not overwrite SPARK_JAVA_OPTS environment variable in this script -if "%RUNNING_DAEMON%"=="0" set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS% -if "%RUNNING_DAEMON%"=="1" set OUR_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% -rem Figure out how much memory to use per executor and set it as an environment -rem variable so that our process sees it and can report it to Mesos -if "x%SPARK_MEM%"=="x" set SPARK_MEM=512m +rem Add java opts and memory settings for master, worker, executors, and repl. +rem Master and Worker use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY. +if "%1"=="org.apache.spark.deploy.master.Master" ( + set OUR_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% %SPARK_MASTER_OPTS% + if not "x%SPARK_DAEMON_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DAEMON_MEMORY% +) else if "%1"=="org.apache.spark.deploy.worker.Worker" ( + set OUR_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% %SPARK_WORKER_OPTS% + if not "x%SPARK_DAEMON_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DAEMON_MEMORY% + +rem Executors use SPARK_JAVA_OPTS + SPARK_EXECUTOR_MEMORY. +) else if "%1"=="org.apache.spark.executor.CoarseGrainedExecutorBackend" ( + set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS% %SPARK_EXECUTOR_OPTS% + if not "x%SPARK_EXECUTOR_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_EXECUTOR_MEMORY% +) else if "%1"=="org.apache.spark.executor.MesosExecutorBackend" ( + set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS% %SPARK_EXECUTOR_OPTS% + if not "x%SPARK_EXECUTOR_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_EXECUTOR_MEMORY% + +rem All drivers use SPARK_JAVA_OPTS + SPARK_DRIVER_MEMORY. The repl also uses SPARK_REPL_OPTS. +) else if "%1"=="org.apache.spark.repl.Main" ( + set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS% %SPARK_REPL_OPTS% + if not "x%SPARK_DRIVER_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DRIVER_MEMORY% +) else ( + set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS% + if not "x%SPARK_DRIVER_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DRIVER_MEMORY% +) rem Set JAVA_OPTS to be able to load native libraries and to set heap size -set JAVA_OPTS=%OUR_JAVA_OPTS% -Djava.library.path=%SPARK_LIBRARY_PATH% -Xms%SPARK_MEM% -Xmx%SPARK_MEM% +set JAVA_OPTS=%OUR_JAVA_OPTS% -Djava.library.path=%SPARK_LIBRARY_PATH% -Xms%OUR_JAVA_MEM% -Xmx%OUR_JAVA_MEM% rem Attention: when changing the way the JAVA_OPTS are assembled, the change must be reflected in ExecutorRunner.scala! rem Test whether the user has built Spark diff --git a/bin/spark-shell b/bin/spark-shell index 2bff06cf70051..7d3fe3aca7f1d 100755 --- a/bin/spark-shell +++ b/bin/spark-shell @@ -45,13 +45,11 @@ if [ "$1" = "--help" ] || [ "$1" = "-h" ]; then exit fi -SPARK_SHELL_OPTS="" - for o in "$@"; do if [ "$1" = "-c" -o "$1" = "--cores" ]; then shift if [[ "$1" =~ $CORE_PATTERN ]]; then - SPARK_SHELL_OPTS="$SPARK_SHELL_OPTS -Dspark.cores.max=$1" + SPARK_REPL_OPTS="$SPARK_REPL_OPTS -Dspark.cores.max=$1" shift else echo "ERROR: wrong format for -c/--cores" @@ -61,7 +59,7 @@ for o in "$@"; do if [ "$1" = "-em" -o "$1" = "--execmem" ]; then shift if [[ $1 =~ $MEM_PATTERN ]]; then - SPARK_SHELL_OPTS="$SPARK_SHELL_OPTS -Dspark.executor.memory=$1" + SPARK_REPL_OPTS="$SPARK_REPL_OPTS -Dspark.executor.memory=$1" shift else echo "ERROR: wrong format for --execmem/-em" @@ -71,7 +69,7 @@ for o in "$@"; do if [ "$1" = "-dm" -o "$1" = "--drivermem" ]; then shift if [[ $1 =~ $MEM_PATTERN ]]; then - export SPARK_MEM=$1 + export SPARK_DRIVER_MEMORY=$1 shift else echo "ERROR: wrong format for --drivermem/-dm" @@ -125,16 +123,18 @@ if [[ ! $? ]]; then fi if $cygwin; then - # Workaround for issue involving JLine and Cygwin - # (see http://sourceforge.net/p/jline/bugs/40/). - # If you're using the Mintty terminal emulator in Cygwin, may need to set the - # "Backspace sends ^H" setting in "Keys" section of the Mintty options - # (see https://github.com/sbt/sbt/issues/562). - stty -icanon min 1 -echo > /dev/null 2>&1 - $FWDIR/bin/spark-class -Djline.terminal=unix $SPARK_SHELL_OPTS org.apache.spark.repl.Main "$@" - stty icanon echo > /dev/null 2>&1 + # Workaround for issue involving JLine and Cygwin + # (see http://sourceforge.net/p/jline/bugs/40/). + # If you're using the Mintty terminal emulator in Cygwin, may need to set the + # "Backspace sends ^H" setting in "Keys" section of the Mintty options + # (see https://github.com/sbt/sbt/issues/562). + stty -icanon min 1 -echo > /dev/null 2>&1 + export SPARK_REPL_OPTS="$SPARK_REPL_OPTS -Djline.terminal=unix" + $FWDIR/bin/spark-class org.apache.spark.repl.Main "$@" + stty icanon echo > /dev/null 2>&1 else - $FWDIR/bin/spark-class $SPARK_SHELL_OPTS org.apache.spark.repl.Main "$@" + export SPARK_REPL_OPTS + $FWDIR/bin/spark-class org.apache.spark.repl.Main "$@" fi # record the exit status lest it be overwritten: diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ce25573834829..cdc0e5a34240e 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -162,19 +162,20 @@ class SparkContext( jars.foreach(addJar) } + def warnSparkMem(value: String): String = { + logWarning("Using SPARK_MEM to set amount of memory to use per executor process is " + + "deprecated, please use spark.executor.memory instead.") + value + } + private[spark] val executorMemory = conf.getOption("spark.executor.memory") - .orElse(Option(System.getenv("SPARK_MEM"))) + .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY"))) + .orElse(Option(System.getenv("SPARK_MEM")).map(warnSparkMem)) .map(Utils.memoryStringToMb) .getOrElse(512) - if (!conf.contains("spark.executor.memory") && sys.env.contains("SPARK_MEM")) { - logWarning("Using SPARK_MEM to set amount of memory to use per executor process is " + - "deprecated, instead use spark.executor.memory") - } - // Environment variables to pass to our executors private[spark] val executorEnvs = HashMap[String, String]() - // Note: SPARK_MEM is included for Mesos, but overwritten for standalone mode in ExecutorRunner for (key <- Seq("SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS"); value <- Option(System.getenv(key))) { executorEnvs(key) = value @@ -185,8 +186,9 @@ class SparkContext( value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} { executorEnvs(envKey) = value } - // Since memory can be set with a system property too, use that - executorEnvs("SPARK_MEM") = executorMemory + "m" + // The Mesos scheduler backend relies on this environment variable to set executor memory. + // TODO: Set this only in the Mesos scheduler. + executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m" executorEnvs ++= conf.getExecutorEnv // Set SPARK_USER for user who is running SparkContext. diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 0eb2f78b730f6..53458b6660fab 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -532,8 +532,6 @@ private[spark] object Utils extends Logging { /** * Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes. - * This is used to figure out how much memory to claim from Mesos based on the SPARK_MEM - * environment variable. */ def memoryStringToMb(str: String): Int = { val lower = str.toLowerCase diff --git a/docs/tuning.md b/docs/tuning.md index 26ff1325bb59c..093df3187a789 100644 --- a/docs/tuning.md +++ b/docs/tuning.md @@ -163,7 +163,7 @@ their work directories), *not* on your driver program. **Cache Size Tuning** One important configuration parameter for GC is the amount of memory that should be used for caching RDDs. -By default, Spark uses 60% of the configured executor memory (`spark.executor.memory` or `SPARK_MEM`) to +By default, Spark uses 60% of the configured executor memory (`spark.executor.memory`) to cache RDDs. This means that 40% of memory is available for any objects created during task execution. In case your tasks slow down and you find that your JVM is garbage-collecting frequently or running out of diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index c15add5237507..6a16756e0576d 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -29,7 +29,7 @@ def launch_gateway(): # Launch the Py4j gateway using Spark's run command so that we pick up the - # proper classpath and SPARK_MEM settings from spark-env.sh + # proper classpath and settings from spark-env.sh on_windows = platform.system() == "Windows" script = "./bin/spark-class.cmd" if on_windows else "./bin/spark-class" command = [os.path.join(SPARK_HOME, script), "py4j.GatewayServer", From f6f9d02e85d17da2f742ed0062f1648a9293e73c Mon Sep 17 00:00:00 2001 From: Jiacheng Guo Date: Sun, 9 Mar 2014 11:37:44 -0700 Subject: [PATCH 06/14] Add timeout for fetch file Currently, when fetch a file, the connection's connect timeout and read timeout is based on the default jvm setting, in this change, I change it to use spark.worker.timeout. This can be usefull, when the connection status between worker is not perfect. And prevent prematurely remove task set. Author: Jiacheng Guo Closes #98 from guojc/master and squashes the following commits: abfe698 [Jiacheng Guo] add space according request 2a37c34 [Jiacheng Guo] Add timeout for fetch file Currently, when fetch a file, the connection's connect timeout and read timeout is based on the default jvm setting, in this change, I change it to use spark.worker.timeout. This can be usefull, when the connection status between worker is not perfect. And prevent prematurely remove task set. --- core/src/main/scala/org/apache/spark/util/Utils.scala | 4 ++++ docs/configuration.md | 9 +++++++++ 2 files changed, 13 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 53458b6660fab..ac376fc403ada 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -278,6 +278,10 @@ private[spark] object Utils extends Logging { uc = new URL(url).openConnection() } + val timeout = conf.getInt("spark.files.fetchTimeout", 60) * 1000 + uc.setConnectTimeout(timeout) + uc.setReadTimeout(timeout) + uc.connect() val in = uc.getInputStream(); val out = new FileOutputStream(tempFile) Utils.copyStream(in, out, true) diff --git a/docs/configuration.md b/docs/configuration.md index 913c653b0dac4..8f6cb02911de5 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -523,6 +523,15 @@ Apart from these, the following properties are also available, and may be useful Whether to overwrite files added through SparkContext.addFile() when the target file exists and its contents do not match those of the source. + + + spark.files.fetchTimeout + false + + Communication timeout to use when fetching files added through SparkContext.addFile() from + the driver. + + spark.authenticate false From faf4cad1debb76148facc008e0a3308ac96eee7a Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sun, 9 Mar 2014 11:57:06 -0700 Subject: [PATCH 07/14] Fix markup errors introduced in #33 (SPARK-1189) These were causing errors on the configuration page. Author: Patrick Wendell Closes #111 from pwendell/master and squashes the following commits: 8467a86 [Patrick Wendell] Fix markup errors introduced in #33 (SPARK-1189) --- docs/configuration.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index 8f6cb02911de5..a006224d5080c 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -147,13 +147,13 @@ Apart from these, the following properties are also available, and may be useful How many stages the Spark UI remembers before garbage collecting. - + spark.ui.filters None Comma separated list of filter class names to apply to the Spark web ui. The filter should be a standard javax servlet Filter. Parameters to each filter can also be specified by setting a - java system property of spark..params='param1=value1,param2=value2' + java system property of spark.<class name of filter>.params='param1=value1,param2=value2' (e.g.-Dspark.ui.filters=com.test.filter1 -Dspark.com.test.filter1.params='param1=foo,param2=testing') @@ -515,7 +515,7 @@ Apart from these, the following properties are also available, and may be useful the whole cluster by default.
Note: this setting needs to be configured in the standalone cluster master, not in individual applications; you can set it through SPARK_JAVA_OPTS in spark-env.sh. - + spark.files.overwrite From b9be160951b9e7a7e801009e9d6ee6c2b5d2d47e Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sun, 9 Mar 2014 13:17:07 -0700 Subject: [PATCH 08/14] SPARK-782 Clean up for ASM dependency. This makes two changes. 1) Spark uses the shaded version of asm that is (conveniently) published with Kryo. 2) Existing exclude rules around asm are updated to reflect the new groupId of `org.ow2.asm`. This made all of the old rules not work with newer Hadoop versions that pull in new asm versions. Author: Patrick Wendell Closes #100 from pwendell/asm and squashes the following commits: 9235f3f [Patrick Wendell] SPARK-782 Clean up for ASM dependency. --- core/pom.xml | 4 -- .../apache/spark/util/ClosureCleaner.scala | 4 +- .../spark/graphx/util/BytecodeUtils.scala | 4 +- pom.xml | 41 ++++++++++++++++--- project/SparkBuild.scala | 20 ++++----- .../spark/repl/ExecutorClassLoader.scala | 5 ++- 6 files changed, 53 insertions(+), 25 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index 3e6e98cd2cf92..4d7d41a9714d7 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -102,10 +102,6 @@ org.xerial.snappy snappy-java - - org.ow2.asm - asm - com.twitter chill_${scala.binary.version} diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index 681d0a30cb3f8..a8d20ee332355 100644 --- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala @@ -22,8 +22,8 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream} import scala.collection.mutable.Map import scala.collection.mutable.Set -import org.objectweb.asm.{ClassReader, ClassVisitor, MethodVisitor, Type} -import org.objectweb.asm.Opcodes._ +import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.{ClassReader, ClassVisitor, MethodVisitor, Type} +import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes._ import org.apache.spark.Logging diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala index d1528e2f07cf2..014a7335f85cc 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala @@ -23,8 +23,8 @@ import scala.collection.mutable.HashSet import org.apache.spark.util.Utils -import org.objectweb.asm.{ClassReader, ClassVisitor, MethodVisitor} -import org.objectweb.asm.Opcodes._ +import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.{ClassReader, ClassVisitor, MethodVisitor} +import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes._ /** diff --git a/pom.xml b/pom.xml index 3b863856e4634..f0c877dcfe7b2 100644 --- a/pom.xml +++ b/pom.xml @@ -221,11 +221,6 @@ snappy-java 1.0.5 - - org.ow2.asm - asm - 4.0 - com.clearspring.analytics stream @@ -245,11 +240,31 @@ com.twitter chill_${scala.binary.version} 0.3.1 + + + org.ow2.asm + asm + + + org.ow2.asm + asm-commons + + com.twitter chill-java 0.3.1 + + + org.ow2.asm + asm + + + org.ow2.asm + asm-commons + + ${akka.group} @@ -435,6 +450,10 @@ asm asm + + org.ow2.asm + asm + org.jboss.netty netty @@ -474,6 +493,10 @@ asm asm + + org.ow2.asm + asm + org.jboss.netty netty @@ -489,6 +512,10 @@ asm asm + + org.ow2.asm + asm + org.jboss.netty netty @@ -505,6 +532,10 @@ asm asm + + org.ow2.asm + asm + org.jboss.netty netty diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 138aad7561043..8fa220c413291 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -257,7 +257,8 @@ object SparkBuild extends Build { val slf4jVersion = "1.7.5" val excludeNetty = ExclusionRule(organization = "org.jboss.netty") - val excludeAsm = ExclusionRule(organization = "asm") + val excludeAsm = ExclusionRule(organization = "org.ow2.asm") + val excludeOldAsm = ExclusionRule(organization = "asm") val excludeCommonsLogging = ExclusionRule(organization = "commons-logging") val excludeSLF4J = ExclusionRule(organization = "org.slf4j") val excludeScalap = ExclusionRule(organization = "org.scala-lang", artifact = "scalap") @@ -280,7 +281,6 @@ object SparkBuild extends Build { "commons-daemon" % "commons-daemon" % "1.0.10", // workaround for bug HADOOP-9407 "com.ning" % "compress-lzf" % "1.0.0", "org.xerial.snappy" % "snappy-java" % "1.0.5", - "org.ow2.asm" % "asm" % "4.0", "org.spark-project.akka" %% "akka-remote" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty), "org.spark-project.akka" %% "akka-slf4j" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty), "org.spark-project.akka" %% "akka-testkit" % "2.2.3-shaded-protobuf" % "test", @@ -291,15 +291,15 @@ object SparkBuild extends Build { "commons-net" % "commons-net" % "2.2", "net.java.dev.jets3t" % "jets3t" % "0.7.1" excludeAll(excludeCommonsLogging), "org.apache.derby" % "derby" % "10.4.2.0" % "test", - "org.apache.hadoop" % hadoopClient % hadoopVersion excludeAll(excludeNetty, excludeAsm, excludeCommonsLogging, excludeSLF4J), + "org.apache.hadoop" % hadoopClient % hadoopVersion excludeAll(excludeNetty, excludeAsm, excludeCommonsLogging, excludeSLF4J, excludeOldAsm), "org.apache.curator" % "curator-recipes" % "2.4.0" excludeAll(excludeNetty), "com.codahale.metrics" % "metrics-core" % "3.0.0", "com.codahale.metrics" % "metrics-jvm" % "3.0.0", "com.codahale.metrics" % "metrics-json" % "3.0.0", "com.codahale.metrics" % "metrics-ganglia" % "3.0.0", "com.codahale.metrics" % "metrics-graphite" % "3.0.0", - "com.twitter" %% "chill" % "0.3.1", - "com.twitter" % "chill-java" % "0.3.1", + "com.twitter" %% "chill" % "0.3.1" excludeAll(excludeAsm), + "com.twitter" % "chill-java" % "0.3.1" excludeAll(excludeAsm), "com.clearspring.analytics" % "stream" % "2.5.1" ), libraryDependencies ++= maybeAvro @@ -320,7 +320,7 @@ object SparkBuild extends Build { name := "spark-examples", libraryDependencies ++= Seq( "com.twitter" %% "algebird-core" % "0.1.11", - "org.apache.hbase" % "hbase" % HBASE_VERSION excludeAll(excludeNetty, excludeAsm, excludeCommonsLogging), + "org.apache.hbase" % "hbase" % HBASE_VERSION excludeAll(excludeNetty, excludeAsm, excludeOldAsm, excludeCommonsLogging), "org.apache.cassandra" % "cassandra-all" % "1.2.6" exclude("com.google.guava", "guava") exclude("com.googlecode.concurrentlinkedhashmap", "concurrentlinkedhashmap-lru") @@ -397,10 +397,10 @@ object SparkBuild extends Build { def yarnEnabledSettings = Seq( libraryDependencies ++= Seq( // Exclude rule required for all ? - "org.apache.hadoop" % hadoopClient % hadoopVersion excludeAll(excludeNetty, excludeAsm), - "org.apache.hadoop" % "hadoop-yarn-api" % hadoopVersion excludeAll(excludeNetty, excludeAsm), - "org.apache.hadoop" % "hadoop-yarn-common" % hadoopVersion excludeAll(excludeNetty, excludeAsm), - "org.apache.hadoop" % "hadoop-yarn-client" % hadoopVersion excludeAll(excludeNetty, excludeAsm) + "org.apache.hadoop" % hadoopClient % hadoopVersion excludeAll(excludeNetty, excludeAsm, excludeOldAsm), + "org.apache.hadoop" % "hadoop-yarn-api" % hadoopVersion excludeAll(excludeNetty, excludeAsm, excludeOldAsm), + "org.apache.hadoop" % "hadoop-yarn-common" % hadoopVersion excludeAll(excludeNetty, excludeAsm, excludeOldAsm), + "org.apache.hadoop" % "hadoop-yarn-client" % hadoopVersion excludeAll(excludeNetty, excludeAsm, excludeOldAsm) ) ) diff --git a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala index 1aa94079fd0ae..ee972887feda6 100644 --- a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala +++ b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala @@ -27,8 +27,9 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.SparkEnv import org.apache.spark.util.Utils -import org.objectweb.asm._ -import org.objectweb.asm.Opcodes._ + +import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm._ +import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes._ /** From 5d98cfc1c8fb17fbbeacc7192ac21c0b038cbd16 Mon Sep 17 00:00:00 2001 From: Chen Chao Date: Sun, 9 Mar 2014 22:42:12 -0700 Subject: [PATCH 09/14] maintain arbitrary state data for each key RT Author: Chen Chao Closes #114 from CrazyJvm/patch-1 and squashes the following commits: dcb0df5 [Chen Chao] maintain arbitrary state data for each key --- docs/streaming-programming-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 2a56cf07d0cfc..f9904d45013f6 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -539,7 +539,7 @@ common ones are as follows. updateStateByKey(func) Return a new "state" DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values for the key. This can be - used to maintain arbitrary state data for each ket. + used to maintain arbitrary state data for each key. From e1e09e0ef6b18e034727403d81747d899b042219 Mon Sep 17 00:00:00 2001 From: Prabin Banka Date: Mon, 10 Mar 2014 13:27:00 -0700 Subject: [PATCH 10/14] SPARK-977 Added Python RDD.zip function was raised earlier as a part of apache/incubator-spark#486 Author: Prabin Banka Closes #76 from prabinb/python-api-zip and squashes the following commits: b1a31a0 [Prabin Banka] Added Python RDD.zip function --- python/pyspark/rdd.py | 20 +++++++++++++++++++- python/pyspark/serializers.py | 29 ++++++++++++++++++++++++++++- 2 files changed, 47 insertions(+), 2 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index e72f57d9d1ab0..5ab27ff4029d8 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -30,7 +30,7 @@ import warnings from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ - BatchedSerializer, CloudPickleSerializer, pack_long + BatchedSerializer, CloudPickleSerializer, PairDeserializer, pack_long from pyspark.join import python_join, python_left_outer_join, \ python_right_outer_join, python_cogroup from pyspark.statcounter import StatCounter @@ -1081,6 +1081,24 @@ def coalesce(self, numPartitions, shuffle=False): jrdd = self._jrdd.coalesce(numPartitions) return RDD(jrdd, self.ctx, self._jrdd_deserializer) + def zip(self, other): + """ + Zips this RDD with another one, returning key-value pairs with the first element in each RDD + second element in each RDD, etc. Assumes that the two RDDs have the same number of + partitions and the same number of elements in each partition (e.g. one was made through + a map on the other). + + >>> x = sc.parallelize(range(0,5)) + >>> y = sc.parallelize(range(1000, 1005)) + >>> x.zip(y).collect() + [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)] + """ + pairRDD = self._jrdd.zip(other._jrdd) + deserializer = PairDeserializer(self._jrdd_deserializer, + other._jrdd_deserializer) + return RDD(pairRDD, self.ctx, deserializer) + + # TODO: `lookup` is disabled because we can't make direct comparisons based # on the key; we need to compare the hash of the key to the hash of the # keys in the pairs. This could be an expensive operation, since those diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index 8c6ad79059c23..12c63f186a2b7 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -204,7 +204,7 @@ def __init__(self, key_ser, val_ser): self.key_ser = key_ser self.val_ser = val_ser - def load_stream(self, stream): + def prepare_keys_values(self, stream): key_stream = self.key_ser._load_stream_without_unbatching(stream) val_stream = self.val_ser._load_stream_without_unbatching(stream) key_is_batched = isinstance(self.key_ser, BatchedSerializer) @@ -212,6 +212,10 @@ def load_stream(self, stream): for (keys, vals) in izip(key_stream, val_stream): keys = keys if key_is_batched else [keys] vals = vals if val_is_batched else [vals] + yield (keys, vals) + + def load_stream(self, stream): + for (keys, vals) in self.prepare_keys_values(stream): for pair in product(keys, vals): yield pair @@ -224,6 +228,29 @@ def __str__(self): (str(self.key_ser), str(self.val_ser)) +class PairDeserializer(CartesianDeserializer): + """ + Deserializes the JavaRDD zip() of two PythonRDDs. + """ + + def __init__(self, key_ser, val_ser): + self.key_ser = key_ser + self.val_ser = val_ser + + def load_stream(self, stream): + for (keys, vals) in self.prepare_keys_values(stream): + for pair in izip(keys, vals): + yield pair + + def __eq__(self, other): + return isinstance(other, PairDeserializer) and \ + self.key_ser == other.key_ser and self.val_ser == other.val_ser + + def __str__(self): + return "PairDeserializer<%s, %s>" % \ + (str(self.key_ser), str(self.val_ser)) + + class NoOpSerializer(FramedSerializer): def loads(self, obj): return obj From f5518989b67a0941ca79368e73811895a5fa8669 Mon Sep 17 00:00:00 2001 From: jyotiska Date: Mon, 10 Mar 2014 13:34:49 -0700 Subject: [PATCH 11/14] [SPARK-972] Added detailed callsite info for ValueError in context.py (resubmitted) Author: jyotiska Closes #34 from jyotiska/pyspark_code and squashes the following commits: c9439be [jyotiska] replaced dict with namedtuple a6bf4cd [jyotiska] added callsite info for context.py --- python/pyspark/context.py | 16 +++++++++++++++- python/pyspark/rdd.py | 21 ++++++++++++++------- 2 files changed, 29 insertions(+), 8 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index c9f42d3aacb58..bf2454fd7e38e 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -20,6 +20,7 @@ import sys from threading import Lock from tempfile import NamedTemporaryFile +from collections import namedtuple from pyspark import accumulators from pyspark.accumulators import Accumulator @@ -29,6 +30,7 @@ from pyspark.java_gateway import launch_gateway from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer from pyspark.storagelevel import StorageLevel +from pyspark import rdd from pyspark.rdd import RDD from py4j.java_collections import ListConverter @@ -83,6 +85,11 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, ... ValueError:... """ + if rdd._extract_concise_traceback() is not None: + self._callsite = rdd._extract_concise_traceback() + else: + tempNamedTuple = namedtuple("Callsite", "function file linenum") + self._callsite = tempNamedTuple(function=None, file=None, linenum=None) SparkContext._ensure_initialized(self, gateway=gateway) self.environment = environment or {} @@ -169,7 +176,14 @@ def _ensure_initialized(cls, instance=None, gateway=None): if instance: if SparkContext._active_spark_context and SparkContext._active_spark_context != instance: - raise ValueError("Cannot run multiple SparkContexts at once") + currentMaster = SparkContext._active_spark_context.master + currentAppName = SparkContext._active_spark_context.appName + callsite = SparkContext._active_spark_context._callsite + + # Raise error if there is already a running Spark context + raise ValueError("Cannot run multiple SparkContexts at once; existing SparkContext(app=%s, master=%s)" \ + " created by %s at %s:%s " \ + % (currentAppName, currentMaster, callsite.function, callsite.file, callsite.linenum)) else: SparkContext._active_spark_context = instance diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 5ab27ff4029d8..e1043ad564611 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -18,6 +18,7 @@ from base64 import standard_b64encode as b64enc import copy from collections import defaultdict +from collections import namedtuple from itertools import chain, ifilter, imap import operator import os @@ -42,12 +43,14 @@ __all__ = ["RDD"] def _extract_concise_traceback(): + """ + This function returns the traceback info for a callsite, returns a dict + with function name, file name and line number + """ tb = traceback.extract_stack() + callsite = namedtuple("Callsite", "function file linenum") if len(tb) == 0: - return "I'm lost!" - # HACK: This function is in a file called 'rdd.py' in the top level of - # everything PySpark. Just trim off the directory name and assume - # everything in that tree is PySpark guts. + return None file, line, module, what = tb[len(tb) - 1] sparkpath = os.path.dirname(file) first_spark_frame = len(tb) - 1 @@ -58,16 +61,20 @@ def _extract_concise_traceback(): break if first_spark_frame == 0: file, line, fun, what = tb[0] - return "%s at %s:%d" % (fun, file, line) + return callsite(function=fun, file=file, linenum=line) sfile, sline, sfun, swhat = tb[first_spark_frame] ufile, uline, ufun, uwhat = tb[first_spark_frame-1] - return "%s at %s:%d" % (sfun, ufile, uline) + return callsite(function=sfun, file=ufile, linenum=uline) _spark_stack_depth = 0 class _JavaStackTrace(object): def __init__(self, sc): - self._traceback = _extract_concise_traceback() + tb = _extract_concise_traceback() + if tb is not None: + self._traceback = "%s at %s:%s" % (tb.function, tb.file, tb.linenum) + else: + self._traceback = "Error! Could not extract traceback info" self._context = sc def __enter__(self): From a59419c27e45f06be5143c58d48affb0a5158bdf Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Mon, 10 Mar 2014 13:37:11 -0700 Subject: [PATCH 12/14] SPARK-1168, Added foldByKey to pyspark. Author: Prashant Sharma Closes #115 from ScrapCodes/SPARK-1168/pyspark-foldByKey and squashes the following commits: db6f67e [Prashant Sharma] SPARK-1168, Added foldByKey to pyspark. --- python/pyspark/rdd.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index e1043ad564611..39916d21c76c5 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -946,7 +946,21 @@ def _mergeCombiners(iterator): combiners[k] = mergeCombiners(combiners[k], v) return combiners.iteritems() return shuffled.mapPartitions(_mergeCombiners) + + def foldByKey(self, zeroValue, func, numPartitions=None): + """ + Merge the values for each key using an associative function "func" and a neutral "zeroValue" + which may be added to the result an arbitrary number of times, and must not change + the result (e.g., 0 for addition, or 1 for multiplication.). + >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) + >>> from operator import add + >>> rdd.foldByKey(0, add).collect() + [('a', 2), ('b', 1)] + """ + return self.combineByKey(lambda v: func(zeroValue, v), func, func, numPartitions) + + # TODO: support variant with custom partitioner def groupByKey(self, numPartitions=None): """ From 2a5161708f4d2f743c7bd69ed3d98bb7bff46460 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Mon, 10 Mar 2014 16:28:41 -0700 Subject: [PATCH 13/14] SPARK-1205: Clean up callSite/origin/generator. This patch removes the `generator` field and simplifies + documents the tracking of callsites. There are two places where we care about call sites, when a job is run and when an RDD is created. This patch retains both of those features but does a slight refactoring and renaming to make things less confusing. There was another feature of an rdd called the `generator` which was by default the user class that in which the RDD was created. This is used exclusively in the JobLogger. It been subsumed by the ability to name a job group. The job logger can later be refectored to read the job group directly (will require some work) but for now this just preserves the default logged value of the user class. I'm not sure any users ever used the ability to override this. Author: Patrick Wendell Closes #106 from pwendell/callsite and squashes the following commits: fc1d009 [Patrick Wendell] Compile fix e17fb76 [Patrick Wendell] Review feedback: callSite -> creationSite 62e77ef [Patrick Wendell] Review feedback 576e60b [Patrick Wendell] SPARK-1205: Clean up callSite/origin/generator. --- .../scala/org/apache/spark/SparkContext.scala | 11 +++++------ .../org/apache/spark/api/java/JavaRDD.scala | 2 -- .../apache/spark/api/java/JavaRDDLike.scala | 5 ----- .../main/scala/org/apache/spark/rdd/RDD.scala | 18 ++++-------------- .../apache/spark/scheduler/DAGScheduler.scala | 2 +- .../org/apache/spark/scheduler/JobLogger.scala | 10 +++------- .../org/apache/spark/scheduler/Stage.scala | 2 +- .../scala/org/apache/spark/util/Utils.scala | 4 ++-- 8 files changed, 16 insertions(+), 38 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index cdc0e5a34240e..745e3fa4e85f6 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -832,13 +832,12 @@ class SparkContext( setLocalProperty("externalCallSite", null) } + /** + * Capture the current user callsite and return a formatted version for printing. If the user + * has overridden the call site, this will return the user's version. + */ private[spark] def getCallSite(): String = { - val callSite = getLocalProperty("externalCallSite") - if (callSite == null) { - Utils.formatSparkCallSite - } else { - callSite - } + Option(getLocalProperty("externalCallSite")).getOrElse(Utils.formatCallSiteInfo()) } /** diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala index 91bf404631f49..01d9357a2556d 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala @@ -135,8 +135,6 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T]) def subtract(other: JavaRDD[T], p: Partitioner): JavaRDD[T] = wrapRDD(rdd.subtract(other, p)) - def generator: String = rdd.generator - override def toString = rdd.toString /** Assign a name to this RDD */ diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index af0114bee3f49..a89419bbd10e7 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -19,7 +19,6 @@ package org.apache.spark.api.java import java.util.{Comparator, List => JList} -import scala.Tuple2 import scala.collection.JavaConversions._ import scala.reflect.ClassTag @@ -500,8 +499,4 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def name(): String = rdd.name - /** Reset generator */ - def setGenerator(_generator: String) = { - rdd.setGenerator(_generator) - } } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 3fe56963e0008..4afa7523dd802 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -126,14 +126,6 @@ abstract class RDD[T: ClassTag]( this } - /** User-defined generator of this RDD*/ - @transient var generator = Utils.getCallSiteInfo.firstUserClass - - /** Reset generator*/ - def setGenerator(_generator: String) = { - generator = _generator - } - /** * Set this RDD's storage level to persist its values across operations after the first time * it is computed. This can only be used to assign a new storage level if the RDD does not @@ -1031,8 +1023,9 @@ abstract class RDD[T: ClassTag]( private var storageLevel: StorageLevel = StorageLevel.NONE - /** Record user function generating this RDD. */ - @transient private[spark] val origin = sc.getCallSite() + /** User code that created this RDD (e.g. `textFile`, `parallelize`). */ + @transient private[spark] val creationSiteInfo = Utils.getCallSiteInfo + private[spark] def getCreationSite = Utils.formatCallSiteInfo(creationSiteInfo) private[spark] def elementClassTag: ClassTag[T] = classTag[T] @@ -1095,10 +1088,7 @@ abstract class RDD[T: ClassTag]( } override def toString: String = "%s%s[%d] at %s".format( - Option(name).map(_ + " ").getOrElse(""), - getClass.getSimpleName, - id, - origin) + Option(name).map(_ + " ").getOrElse(""), getClass.getSimpleName, id, getCreationSite) def toJavaRDD() : JavaRDD[T] = { new JavaRDD(this)(elementClassTag) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index dc5b25d845dc2..d83d0341c61ab 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -279,7 +279,7 @@ class DAGScheduler( } else { // Kind of ugly: need to register RDDs with the cache and map output tracker here // since we can't do it in the RDD constructor because # of partitions is unknown - logInfo("Registering RDD " + rdd.id + " (" + rdd.origin + ")") + logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")") mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.size) } stage diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala index 80f9ec7d03007..01cbcc390c6cd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala @@ -213,14 +213,10 @@ class JobLogger(val user: String, val logDirName: String) * @param indent Indent number before info */ protected def recordRddInStageGraph(jobID: Int, rdd: RDD[_], indent: Int) { + val cacheStr = if (rdd.getStorageLevel != StorageLevel.NONE) "CACHED" else "NONE" val rddInfo = - if (rdd.getStorageLevel != StorageLevel.NONE) { - "RDD_ID=" + rdd.id + " " + getRddName(rdd) + " CACHED" + " " + - rdd.origin + " " + rdd.generator - } else { - "RDD_ID=" + rdd.id + " " + getRddName(rdd) + " NONE" + " " + - rdd.origin + " " + rdd.generator - } + s"RDD_ID=$rdd.id ${getRddName(rdd)} $cacheStr " + + s"${rdd.getCreationSite} ${rdd.creationSiteInfo.firstUserClass}" jobLogInfo(jobID, indentString(indent) + rddInfo, false) rdd.dependencies.foreach { case shufDep: ShuffleDependency[_, _] => diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index a78b0186b9eab..5c1fc30e4a557 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -100,7 +100,7 @@ private[spark] class Stage( id } - val name = callSite.getOrElse(rdd.origin) + val name = callSite.getOrElse(rdd.getCreationSite) override def toString = "Stage " + id diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index ac376fc403ada..38a275d438959 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -719,8 +719,8 @@ private[spark] object Utils extends Logging { new CallSiteInfo(lastSparkMethod, firstUserFile, firstUserLine, firstUserClass) } - def formatSparkCallSite = { - val callSiteInfo = getCallSiteInfo + /** Returns a printable version of the call site info suitable for logs. */ + def formatCallSiteInfo(callSiteInfo: CallSiteInfo = Utils.getCallSiteInfo) = { "%s at %s:%s".format(callSiteInfo.lastSparkMethod, callSiteInfo.firstUserFile, callSiteInfo.firstUserLine) } From 2a2c9645e4ea08cd1408151a33d2d52f6752404a Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Mon, 10 Mar 2014 17:42:33 -0700 Subject: [PATCH 14/14] SPARK-1211. In ApplicationMaster, set spark.master system property to "y... ...arn-cluster" Author: Sandy Ryza Closes #118 from sryza/sandy-spark-1211 and squashes the following commits: d4001c7 [Sandy Ryza] SPARK-1211. In ApplicationMaster, set spark.master system property to "yarn-cluster" --- .../scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala | 3 +++ .../scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala | 3 +++ 2 files changed, 6 insertions(+) diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index bb574f415293a..87785cdc60c52 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -79,6 +79,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, // other spark processes running on the same box System.setProperty("spark.ui.port", "0") + // when running the AM, the Spark master is always "yarn-cluster" + System.setProperty("spark.master", "yarn-cluster") + // Use priority 30 as its higher then HDFS. Its same priority as MapReduce is using. ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30) diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index b48a2d50db5ef..57d15774290dd 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -82,6 +82,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, // other spark processes running on the same box System.setProperty("spark.ui.port", "0") + // when running the AM, the Spark master is always "yarn-cluster" + System.setProperty("spark.master", "yarn-cluster") + // Use priority 30 as it's higher then HDFS. It's same priority as MapReduce is using. ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30)