Skip to content

Commit

Permalink
address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
hasnain-db committed Oct 11, 2023
1 parent 0d64bc4 commit 91ba3a8
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 74 deletions.
24 changes: 3 additions & 21 deletions core/src/main/scala/org/apache/spark/SSLOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,6 @@ import org.apache.spark.network.util.MapConfigProvider
* requires certChain and keyFile arguments
* @param protocol SSL protocol (remember that SSLv3 was compromised) supported by Java
* @param enabledAlgorithms a set of encryption algorithms that may be used
* @param dangerouslyFallbackIfKeysNotPresent If SSL is enabled but key files are not present,
* fall back to unencrypted communication. This is an
* advanced option and not recommended for normal use.
*/
private[spark] case class SSLOptions(
namespace: Option[String] = None,
Expand All @@ -82,8 +79,7 @@ private[spark] case class SSLOptions(
trustStoreReloadIntervalMs: Int = 10000,
openSslEnabled: Boolean = false,
protocol: Option[String] = None,
enabledAlgorithms: Set[String] = Set.empty,
dangerouslyFallbackIfKeysNotPresent: Boolean = false)
enabledAlgorithms: Set[String] = Set.empty)
extends Logging {

/**
Expand Down Expand Up @@ -157,10 +153,6 @@ private[spark] case class SSLOptions(
supported
}

/**
*
* @return
*/
def createConfigProvider(conf: SparkConf): ConfigProvider = {
val nsp = namespace.getOrElse("spark.ssl")
val confMap: Map[String, String] = new HashMap[String, String]
Expand All @@ -178,8 +170,6 @@ private[spark] case class SSLOptions(
trustStorePassword.foreach(confMap.put(s"$nsp.trustStorePassword", _))
protocol.foreach(confMap.put(s"$nsp.protocol", _))
confMap.put(s"$nsp.enabledAlgorithms", enabledAlgorithms.mkString(","))
confMap.put(
s"$nsp.dangerouslyFallbackIfKeysNotPresent", dangerouslyFallbackIfKeysNotPresent.toString)

new MapConfigProvider(confMap)
}
Expand All @@ -193,8 +183,7 @@ private[spark] case class SSLOptions(
s"trustStorePassword=${trustStorePassword.map(_ => "xxx")}, " +
s"trustStoreReloadIntervalMs=$trustStoreReloadIntervalMs, " +
s"trustStoreReloadingEnabled=$trustStoreReloadingEnabled, openSSLEnabled=$openSslEnabled, " +
s"protocol=$protocol, enabledAlgorithms=$enabledAlgorithms}, " +
s"dangerouslyFallbackIfKeysNotPresent=$dangerouslyFallbackIfKeysNotPresent"
s"protocol=$protocol, enabledAlgorithms=$enabledAlgorithms}"
}

private[spark] object SSLOptions extends Logging {
Expand Down Expand Up @@ -224,8 +213,6 @@ private[spark] object SSLOptions extends Logging {
* (if available on host system), requires certChain and keyFile arguments
* $ - `[ns].protocol` - a protocol name supported by a particular Java version
* $ - `[ns].enabledAlgorithms` - a comma separated list of ciphers
* $ - `[ns].dangerouslyFallbackIfKeysNotPresent` - whether to fallback to unencrypted
* communication if keys are not present.
*
* For a list of protocols and ciphers supported by particular Java versions, you may go to
* <a href="https://blogs.oracle.com/java-platform-group/entry/diagnosing_tls_ssl_and_https">
Expand Down Expand Up @@ -315,10 +302,6 @@ private[spark] object SSLOptions extends Logging {
.orElse(defaults.map(_.enabledAlgorithms))
.getOrElse(Set.empty)

val dangerouslyFallbackIfKeysNotPresent =
conf.getBoolean(s"$ns.dangerouslyFallbackIfKeysNotPresent",
defaultValue = defaults.exists(_.dangerouslyFallbackIfKeysNotPresent))

new SSLOptions(
Some(ns),
enabled,
Expand All @@ -337,8 +320,7 @@ private[spark] object SSLOptions extends Logging {
trustStoreReloadIntervalMs,
openSslEnabled,
protocol,
enabledAlgorithms,
dangerouslyFallbackIfKeysNotPresent)
enabledAlgorithms)
}

// Config names and environment variables for propagating SSL passwords
Expand Down
35 changes: 11 additions & 24 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ private[spark] class SecurityManager(

private var secretKey: String = _

private val isSslRPCEnabled = sparkConf.getBoolean(
private val sslRpcEnabled = sparkConf.getBoolean(
"spark.ssl.rpc.enabled", false)

logInfo("SecurityManager: authentication " + (if (authOn) "enabled" else "disabled") +
Expand All @@ -99,7 +99,7 @@ private[spark] class SecurityManager(
(if (modifyAcls.nonEmpty) modifyAcls.mkString(", ") else "EMPTY") +
"; groups with modify permissions: " +
(if (modifyAclsGroups.nonEmpty) modifyAclsGroups.mkString(", ") else "EMPTY") +
"; RPC SSL " + (if (isSslRPCEnabled) "enabled" else "disabled"))
"; RPC SSL " + (if (sslRpcEnabled) "enabled" else "disabled"))

private val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf)
// the default SSL configuration - it will be used by all communication layers unless overwritten
Expand Down Expand Up @@ -265,30 +265,11 @@ private[spark] class SecurityManager(
isUserInACL(user, modifyAcls, modifyAclsGroups)
}

/**
* Check to see if authentication is enabled and SSL for RPC is disabled (as they are mutually
* exclusive)
* @return Whether authentication is enabled and SSL for RPC is disabled
*/
private def isAuthenticationEnabledAndSslRpcDisabled(): Boolean = {
if (sparkConf.get(NETWORK_AUTH_ENABLED)) {
if (rpcSSLOptions.enabled) {
logWarning("Network auth disabled as RPC SSL encryption is enabled")
false
} else {
true
}
} else {
false
}
}


/**
* Check to see if authentication for the Spark communication protocols is enabled
* @return true if authentication is enabled, otherwise false
*/
def isAuthenticationEnabled(): Boolean = isAuthenticationEnabledAndSslRpcDisabled
def isAuthenticationEnabled(): Boolean = authOn

/**
* Checks whether network encryption should be enabled.
Expand All @@ -307,6 +288,12 @@ private[spark] class SecurityManager(
}
}

/**
* Checks whether RPC SSL is enabled or not
* @return Whether RPC SSL is enabled or not
*/
def isSslRpcEnabled(): Boolean = sslRpcEnabled

/**
* Gets the user used for authenticating SASL connections.
* For now use a single hardcoded user.
Expand All @@ -319,7 +306,7 @@ private[spark] class SecurityManager(
* @return the secret key as a String if authentication is enabled, otherwise returns null
*/
def getSecretKey(): String = {
if (authOn) {
if (isAuthenticationEnabled) {
val creds = UserGroupInformation.getCurrentUser().getCredentials()
Option(creds.getSecretKey(SECRET_LOOKUP_KEY))
.map { bytes => new String(bytes, UTF_8) }
Expand Down Expand Up @@ -353,7 +340,7 @@ private[spark] class SecurityManager(
def initializeAuth(): Unit = {
import SparkMasterRegex._

if (!authOn) {
if (!sparkConf.get(NETWORK_AUTH_ENABLED)) {
return
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ object SparkEnv extends Logging {
}

ioEncryptionKey.foreach { _ =>
if (!securityManager.isEncryptionEnabled()) {
if (!(securityManager.isEncryptionEnabled() || securityManager.isSslRpcEnabled())) {
logWarning("I/O encryption enabled without RPC encryption: keys will be visible on the " +
"wire.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,32 +38,14 @@ object SparkTransportConf {
* This restriction will only occur if these properties are not already set.
* @param role optional role, could be driver, executor, worker and master. Default is
* [[None]], means no role specific configurations.
*/
def fromSparkConf(
_conf: SparkConf,
module: String,
numUsableCores: Int = 0,
role: Option[String] = None): TransportConf = {
SparkTransportConf.fromSparkConfWithSslOptions(
_conf, module, numUsableCores, role, sslOptions = None)
}
/**
* Utility for creating a [[TransportConf]] from a [[SparkConf]].
* @param _conf the [[SparkConf]]
* @param module the module name
* @param numUsableCores if nonzero, this will restrict the server and client threads to only
* use the given number of cores, rather than all of the machine's cores.
* This restriction will only occur if these properties are not already set.
* @param role optional role, could be driver, executor, worker and master. Default is
* [[None]], means no role specific configurations.
* @param sslOptions SSL config options
*/
def fromSparkConfWithSslOptions(
def fromSparkConf(
_conf: SparkConf,
module: String,
numUsableCores: Int = 0,
role: Option[String] = None,
sslOptions: Option[SSLOptions]): TransportConf = {
sslOptions: Option[SSLOptions] = None): TransportConf = {
val conf = _conf.clone
// specify default thread configuration based on our JVM's allocation of cores (rather than
// necessarily assuming we have all the machine's cores).
Expand All @@ -77,14 +59,14 @@ object SparkTransportConf {
conf.set(s"spark.$module.io.$suffix", value)
}

val configProvider = sslOptions.map(_.createConfigProvider(conf)).getOrElse({
val configProvider = sslOptions.map(_.createConfigProvider(conf)).getOrElse(
new ConfigProvider {
override def get(name: String): String = conf.get(name)
override def get(name: String, defaultValue: String): String = conf.get(name, defaultValue)
override def getAll(): java.lang.Iterable[java.util.Map.Entry[String, String]] = {
conf.getAll.toMap.asJava.entrySet()
}
}})
override def get(name: String): String = conf.get(name)
override def get(name: String, defaultValue: String): String = conf.get(name, defaultValue)
override def getAll(): java.lang.Iterable[java.util.Map.Entry[String, String]] = {
conf.getAll.toMap.asJava.entrySet()
}
})
new TransportConf(module, configProvider)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ class CommandUtilsSuite extends SparkFunSuite with Matchers with PrivateMethodTe
field => "-D" + field + "=" + secret
))

conf.set(SecurityManager.SPARK_AUTH_CONF, "true")
val cmd = CommandUtils invokePrivate buildLocalCommand(
command, new SecurityManager(conf), (t: String) => t, Seq(), Map())
SSLOptions.SPARK_RPC_SSL_PASSWORD_FIELDS.foreach(
Expand Down

0 comments on commit 91ba3a8

Please sign in to comment.