diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 00a43673e5cd3..71650cd773bcf 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -42,7 +42,7 @@ private[spark] class ExecutorRunner( val workerId: String, val host: String, val sparkHome: File, - val workDir: File, + val executorDir: File, val workerUrl: String, val conf: SparkConf, var state: ExecutorState.Value) @@ -130,12 +130,6 @@ private[spark] class ExecutorRunner( */ def fetchAndRunExecutor() { try { - // Create the executor's working directory - val executorDir = new File(workDir, appId + "/" + execId) - if (!executorDir.mkdirs()) { - throw new IOException("Failed to create directory " + executorDir) - } - // Launch the process val command = getCommandSeq logInfo("Launch command: " + command.mkString("\"", "\" \"", "\"")) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 0c454e4138c96..3b13f43a1868c 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -18,15 +18,18 @@ package org.apache.spark.deploy.worker import java.io.File +import java.io.IOException import java.text.SimpleDateFormat import java.util.Date +import scala.collection.JavaConversions._ import scala.collection.mutable.HashMap import scala.concurrent.duration._ import scala.language.postfixOps import akka.actor._ import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent} +import org.apache.commons.io.FileUtils import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.{ExecutorDescription, ExecutorState} @@ -191,6 +194,7 @@ private[spark] class Worker( changeMaster(masterUrl, masterWebUiUrl) context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat) if (CLEANUP_ENABLED) { + logInfo(s"Worker cleanup enabled; old application directories will be deleted in: $workDir") context.system.scheduler.schedule(CLEANUP_INTERVAL_MILLIS millis, CLEANUP_INTERVAL_MILLIS millis, self, WorkDirCleanup) } @@ -201,10 +205,23 @@ private[spark] class Worker( case WorkDirCleanup => // Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker actor val cleanupFuture = concurrent.future { - logInfo("Cleaning up oldest application directories in " + workDir + " ...") - Utils.findOldFiles(workDir, APP_DATA_RETENTION_SECS) - .foreach(Utils.deleteRecursively) + val appDirs = workDir.listFiles() + if (appDirs == null) { + throw new IOException("ERROR: Failed to list files in " + appDirs) + } + appDirs.filter { dir => + // the directory is used by an application - check that the application is not running + // when cleaning up + val appIdFromDir = dir.getName + val isAppStillRunning = executors.values.map(_.appId).contains(appIdFromDir) + dir.isDirectory && !isAppStillRunning && + !Utils.doesDirectoryContainAnyNewFiles(dir, APP_DATA_RETENTION_SECS) + }.foreach { dir => + logInfo(s"Removing directory: ${dir.getPath}") + Utils.deleteRecursively(dir) + } } + cleanupFuture onFailure { case e: Throwable => logError("App dir cleanup failed: " + e.getMessage, e) @@ -233,8 +250,15 @@ private[spark] class Worker( } else { try { logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) + + // Create the executor's working directory + val executorDir = new File(workDir, appId + "/" + execId) + if (!executorDir.mkdirs()) { + throw new IOException("Failed to create directory " + executorDir) + } + val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, - self, workerId, host, sparkHome, workDir, akkaUrl, conf, ExecutorState.LOADING) + self, workerId, host, sparkHome, executorDir, akkaUrl, conf, ExecutorState.LOADING) executors(appId + "/" + execId) = manager manager.start() coresUsed += cores_ @@ -242,12 +266,13 @@ private[spark] class Worker( master ! ExecutorStateChanged(appId, execId, manager.state, None, None) } catch { case e: Exception => { - logError("Failed to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) + logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e) if (executors.contains(appId + "/" + execId)) { executors(appId + "/" + execId).kill() executors -= appId + "/" + execId } - master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None) + master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, + Some(e.toString), None) } } } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 9399ddab76331..a67124140f9da 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -35,6 +35,8 @@ import scala.util.control.{ControlThrowable, NonFatal} import com.google.common.io.Files import com.google.common.util.concurrent.ThreadFactoryBuilder +import org.apache.commons.io.FileUtils +import org.apache.commons.io.filefilter.TrueFileFilter import org.apache.commons.lang3.SystemUtils import org.apache.hadoop.conf.Configuration import org.apache.log4j.PropertyConfigurator @@ -705,17 +707,20 @@ private[spark] object Utils extends Logging { } /** - * Finds all the files in a directory whose last modified time is older than cutoff seconds. - * @param dir must be the path to a directory, or IllegalArgumentException is thrown - * @param cutoff measured in seconds. Files older than this are returned. + * Determines if a directory contains any files newer than cutoff seconds. + * + * @param dir must be the path to a directory, or IllegalArgumentException is thrown + * @param cutoff measured in seconds. Returns true if there are any files in dir newer than this. */ - def findOldFiles(dir: File, cutoff: Long): Seq[File] = { + def doesDirectoryContainAnyNewFiles(dir: File, cutoff: Long): Boolean = { val currentTimeMillis = System.currentTimeMillis - if (dir.isDirectory) { - val files = listFilesSafely(dir) - files.filter { file => file.lastModified < (currentTimeMillis - cutoff * 1000) } + if (!dir.isDirectory) { + throw new IllegalArgumentException (dir + " is not a directory!") } else { - throw new IllegalArgumentException(dir + " is not a directory!") + val files = FileUtils.listFilesAndDirs(dir, TrueFileFilter.TRUE, TrueFileFilter.TRUE) + val cutoffTimeInMillis = (currentTimeMillis - (cutoff * 1000)) + val newFiles = files.filter { _.lastModified > cutoffTimeInMillis } + newFiles.nonEmpty } } diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 70d423ba8a04d..e63d9d085e385 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -189,17 +189,28 @@ class UtilsSuite extends FunSuite { assert(Utils.getIteratorSize(iterator) === 5L) } - test("findOldFiles") { + test("doesDirectoryContainFilesNewerThan") { // create some temporary directories and files val parent: File = Utils.createTempDir() val child1: File = Utils.createTempDir(parent.getCanonicalPath) // The parent directory has two child directories val child2: File = Utils.createTempDir(parent.getCanonicalPath) - // set the last modified time of child1 to 10 secs old - child1.setLastModified(System.currentTimeMillis() - (1000 * 10)) + val child3: File = Utils.createTempDir(child1.getCanonicalPath) + // set the last modified time of child1 to 30 secs old + child1.setLastModified(System.currentTimeMillis() - (1000 * 30)) - val result = Utils.findOldFiles(parent, 5) // find files older than 5 secs - assert(result.size.equals(1)) - assert(result(0).getCanonicalPath.equals(child1.getCanonicalPath)) + // although child1 is old, child2 is still new so return true + assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5)) + + child2.setLastModified(System.currentTimeMillis - (1000 * 30)) + assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5)) + + parent.setLastModified(System.currentTimeMillis - (1000 * 30)) + // although parent and its immediate children are new, child3 is still old + // we expect a full recursive search for new files. + assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5)) + + child3.setLastModified(System.currentTimeMillis - (1000 * 30)) + assert(!Utils.doesDirectoryContainAnyNewFiles(parent, 5)) } test("resolveURI") {