Skip to content

Commit

Permalink
working
Browse files Browse the repository at this point in the history
  • Loading branch information
hasnain-db committed Oct 11, 2023
1 parent c47a920 commit 0d64bc4
Show file tree
Hide file tree
Showing 8 changed files with 386 additions and 23 deletions.
132 changes: 126 additions & 6 deletions core/src/main/scala/org/apache/spark/SSLOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ package org.apache.spark

import java.io.File
import java.security.NoSuchAlgorithmException
import java.util.HashMap
import java.util.Map
import javax.net.ssl.SSLContext

import org.apache.hadoop.conf.Configuration
import org.eclipse.jetty.util.ssl.SslContextFactory

import org.apache.spark.internal.Logging
import org.apache.spark.network.util.ConfigProvider
import org.apache.spark.network.util.MapConfigProvider

/**
* SSLOptions class is a common container for SSL configuration options. It offers methods to
Expand All @@ -33,34 +37,53 @@ import org.apache.spark.internal.Logging
* SSLOptions is intended to provide the maximum common set of SSL settings, which are supported
* by the protocol, which it can generate the configuration for.
*
* @param namespace the configuration namespace
* @param enabled enables or disables SSL; if it is set to false, the rest of the
* settings are disregarded
* @param port the port where to bind the SSL server; if not defined, it will be
* based on the non-SSL port for the same service.
* @param keyStore a path to the key-store file
* @param keyStorePassword a password to access the key-store file
* @param privateKey a PKCS#8 private key file in PEM format
* @param keyPassword a password to access the private key in the key-store
* @param keyStoreType the type of the key-store
* @param needClientAuth set true if SSL needs client authentication
* @param certChain an X.509 certificate chain file in PEM format
* @param trustStore a path to the trust-store file
* @param trustStorePassword a password to access the trust-store file
* @param trustStoreType the type of the trust-store
* @param trustStoreReloadingEnabled enables or disables using a trust-store that reloads
* its configuration when the trust-store file on disk changes
* @param trustStoreReloadIntervalMs the interval, in milliseconds,
* when the trust-store will reload its configuration
* @param openSslEnabled enables or disables using an OpenSSL implementation (if available),
* requires certChain and keyFile arguments
* @param protocol SSL protocol (remember that SSLv3 was compromised) supported by Java
* @param enabledAlgorithms a set of encryption algorithms that may be used
* @param dangerouslyFallbackIfKeysNotPresent If SSL is enabled but key files are not present,
* fall back to unencrypted communication. This is an
* advanced option and not recommended for normal use.
*/
private[spark] case class SSLOptions(
namespace: Option[String] = None,
enabled: Boolean = false,
port: Option[Int] = None,
keyStore: Option[File] = None,
keyStorePassword: Option[String] = None,
privateKey: Option[File] = None,
keyPassword: Option[String] = None,
keyStoreType: Option[String] = None,
needClientAuth: Boolean = false,
certChain: Option[File] = None,
trustStore: Option[File] = None,
trustStorePassword: Option[String] = None,
trustStoreType: Option[String] = None,
trustStoreReloadingEnabled: Boolean = false,
trustStoreReloadIntervalMs: Int = 10000,
openSslEnabled: Boolean = false,
protocol: Option[String] = None,
enabledAlgorithms: Set[String] = Set.empty)
enabledAlgorithms: Set[String] = Set.empty,
dangerouslyFallbackIfKeysNotPresent: Boolean = false)
extends Logging {

/**
Expand Down Expand Up @@ -134,12 +157,44 @@ private[spark] case class SSLOptions(
supported
}

/**
*
* @return
*/
def createConfigProvider(conf: SparkConf): ConfigProvider = {
val nsp = namespace.getOrElse("spark.ssl")
val confMap: Map[String, String] = new HashMap[String, String]
conf.getAll.foreach(tuple => confMap.put(tuple._1, tuple._2))
confMap.put(s"$nsp.enabled", enabled.toString)
confMap.put(s"$nsp.trustStoreReloadingEnabled", trustStoreReloadingEnabled.toString)
confMap.put(s"$nsp.openSslEnabled", openSslEnabled.toString)
confMap.put(s"$nsp.trustStoreReloadIntervalMs", trustStoreReloadIntervalMs.toString)
keyStore.map(_.getAbsolutePath).foreach(confMap.put(s"$nsp.keyStore", _))
keyStorePassword.foreach(confMap.put(s"$nsp.keyStorePassword", _))
privateKey.map(_.getAbsolutePath).foreach(confMap.put(s"$nsp.privateKey", _))
keyPassword.foreach(confMap.put(s"$nsp.keyPassword", _))
certChain.map(_.getAbsolutePath).foreach(confMap.put(s"$nsp.certChain", _))
trustStore.map(_.getAbsolutePath).foreach(confMap.put(s"$nsp.trustStore", _))
trustStorePassword.foreach(confMap.put(s"$nsp.trustStorePassword", _))
protocol.foreach(confMap.put(s"$nsp.protocol", _))
confMap.put(s"$nsp.enabledAlgorithms", enabledAlgorithms.mkString(","))
confMap.put(
s"$nsp.dangerouslyFallbackIfKeysNotPresent", dangerouslyFallbackIfKeysNotPresent.toString)

new MapConfigProvider(confMap)
}

/** Returns a string representation of this SSLOptions with all the passwords masked. */
override def toString: String = s"SSLOptions{enabled=$enabled, port=$port, " +
s"keyStore=$keyStore, keyStorePassword=${keyStorePassword.map(_ => "xxx")}, " +
s"trustStore=$trustStore, trustStorePassword=${trustStorePassword.map(_ => "xxx")}, " +
s"protocol=$protocol, enabledAlgorithms=$enabledAlgorithms}"

s"privateKey=$privateKey, keyPassword=${keyPassword.map(_ => "xxx")}, " +
s"keyStoreType=$keyStoreType, needClientAuth=$needClientAuth, " +
s"certChain=$certChain, trustStore=$trustStore, " +
s"trustStorePassword=${trustStorePassword.map(_ => "xxx")}, " +
s"trustStoreReloadIntervalMs=$trustStoreReloadIntervalMs, " +
s"trustStoreReloadingEnabled=$trustStoreReloadingEnabled, openSSLEnabled=$openSslEnabled, " +
s"protocol=$protocol, enabledAlgorithms=$enabledAlgorithms}, " +
s"dangerouslyFallbackIfKeysNotPresent=$dangerouslyFallbackIfKeysNotPresent"
}

private[spark] object SSLOptions extends Logging {
Expand All @@ -152,15 +207,25 @@ private[spark] object SSLOptions extends Logging {
* $ - `[ns].port` - the port where to bind the SSL server
* $ - `[ns].keyStore` - a path to the key-store file; can be relative to the current directory
* $ - `[ns].keyStorePassword` - a password to the key-store file
* $ - `[ns].privateKey` - a PKCS#8 private key file in PEM format
* $ - `[ns].keyPassword` - a password to the private key
* $ - `[ns].keyStoreType` - the type of the key-store
* $ - `[ns].needClientAuth` - whether SSL needs client authentication
* $ - `[ns].certChain` - an X.509 certificate chain file in PEM format
* $ - `[ns].trustStore` - a path to the trust-store file; can be relative to the current
* directory
* $ - `[ns].trustStorePassword` - a password to the trust-store file
* $ - `[ns].trustStoreType` - the type of trust-store
* $ - `[ns].trustStoreReloadingEnabled` - enables or disables using a trust-store
* that reloads its configuration when the trust-store file on disk changes
* $ - `[ns].trustStoreReloadIntervalMs` - the interval, in milliseconds, the
* trust-store will reload its configuration
* $ - `[ns].openSslEnabled` - enables or disables using an OpenSSL implementation
* (if available on host system), requires certChain and keyFile arguments
* $ - `[ns].protocol` - a protocol name supported by a particular Java version
* $ - `[ns].enabledAlgorithms` - a comma separated list of ciphers
* $ - `[ns].dangerouslyFallbackIfKeysNotPresent` - whether to fallback to unencrypted
* communication if keys are not present.
*
* For a list of protocols and ciphers supported by particular Java versions, you may go to
* <a href="https://blogs.oracle.com/java-platform-group/entry/diagnosing_tls_ssl_and_https">
Expand All @@ -180,7 +245,15 @@ private[spark] object SSLOptions extends Logging {
hadoopConf: Configuration,
ns: String,
defaults: Option[SSLOptions] = None): SSLOptions = {
val enabled = conf.getBoolean(s"$ns.enabled", defaultValue = defaults.exists(_.enabled))

// RPC does not inherit the default enabled setting due to backwards compatibility reasons
val enabledDefault = if (ns == "spark.ssl.rpc") {
false
} else {
defaults.exists(_.enabled)
}

val enabled = conf.getBoolean(s"$ns.enabled", defaultValue = enabledDefault)
if (!enabled) {
return new SSLOptions()
}
Expand All @@ -194,15 +267,23 @@ private[spark] object SSLOptions extends Logging {

val keyStorePassword = conf.getWithSubstitution(s"$ns.keyStorePassword")
.orElse(Option(hadoopConf.getPassword(s"$ns.keyStorePassword")).map(new String(_)))
.orElse(Option(conf.getenv(ENV_RPC_SSL_KEY_STORE_PASSWORD)).filter(_.trim.nonEmpty))
.orElse(defaults.flatMap(_.keyStorePassword))

val privateKey = conf.getOption(s"$ns.privateKey").map(new File(_))
.orElse(defaults.flatMap(_.privateKey))

val keyPassword = conf.getWithSubstitution(s"$ns.keyPassword")
.orElse(Option(hadoopConf.getPassword(s"$ns.keyPassword")).map(new String(_)))
.orElse(Option(conf.getenv(ENV_RPC_SSL_KEY_PASSWORD)).filter(_.trim.nonEmpty))
.orElse(defaults.flatMap(_.keyPassword))

val keyStoreType = conf.getWithSubstitution(s"$ns.keyStoreType")
.orElse(defaults.flatMap(_.keyStoreType))

val certChain = conf.getOption(s"$ns.certChain").map(new File(_))
.orElse(defaults.flatMap(_.certChain))

val needClientAuth =
conf.getBoolean(s"$ns.needClientAuth", defaultValue = defaults.exists(_.needClientAuth))

Expand All @@ -211,11 +292,21 @@ private[spark] object SSLOptions extends Logging {

val trustStorePassword = conf.getWithSubstitution(s"$ns.trustStorePassword")
.orElse(Option(hadoopConf.getPassword(s"$ns.trustStorePassword")).map(new String(_)))
.orElse(Option(conf.getenv(ENV_RPC_SSL_TRUST_STORE_PASSWORD)).filter(_.trim.nonEmpty))
.orElse(defaults.flatMap(_.trustStorePassword))

val trustStoreType = conf.getWithSubstitution(s"$ns.trustStoreType")
.orElse(defaults.flatMap(_.trustStoreType))

val trustStoreReloadingEnabled = conf.getBoolean(s"$ns.trustStoreReloadingEnabled",
defaultValue = defaults.exists(_.trustStoreReloadingEnabled))

val trustStoreReloadIntervalMs = conf.getInt(s"$ns.trustStoreReloadIntervalMs",
defaultValue = defaults.map(_.trustStoreReloadIntervalMs).getOrElse(10000))

val openSslEnabled = conf.getBoolean(s"$ns.openSslEnabled",
defaultValue = defaults.exists(_.openSslEnabled))

val protocol = conf.getWithSubstitution(s"$ns.protocol")
.orElse(defaults.flatMap(_.protocol))

Expand All @@ -224,20 +315,49 @@ private[spark] object SSLOptions extends Logging {
.orElse(defaults.map(_.enabledAlgorithms))
.getOrElse(Set.empty)

val dangerouslyFallbackIfKeysNotPresent =
conf.getBoolean(s"$ns.dangerouslyFallbackIfKeysNotPresent",
defaultValue = defaults.exists(_.dangerouslyFallbackIfKeysNotPresent))

new SSLOptions(
Some(ns),
enabled,
port,
keyStore,
keyStorePassword,
privateKey,
keyPassword,
keyStoreType,
needClientAuth,
certChain,
trustStore,
trustStorePassword,
trustStoreType,
trustStoreReloadingEnabled,
trustStoreReloadIntervalMs,
openSslEnabled,
protocol,
enabledAlgorithms)
enabledAlgorithms,
dangerouslyFallbackIfKeysNotPresent)
}

// Config names and environment variables for propagating SSL passwords
val SPARK_RPC_SSL_KEY_PASSWORD_CONF = "spark.ssl.rpc.keyPassword"
val SPARK_RPC_SSL_KEY_STORE_PASSWORD_CONF = "spark.ssl.rpc.keyStorePassword"
val SPARK_RPC_SSL_TRUST_STORE_PASSWORD_CONF = "spark.ssl.rpc.trustStorePassword"
val SPARK_RPC_SSL_PASSWORD_FIELDS: Seq[String] = Seq(
SPARK_RPC_SSL_KEY_PASSWORD_CONF,
SPARK_RPC_SSL_KEY_STORE_PASSWORD_CONF,
SPARK_RPC_SSL_TRUST_STORE_PASSWORD_CONF
)

val ENV_RPC_SSL_KEY_PASSWORD = "_SPARK_SSL_RPC_KEY_PASSWORD"
val ENV_RPC_SSL_KEY_STORE_PASSWORD = "_SPARK_SSL_RPC_KEY_STORE_PASSWORD"
val ENV_RPC_SSL_TRUST_STORE_PASSWORD = "_SPARK_SSL_RPC_TRUST_STORE_PASSWORD"
val SPARK_RPC_SSL_PASSWORD_ENVS: Seq[String] = Seq(
ENV_RPC_SSL_KEY_PASSWORD,
ENV_RPC_SSL_KEY_STORE_PASSWORD,
ENV_RPC_SSL_TRUST_STORE_PASSWORD
)
}

66 changes: 61 additions & 5 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ private[spark] class SecurityManager(
setModifyAclsGroups(sparkConf.get(MODIFY_ACLS_GROUPS))

private var secretKey: String = _

private val isSslRPCEnabled = sparkConf.getBoolean(
"spark.ssl.rpc.enabled", false)

logInfo("SecurityManager: authentication " + (if (authOn) "enabled" else "disabled") +
"; ui acls " + (if (aclsOn) "enabled" else "disabled") +
"; users with view permissions: " +
Expand All @@ -94,12 +98,15 @@ private[spark] class SecurityManager(
"; users with modify permissions: " +
(if (modifyAcls.nonEmpty) modifyAcls.mkString(", ") else "EMPTY") +
"; groups with modify permissions: " +
(if (modifyAclsGroups.nonEmpty) modifyAclsGroups.mkString(", ") else "EMPTY"))
(if (modifyAclsGroups.nonEmpty) modifyAclsGroups.mkString(", ") else "EMPTY") +
"; RPC SSL " + (if (isSslRPCEnabled) "enabled" else "disabled"))

private val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf)
// the default SSL configuration - it will be used by all communication layers unless overwritten
private val defaultSSLOptions =
SSLOptions.parse(sparkConf, hadoopConf, "spark.ssl", defaults = None)
// the SSL configuration for RPCs
private val rpcSSLOptions = getSSLOptions("rpc")

def getSSLOptions(module: String): SSLOptions = {
val opts =
Expand Down Expand Up @@ -258,18 +265,46 @@ private[spark] class SecurityManager(
isUserInACL(user, modifyAcls, modifyAclsGroups)
}

/**
* Check to see if authentication is enabled and SSL for RPC is disabled (as they are mutually
* exclusive)
* @return Whether authentication is enabled and SSL for RPC is disabled
*/
private def isAuthenticationEnabledAndSslRpcDisabled(): Boolean = {
if (sparkConf.get(NETWORK_AUTH_ENABLED)) {
if (rpcSSLOptions.enabled) {
logWarning("Network auth disabled as RPC SSL encryption is enabled")
false
} else {
true
}
} else {
false
}
}


/**
* Check to see if authentication for the Spark communication protocols is enabled
* @return true if authentication is enabled, otherwise false
*/
def isAuthenticationEnabled(): Boolean = authOn
def isAuthenticationEnabled(): Boolean = isAuthenticationEnabledAndSslRpcDisabled

/**
* Checks whether network encryption should be enabled.
* @return Whether to enable encryption when connecting to services that support it.
*/
def isEncryptionEnabled(): Boolean = {
sparkConf.get(Network.NETWORK_CRYPTO_ENABLED) || sparkConf.get(SASL_ENCRYPTION_ENABLED)
if (sparkConf.get(Network.NETWORK_CRYPTO_ENABLED) || sparkConf.get(SASL_ENCRYPTION_ENABLED)) {
if (rpcSSLOptions.enabled) {
logWarning("Network encryption disabled as RPC SSL encryption is enabled")
false
} else {
true
}
} else {
false
}
}

/**
Expand All @@ -284,7 +319,7 @@ private[spark] class SecurityManager(
* @return the secret key as a String if authentication is enabled, otherwise returns null
*/
def getSecretKey(): String = {
if (isAuthenticationEnabled) {
if (authOn) {
val creds = UserGroupInformation.getCurrentUser().getCredentials()
Option(creds.getSecretKey(SECRET_LOOKUP_KEY))
.map { bytes => new String(bytes, UTF_8) }
Expand Down Expand Up @@ -318,7 +353,7 @@ private[spark] class SecurityManager(
def initializeAuth(): Unit = {
import SparkMasterRegex._

if (!sparkConf.get(NETWORK_AUTH_ENABLED)) {
if (!authOn) {
return
}

Expand Down Expand Up @@ -391,6 +426,27 @@ private[spark] class SecurityManager(
// Default SecurityManager only has a single secret key, so ignore appId.
override def getSaslUser(appId: String): String = getSaslUser()
override def getSecretKey(appId: String): String = getSecretKey()

/**
* If the RPC SSL settings are enabled, returns a map containing the password
* values so they can be passed to executors or other subprocesses.
*
* @return Map containing environment variables to pass
*/
def getEnvironmentForSslRpcPasswords: Map[String, String] = {
if (rpcSSLOptions.enabled) {
val map = scala.collection.mutable.Map[String, String]()
rpcSSLOptions.keyPassword.foreach(password =>
map += (SSLOptions.ENV_RPC_SSL_KEY_PASSWORD -> password))
rpcSSLOptions.keyStorePassword.foreach(password =>
map += (SSLOptions.ENV_RPC_SSL_KEY_STORE_PASSWORD -> password))
rpcSSLOptions.trustStorePassword.foreach(password =>
map += (SSLOptions.ENV_RPC_SSL_TRUST_STORE_PASSWORD -> password))
map.toMap
} else {
Map()
}
}
}

private[spark] object SecurityManager {
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,9 @@ private[spark] object SparkConf extends Logging {
(name.startsWith("spark.auth") && name != SecurityManager.SPARK_AUTH_SECRET_CONF) ||
name.startsWith("spark.rpc") ||
name.startsWith("spark.network") ||
// We need SSL configs to propagate as they may be needed for RPCs.
// Passwords are propagated separately though.
(name.startsWith("spark.ssl") && !name.contains("Password")) ||
isSparkPortConf(name)
}

Expand Down Expand Up @@ -804,5 +807,4 @@ private[spark] object SparkConf extends Logging {
key: String,
version: String,
translation: String => String = null)

}
Loading

0 comments on commit 0d64bc4

Please sign in to comment.