Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-1154: Clean up app folders in worker nodes #288

Closed
wants to merge 11 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ private[deploy] object DeployMessages {

case class KillDriver(driverId: String) extends DeployMessage

// Worker internal

case object WorkDirCleanup // Sent to Worker actor periodically for cleaning up app folders

// AppClient to Master

case class RegisterApplication(appDescription: ApplicationDescription)
Expand Down
23 changes: 22 additions & 1 deletion core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ private[spark] class Worker(
val REGISTRATION_TIMEOUT = 20.seconds
val REGISTRATION_RETRIES = 3

val CLEANUP_ENABLED = conf.getBoolean("spark.worker.cleanup.enabled", true)
// How often worker will clean up old app folders
val CLEANUP_INTERVAL_MILLIS = conf.getLong("spark.worker.cleanup.interval", 60 * 30) * 1000
// TTL for app folders/data; after TTL expires it will be cleaned up
val APP_DATA_RETENTION_SECS = conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600)

// Index into masterUrls that we're currently trying to register with.
var masterIndex = 0

Expand Down Expand Up @@ -179,12 +185,28 @@ private[spark] class Worker(
registered = true
changeMaster(masterUrl, masterWebUiUrl)
context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat)
if (CLEANUP_ENABLED) {
context.system.scheduler.schedule(CLEANUP_INTERVAL_MILLIS millis,
CLEANUP_INTERVAL_MILLIS millis, self, WorkDirCleanup)
}

case SendHeartbeat =>
masterLock.synchronized {
if (connected) { master ! Heartbeat(workerId) }
}

case WorkDirCleanup =>
// Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker actor
val cleanupFuture = concurrent.future {
logInfo("Cleaning up oldest application directories in " + workDir + " ...")
Utils.findOldFiles(workDir, APP_DATA_RETENTION_SECS)
.foreach(Utils.deleteRecursively)
}
cleanupFuture onFailure {
case e: Throwable =>
logError("App dir cleanup failed: " + e.getMessage, e)
}

case MasterChanged(masterUrl, masterWebUiUrl) =>
logInfo("Master has changed, new master is at " + masterUrl)
changeMaster(masterUrl, masterWebUiUrl)
Expand Down Expand Up @@ -331,7 +353,6 @@ private[spark] class Worker(
}

private[spark] object Worker {

def main(argStrings: Array[String]) {
val args = new WorkerArguments(argStrings)
val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,
Expand Down
15 changes: 15 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,21 @@ private[spark] object Utils extends Logging {
}
}

/**
* Finds all the files in a directory whose last modified time is older than cutoff seconds.
* @param dir must be the path to a directory, or IllegalArgumentException is thrown
* @param cutoff measured in seconds. Files older than this are returned.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't need to rev this again just for this - but is there supposed to be one space or two spaces after the parameter name? It's inconsistent here.

*/
def findOldFiles(dir: File, cutoff: Long): Seq[File] = {
val currentTimeMillis = System.currentTimeMillis
if (dir.isDirectory) {
val files = listFilesSafely(dir)
files.filter { file => file.lastModified < (currentTimeMillis - cutoff * 1000) }
} else {
throw new IllegalArgumentException(dir + " is not a directory!")
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason for not making this deleteOldFiles directly? I don't see another use case for getting old files other than deleting them. Is this because we need it for testing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Easier to test for sure. I just prefer smaller more functional pieces of code in general for easier testing and to limit side effects.


/**
* Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes.
*/
Expand Down
15 changes: 14 additions & 1 deletion core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.util

import scala.util.Random

import java.io.{ByteArrayOutputStream, ByteArrayInputStream, FileOutputStream}
import java.io.{File, ByteArrayOutputStream, ByteArrayInputStream, FileOutputStream}
import java.nio.{ByteBuffer, ByteOrder}

import com.google.common.base.Charsets
Expand Down Expand Up @@ -154,5 +154,18 @@ class UtilsSuite extends FunSuite {
val iterator = Iterator.range(0, 5)
assert(Utils.getIteratorSize(iterator) === 5L)
}

test("findOldFiles") {
// create some temporary directories and files
val parent: File = Utils.createTempDir()
val child1: File = Utils.createTempDir(parent.getCanonicalPath) // The parent directory has two child directories
val child2: File = Utils.createTempDir(parent.getCanonicalPath)
// set the last modified time of child1 to 10 secs old
child1.setLastModified(System.currentTimeMillis() - (1000 * 10))

val result = Utils.findOldFiles(parent, 5) // find files older than 5 secs
assert(result.size.equals(1))
assert(result(0).getCanonicalPath.equals(child1.getCanonicalPath))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to also test the actual "delete recursively" part. Right now if the deleting part fails for some reason this test doesn't catch it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true. We could add a unit test for deleteRecursively.

Actually, we noticed there isn't a test for Worker in general.

}

50 changes: 38 additions & 12 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,13 @@ Apart from these, the following properties are also available, and may be useful
<td>spark.ui.acls.enable</td>
<td>false</td>
<td>
Whether spark web ui acls should are enabled. If enabled, this checks to see if the user has
Whether spark web ui acls should are enabled. If enabled, this checks to see if the user has
access permissions to view the web ui. See <code>spark.ui.view.acls</code> for more details.
Also note this requires the user to be known, if the user comes across as null no checks
are done. Filters can be used to authenticate and set the user.
</td>
</tr>
<tr>
<tr>
<td>spark.ui.view.acls</td>
<td>Empty</td>
<td>
Expand Down Expand Up @@ -276,10 +276,10 @@ Apart from these, the following properties are also available, and may be useful
<td>spark.serializer.objectStreamReset</td>
<td>10000</td>
<td>
When serializing using org.apache.spark.serializer.JavaSerializer, the serializer caches
objects to prevent writing redundant data, however that stops garbage collection of those
objects. By calling 'reset' you flush that info from the serializer, and allow old
objects to be collected. To turn off this periodic reset set it to a value of <= 0.
When serializing using org.apache.spark.serializer.JavaSerializer, the serializer caches
objects to prevent writing redundant data, however that stops garbage collection of those
objects. By calling 'reset' you flush that info from the serializer, and allow old
objects to be collected. To turn off this periodic reset set it to a value of <= 0.
By default it will reset the serializer every 10,000 objects.
</td>
</tr>
Expand Down Expand Up @@ -333,6 +333,32 @@ Apart from these, the following properties are also available, and may be useful
receives no heartbeats.
</td>
</tr>
<tr>
<td>spark.worker.cleanup.enabled</td>
<td>true</td>
<td>
Enable periodic cleanup of worker / application directories. Note that this only affects standalone
mode, as YARN works differently.
</td>
</tr>
<tr>
<td>spark.worker.cleanup.interval</td>
<td>1800 (30 minutes)</td>
<td>
Controls the interval, in seconds, at which the worker cleans up old application work dirs
on the local machine.
</td>
</tr>
<tr>
<td>spark.worker.cleanup.appDataTtl</td>
<td>7 * 24 * 3600 (7 days)</td>
<td>
The number of seconds to retain application work directories on each worker. This is a Time To Live
and should depend on the amount of available disk space you have. Application logs and jars are
downloaded to each application work dir. Over time, the work dirs can quickly fill up disk space,
especially if you run jobs very frequently.
</td>
</tr>
<tr>
<td>spark.akka.frameSize</td>
<td>10</td>
Expand Down Expand Up @@ -375,7 +401,7 @@ Apart from these, the following properties are also available, and may be useful
<td>spark.akka.heartbeat.interval</td>
<td>1000</td>
<td>
This is set to a larger value to disable failure detector that comes inbuilt akka. It can be enabled again, if you plan to use this feature (Not recommended). A larger interval value in seconds reduces network overhead and a smaller value ( ~ 1 s) might be more informative for akka's failure detector. Tune this in combination of `spark.akka.heartbeat.pauses` and `spark.akka.failure-detector.threshold` if you need to. Only positive use case for using failure detector can be, a sensistive failure detector can help evict rogue executors really quick. However this is usually not the case as gc pauses and network lags are expected in a real spark cluster. Apart from that enabling this leads to a lot of exchanges of heart beats between nodes leading to flooding the network with those.
This is set to a larger value to disable failure detector that comes inbuilt akka. It can be enabled again, if you plan to use this feature (Not recommended). A larger interval value in seconds reduces network overhead and a smaller value ( ~ 1 s) might be more informative for akka's failure detector. Tune this in combination of `spark.akka.heartbeat.pauses` and `spark.akka.failure-detector.threshold` if you need to. Only positive use case for using failure detector can be, a sensistive failure detector can help evict rogue executors really quick. However this is usually not the case as gc pauses and network lags are expected in a real spark cluster. Apart from that enabling this leads to a lot of exchanges of heart beats between nodes leading to flooding the network with those.
</td>
</tr>
<tr>
Expand Down Expand Up @@ -430,7 +456,7 @@ Apart from these, the following properties are also available, and may be useful
<td>spark.broadcast.blockSize</td>
<td>4096</td>
<td>
Size of each piece of a block in kilobytes for <code>TorrentBroadcastFactory</code>.
Size of each piece of a block in kilobytes for <code>TorrentBroadcastFactory</code>.
Too large a value decreases parallelism during broadcast (makes it slower); however, if it is too small, <code>BlockManager</code> might take a performance hit.
</td>
</tr>
Expand Down Expand Up @@ -555,28 +581,28 @@ Apart from these, the following properties are also available, and may be useful
the driver.
</td>
</tr>
<tr>
<tr>
<td>spark.authenticate</td>
<td>false</td>
<td>
Whether spark authenticates its internal connections. See <code>spark.authenticate.secret</code> if not
running on Yarn.
</td>
</tr>
<tr>
<tr>
<td>spark.authenticate.secret</td>
<td>None</td>
<td>
Set the secret key used for Spark to authenticate between components. This needs to be set if
not running on Yarn and authentication is enabled.
</td>
</tr>
<tr>
<tr>
<td>spark.core.connection.auth.wait.timeout</td>
<td>30</td>
<td>
Number of seconds for the connection to wait for authentication to occur before timing
out and giving up.
out and giving up.
</td>
</tr>
<tr>
Expand Down