Skip to content

Commit

Permalink
SPARK-1676 Cache Hadoop UGIs by default to prevent FileSystem leak
Browse files Browse the repository at this point in the history
UserGroupInformation objects (UGIs) are used for Hadoop security. A relatively
recent PR (apache#29) makes Spark always use UGIs when executing tasks. Unfortunately,
this causes HDFS-3545, which causes the FileSystem cache to continuously create
new FileSystems, as the UGIs look different (even though they're logically
identical). This causes a memory and sometimes file descriptor leak for FileSystems
(like S3N) which maintain open connections.

This solution is to introduce a config option (enabled by default) which reuses a
single Spark user UGI, rather than creating new ones for each task. The downside
to this approach is that UGIs cannot be safely cached (see the notes in HDFS-3545).
For example, if a token expires, it will never be cleared from the UGI but may
be used anyway (usage of a particular token on a UGI is nondeterministic as it is
backed by a Set).

This setting is enabled by default because the memory leak can become serious
very quickly. In one benchmark, attempting to read 10k files from an S3 directory
caused 45k connections to remain open to S3 after the job completed. These file
descriptors are never cleaned up, nor the memory used by the associated
FileSystems.

Conflicts:
	docs/configuration.md
	yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
  • Loading branch information
aarondav committed May 2, 2014
1 parent 54c3b7e commit cca6a8a
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 14 deletions.
27 changes: 20 additions & 7 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,28 @@ class SparkHadoopUtil {
val conf = newConfiguration()
UserGroupInformation.setConfiguration(conf)

def runAsUser(user: String)(func: () => Unit) {
/** Creates a UserGroupInformation for Spark based on SPARK_USER environment variable. */
def createSparkUser(): Option[UserGroupInformation] = {
val user = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER)
if (user != SparkContext.SPARK_UNKNOWN_USER) {
val ugi = UserGroupInformation.createRemoteUser(user)
transferCredentials(UserGroupInformation.getCurrentUser(), ugi)
ugi.doAs(new PrivilegedExceptionAction[Unit] {
def run: Unit = func()
})
Some(UserGroupInformation.createRemoteUser(user))
} else {
func()
None
}
}

/**
* If a user is specified, we will run the function as that user. We additionally transfer
* Spark's tokens to the given UGI to ensure it has access to data written by Spark.
*/
def runAsUser(user: Option[UserGroupInformation])(func: () => Unit) {
user match {
case Some(ugi) => {
transferCredentials(UserGroupInformation.getCurrentUser(), ugi)
ugi.doAs(new PrivilegedExceptionAction[Unit] {
def run: Unit = func()
})}
case None => func()
}
}

Expand Down
10 changes: 8 additions & 2 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,13 @@ private[spark] class Executor(
// Maintains the list of running tasks.
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]

val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER)
// NB: Workaround for SPARK-1676. Caching UGIs prevents continuously creating FileSystem
// objects with "unique" UGIs, but is not a good solution if real UGIs and tokens are needed,
// mainly because expired tokens cannot be removed from the UGI.
val cacheUgi = conf.getBoolean("spark.user.cacheUserGroupInformation", true)

val cachedSparkUser = SparkHadoopUtil.get.createSparkUser()
def getSparkUser = if (cacheUgi) cachedSparkUser else SparkHadoopUtil.get.createSparkUser()

def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
val tr = new TaskRunner(context, taskId, serializedTask)
Expand Down Expand Up @@ -173,7 +179,7 @@ private[spark] class Executor(
}
}

override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
override def run(): Unit = SparkHadoopUtil.get.runAsUser(getSparkUser) { () =>
val startTime = System.currentTimeMillis()
SparkEnv.set(env)
Thread.currentThread.setContextClassLoader(replClassLoader)
Expand Down
12 changes: 12 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,18 @@ Apart from these, the following properties are also available, and may be useful
applications; you can set it through <code>SPARK_JAVA_OPTS</code> in <code>spark-env.sh</code>.
</td>
</tr>
<tr>
<td>spark.user.cacheUserGroupInformation</td>
<td>true</td>
<td>
Caching UGIs is a workaround for [SPARK-1676](https://issues.apache.org/jira/browse/SPARK-1676)
for users who are not using security in a very serious manner. Caching UGIs can produce
security-related exceptions when tokens have an expiry, or are shared between users. On the other
hand, not caching UGIs means that every FileSystem.get() call can potentially create and cache a
new FileSystem object, which leads to leaked memory and file descriptors.
</td>
</tr>

</table>

## Viewing Spark Properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,

private var registered = false

private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
SparkContext.SPARK_UNKNOWN_USER)
private val sparkUser = SparkHadoopUtil.createSparkUser()

def run() {
// Setup the directories so things go to yarn approved directories rather
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
math.max(args.numWorkers * 2, 3))

private var registered = false

private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
SparkContext.SPARK_UNKNOWN_USER)

private val sparkUser = SparkHadoopUtil.createSparkUser()

def run() {
// Setup the directories so things go to YARN approved directories rather
Expand Down

0 comments on commit cca6a8a

Please sign in to comment.