Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-45427][CORE] Add RPC SSL settings to SSLOptions and SparkTransportConf #43238

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 105 additions & 3 deletions core/src/main/scala/org/apache/spark/SSLOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ package org.apache.spark

import java.io.File
import java.security.NoSuchAlgorithmException
import java.util.HashMap
import java.util.Map
import javax.net.ssl.SSLContext

import org.apache.hadoop.conf.Configuration
import org.eclipse.jetty.util.ssl.SslContextFactory

import org.apache.spark.internal.Logging
import org.apache.spark.network.util.ConfigProvider
import org.apache.spark.network.util.MapConfigProvider

/**
* SSLOptions class is a common container for SSL configuration options. It offers methods to
Expand All @@ -33,32 +37,47 @@ import org.apache.spark.internal.Logging
* SSLOptions is intended to provide the maximum common set of SSL settings, which are supported
* by the protocol, which it can generate the configuration for.
*
* @param namespace the configuration namespace
* @param enabled enables or disables SSL; if it is set to false, the rest of the
* settings are disregarded
* @param port the port where to bind the SSL server; if not defined, it will be
* based on the non-SSL port for the same service.
* @param keyStore a path to the key-store file
* @param keyStorePassword a password to access the key-store file
* @param privateKey a PKCS#8 private key file in PEM format
* @param keyPassword a password to access the private key in the key-store
* @param keyStoreType the type of the key-store
* @param needClientAuth set true if SSL needs client authentication
* @param certChain an X.509 certificate chain file in PEM format
* @param trustStore a path to the trust-store file
* @param trustStorePassword a password to access the trust-store file
* @param trustStoreType the type of the trust-store
* @param trustStoreReloadingEnabled enables or disables using a trust-store that reloads
* its configuration when the trust-store file on disk changes
* @param trustStoreReloadIntervalMs the interval, in milliseconds,
* when the trust-store will reload its configuration
* @param openSslEnabled enables or disables using an OpenSSL implementation (if available),
* requires certChain and keyFile arguments
* @param protocol SSL protocol (remember that SSLv3 was compromised) supported by Java
* @param enabledAlgorithms a set of encryption algorithms that may be used
*/
private[spark] case class SSLOptions(
namespace: Option[String] = None,
enabled: Boolean = false,
port: Option[Int] = None,
keyStore: Option[File] = None,
keyStorePassword: Option[String] = None,
privateKey: Option[File] = None,
keyPassword: Option[String] = None,
keyStoreType: Option[String] = None,
needClientAuth: Boolean = false,
certChain: Option[File] = None,
trustStore: Option[File] = None,
trustStorePassword: Option[String] = None,
trustStoreType: Option[String] = None,
trustStoreReloadingEnabled: Boolean = false,
trustStoreReloadIntervalMs: Int = 10000,
openSslEnabled: Boolean = false,
protocol: Option[String] = None,
enabledAlgorithms: Set[String] = Set.empty)
extends Logging {
Expand Down Expand Up @@ -134,12 +153,37 @@ private[spark] case class SSLOptions(
supported
}

def createConfigProvider(conf: SparkConf): ConfigProvider = {
val nsp = namespace.getOrElse("spark.ssl")
val confMap: Map[String, String] = new HashMap[String, String]
conf.getAll.foreach(tuple => confMap.put(tuple._1, tuple._2))
confMap.put(s"$nsp.enabled", enabled.toString)
confMap.put(s"$nsp.trustStoreReloadingEnabled", trustStoreReloadingEnabled.toString)
confMap.put(s"$nsp.openSslEnabled", openSslEnabled.toString)
confMap.put(s"$nsp.trustStoreReloadIntervalMs", trustStoreReloadIntervalMs.toString)
keyStore.map(_.getAbsolutePath).foreach(confMap.put(s"$nsp.keyStore", _))
keyStorePassword.foreach(confMap.put(s"$nsp.keyStorePassword", _))
privateKey.map(_.getAbsolutePath).foreach(confMap.put(s"$nsp.privateKey", _))
keyPassword.foreach(confMap.put(s"$nsp.keyPassword", _))
certChain.map(_.getAbsolutePath).foreach(confMap.put(s"$nsp.certChain", _))
trustStore.map(_.getAbsolutePath).foreach(confMap.put(s"$nsp.trustStore", _))
trustStorePassword.foreach(confMap.put(s"$nsp.trustStorePassword", _))
protocol.foreach(confMap.put(s"$nsp.protocol", _))
confMap.put(s"$nsp.enabledAlgorithms", enabledAlgorithms.mkString(","))

new MapConfigProvider(confMap)
}

/** Returns a string representation of this SSLOptions with all the passwords masked. */
override def toString: String = s"SSLOptions{enabled=$enabled, port=$port, " +
s"keyStore=$keyStore, keyStorePassword=${keyStorePassword.map(_ => "xxx")}, " +
s"trustStore=$trustStore, trustStorePassword=${trustStorePassword.map(_ => "xxx")}, " +
s"privateKey=$privateKey, keyPassword=${keyPassword.map(_ => "xxx")}, " +
s"keyStoreType=$keyStoreType, needClientAuth=$needClientAuth, " +
s"certChain=$certChain, trustStore=$trustStore, " +
s"trustStorePassword=${trustStorePassword.map(_ => "xxx")}, " +
s"trustStoreReloadIntervalMs=$trustStoreReloadIntervalMs, " +
s"trustStoreReloadingEnabled=$trustStoreReloadingEnabled, openSSLEnabled=$openSslEnabled, " +
s"protocol=$protocol, enabledAlgorithms=$enabledAlgorithms}"

}

private[spark] object SSLOptions extends Logging {
Expand All @@ -152,13 +196,21 @@ private[spark] object SSLOptions extends Logging {
* $ - `[ns].port` - the port where to bind the SSL server
* $ - `[ns].keyStore` - a path to the key-store file; can be relative to the current directory
* $ - `[ns].keyStorePassword` - a password to the key-store file
* $ - `[ns].privateKey` - a PKCS#8 private key file in PEM format
* $ - `[ns].keyPassword` - a password to the private key
* $ - `[ns].keyStoreType` - the type of the key-store
* $ - `[ns].needClientAuth` - whether SSL needs client authentication
* $ - `[ns].certChain` - an X.509 certificate chain file in PEM format
* $ - `[ns].trustStore` - a path to the trust-store file; can be relative to the current
* directory
* $ - `[ns].trustStorePassword` - a password to the trust-store file
* $ - `[ns].trustStoreType` - the type of trust-store
* $ - `[ns].trustStoreReloadingEnabled` - enables or disables using a trust-store
* that reloads its configuration when the trust-store file on disk changes
* $ - `[ns].trustStoreReloadIntervalMs` - the interval, in milliseconds, the
* trust-store will reload its configuration
* $ - `[ns].openSslEnabled` - enables or disables using an OpenSSL implementation
* (if available on host system), requires certChain and keyFile arguments
* $ - `[ns].protocol` - a protocol name supported by a particular Java version
* $ - `[ns].enabledAlgorithms` - a comma separated list of ciphers
*
Expand All @@ -180,7 +232,15 @@ private[spark] object SSLOptions extends Logging {
hadoopConf: Configuration,
ns: String,
defaults: Option[SSLOptions] = None): SSLOptions = {
val enabled = conf.getBoolean(s"$ns.enabled", defaultValue = defaults.exists(_.enabled))

// RPC does not inherit the default enabled setting due to backwards compatibility reasons
val enabledDefault = if (ns == "spark.ssl.rpc") {
false
} else {
defaults.exists(_.enabled)
}

val enabled = conf.getBoolean(s"$ns.enabled", defaultValue = enabledDefault)
if (!enabled) {
return new SSLOptions()
}
Expand All @@ -194,15 +254,23 @@ private[spark] object SSLOptions extends Logging {

val keyStorePassword = conf.getWithSubstitution(s"$ns.keyStorePassword")
.orElse(Option(hadoopConf.getPassword(s"$ns.keyStorePassword")).map(new String(_)))
.orElse(Option(conf.getenv(ENV_RPC_SSL_KEY_STORE_PASSWORD)).filter(_.trim.nonEmpty))
.orElse(defaults.flatMap(_.keyStorePassword))

val privateKey = conf.getOption(s"$ns.privateKey").map(new File(_))
.orElse(defaults.flatMap(_.privateKey))

val keyPassword = conf.getWithSubstitution(s"$ns.keyPassword")
.orElse(Option(hadoopConf.getPassword(s"$ns.keyPassword")).map(new String(_)))
.orElse(Option(conf.getenv(ENV_RPC_SSL_KEY_PASSWORD)).filter(_.trim.nonEmpty))
.orElse(defaults.flatMap(_.keyPassword))

val keyStoreType = conf.getWithSubstitution(s"$ns.keyStoreType")
.orElse(defaults.flatMap(_.keyStoreType))

val certChain = conf.getOption(s"$ns.certChain").map(new File(_))
.orElse(defaults.flatMap(_.certChain))

val needClientAuth =
conf.getBoolean(s"$ns.needClientAuth", defaultValue = defaults.exists(_.needClientAuth))

Expand All @@ -211,11 +279,21 @@ private[spark] object SSLOptions extends Logging {

val trustStorePassword = conf.getWithSubstitution(s"$ns.trustStorePassword")
.orElse(Option(hadoopConf.getPassword(s"$ns.trustStorePassword")).map(new String(_)))
.orElse(Option(conf.getenv(ENV_RPC_SSL_TRUST_STORE_PASSWORD)).filter(_.trim.nonEmpty))
.orElse(defaults.flatMap(_.trustStorePassword))

val trustStoreType = conf.getWithSubstitution(s"$ns.trustStoreType")
.orElse(defaults.flatMap(_.trustStoreType))

val trustStoreReloadingEnabled = conf.getBoolean(s"$ns.trustStoreReloadingEnabled",
defaultValue = defaults.exists(_.trustStoreReloadingEnabled))

val trustStoreReloadIntervalMs = conf.getInt(s"$ns.trustStoreReloadIntervalMs",
defaultValue = defaults.map(_.trustStoreReloadIntervalMs).getOrElse(10000))

val openSslEnabled = conf.getBoolean(s"$ns.openSslEnabled",
defaultValue = defaults.exists(_.openSslEnabled))

val protocol = conf.getWithSubstitution(s"$ns.protocol")
.orElse(defaults.flatMap(_.protocol))

Expand All @@ -225,19 +303,43 @@ private[spark] object SSLOptions extends Logging {
.getOrElse(Set.empty)

new SSLOptions(
Some(ns),
enabled,
port,
keyStore,
keyStorePassword,
privateKey,
keyPassword,
keyStoreType,
needClientAuth,
certChain,
trustStore,
trustStorePassword,
trustStoreType,
trustStoreReloadingEnabled,
trustStoreReloadIntervalMs,
openSslEnabled,
protocol,
enabledAlgorithms)
}

// Config names and environment variables for propagating SSL passwords
val SPARK_RPC_SSL_KEY_PASSWORD_CONF = "spark.ssl.rpc.keyPassword"
val SPARK_RPC_SSL_KEY_STORE_PASSWORD_CONF = "spark.ssl.rpc.keyStorePassword"
val SPARK_RPC_SSL_TRUST_STORE_PASSWORD_CONF = "spark.ssl.rpc.trustStorePassword"
val SPARK_RPC_SSL_PASSWORD_FIELDS: Seq[String] = Seq(
SPARK_RPC_SSL_KEY_PASSWORD_CONF,
SPARK_RPC_SSL_KEY_STORE_PASSWORD_CONF,
SPARK_RPC_SSL_TRUST_STORE_PASSWORD_CONF
)

val ENV_RPC_SSL_KEY_PASSWORD = "_SPARK_SSL_RPC_KEY_PASSWORD"
val ENV_RPC_SSL_KEY_STORE_PASSWORD = "_SPARK_SSL_RPC_KEY_STORE_PASSWORD"
val ENV_RPC_SSL_TRUST_STORE_PASSWORD = "_SPARK_SSL_RPC_TRUST_STORE_PASSWORD"
val SPARK_RPC_SSL_PASSWORD_ENVS: Seq[String] = Seq(
ENV_RPC_SSL_KEY_PASSWORD,
ENV_RPC_SSL_KEY_STORE_PASSWORD,
ENV_RPC_SSL_TRUST_STORE_PASSWORD
)
}

47 changes: 45 additions & 2 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ private[spark] class SecurityManager(
setModifyAclsGroups(sparkConf.get(MODIFY_ACLS_GROUPS))

private var secretKey: String = _

private val sslRpcEnabled = sparkConf.getBoolean(
"spark.ssl.rpc.enabled", false)

logInfo("SecurityManager: authentication " + (if (authOn) "enabled" else "disabled") +
"; ui acls " + (if (aclsOn) "enabled" else "disabled") +
"; users with view permissions: " +
Expand All @@ -94,12 +98,15 @@ private[spark] class SecurityManager(
"; users with modify permissions: " +
(if (modifyAcls.nonEmpty) modifyAcls.mkString(", ") else "EMPTY") +
"; groups with modify permissions: " +
(if (modifyAclsGroups.nonEmpty) modifyAclsGroups.mkString(", ") else "EMPTY"))
(if (modifyAclsGroups.nonEmpty) modifyAclsGroups.mkString(", ") else "EMPTY") +
"; RPC SSL " + (if (sslRpcEnabled) "enabled" else "disabled"))

private val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf)
// the default SSL configuration - it will be used by all communication layers unless overwritten
private val defaultSSLOptions =
SSLOptions.parse(sparkConf, hadoopConf, "spark.ssl", defaults = None)
// the SSL configuration for RPCs
private val rpcSSLOptions = getSSLOptions("rpc")

def getSSLOptions(module: String): SSLOptions = {
val opts =
Expand Down Expand Up @@ -269,9 +276,24 @@ private[spark] class SecurityManager(
* @return Whether to enable encryption when connecting to services that support it.
*/
def isEncryptionEnabled(): Boolean = {
hasnain-db marked this conversation as resolved.
Show resolved Hide resolved
sparkConf.get(Network.NETWORK_CRYPTO_ENABLED) || sparkConf.get(SASL_ENCRYPTION_ENABLED)
if (sparkConf.get(Network.NETWORK_CRYPTO_ENABLED) || sparkConf.get(SASL_ENCRYPTION_ENABLED)) {
if (rpcSSLOptions.enabled) {
logWarning("Network encryption disabled as RPC SSL encryption is enabled")
false
} else {
true
}
} else {
false
}
hasnain-db marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Checks whether RPC SSL is enabled or not
* @return Whether RPC SSL is enabled or not
*/
def isSslRpcEnabled(): Boolean = sslRpcEnabled

/**
* Gets the user used for authenticating SASL connections.
* For now use a single hardcoded user.
Expand Down Expand Up @@ -391,6 +413,27 @@ private[spark] class SecurityManager(
// Default SecurityManager only has a single secret key, so ignore appId.
override def getSaslUser(appId: String): String = getSaslUser()
override def getSecretKey(appId: String): String = getSecretKey()

/**
* If the RPC SSL settings are enabled, returns a map containing the password
* values so they can be passed to executors or other subprocesses.
*
* @return Map containing environment variables to pass
*/
def getEnvironmentForSslRpcPasswords: Map[String, String] = {
if (rpcSSLOptions.enabled) {
val map = scala.collection.mutable.Map[String, String]()
rpcSSLOptions.keyPassword.foreach(password =>
map += (SSLOptions.ENV_RPC_SSL_KEY_PASSWORD -> password))
rpcSSLOptions.keyStorePassword.foreach(password =>
map += (SSLOptions.ENV_RPC_SSL_KEY_STORE_PASSWORD -> password))
rpcSSLOptions.trustStorePassword.foreach(password =>
map += (SSLOptions.ENV_RPC_SSL_TRUST_STORE_PASSWORD -> password))
map.toMap
} else {
Map()
}
}
}

private[spark] object SecurityManager {
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,9 @@ private[spark] object SparkConf extends Logging {
(name.startsWith("spark.auth") && name != SecurityManager.SPARK_AUTH_SECRET_CONF) ||
name.startsWith("spark.rpc") ||
name.startsWith("spark.network") ||
// We need SSL configs to propagate as they may be needed for RPCs.
// Passwords are propagated separately though.
(name.startsWith("spark.ssl") && !name.contains("Password")) ||
mridulm marked this conversation as resolved.
Show resolved Hide resolved
isSparkPortConf(name)
}

Expand Down Expand Up @@ -804,5 +807,4 @@ private[spark] object SparkConf extends Logging {
key: String,
version: String,
translation: String => String = null)

}
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ object SparkEnv extends Logging {
}

ioEncryptionKey.foreach { _ =>
if (!securityManager.isEncryptionEnabled()) {
if (!(securityManager.isEncryptionEnabled() || securityManager.isSslRpcEnabled())) {
logWarning("I/O encryption enabled without RPC encryption: keys will be visible on the " +
"wire.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.io.{File, FileOutputStream, InputStream, IOException}
import scala.collection.Map
import scala.jdk.CollectionConverters._

import org.apache.spark.SecurityManager
import org.apache.spark.{SecurityManager, SSLOptions}
import org.apache.spark.deploy.Command
import org.apache.spark.internal.Logging
import org.apache.spark.launcher.WorkerCommandBuilder
Expand Down Expand Up @@ -90,15 +90,22 @@ object CommandUtils extends Logging {
if (securityMgr.isAuthenticationEnabled) {
newEnvironment += (SecurityManager.ENV_AUTH_SECRET -> securityMgr.getSecretKey)
}
// set SSL env variables if needed
newEnvironment ++= securityMgr.getEnvironmentForSslRpcPasswords

Command(
command.mainClass,
command.arguments.map(substituteArguments),
newEnvironment,
command.classPathEntries ++ classPath,
Seq.empty, // library path already captured in environment variable
// filter out auth secret from java options
command.javaOpts.filterNot(_.startsWith("-D" + SecurityManager.SPARK_AUTH_SECRET_CONF)))
// filter out secrets from java options
command.javaOpts.filterNot(opts =>
opts.startsWith("-D" + SecurityManager.SPARK_AUTH_SECRET_CONF) ||
SSLOptions.SPARK_RPC_SSL_PASSWORD_FIELDS.exists(
field => opts.startsWith("-D" + field)
)
))
}

/** Spawn a thread that will redirect a given stream to a file */
Expand Down
Loading