Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into streaming
Browse files Browse the repository at this point in the history
Conflicts:
	core/src/main/scala/org/apache/spark/SparkEnv.scala
  • Loading branch information
davies committed Oct 7, 2014
2 parents 8380064 + 553737c commit 6db00da
Show file tree
Hide file tree
Showing 17 changed files with 270 additions and 165 deletions.
2 changes: 1 addition & 1 deletion bin/compute-classpath.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd"
rem Build up classpath
set CLASSPATH=%SPARK_CLASSPATH%;%SPARK_SUBMIT_CLASSPATH%

if "x%SPARK_CONF_DIR%"!="x" (
if not "x%SPARK_CONF_DIR%"=="x" (
set CLASSPATH=%CLASSPATH%;%SPARK_CONF_DIR%
) else (
set CLASSPATH=%CLASSPATH%;%FWDIR%conf
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/resources/org/apache/spark/ui/static/webui.css
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ table.sortable thead {
cursor: pointer;
}

table.sortable td {
word-wrap: break-word;
max-width: 600px;
}

.progress {
margin-bottom: 0px; position: relative
}
Expand Down
2 changes: 0 additions & 2 deletions core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,6 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
arr.iterator.asInstanceOf[Iterator[T]]
case Right(it) =>
// There is not enough space to cache this partition in memory
logWarning(s"Not enough space to cache partition $key in memory! " +
s"Free memory is ${blockManager.memoryStore.freeMemory} bytes.")
val returnValues = it.asInstanceOf[Iterator[T]]
if (putLevel.useDisk) {
logWarning(s"Persisting partition $key to disk instead.")
Expand Down
28 changes: 8 additions & 20 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,8 @@ import org.apache.spark.util.{AkkaUtils, Utils}
* :: DeveloperApi ::
* Holds all the runtime environment objects for a running Spark instance (either master or worker),
* including the serializer, Akka actor system, block manager, map output tracker, etc. Currently
* Spark code finds the SparkEnv through a thread-local variable, so each thread that accesses these
* objects needs to have the right SparkEnv set. You can get the current environment with
* SparkEnv.get (e.g. after creating a SparkContext) and set it with SparkEnv.set.
* Spark code finds the SparkEnv through a global variable, so all the threads can access the same
* SparkEnv. It can be accessed by SparkEnv.get (e.g. after creating a SparkContext).
*
* NOTE: This is not intended for external use. This is exposed for Shark and may be made private
* in a future release.
Expand Down Expand Up @@ -91,9 +90,6 @@ class SparkEnv (
// actorSystem.awaitTermination()

// Note that blockTransferService is stopped by BlockManager since it is started by it.

// clear all the references in ThreadLocal object
SparkEnv.reset()
}

private[spark]
Expand Down Expand Up @@ -122,36 +118,28 @@ class SparkEnv (
}

object SparkEnv extends Logging {
@volatile private var env = new ThreadLocal[SparkEnv]
@volatile private var lastSetSparkEnv : SparkEnv = _
@volatile private var env: SparkEnv = _

private[spark] val driverActorSystemName = "sparkDriver"
private[spark] val executorActorSystemName = "sparkExecutor"

def set(e: SparkEnv) {
lastSetSparkEnv = e
env.set(e)
}

// clear all the threadlocal references
private[spark] def reset(): Unit = {
env = new ThreadLocal[SparkEnv]
lastSetSparkEnv = null
env = e
}

/**
* Returns the ThreadLocal SparkEnv, if non-null. Else returns the SparkEnv
* previously set in any thread.
* Returns the SparkEnv.
*/
def get: SparkEnv = {
Option(env.get()).getOrElse(lastSetSparkEnv)
env
}

/**
* Returns the ThreadLocal SparkEnv.
*/
@deprecated("Use SparkEnv.get instead", "1.2")
def getThreadLocal: SparkEnv = {
env.get()
env
}

private[spark] def create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,6 @@ private[spark] class PythonRDD(

override def run(): Unit = Utils.logUncaughtExceptions {
try {
SparkEnv.set(env)
val stream = new BufferedOutputStream(worker.getOutputStream, bufferSize)
val dataOut = new DataOutputStream(stream)
// Partition index
Expand Down Expand Up @@ -246,6 +245,11 @@ private[spark] class PythonRDD(
// will kill the whole executor (see org.apache.spark.executor.Executor).
_exception = e
worker.shutdownOutput()
} finally {
// Release memory used by this thread for shuffles
env.shuffleMemoryManager.releaseMemoryForThisThread()
// Release memory used by this thread for unrolling blocks
env.blockManager.memoryStore.releaseUnrollMemoryForThisThread()
}
}
}
Expand Down
2 changes: 0 additions & 2 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ private[spark] class Executor(

override def run() {
val startTime = System.currentTimeMillis()
SparkEnv.set(env)
Thread.currentThread.setContextClassLoader(replClassLoader)
val ser = SparkEnv.get.closureSerializer.newInstance()
logInfo(s"Running $taskName (TID $taskId)")
Expand All @@ -158,7 +157,6 @@ private[spark] class Executor(
val startGCTime = gcTime

try {
SparkEnv.set(env)
Accumulators.clear()
val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
updateDependencies(taskFiles, taskJars)
Expand Down
1 change: 0 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ private[spark] class PipedRDD[T: ClassTag](
// Start a thread to feed the process input from our parent's iterator
new Thread("stdin writer for " + command) {
override def run() {
SparkEnv.set(env)
val out = new PrintWriter(proc.getOutputStream)

// input the pipe context firstly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,6 @@ class DAGScheduler(
protected def runLocallyWithinThread(job: ActiveJob) {
var jobResult: JobResult = JobSucceeded
try {
SparkEnv.set(env)
val rdd = job.finalStage.rdd
val split = rdd.partitions(job.partitions(0))
val taskContext =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,6 @@ private[spark] class TaskSchedulerImpl(
* that tasks are balanced across the cluster.
*/
def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
SparkEnv.set(sc.env)

// Mark each slave as alive and remember its hostname
// Also track if new executor is added
var newExecAvail = false
Expand Down
45 changes: 39 additions & 6 deletions core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,6 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
PutResult(res.size, res.data, droppedBlocks)
case Right(iteratorValues) =>
// Not enough space to unroll this block; drop to disk if applicable
logWarning(s"Not enough space to store block $blockId in memory! " +
s"Free memory is $freeMemory bytes.")
if (level.useDisk && allowPersistToDisk) {
logWarning(s"Persisting block $blockId to disk instead.")
val res = blockManager.diskStore.putIterator(blockId, iteratorValues, level, returnValues)
Expand Down Expand Up @@ -265,6 +263,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
Left(vector.toArray)
} else {
// We ran out of space while unrolling the values for this block
logUnrollFailureMessage(blockId, vector.estimateSize())
Right(vector.iterator ++ values)
}

Expand Down Expand Up @@ -424,7 +423,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
* Reserve additional memory for unrolling blocks used by this thread.
* Return whether the request is granted.
*/
private[spark] def reserveUnrollMemoryForThisThread(memory: Long): Boolean = {
def reserveUnrollMemoryForThisThread(memory: Long): Boolean = {
accountingLock.synchronized {
val granted = freeMemory > currentUnrollMemory + memory
if (granted) {
Expand All @@ -439,7 +438,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
* Release memory used by this thread for unrolling blocks.
* If the amount is not specified, remove the current thread's allocation altogether.
*/
private[spark] def releaseUnrollMemoryForThisThread(memory: Long = -1L): Unit = {
def releaseUnrollMemoryForThisThread(memory: Long = -1L): Unit = {
val threadId = Thread.currentThread().getId
accountingLock.synchronized {
if (memory < 0) {
Expand All @@ -457,16 +456,50 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
/**
* Return the amount of memory currently occupied for unrolling blocks across all threads.
*/
private[spark] def currentUnrollMemory: Long = accountingLock.synchronized {
def currentUnrollMemory: Long = accountingLock.synchronized {
unrollMemoryMap.values.sum
}

/**
* Return the amount of memory currently occupied for unrolling blocks by this thread.
*/
private[spark] def currentUnrollMemoryForThisThread: Long = accountingLock.synchronized {
def currentUnrollMemoryForThisThread: Long = accountingLock.synchronized {
unrollMemoryMap.getOrElse(Thread.currentThread().getId, 0L)
}

/**
* Return the number of threads currently unrolling blocks.
*/
def numThreadsUnrolling: Int = accountingLock.synchronized { unrollMemoryMap.keys.size }

/**
* Log information about current memory usage.
*/
def logMemoryUsage(): Unit = {
val blocksMemory = currentMemory
val unrollMemory = currentUnrollMemory
val totalMemory = blocksMemory + unrollMemory
logInfo(
s"Memory use = ${Utils.bytesToString(blocksMemory)} (blocks) + " +
s"${Utils.bytesToString(unrollMemory)} (scratch space shared across " +
s"$numThreadsUnrolling thread(s)) = ${Utils.bytesToString(totalMemory)}. " +
s"Storage limit = ${Utils.bytesToString(maxMemory)}."
)
}

/**
* Log a warning for failing to unroll a block.
*
* @param blockId ID of the block we are trying to unroll.
* @param finalVectorSize Final size of the vector before unrolling failed.
*/
def logUnrollFailureMessage(blockId: BlockId, finalVectorSize: Long): Unit = {
logWarning(
s"Not enough space to cache $blockId in memory! " +
s"(computed ${Utils.bytesToString(finalVectorSize)} so far)"
)
logMemoryUsage()
}
}

private[spark] case class ResultWithDroppedBlocks(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,6 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {

/** Generate jobs and perform checkpoint for the given `time`. */
private def generateJobs(time: Time) {
SparkEnv.set(ssc.env)
Try(graph.generateJobs(time)) match {
case Success(jobs) =>
val receivedBlockInfo = graph.getReceiverInputStreams.map { stream =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
}
jobSet.handleJobStart(job)
logInfo("Starting job " + job.id + " from job set of time " + jobSet.time)
SparkEnv.set(ssc.env)
}

private def handleJobCompletion(job: Job) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,6 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging {
@transient val thread = new Thread() {
override def run() {
try {
SparkEnv.set(env)
startReceivers()
} catch {
case ie: InterruptedException => logInfo("ReceiverLauncher interrupted")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
private var rpc: YarnRPC = null
private var resourceManager: AMRMProtocol = _
private var uiHistoryAddress: String = _
private var registered: Boolean = false

override def register(
conf: YarnConfiguration,
Expand All @@ -51,8 +52,11 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
this.rpc = YarnRPC.create(conf)
this.uiHistoryAddress = uiHistoryAddress

resourceManager = registerWithResourceManager(conf)
registerApplicationMaster(uiAddress)
synchronized {
resourceManager = registerWithResourceManager(conf)
registerApplicationMaster(uiAddress)
registered = true
}

new YarnAllocationHandler(conf, sparkConf, resourceManager, getAttemptId(), args,
preferredNodeLocations, securityMgr)
Expand All @@ -66,14 +70,16 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
appAttemptId
}

override def shutdown(status: FinalApplicationStatus, diagnostics: String = "") = {
val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
.asInstanceOf[FinishApplicationMasterRequest]
finishReq.setAppAttemptId(getAttemptId())
finishReq.setFinishApplicationStatus(status)
finishReq.setDiagnostics(diagnostics)
finishReq.setTrackingUrl(uiHistoryAddress)
resourceManager.finishApplicationMaster(finishReq)
override def unregister(status: FinalApplicationStatus, diagnostics: String = "") = synchronized {
if (registered) {
val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
.asInstanceOf[FinishApplicationMasterRequest]
finishReq.setAppAttemptId(getAttemptId())
finishReq.setFinishApplicationStatus(status)
finishReq.setDiagnostics(diagnostics)
finishReq.setTrackingUrl(uiHistoryAddress)
resourceManager.finishApplicationMaster(finishReq)
}
}

override def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String) = {
Expand Down
Loading

0 comments on commit 6db00da

Please sign in to comment.