Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into configure-ports
Browse files Browse the repository at this point in the history
Conflicts:
	docs/spark-standalone.md
  • Loading branch information
andrewor14 committed Aug 5, 2014
2 parents de1b207 + 1c5555a commit bfbab28
Show file tree
Hide file tree
Showing 24 changed files with 1,108 additions and 193 deletions.
107 changes: 90 additions & 17 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,19 @@ import org.apache.spark.deploy.SparkHadoopUtil
* secure the UI if it has data that other users should not be allowed to see. The javax
* servlet filter specified by the user can authenticate the user and then once the user
* is logged in, Spark can compare that user versus the view acls to make sure they are
* authorized to view the UI. The configs 'spark.ui.acls.enable' and 'spark.ui.view.acls'
* authorized to view the UI. The configs 'spark.acls.enable' and 'spark.ui.view.acls'
* control the behavior of the acls. Note that the person who started the application
* always has view access to the UI.
*
* Spark has a set of modify acls (`spark.modify.acls`) that controls which users have permission
* to modify a single application. This would include things like killing the application. By
* default the person who started the application has modify access. For modify access through
* the UI, you must have a filter that does authentication in place for the modify acls to work
* properly.
*
* Spark also has a set of admin acls (`spark.admin.acls`) which is a set of users/administrators
* who always have permission to view or modify the Spark application.
*
* Spark does not currently support encryption after authentication.
*
* At this point spark has multiple communication protocols that need to be secured and
Expand Down Expand Up @@ -137,18 +146,32 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging {
private val sparkSecretLookupKey = "sparkCookie"

private val authOn = sparkConf.getBoolean("spark.authenticate", false)
private var uiAclsOn = sparkConf.getBoolean("spark.ui.acls.enable", false)
// keep spark.ui.acls.enable for backwards compatibility with 1.0
private var aclsOn = sparkConf.getOption("spark.acls.enable").getOrElse(
sparkConf.get("spark.ui.acls.enable", "false")).toBoolean

// admin acls should be set before view or modify acls
private var adminAcls: Set[String] =
stringToSet(sparkConf.get("spark.admin.acls", ""))

private var viewAcls: Set[String] = _

// list of users who have permission to modify the application. This should
// apply to both UI and CLI for things like killing the application.
private var modifyAcls: Set[String] = _

// always add the current user and SPARK_USER to the viewAcls
private val defaultAclUsers = Seq[String](System.getProperty("user.name", ""),
private val defaultAclUsers = Set[String](System.getProperty("user.name", ""),
Option(System.getenv("SPARK_USER")).getOrElse(""))

setViewAcls(defaultAclUsers, sparkConf.get("spark.ui.view.acls", ""))
setModifyAcls(defaultAclUsers, sparkConf.get("spark.modify.acls", ""))

private val secretKey = generateSecretKey()
logInfo("SecurityManager: authentication " + (if (authOn) "enabled" else "disabled") +
"; ui acls " + (if (uiAclsOn) "enabled" else "disabled") +
"; users with view permissions: " + viewAcls.toString())
"; ui acls " + (if (aclsOn) "enabled" else "disabled") +
"; users with view permissions: " + viewAcls.toString() +
"; users with modify permissions: " + modifyAcls.toString())

// Set our own authenticator to properly negotiate user/password for HTTP connections.
// This is needed by the HTTP client fetching from the HttpServer. Put here so its
Expand All @@ -169,18 +192,51 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging {
)
}

private[spark] def setViewAcls(defaultUsers: Seq[String], allowedUsers: String) {
viewAcls = (defaultUsers ++ allowedUsers.split(',')).map(_.trim()).filter(!_.isEmpty).toSet
/**
* Split a comma separated String, filter out any empty items, and return a Set of strings
*/
private def stringToSet(list: String): Set[String] = {
list.split(',').map(_.trim).filter(!_.isEmpty).toSet
}

/**
* Admin acls should be set before the view or modify acls. If you modify the admin
* acls you should also set the view and modify acls again to pick up the changes.
*/
def setViewAcls(defaultUsers: Set[String], allowedUsers: String) {
viewAcls = (adminAcls ++ defaultUsers ++ stringToSet(allowedUsers))
logInfo("Changing view acls to: " + viewAcls.mkString(","))
}

private[spark] def setViewAcls(defaultUser: String, allowedUsers: String) {
setViewAcls(Seq[String](defaultUser), allowedUsers)
def setViewAcls(defaultUser: String, allowedUsers: String) {
setViewAcls(Set[String](defaultUser), allowedUsers)
}

def getViewAcls: String = viewAcls.mkString(",")

/**
* Admin acls should be set before the view or modify acls. If you modify the admin
* acls you should also set the view and modify acls again to pick up the changes.
*/
def setModifyAcls(defaultUsers: Set[String], allowedUsers: String) {
modifyAcls = (adminAcls ++ defaultUsers ++ stringToSet(allowedUsers))
logInfo("Changing modify acls to: " + modifyAcls.mkString(","))
}

def getModifyAcls: String = modifyAcls.mkString(",")

/**
* Admin acls should be set before the view or modify acls. If you modify the admin
* acls you should also set the view and modify acls again to pick up the changes.
*/
def setAdminAcls(adminUsers: String) {
adminAcls = stringToSet(adminUsers)
logInfo("Changing admin acls to: " + adminAcls.mkString(","))
}

private[spark] def setUIAcls(aclSetting: Boolean) {
uiAclsOn = aclSetting
logInfo("Changing acls enabled to: " + uiAclsOn)
def setAcls(aclSetting: Boolean) {
aclsOn = aclSetting
logInfo("Changing acls enabled to: " + aclsOn)
}

/**
Expand Down Expand Up @@ -224,22 +280,39 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging {
* Check to see if Acls for the UI are enabled
* @return true if UI authentication is enabled, otherwise false
*/
def uiAclsEnabled(): Boolean = uiAclsOn
def aclsEnabled(): Boolean = aclsOn

/**
* Checks the given user against the view acl list to see if they have
* authorization to view the UI. If the UI acls must are disabled
* via spark.ui.acls.enable, all users have view access.
* authorization to view the UI. If the UI acls are disabled
* via spark.acls.enable, all users have view access. If the user is null
* it is assumed authentication is off and all users have access.
*
* @param user to see if is authorized
* @return true is the user has permission, otherwise false
*/
def checkUIViewPermissions(user: String): Boolean = {
logDebug("user=" + user + " uiAclsEnabled=" + uiAclsEnabled() + " viewAcls=" +
logDebug("user=" + user + " aclsEnabled=" + aclsEnabled() + " viewAcls=" +
viewAcls.mkString(","))
if (uiAclsEnabled() && (user != null) && (!viewAcls.contains(user))) false else true
if (aclsEnabled() && (user != null) && (!viewAcls.contains(user))) false else true
}

/**
* Checks the given user against the modify acl list to see if they have
* authorization to modify the application. If the UI acls are disabled
* via spark.acls.enable, all users have modify access. If the user is null
* it is assumed authentication isn't turned on and all users have access.
*
* @param user to see if is authorized
* @return true is the user has permission, otherwise false
*/
def checkModifyPermissions(user: String): Boolean = {
logDebug("user=" + user + " aclsEnabled=" + aclsEnabled() + " modifyAcls=" +
modifyAcls.mkString(","))
if (aclsEnabled() && (user != null) && (!modifyAcls.contains(user))) false else true
}


/**
* Check to see if authentication for the Spark communication protocols is enabled
* @return true if authentication is enabled, otherwise false
Expand Down
14 changes: 14 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,20 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
}
}

// Validate memory fractions
val memoryKeys = Seq(
"spark.storage.memoryFraction",
"spark.shuffle.memoryFraction",
"spark.shuffle.safetyFraction",
"spark.storage.unrollFraction",
"spark.storage.safetyFraction")
for (key <- memoryKeys) {
val value = getDouble(key, 0.5)
if (value > 1 || value < 0) {
throw new IllegalArgumentException("$key should be between 0 and 1 (was '$value').")
}
}

// Check for legacy configs
sys.env.get("SPARK_JAVA_OPTS").foreach { value =>
val warning =
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.network.ConnectionManager
import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.ShuffleManager
import org.apache.spark.shuffle.{ShuffleMemoryManager, ShuffleManager}
import org.apache.spark.storage._
import org.apache.spark.util.{AkkaUtils, Utils}

Expand Down Expand Up @@ -65,12 +65,9 @@ class SparkEnv (
val httpFileServer: HttpFileServer,
val sparkFilesDir: String,
val metricsSystem: MetricsSystem,
val shuffleMemoryManager: ShuffleMemoryManager,
val conf: SparkConf) extends Logging {

// A mapping of thread ID to amount of memory, in bytes, used for shuffle aggregations
// All accesses should be manually synchronized
val shuffleMemoryMap = mutable.HashMap[Long, Long]()

private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()

// A general, soft-reference map for metadata needed during HadoopRDD split computation
Expand Down Expand Up @@ -252,6 +249,8 @@ object SparkEnv extends Logging {
val shuffleManager = instantiateClass[ShuffleManager](
"spark.shuffle.manager", "org.apache.spark.shuffle.hash.HashShuffleManager")

val shuffleMemoryManager = new ShuffleMemoryManager(conf)

// Warn about deprecated spark.cache.class property
if (conf.contains("spark.cache.class")) {
logWarning("The spark.cache.class property is no longer being used! Specify storage " +
Expand All @@ -273,6 +272,7 @@ object SparkEnv extends Logging {
httpFileServer,
sparkFilesDir,
metricsSystem,
shuffleMemoryManager,
conf)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,9 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis

if (ui != null) {
val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
ui.getSecurityManager.setUIAcls(uiAclsEnabled)
ui.getSecurityManager.setAcls(uiAclsEnabled)
// make sure to set admin acls before view acls so properly picked up
ui.getSecurityManager.setAdminAcls(appListener.adminAcls)
ui.getSecurityManager.setViewAcls(appListener.sparkUser, appListener.viewAcls)
}
(appInfo, ui)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ private[spark] class MasterArguments(args: Array[String], conf: SparkConf) {
if (System.getenv("SPARK_MASTER_WEBUI_PORT") != null) {
webUiPort = System.getenv("SPARK_MASTER_WEBUI_PORT").toInt
}
if (conf.contains("master.ui.port")) {
webUiPort = conf.get("master.ui.port").toInt
if (conf.contains("spark.master.ui.port")) {
webUiPort = conf.get("spark.master.ui.port").toInt
}

parse(args.toList)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,6 @@ private[spark] object WorkerWebUI {
val STATIC_RESOURCE_BASE = SparkUI.STATIC_RESOURCE_DIR

def getUIPort(requestedPort: Option[Int], conf: SparkConf): Int = {
requestedPort.getOrElse(conf.getInt("worker.ui.port", WorkerWebUI.DEFAULT_PORT))
requestedPort.getOrElse(conf.getInt("spark.worker.ui.port", WorkerWebUI.DEFAULT_PORT))
}
}
5 changes: 1 addition & 4 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -277,10 +277,7 @@ private[spark] class Executor(
}
} finally {
// Release memory used by this thread for shuffles
val shuffleMemoryMap = env.shuffleMemoryMap
shuffleMemoryMap.synchronized {
shuffleMemoryMap.remove(Thread.currentThread().getId)
}
env.shuffleMemoryManager.releaseMemoryForThisThread()
// Release memory used by this thread for unrolling blocks
env.blockManager.memoryStore.releaseUnrollMemoryForThisThread()
runningTasks.remove(taskId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ private[spark] class ApplicationEventListener extends SparkListener {
var startTime = -1L
var endTime = -1L
var viewAcls = ""
var enableViewAcls = false
var adminAcls = ""

def applicationStarted = startTime != -1

Expand All @@ -55,7 +55,7 @@ private[spark] class ApplicationEventListener extends SparkListener {
val environmentDetails = environmentUpdate.environmentDetails
val allProperties = environmentDetails("Spark Properties").toMap
viewAcls = allProperties.getOrElse("spark.ui.view.acls", "")
enableViewAcls = allProperties.getOrElse("spark.ui.acls.enable", "false").toBoolean
adminAcls = allProperties.getOrElse("spark.admin.acls", "")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ class KryoSerializer(conf: SparkConf)
with Logging
with Serializable {

private val bufferSize = conf.getInt("spark.kryoserializer.buffer.mb", 2) * 1024 * 1024
private val bufferSize =
(conf.getDouble("spark.kryoserializer.buffer.mb", 0.064) * 1024 * 1024).toInt

private val maxBufferSize = conf.getInt("spark.kryoserializer.buffer.max.mb", 64) * 1024 * 1024
private val referenceTracking = conf.getBoolean("spark.kryo.referenceTracking", true)
private val registrationRequired = conf.getBoolean("spark.kryo.registrationRequired", false)
Expand Down
Loading

0 comments on commit bfbab28

Please sign in to comment.