Skip to content

Commit

Permalink
[SPARK-45427][CORE] Add RPC SSL settings to SSLOptions and SparkTrans…
Browse files Browse the repository at this point in the history
…portConf

### What changes were proposed in this pull request?

This PR adds the options added in #43220  to `SSLOptions` and `SparkTransportConf`.

By adding it to the `SSLOptions` we can support inheritance of options, so settings for the UI and RPC SSL settings can be shared as much as possible. The `SparkTransportConf` changes are needed to support propagating these settings.

I also make some changes to `SecurityManager` to log when this feature is enabled, and make the existing `spark.network.crypto` options mutually exclusive with this new settings (it would just involve double encryption then).

Lastly, make these flags propagate down to when executors are launched, and allow the passwords to be sent via environment variables (similar to how it's done for an existing secret). This ensures they are not visible in plaintext, but also ensures they are available at executor startup (otherwise it can never talk to the driver/worker)

### Why are the changes needed?

The propagation of these options are needed for the RPC functionality to work

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

CI

Added some unit tests which I verified passed:

```
build/sbt
> project core
> testOnly org.apache.spark.SparkConfSuite org.apache.spark.SSLOptionsSuite org.apache.spark.SecurityManagerSuite org.apache.spark.deploy.worker.CommandUtilsSuite
```

The rest of the changes and integration were tested as part of #42685

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #43238 from hasnain-db/spark-tls-ssloptions.

Authored-by: Hasnain Lakhani <hasnain.lakhani@databricks.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
  • Loading branch information
hasnain-db authored and Mridul Muralidharan committed Oct 14, 2023
1 parent 2f6cca5 commit 26aaf1c
Show file tree
Hide file tree
Showing 10 changed files with 336 additions and 23 deletions.
108 changes: 105 additions & 3 deletions core/src/main/scala/org/apache/spark/SSLOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ package org.apache.spark

import java.io.File
import java.security.NoSuchAlgorithmException
import java.util.HashMap
import java.util.Map
import javax.net.ssl.SSLContext

import org.apache.hadoop.conf.Configuration
import org.eclipse.jetty.util.ssl.SslContextFactory

import org.apache.spark.internal.Logging
import org.apache.spark.network.util.ConfigProvider
import org.apache.spark.network.util.MapConfigProvider

/**
* SSLOptions class is a common container for SSL configuration options. It offers methods to
Expand All @@ -33,32 +37,47 @@ import org.apache.spark.internal.Logging
* SSLOptions is intended to provide the maximum common set of SSL settings, which are supported
* by the protocol, which it can generate the configuration for.
*
* @param namespace the configuration namespace
* @param enabled enables or disables SSL; if it is set to false, the rest of the
* settings are disregarded
* @param port the port where to bind the SSL server; if not defined, it will be
* based on the non-SSL port for the same service.
* @param keyStore a path to the key-store file
* @param keyStorePassword a password to access the key-store file
* @param privateKey a PKCS#8 private key file in PEM format
* @param keyPassword a password to access the private key in the key-store
* @param keyStoreType the type of the key-store
* @param needClientAuth set true if SSL needs client authentication
* @param certChain an X.509 certificate chain file in PEM format
* @param trustStore a path to the trust-store file
* @param trustStorePassword a password to access the trust-store file
* @param trustStoreType the type of the trust-store
* @param trustStoreReloadingEnabled enables or disables using a trust-store that reloads
* its configuration when the trust-store file on disk changes
* @param trustStoreReloadIntervalMs the interval, in milliseconds,
* when the trust-store will reload its configuration
* @param openSslEnabled enables or disables using an OpenSSL implementation (if available),
* requires certChain and keyFile arguments
* @param protocol SSL protocol (remember that SSLv3 was compromised) supported by Java
* @param enabledAlgorithms a set of encryption algorithms that may be used
*/
private[spark] case class SSLOptions(
namespace: Option[String] = None,
enabled: Boolean = false,
port: Option[Int] = None,
keyStore: Option[File] = None,
keyStorePassword: Option[String] = None,
privateKey: Option[File] = None,
keyPassword: Option[String] = None,
keyStoreType: Option[String] = None,
needClientAuth: Boolean = false,
certChain: Option[File] = None,
trustStore: Option[File] = None,
trustStorePassword: Option[String] = None,
trustStoreType: Option[String] = None,
trustStoreReloadingEnabled: Boolean = false,
trustStoreReloadIntervalMs: Int = 10000,
openSslEnabled: Boolean = false,
protocol: Option[String] = None,
enabledAlgorithms: Set[String] = Set.empty)
extends Logging {
Expand Down Expand Up @@ -134,12 +153,37 @@ private[spark] case class SSLOptions(
supported
}

def createConfigProvider(conf: SparkConf): ConfigProvider = {
val nsp = namespace.getOrElse("spark.ssl")
val confMap: Map[String, String] = new HashMap[String, String]
conf.getAll.foreach(tuple => confMap.put(tuple._1, tuple._2))
confMap.put(s"$nsp.enabled", enabled.toString)
confMap.put(s"$nsp.trustStoreReloadingEnabled", trustStoreReloadingEnabled.toString)
confMap.put(s"$nsp.openSslEnabled", openSslEnabled.toString)
confMap.put(s"$nsp.trustStoreReloadIntervalMs", trustStoreReloadIntervalMs.toString)
keyStore.map(_.getAbsolutePath).foreach(confMap.put(s"$nsp.keyStore", _))
keyStorePassword.foreach(confMap.put(s"$nsp.keyStorePassword", _))
privateKey.map(_.getAbsolutePath).foreach(confMap.put(s"$nsp.privateKey", _))
keyPassword.foreach(confMap.put(s"$nsp.keyPassword", _))
certChain.map(_.getAbsolutePath).foreach(confMap.put(s"$nsp.certChain", _))
trustStore.map(_.getAbsolutePath).foreach(confMap.put(s"$nsp.trustStore", _))
trustStorePassword.foreach(confMap.put(s"$nsp.trustStorePassword", _))
protocol.foreach(confMap.put(s"$nsp.protocol", _))
confMap.put(s"$nsp.enabledAlgorithms", enabledAlgorithms.mkString(","))

new MapConfigProvider(confMap)
}

/** Returns a string representation of this SSLOptions with all the passwords masked. */
override def toString: String = s"SSLOptions{enabled=$enabled, port=$port, " +
s"keyStore=$keyStore, keyStorePassword=${keyStorePassword.map(_ => "xxx")}, " +
s"trustStore=$trustStore, trustStorePassword=${trustStorePassword.map(_ => "xxx")}, " +
s"privateKey=$privateKey, keyPassword=${keyPassword.map(_ => "xxx")}, " +
s"keyStoreType=$keyStoreType, needClientAuth=$needClientAuth, " +
s"certChain=$certChain, trustStore=$trustStore, " +
s"trustStorePassword=${trustStorePassword.map(_ => "xxx")}, " +
s"trustStoreReloadIntervalMs=$trustStoreReloadIntervalMs, " +
s"trustStoreReloadingEnabled=$trustStoreReloadingEnabled, openSSLEnabled=$openSslEnabled, " +
s"protocol=$protocol, enabledAlgorithms=$enabledAlgorithms}"

}

private[spark] object SSLOptions extends Logging {
Expand All @@ -152,13 +196,21 @@ private[spark] object SSLOptions extends Logging {
* $ - `[ns].port` - the port where to bind the SSL server
* $ - `[ns].keyStore` - a path to the key-store file; can be relative to the current directory
* $ - `[ns].keyStorePassword` - a password to the key-store file
* $ - `[ns].privateKey` - a PKCS#8 private key file in PEM format
* $ - `[ns].keyPassword` - a password to the private key
* $ - `[ns].keyStoreType` - the type of the key-store
* $ - `[ns].needClientAuth` - whether SSL needs client authentication
* $ - `[ns].certChain` - an X.509 certificate chain file in PEM format
* $ - `[ns].trustStore` - a path to the trust-store file; can be relative to the current
* directory
* $ - `[ns].trustStorePassword` - a password to the trust-store file
* $ - `[ns].trustStoreType` - the type of trust-store
* $ - `[ns].trustStoreReloadingEnabled` - enables or disables using a trust-store
* that reloads its configuration when the trust-store file on disk changes
* $ - `[ns].trustStoreReloadIntervalMs` - the interval, in milliseconds, the
* trust-store will reload its configuration
* $ - `[ns].openSslEnabled` - enables or disables using an OpenSSL implementation
* (if available on host system), requires certChain and keyFile arguments
* $ - `[ns].protocol` - a protocol name supported by a particular Java version
* $ - `[ns].enabledAlgorithms` - a comma separated list of ciphers
*
Expand All @@ -180,7 +232,15 @@ private[spark] object SSLOptions extends Logging {
hadoopConf: Configuration,
ns: String,
defaults: Option[SSLOptions] = None): SSLOptions = {
val enabled = conf.getBoolean(s"$ns.enabled", defaultValue = defaults.exists(_.enabled))

// RPC does not inherit the default enabled setting due to backwards compatibility reasons
val enabledDefault = if (ns == "spark.ssl.rpc") {
false
} else {
defaults.exists(_.enabled)
}

val enabled = conf.getBoolean(s"$ns.enabled", defaultValue = enabledDefault)
if (!enabled) {
return new SSLOptions()
}
Expand All @@ -194,15 +254,23 @@ private[spark] object SSLOptions extends Logging {

val keyStorePassword = conf.getWithSubstitution(s"$ns.keyStorePassword")
.orElse(Option(hadoopConf.getPassword(s"$ns.keyStorePassword")).map(new String(_)))
.orElse(Option(conf.getenv(ENV_RPC_SSL_KEY_STORE_PASSWORD)).filter(_.trim.nonEmpty))
.orElse(defaults.flatMap(_.keyStorePassword))

val privateKey = conf.getOption(s"$ns.privateKey").map(new File(_))
.orElse(defaults.flatMap(_.privateKey))

val keyPassword = conf.getWithSubstitution(s"$ns.keyPassword")
.orElse(Option(hadoopConf.getPassword(s"$ns.keyPassword")).map(new String(_)))
.orElse(Option(conf.getenv(ENV_RPC_SSL_KEY_PASSWORD)).filter(_.trim.nonEmpty))
.orElse(defaults.flatMap(_.keyPassword))

val keyStoreType = conf.getWithSubstitution(s"$ns.keyStoreType")
.orElse(defaults.flatMap(_.keyStoreType))

val certChain = conf.getOption(s"$ns.certChain").map(new File(_))
.orElse(defaults.flatMap(_.certChain))

val needClientAuth =
conf.getBoolean(s"$ns.needClientAuth", defaultValue = defaults.exists(_.needClientAuth))

Expand All @@ -211,11 +279,21 @@ private[spark] object SSLOptions extends Logging {

val trustStorePassword = conf.getWithSubstitution(s"$ns.trustStorePassword")
.orElse(Option(hadoopConf.getPassword(s"$ns.trustStorePassword")).map(new String(_)))
.orElse(Option(conf.getenv(ENV_RPC_SSL_TRUST_STORE_PASSWORD)).filter(_.trim.nonEmpty))
.orElse(defaults.flatMap(_.trustStorePassword))

val trustStoreType = conf.getWithSubstitution(s"$ns.trustStoreType")
.orElse(defaults.flatMap(_.trustStoreType))

val trustStoreReloadingEnabled = conf.getBoolean(s"$ns.trustStoreReloadingEnabled",
defaultValue = defaults.exists(_.trustStoreReloadingEnabled))

val trustStoreReloadIntervalMs = conf.getInt(s"$ns.trustStoreReloadIntervalMs",
defaultValue = defaults.map(_.trustStoreReloadIntervalMs).getOrElse(10000))

val openSslEnabled = conf.getBoolean(s"$ns.openSslEnabled",
defaultValue = defaults.exists(_.openSslEnabled))

val protocol = conf.getWithSubstitution(s"$ns.protocol")
.orElse(defaults.flatMap(_.protocol))

Expand All @@ -225,19 +303,43 @@ private[spark] object SSLOptions extends Logging {
.getOrElse(Set.empty)

new SSLOptions(
Some(ns),
enabled,
port,
keyStore,
keyStorePassword,
privateKey,
keyPassword,
keyStoreType,
needClientAuth,
certChain,
trustStore,
trustStorePassword,
trustStoreType,
trustStoreReloadingEnabled,
trustStoreReloadIntervalMs,
openSslEnabled,
protocol,
enabledAlgorithms)
}

// Config names and environment variables for propagating SSL passwords
val SPARK_RPC_SSL_KEY_PASSWORD_CONF = "spark.ssl.rpc.keyPassword"
val SPARK_RPC_SSL_KEY_STORE_PASSWORD_CONF = "spark.ssl.rpc.keyStorePassword"
val SPARK_RPC_SSL_TRUST_STORE_PASSWORD_CONF = "spark.ssl.rpc.trustStorePassword"
val SPARK_RPC_SSL_PASSWORD_FIELDS: Seq[String] = Seq(
SPARK_RPC_SSL_KEY_PASSWORD_CONF,
SPARK_RPC_SSL_KEY_STORE_PASSWORD_CONF,
SPARK_RPC_SSL_TRUST_STORE_PASSWORD_CONF
)

val ENV_RPC_SSL_KEY_PASSWORD = "_SPARK_SSL_RPC_KEY_PASSWORD"
val ENV_RPC_SSL_KEY_STORE_PASSWORD = "_SPARK_SSL_RPC_KEY_STORE_PASSWORD"
val ENV_RPC_SSL_TRUST_STORE_PASSWORD = "_SPARK_SSL_RPC_TRUST_STORE_PASSWORD"
val SPARK_RPC_SSL_PASSWORD_ENVS: Seq[String] = Seq(
ENV_RPC_SSL_KEY_PASSWORD,
ENV_RPC_SSL_KEY_STORE_PASSWORD,
ENV_RPC_SSL_TRUST_STORE_PASSWORD
)
}

45 changes: 43 additions & 2 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ private[spark] class SecurityManager(
setModifyAclsGroups(sparkConf.get(MODIFY_ACLS_GROUPS))

private var secretKey: String = _

private val sslRpcEnabled = sparkConf.getBoolean(
"spark.ssl.rpc.enabled", false)

logInfo("SecurityManager: authentication " + (if (authOn) "enabled" else "disabled") +
"; ui acls " + (if (aclsOn) "enabled" else "disabled") +
"; users with view permissions: " +
Expand All @@ -94,12 +98,15 @@ private[spark] class SecurityManager(
"; users with modify permissions: " +
(if (modifyAcls.nonEmpty) modifyAcls.mkString(", ") else "EMPTY") +
"; groups with modify permissions: " +
(if (modifyAclsGroups.nonEmpty) modifyAclsGroups.mkString(", ") else "EMPTY"))
(if (modifyAclsGroups.nonEmpty) modifyAclsGroups.mkString(", ") else "EMPTY") +
"; RPC SSL " + (if (sslRpcEnabled) "enabled" else "disabled"))

private val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf)
// the default SSL configuration - it will be used by all communication layers unless overwritten
private val defaultSSLOptions =
SSLOptions.parse(sparkConf, hadoopConf, "spark.ssl", defaults = None)
// the SSL configuration for RPCs
private val rpcSSLOptions = getSSLOptions("rpc")

def getSSLOptions(module: String): SSLOptions = {
val opts =
Expand Down Expand Up @@ -269,9 +276,22 @@ private[spark] class SecurityManager(
* @return Whether to enable encryption when connecting to services that support it.
*/
def isEncryptionEnabled(): Boolean = {
sparkConf.get(Network.NETWORK_CRYPTO_ENABLED) || sparkConf.get(SASL_ENCRYPTION_ENABLED)
val encryptionEnabled = sparkConf.get(Network.NETWORK_CRYPTO_ENABLED) ||
sparkConf.get(SASL_ENCRYPTION_ENABLED)
if (encryptionEnabled && sslRpcEnabled) {
logWarning("Network encryption disabled as RPC SSL encryption is enabled")
false
} else {
encryptionEnabled
}
}

/**
* Checks whether RPC SSL is enabled or not
* @return Whether RPC SSL is enabled or not
*/
def isSslRpcEnabled(): Boolean = sslRpcEnabled

/**
* Gets the user used for authenticating SASL connections.
* For now use a single hardcoded user.
Expand Down Expand Up @@ -391,6 +411,27 @@ private[spark] class SecurityManager(
// Default SecurityManager only has a single secret key, so ignore appId.
override def getSaslUser(appId: String): String = getSaslUser()
override def getSecretKey(appId: String): String = getSecretKey()

/**
* If the RPC SSL settings are enabled, returns a map containing the password
* values so they can be passed to executors or other subprocesses.
*
* @return Map containing environment variables to pass
*/
def getEnvironmentForSslRpcPasswords: Map[String, String] = {
if (rpcSSLOptions.enabled) {
val map = scala.collection.mutable.Map[String, String]()
rpcSSLOptions.keyPassword.foreach(password =>
map += (SSLOptions.ENV_RPC_SSL_KEY_PASSWORD -> password))
rpcSSLOptions.keyStorePassword.foreach(password =>
map += (SSLOptions.ENV_RPC_SSL_KEY_STORE_PASSWORD -> password))
rpcSSLOptions.trustStorePassword.foreach(password =>
map += (SSLOptions.ENV_RPC_SSL_TRUST_STORE_PASSWORD -> password))
map.toMap
} else {
Map()
}
}
}

private[spark] object SecurityManager {
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,9 @@ private[spark] object SparkConf extends Logging {
(name.startsWith("spark.auth") && name != SecurityManager.SPARK_AUTH_SECRET_CONF) ||
name.startsWith("spark.rpc") ||
name.startsWith("spark.network") ||
// We need SSL configs to propagate as they may be needed for RPCs.
// Passwords are propagated separately though.
(name.startsWith("spark.ssl") && !name.contains("Password")) ||
isSparkPortConf(name)
}

Expand Down Expand Up @@ -804,5 +807,4 @@ private[spark] object SparkConf extends Logging {
key: String,
version: String,
translation: String => String = null)

}
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ object SparkEnv extends Logging {
}

ioEncryptionKey.foreach { _ =>
if (!securityManager.isEncryptionEnabled()) {
if (!(securityManager.isEncryptionEnabled() || securityManager.isSslRpcEnabled())) {
logWarning("I/O encryption enabled without RPC encryption: keys will be visible on the " +
"wire.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.io.{File, FileOutputStream, InputStream, IOException}
import scala.collection.Map
import scala.jdk.CollectionConverters._

import org.apache.spark.SecurityManager
import org.apache.spark.{SecurityManager, SSLOptions}
import org.apache.spark.deploy.Command
import org.apache.spark.internal.Logging
import org.apache.spark.launcher.WorkerCommandBuilder
Expand Down Expand Up @@ -90,15 +90,22 @@ object CommandUtils extends Logging {
if (securityMgr.isAuthenticationEnabled) {
newEnvironment += (SecurityManager.ENV_AUTH_SECRET -> securityMgr.getSecretKey)
}
// set SSL env variables if needed
newEnvironment ++= securityMgr.getEnvironmentForSslRpcPasswords

Command(
command.mainClass,
command.arguments.map(substituteArguments),
newEnvironment,
command.classPathEntries ++ classPath,
Seq.empty, // library path already captured in environment variable
// filter out auth secret from java options
command.javaOpts.filterNot(_.startsWith("-D" + SecurityManager.SPARK_AUTH_SECRET_CONF)))
// filter out secrets from java options
command.javaOpts.filterNot(opts =>
opts.startsWith("-D" + SecurityManager.SPARK_AUTH_SECRET_CONF) ||
SSLOptions.SPARK_RPC_SSL_PASSWORD_FIELDS.exists(
field => opts.startsWith("-D" + field)
)
))
}

/** Spawn a thread that will redirect a given stream to a file */
Expand Down
Loading

0 comments on commit 26aaf1c

Please sign in to comment.