Skip to content

Commit

Permalink
Merge branch 'master' of git://git.apache.org/spark into fix-assembly…
Browse files Browse the repository at this point in the history
…-jarname
  • Loading branch information
sarutak committed Oct 11, 2014
2 parents ad1f96e + 0e8203f commit c81806b
Show file tree
Hide file tree
Showing 157 changed files with 3,762 additions and 1,977 deletions.
2 changes: 1 addition & 1 deletion bin/compute-classpath.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd"
rem Build up classpath
set CLASSPATH=%SPARK_CLASSPATH%;%SPARK_SUBMIT_CLASSPATH%

if "x%SPARK_CONF_DIR%"!="x" (
if not "x%SPARK_CONF_DIR%"=="x" (
set CLASSPATH=%CLASSPATH%;%SPARK_CONF_DIR%
) else (
set CLASSPATH=%CLASSPATH%;%FWDIR%conf
Expand Down
51 changes: 38 additions & 13 deletions bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -50,22 +50,47 @@ fi

. "$FWDIR"/bin/load-spark-env.sh

# Figure out which Python executable to use
# In Spark <= 1.1, setting IPYTHON=1 would cause the driver to be launched using the `ipython`
# executable, while the worker would still be launched using PYSPARK_PYTHON.
#
# In Spark 1.2, we removed the documentation of the IPYTHON and IPYTHON_OPTS variables and added
# PYSPARK_DRIVER_PYTHON and PYSPARK_DRIVER_PYTHON_OPTS to allow IPython to be used for the driver.
# Now, users can simply set PYSPARK_DRIVER_PYTHON=ipython to use IPython and set
# PYSPARK_DRIVER_PYTHON_OPTS to pass options when starting the Python driver
# (e.g. PYSPARK_DRIVER_PYTHON_OPTS='notebook'). This supports full customization of the IPython
# and executor Python executables.
#
# For backwards-compatibility, we retain the old IPYTHON and IPYTHON_OPTS variables.

# Determine the Python executable to use if PYSPARK_PYTHON or PYSPARK_DRIVER_PYTHON isn't set:
if hash python2.7 2>/dev/null; then
# Attempt to use Python 2.7, if installed:
DEFAULT_PYTHON="python2.7"
else
DEFAULT_PYTHON="python"
fi

# Determine the Python executable to use for the driver:
if [[ -n "$IPYTHON_OPTS" || "$IPYTHON" == "1" ]]; then
# If IPython options are specified, assume user wants to run IPython
# (for backwards-compatibility)
PYSPARK_DRIVER_PYTHON_OPTS="$PYSPARK_DRIVER_PYTHON_OPTS $IPYTHON_OPTS"
PYSPARK_DRIVER_PYTHON="ipython"
elif [[ -z "$PYSPARK_DRIVER_PYTHON" ]]; then
PYSPARK_DRIVER_PYTHON="${PYSPARK_PYTHON:-"$DEFAULT_PYTHON"}"
fi

# Determine the Python executable to use for the executors:
if [[ -z "$PYSPARK_PYTHON" ]]; then
if [[ "$IPYTHON" = "1" || -n "$IPYTHON_OPTS" ]]; then
# for backward compatibility
PYSPARK_PYTHON="ipython"
if [[ $PYSPARK_DRIVER_PYTHON == *ipython* && $DEFAULT_PYTHON != "python2.7" ]]; then
echo "IPython requires Python 2.7+; please install python2.7 or set PYSPARK_PYTHON" 1>&2
exit 1
else
PYSPARK_PYTHON="python"
PYSPARK_PYTHON="$DEFAULT_PYTHON"
fi
fi
export PYSPARK_PYTHON

if [[ -z "$PYSPARK_PYTHON_OPTS" && -n "$IPYTHON_OPTS" ]]; then
# for backward compatibility
PYSPARK_PYTHON_OPTS="$IPYTHON_OPTS"
fi

# Add the PySpark classes to the Python path:
export PYTHONPATH="$SPARK_HOME/python/:$PYTHONPATH"
export PYTHONPATH="$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH"
Expand Down Expand Up @@ -93,9 +118,9 @@ if [[ -n "$SPARK_TESTING" ]]; then
unset YARN_CONF_DIR
unset HADOOP_CONF_DIR
if [[ -n "$PYSPARK_DOC_TEST" ]]; then
exec "$PYSPARK_PYTHON" -m doctest $1
exec "$PYSPARK_DRIVER_PYTHON" -m doctest $1
else
exec "$PYSPARK_PYTHON" $1
exec "$PYSPARK_DRIVER_PYTHON" $1
fi
exit
fi
Expand All @@ -111,5 +136,5 @@ if [[ "$1" =~ \.py$ ]]; then
else
# PySpark shell requires special handling downstream
export PYSPARK_SHELL=1
exec "$PYSPARK_PYTHON" $PYSPARK_PYTHON_OPTS
exec "$PYSPARK_DRIVER_PYTHON" $PYSPARK_DRIVER_PYTHON_OPTS
fi
5 changes: 5 additions & 0 deletions core/src/main/resources/org/apache/spark/ui/static/webui.css
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ table.sortable thead {
cursor: pointer;
}

table.sortable td {
word-wrap: break-word;
max-width: 600px;
}

.progress {
margin-bottom: 0px; position: relative
}
Expand Down
2 changes: 0 additions & 2 deletions core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,6 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
arr.iterator.asInstanceOf[Iterator[T]]
case Right(it) =>
// There is not enough space to cache this partition in memory
logWarning(s"Not enough space to cache partition $key in memory! " +
s"Free memory is ${blockManager.memoryStore.freeMemory} bytes.")
val returnValues = it.asInstanceOf[Iterator[T]]
if (putLevel.useDisk) {
logWarning(s"Persisting partition $key to disk instead.")
Expand Down
19 changes: 8 additions & 11 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,8 @@ import org.apache.spark.util.{AkkaUtils, Utils}
* :: DeveloperApi ::
* Holds all the runtime environment objects for a running Spark instance (either master or worker),
* including the serializer, Akka actor system, block manager, map output tracker, etc. Currently
* Spark code finds the SparkEnv through a thread-local variable, so each thread that accesses these
* objects needs to have the right SparkEnv set. You can get the current environment with
* SparkEnv.get (e.g. after creating a SparkContext) and set it with SparkEnv.set.
* Spark code finds the SparkEnv through a global variable, so all the threads can access the same
* SparkEnv. It can be accessed by SparkEnv.get (e.g. after creating a SparkContext).
*
* NOTE: This is not intended for external use. This is exposed for Shark and may be made private
* in a future release.
Expand Down Expand Up @@ -119,30 +118,28 @@ class SparkEnv (
}

object SparkEnv extends Logging {
private val env = new ThreadLocal[SparkEnv]
@volatile private var lastSetSparkEnv : SparkEnv = _
@volatile private var env: SparkEnv = _

private[spark] val driverActorSystemName = "sparkDriver"
private[spark] val executorActorSystemName = "sparkExecutor"

def set(e: SparkEnv) {
lastSetSparkEnv = e
env.set(e)
env = e
}

/**
* Returns the ThreadLocal SparkEnv, if non-null. Else returns the SparkEnv
* previously set in any thread.
* Returns the SparkEnv.
*/
def get: SparkEnv = {
Option(env.get()).getOrElse(lastSetSparkEnv)
env
}

/**
* Returns the ThreadLocal SparkEnv.
*/
@deprecated("Use SparkEnv.get instead", "1.2")
def getThreadLocal: SparkEnv = {
env.get()
env
}

private[spark] def create(
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import scala.collection.JavaConversions._
import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider}
import com.google.common.io.Files

import org.apache.spark.util.Utils

/**
* Utilities for tests. Included in main codebase since it's used by multiple
* projects.
Expand All @@ -42,8 +44,7 @@ private[spark] object TestUtils {
* in order to avoid interference between tests.
*/
def createJarWithClasses(classNames: Seq[String], value: String = ""): URL = {
val tempDir = Files.createTempDir()
tempDir.deleteOnExit()
val tempDir = Utils.createTempDir()
val files = for (name <- classNames) yield createCompiledClass(name, tempDir, value)
val jarFile = new File(tempDir, "testJar-%s.jar".format(System.currentTimeMillis()))
createJar(files, jarFile)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,6 @@ private[spark] class PythonRDD(

override def run(): Unit = Utils.logUncaughtExceptions {
try {
SparkEnv.set(env)
val stream = new BufferedOutputStream(worker.getOutputStream, bufferSize)
val dataOut = new DataOutputStream(stream)
// Partition index
Expand Down Expand Up @@ -248,6 +247,11 @@ private[spark] class PythonRDD(
// will kill the whole executor (see org.apache.spark.executor.Executor).
_exception = e
worker.shutdownOutput()
} finally {
// Release memory used by this thread for shuffles
env.shuffleMemoryManager.releaseMemoryForThisThread()
// Release memory used by this thread for unrolling blocks
env.blockManager.memoryStore.releaseUnrollMemoryForThisThread()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,12 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1)))

// Create and start the worker
val pb = new ProcessBuilder(Seq(pythonExec, "-u", "-m", "pyspark.worker"))
val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.worker"))
val workerEnv = pb.environment()
workerEnv.putAll(envVars)
workerEnv.put("PYTHONPATH", pythonPath)
// This is equivalent to setting the -u flag; we use it because ipython doesn't support -u:
workerEnv.put("PYTHONUNBUFFERED", "YES")
val worker = pb.start()

// Redirect worker stdout and stderr
Expand Down Expand Up @@ -149,10 +151,12 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String

try {
// Create and start the daemon
val pb = new ProcessBuilder(Seq(pythonExec, "-u", "-m", "pyspark.daemon"))
val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.daemon"))
val workerEnv = pb.environment()
workerEnv.putAll(envVars)
workerEnv.put("PYTHONPATH", pythonPath)
// This is equivalent to setting the -u flag; we use it because ipython doesn't support -u:
workerEnv.put("PYTHONUNBUFFERED", "YES")
daemon = pb.start()

val in = new DataInputStream(daemon.getInputStream)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
System.exit(-1)

case AssociationErrorEvent(cause, _, remoteAddress, _) =>
case AssociationErrorEvent(cause, _, remoteAddress, _, _) =>
println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
println(s"Cause was: $cause")
System.exit(-1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ object PythonRunner {
val pythonFile = args(0)
val pyFiles = args(1)
val otherArgs = args.slice(2, args.length)
val pythonExec = sys.env.get("PYSPARK_PYTHON").getOrElse("python") // TODO: get this from conf
val pythonExec =
sys.env.getOrElse("PYSPARK_DRIVER_PYTHON", sys.env.getOrElse("PYSPARK_PYTHON", "python"))

// Format python file paths before adding them to the PYTHONPATH
val formattedPythonFile = formatPath(pythonFile)
Expand All @@ -57,6 +58,7 @@ object PythonRunner {
val builder = new ProcessBuilder(Seq(pythonExec, formattedPythonFile) ++ otherArgs)
val env = builder.environment()
env.put("PYTHONPATH", pythonPath)
// This is equivalent to setting the -u flag; we use it because ipython doesn't support -u:
env.put("PYTHONUNBUFFERED", "YES") // value is needed to be set to a non-empty string
env.put("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort)
builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ private[spark] class AppClient(
logWarning(s"Connection to $address failed; waiting for master to reconnect...")
markDisconnected()

case AssociationErrorEvent(cause, _, address, _) if isPossibleMaster(address) =>
case AssociationErrorEvent(cause, _, address, _, _) if isPossibleMaster(address) =>
logWarning(s"Could not connect to $address: $cause")

case StopAppClient =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ private[spark] class WorkerWatcher(workerUrl: String)
case AssociatedEvent(localAddress, remoteAddress, inbound) if isWorker(remoteAddress) =>
logInfo(s"Successfully connected to $workerUrl")

case AssociationErrorEvent(cause, localAddress, remoteAddress, inbound)
case AssociationErrorEvent(cause, localAddress, remoteAddress, inbound, _)
if isWorker(remoteAddress) =>
// These logs may not be seen if the worker (and associated pipe) has died
logError(s"Could not initialize connection to worker $workerUrl. Exiting.")
Expand Down
2 changes: 0 additions & 2 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ private[spark] class Executor(

override def run() {
val startTime = System.currentTimeMillis()
SparkEnv.set(env)
Thread.currentThread.setContextClassLoader(replClassLoader)
val ser = SparkEnv.get.closureSerializer.newInstance()
logInfo(s"Running $taskName (TID $taskId)")
Expand All @@ -158,7 +157,6 @@ private[spark] class Executor(
val startGCTime = gcTime

try {
SparkEnv.set(env)
Accumulators.clear()
val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
updateDependencies(taskFiles, taskJars)
Expand Down
16 changes: 15 additions & 1 deletion core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,27 @@ sealed abstract class ManagedBuffer {
final class FileSegmentManagedBuffer(val file: File, val offset: Long, val length: Long)
extends ManagedBuffer {

/**
* Memory mapping is expensive and can destabilize the JVM (SPARK-1145, SPARK-3889).
* Avoid unless there's a good reason not to.
*/
private val MIN_MEMORY_MAP_BYTES = 2 * 1024 * 1024;

override def size: Long = length

override def nioByteBuffer(): ByteBuffer = {
var channel: FileChannel = null
try {
channel = new RandomAccessFile(file, "r").getChannel
channel.map(MapMode.READ_ONLY, offset, length)
// Just copy the buffer if it's sufficiently small, as memory mapping has a high overhead.
if (length < MIN_MEMORY_MAP_BYTES) {
val buf = ByteBuffer.allocate(length.toInt)
channel.read(buf, offset)
buf.flip()
buf
} else {
channel.map(MapMode.READ_ONLY, offset, length)
}
} catch {
case e: IOException =>
Try(channel.size).toOption match {
Expand Down
Loading

0 comments on commit c81806b

Please sign in to comment.