diff --git a/core/src/main/scala/org/apache/spark/SSLOptions.scala b/core/src/main/scala/org/apache/spark/SSLOptions.scala index d159f5717b090..1a3f04c0b7ee2 100644 --- a/core/src/main/scala/org/apache/spark/SSLOptions.scala +++ b/core/src/main/scala/org/apache/spark/SSLOptions.scala @@ -19,12 +19,16 @@ package org.apache.spark import java.io.File import java.security.NoSuchAlgorithmException +import java.util.HashMap +import java.util.Map import javax.net.ssl.SSLContext import org.apache.hadoop.conf.Configuration import org.eclipse.jetty.util.ssl.SslContextFactory import org.apache.spark.internal.Logging +import org.apache.spark.network.util.ConfigProvider +import org.apache.spark.network.util.MapConfigProvider /** * SSLOptions class is a common container for SSL configuration options. It offers methods to @@ -33,34 +37,53 @@ import org.apache.spark.internal.Logging * SSLOptions is intended to provide the maximum common set of SSL settings, which are supported * by the protocol, which it can generate the configuration for. * + * @param namespace the configuration namespace * @param enabled enables or disables SSL; if it is set to false, the rest of the * settings are disregarded * @param port the port where to bind the SSL server; if not defined, it will be * based on the non-SSL port for the same service. * @param keyStore a path to the key-store file * @param keyStorePassword a password to access the key-store file + * @param privateKey a PKCS#8 private key file in PEM format * @param keyPassword a password to access the private key in the key-store * @param keyStoreType the type of the key-store * @param needClientAuth set true if SSL needs client authentication + * @param certChain an X.509 certificate chain file in PEM format * @param trustStore a path to the trust-store file * @param trustStorePassword a password to access the trust-store file * @param trustStoreType the type of the trust-store + * @param trustStoreReloadingEnabled enables or disables using a trust-store that reloads + * its configuration when the trust-store file on disk changes + * @param trustStoreReloadIntervalMs the interval, in milliseconds, + * when the trust-store will reload its configuration + * @param openSslEnabled enables or disables using an OpenSSL implementation (if available), + * requires certChain and keyFile arguments * @param protocol SSL protocol (remember that SSLv3 was compromised) supported by Java * @param enabledAlgorithms a set of encryption algorithms that may be used + * @param dangerouslyFallbackIfKeysNotPresent If SSL is enabled but key files are not present, + * fall back to unencrypted communication. This is an + * advanced option and not recommended for normal use. */ private[spark] case class SSLOptions( + namespace: Option[String] = None, enabled: Boolean = false, port: Option[Int] = None, keyStore: Option[File] = None, keyStorePassword: Option[String] = None, + privateKey: Option[File] = None, keyPassword: Option[String] = None, keyStoreType: Option[String] = None, needClientAuth: Boolean = false, + certChain: Option[File] = None, trustStore: Option[File] = None, trustStorePassword: Option[String] = None, trustStoreType: Option[String] = None, + trustStoreReloadingEnabled: Boolean = false, + trustStoreReloadIntervalMs: Int = 10000, + openSslEnabled: Boolean = false, protocol: Option[String] = None, - enabledAlgorithms: Set[String] = Set.empty) + enabledAlgorithms: Set[String] = Set.empty, + dangerouslyFallbackIfKeysNotPresent: Boolean = false) extends Logging { /** @@ -134,12 +157,44 @@ private[spark] case class SSLOptions( supported } + /** + * + * @return + */ + def createConfigProvider(conf: SparkConf): ConfigProvider = { + val nsp = namespace.getOrElse("spark.ssl") + val confMap: Map[String, String] = new HashMap[String, String] + conf.getAll.foreach(tuple => confMap.put(tuple._1, tuple._2)) + confMap.put(s"$nsp.enabled", enabled.toString) + confMap.put(s"$nsp.trustStoreReloadingEnabled", trustStoreReloadingEnabled.toString) + confMap.put(s"$nsp.openSslEnabled", openSslEnabled.toString) + confMap.put(s"$nsp.trustStoreReloadIntervalMs", trustStoreReloadIntervalMs.toString) + keyStore.map(_.getAbsolutePath).foreach(confMap.put(s"$nsp.keyStore", _)) + keyStorePassword.foreach(confMap.put(s"$nsp.keyStorePassword", _)) + privateKey.map(_.getAbsolutePath).foreach(confMap.put(s"$nsp.privateKey", _)) + keyPassword.foreach(confMap.put(s"$nsp.keyPassword", _)) + certChain.map(_.getAbsolutePath).foreach(confMap.put(s"$nsp.certChain", _)) + trustStore.map(_.getAbsolutePath).foreach(confMap.put(s"$nsp.trustStore", _)) + trustStorePassword.foreach(confMap.put(s"$nsp.trustStorePassword", _)) + protocol.foreach(confMap.put(s"$nsp.protocol", _)) + confMap.put(s"$nsp.enabledAlgorithms", enabledAlgorithms.mkString(",")) + confMap.put( + s"$nsp.dangerouslyFallbackIfKeysNotPresent", dangerouslyFallbackIfKeysNotPresent.toString) + + new MapConfigProvider(confMap) + } + /** Returns a string representation of this SSLOptions with all the passwords masked. */ override def toString: String = s"SSLOptions{enabled=$enabled, port=$port, " + s"keyStore=$keyStore, keyStorePassword=${keyStorePassword.map(_ => "xxx")}, " + - s"trustStore=$trustStore, trustStorePassword=${trustStorePassword.map(_ => "xxx")}, " + - s"protocol=$protocol, enabledAlgorithms=$enabledAlgorithms}" - + s"privateKey=$privateKey, keyPassword=${keyPassword.map(_ => "xxx")}, " + + s"keyStoreType=$keyStoreType, needClientAuth=$needClientAuth, " + + s"certChain=$certChain, trustStore=$trustStore, " + + s"trustStorePassword=${trustStorePassword.map(_ => "xxx")}, " + + s"trustStoreReloadIntervalMs=$trustStoreReloadIntervalMs, " + + s"trustStoreReloadingEnabled=$trustStoreReloadingEnabled, openSSLEnabled=$openSslEnabled, " + + s"protocol=$protocol, enabledAlgorithms=$enabledAlgorithms}, " + + s"dangerouslyFallbackIfKeysNotPresent=$dangerouslyFallbackIfKeysNotPresent" } private[spark] object SSLOptions extends Logging { @@ -152,15 +207,25 @@ private[spark] object SSLOptions extends Logging { * $ - `[ns].port` - the port where to bind the SSL server * $ - `[ns].keyStore` - a path to the key-store file; can be relative to the current directory * $ - `[ns].keyStorePassword` - a password to the key-store file + * $ - `[ns].privateKey` - a PKCS#8 private key file in PEM format * $ - `[ns].keyPassword` - a password to the private key * $ - `[ns].keyStoreType` - the type of the key-store * $ - `[ns].needClientAuth` - whether SSL needs client authentication + * $ - `[ns].certChain` - an X.509 certificate chain file in PEM format * $ - `[ns].trustStore` - a path to the trust-store file; can be relative to the current * directory * $ - `[ns].trustStorePassword` - a password to the trust-store file * $ - `[ns].trustStoreType` - the type of trust-store + * $ - `[ns].trustStoreReloadingEnabled` - enables or disables using a trust-store + * that reloads its configuration when the trust-store file on disk changes + * $ - `[ns].trustStoreReloadIntervalMs` - the interval, in milliseconds, the + * trust-store will reload its configuration + * $ - `[ns].openSslEnabled` - enables or disables using an OpenSSL implementation + * (if available on host system), requires certChain and keyFile arguments * $ - `[ns].protocol` - a protocol name supported by a particular Java version * $ - `[ns].enabledAlgorithms` - a comma separated list of ciphers + * $ - `[ns].dangerouslyFallbackIfKeysNotPresent` - whether to fallback to unencrypted + * communication if keys are not present. * * For a list of protocols and ciphers supported by particular Java versions, you may go to * @@ -180,7 +245,15 @@ private[spark] object SSLOptions extends Logging { hadoopConf: Configuration, ns: String, defaults: Option[SSLOptions] = None): SSLOptions = { - val enabled = conf.getBoolean(s"$ns.enabled", defaultValue = defaults.exists(_.enabled)) + + // RPC does not inherit the default enabled setting due to backwards compatibility reasons + val enabledDefault = if (ns == "spark.ssl.rpc") { + false + } else { + defaults.exists(_.enabled) + } + + val enabled = conf.getBoolean(s"$ns.enabled", defaultValue = enabledDefault) if (!enabled) { return new SSLOptions() } @@ -194,15 +267,23 @@ private[spark] object SSLOptions extends Logging { val keyStorePassword = conf.getWithSubstitution(s"$ns.keyStorePassword") .orElse(Option(hadoopConf.getPassword(s"$ns.keyStorePassword")).map(new String(_))) + .orElse(Option(conf.getenv(ENV_RPC_SSL_KEY_STORE_PASSWORD)).filter(_.trim.nonEmpty)) .orElse(defaults.flatMap(_.keyStorePassword)) + val privateKey = conf.getOption(s"$ns.privateKey").map(new File(_)) + .orElse(defaults.flatMap(_.privateKey)) + val keyPassword = conf.getWithSubstitution(s"$ns.keyPassword") .orElse(Option(hadoopConf.getPassword(s"$ns.keyPassword")).map(new String(_))) + .orElse(Option(conf.getenv(ENV_RPC_SSL_KEY_PASSWORD)).filter(_.trim.nonEmpty)) .orElse(defaults.flatMap(_.keyPassword)) val keyStoreType = conf.getWithSubstitution(s"$ns.keyStoreType") .orElse(defaults.flatMap(_.keyStoreType)) + val certChain = conf.getOption(s"$ns.certChain").map(new File(_)) + .orElse(defaults.flatMap(_.certChain)) + val needClientAuth = conf.getBoolean(s"$ns.needClientAuth", defaultValue = defaults.exists(_.needClientAuth)) @@ -211,11 +292,21 @@ private[spark] object SSLOptions extends Logging { val trustStorePassword = conf.getWithSubstitution(s"$ns.trustStorePassword") .orElse(Option(hadoopConf.getPassword(s"$ns.trustStorePassword")).map(new String(_))) + .orElse(Option(conf.getenv(ENV_RPC_SSL_TRUST_STORE_PASSWORD)).filter(_.trim.nonEmpty)) .orElse(defaults.flatMap(_.trustStorePassword)) val trustStoreType = conf.getWithSubstitution(s"$ns.trustStoreType") .orElse(defaults.flatMap(_.trustStoreType)) + val trustStoreReloadingEnabled = conf.getBoolean(s"$ns.trustStoreReloadingEnabled", + defaultValue = defaults.exists(_.trustStoreReloadingEnabled)) + + val trustStoreReloadIntervalMs = conf.getInt(s"$ns.trustStoreReloadIntervalMs", + defaultValue = defaults.map(_.trustStoreReloadIntervalMs).getOrElse(10000)) + + val openSslEnabled = conf.getBoolean(s"$ns.openSslEnabled", + defaultValue = defaults.exists(_.openSslEnabled)) + val protocol = conf.getWithSubstitution(s"$ns.protocol") .orElse(defaults.flatMap(_.protocol)) @@ -224,20 +315,49 @@ private[spark] object SSLOptions extends Logging { .orElse(defaults.map(_.enabledAlgorithms)) .getOrElse(Set.empty) + val dangerouslyFallbackIfKeysNotPresent = + conf.getBoolean(s"$ns.dangerouslyFallbackIfKeysNotPresent", + defaultValue = defaults.exists(_.dangerouslyFallbackIfKeysNotPresent)) + new SSLOptions( + Some(ns), enabled, port, keyStore, keyStorePassword, + privateKey, keyPassword, keyStoreType, needClientAuth, + certChain, trustStore, trustStorePassword, trustStoreType, + trustStoreReloadingEnabled, + trustStoreReloadIntervalMs, + openSslEnabled, protocol, - enabledAlgorithms) + enabledAlgorithms, + dangerouslyFallbackIfKeysNotPresent) } + // Config names and environment variables for propagating SSL passwords + val SPARK_RPC_SSL_KEY_PASSWORD_CONF = "spark.ssl.rpc.keyPassword" + val SPARK_RPC_SSL_KEY_STORE_PASSWORD_CONF = "spark.ssl.rpc.keyStorePassword" + val SPARK_RPC_SSL_TRUST_STORE_PASSWORD_CONF = "spark.ssl.rpc.trustStorePassword" + val SPARK_RPC_SSL_PASSWORD_FIELDS: Seq[String] = Seq( + SPARK_RPC_SSL_KEY_PASSWORD_CONF, + SPARK_RPC_SSL_KEY_STORE_PASSWORD_CONF, + SPARK_RPC_SSL_TRUST_STORE_PASSWORD_CONF + ) + + val ENV_RPC_SSL_KEY_PASSWORD = "_SPARK_SSL_RPC_KEY_PASSWORD" + val ENV_RPC_SSL_KEY_STORE_PASSWORD = "_SPARK_SSL_RPC_KEY_STORE_PASSWORD" + val ENV_RPC_SSL_TRUST_STORE_PASSWORD = "_SPARK_SSL_RPC_TRUST_STORE_PASSWORD" + val SPARK_RPC_SSL_PASSWORD_ENVS: Seq[String] = Seq( + ENV_RPC_SSL_KEY_PASSWORD, + ENV_RPC_SSL_KEY_STORE_PASSWORD, + ENV_RPC_SSL_TRUST_STORE_PASSWORD + ) } diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala index 7e72ae8d89e37..d63773d413949 100644 --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala @@ -85,6 +85,10 @@ private[spark] class SecurityManager( setModifyAclsGroups(sparkConf.get(MODIFY_ACLS_GROUPS)) private var secretKey: String = _ + + private val isSslRPCEnabled = sparkConf.getBoolean( + "spark.ssl.rpc.enabled", false) + logInfo("SecurityManager: authentication " + (if (authOn) "enabled" else "disabled") + "; ui acls " + (if (aclsOn) "enabled" else "disabled") + "; users with view permissions: " + @@ -94,12 +98,15 @@ private[spark] class SecurityManager( "; users with modify permissions: " + (if (modifyAcls.nonEmpty) modifyAcls.mkString(", ") else "EMPTY") + "; groups with modify permissions: " + - (if (modifyAclsGroups.nonEmpty) modifyAclsGroups.mkString(", ") else "EMPTY")) + (if (modifyAclsGroups.nonEmpty) modifyAclsGroups.mkString(", ") else "EMPTY") + + "; RPC SSL " + (if (isSslRPCEnabled) "enabled" else "disabled")) private val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf) // the default SSL configuration - it will be used by all communication layers unless overwritten private val defaultSSLOptions = SSLOptions.parse(sparkConf, hadoopConf, "spark.ssl", defaults = None) + // the SSL configuration for RPCs + private val rpcSSLOptions = getSSLOptions("rpc") def getSSLOptions(module: String): SSLOptions = { val opts = @@ -258,18 +265,46 @@ private[spark] class SecurityManager( isUserInACL(user, modifyAcls, modifyAclsGroups) } + /** + * Check to see if authentication is enabled and SSL for RPC is disabled (as they are mutually + * exclusive) + * @return Whether authentication is enabled and SSL for RPC is disabled + */ + private def isAuthenticationEnabledAndSslRpcDisabled(): Boolean = { + if (sparkConf.get(NETWORK_AUTH_ENABLED)) { + if (rpcSSLOptions.enabled) { + logWarning("Network auth disabled as RPC SSL encryption is enabled") + false + } else { + true + } + } else { + false + } + } + + /** * Check to see if authentication for the Spark communication protocols is enabled * @return true if authentication is enabled, otherwise false */ - def isAuthenticationEnabled(): Boolean = authOn + def isAuthenticationEnabled(): Boolean = isAuthenticationEnabledAndSslRpcDisabled /** * Checks whether network encryption should be enabled. * @return Whether to enable encryption when connecting to services that support it. */ def isEncryptionEnabled(): Boolean = { - sparkConf.get(Network.NETWORK_CRYPTO_ENABLED) || sparkConf.get(SASL_ENCRYPTION_ENABLED) + if (sparkConf.get(Network.NETWORK_CRYPTO_ENABLED) || sparkConf.get(SASL_ENCRYPTION_ENABLED)) { + if (rpcSSLOptions.enabled) { + logWarning("Network encryption disabled as RPC SSL encryption is enabled") + false + } else { + true + } + } else { + false + } } /** @@ -284,7 +319,7 @@ private[spark] class SecurityManager( * @return the secret key as a String if authentication is enabled, otherwise returns null */ def getSecretKey(): String = { - if (isAuthenticationEnabled) { + if (authOn) { val creds = UserGroupInformation.getCurrentUser().getCredentials() Option(creds.getSecretKey(SECRET_LOOKUP_KEY)) .map { bytes => new String(bytes, UTF_8) } @@ -318,7 +353,7 @@ private[spark] class SecurityManager( def initializeAuth(): Unit = { import SparkMasterRegex._ - if (!sparkConf.get(NETWORK_AUTH_ENABLED)) { + if (!authOn) { return } @@ -391,6 +426,27 @@ private[spark] class SecurityManager( // Default SecurityManager only has a single secret key, so ignore appId. override def getSaslUser(appId: String): String = getSaslUser() override def getSecretKey(appId: String): String = getSecretKey() + + /** + * If the RPC SSL settings are enabled, returns a map containing the password + * values so they can be passed to executors or other subprocesses. + * + * @return Map containing environment variables to pass + */ + def getEnvironmentForSslRpcPasswords: Map[String, String] = { + if (rpcSSLOptions.enabled) { + val map = scala.collection.mutable.Map[String, String]() + rpcSSLOptions.keyPassword.foreach(password => + map += (SSLOptions.ENV_RPC_SSL_KEY_PASSWORD -> password)) + rpcSSLOptions.keyStorePassword.foreach(password => + map += (SSLOptions.ENV_RPC_SSL_KEY_STORE_PASSWORD -> password)) + rpcSSLOptions.trustStorePassword.foreach(password => + map += (SSLOptions.ENV_RPC_SSL_TRUST_STORE_PASSWORD -> password)) + map.toMap + } else { + Map() + } + } } private[spark] object SecurityManager { diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 091413bb9cc92..b688604beead6 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -739,6 +739,9 @@ private[spark] object SparkConf extends Logging { (name.startsWith("spark.auth") && name != SecurityManager.SPARK_AUTH_SECRET_CONF) || name.startsWith("spark.rpc") || name.startsWith("spark.network") || + // We need SSL configs to propagate as they may be needed for RPCs. + // Passwords are propagated separately though. + (name.startsWith("spark.ssl") && !name.contains("Password")) || isSparkPortConf(name) } @@ -804,5 +807,4 @@ private[spark] object SparkConf extends Logging { key: String, version: String, translation: String => String = null) - } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala index 9a4a037e35c66..d689eb02cc53f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala @@ -22,7 +22,7 @@ import java.io.{File, FileOutputStream, InputStream, IOException} import scala.collection.Map import scala.jdk.CollectionConverters._ -import org.apache.spark.SecurityManager +import org.apache.spark.{SecurityManager, SSLOptions} import org.apache.spark.deploy.Command import org.apache.spark.internal.Logging import org.apache.spark.launcher.WorkerCommandBuilder @@ -90,6 +90,8 @@ object CommandUtils extends Logging { if (securityMgr.isAuthenticationEnabled) { newEnvironment += (SecurityManager.ENV_AUTH_SECRET -> securityMgr.getSecretKey) } + // set SSL env variables if needed + newEnvironment ++= securityMgr.getEnvironmentForSslRpcPasswords Command( command.mainClass, @@ -97,8 +99,13 @@ object CommandUtils extends Logging { newEnvironment, command.classPathEntries ++ classPath, Seq.empty, // library path already captured in environment variable - // filter out auth secret from java options - command.javaOpts.filterNot(_.startsWith("-D" + SecurityManager.SPARK_AUTH_SECRET_CONF))) + // filter out secrets from java options + command.javaOpts.filterNot(opts => + opts.startsWith("-D" + SecurityManager.SPARK_AUTH_SECRET_CONF) || + SSLOptions.SPARK_RPC_SSL_PASSWORD_FIELDS.exists( + field => opts.startsWith("-D" + field) + ) + )) } /** Spawn a thread that will redirect a given stream to a file */ diff --git a/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala b/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala index 812d57ac67cb5..54823b40552db 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala @@ -19,7 +19,7 @@ package org.apache.spark.network.netty import scala.jdk.CollectionConverters._ -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, SSLOptions} import org.apache.spark.network.util.{ConfigProvider, NettyUtils, TransportConf} /** @@ -29,6 +29,24 @@ import org.apache.spark.network.util.{ConfigProvider, NettyUtils, TransportConf} */ object SparkTransportConf { + /** + * Utility for creating a [[TransportConf]] from a [[SparkConf]]. + * @param _conf the [[SparkConf]] + * @param module the module name + * @param numUsableCores if nonzero, this will restrict the server and client threads to only + * use the given number of cores, rather than all of the machine's cores. + * This restriction will only occur if these properties are not already set. + * @param role optional role, could be driver, executor, worker and master. Default is + * [[None]], means no role specific configurations. + */ + def fromSparkConf( + _conf: SparkConf, + module: String, + numUsableCores: Int = 0, + role: Option[String] = None): TransportConf = { + SparkTransportConf.fromSparkConfWithSslOptions( + _conf, module, numUsableCores, role, sslOptions = None) + } /** * Utility for creating a [[TransportConf]] from a [[SparkConf]]. * @param _conf the [[SparkConf]] @@ -38,12 +56,14 @@ object SparkTransportConf { * This restriction will only occur if these properties are not already set. * @param role optional role, could be driver, executor, worker and master. Default is * [[None]], means no role specific configurations. + * @param sslOptions SSL config options */ - def fromSparkConf( + def fromSparkConfWithSslOptions( _conf: SparkConf, module: String, numUsableCores: Int = 0, - role: Option[String] = None): TransportConf = { + role: Option[String] = None, + sslOptions: Option[SSLOptions]): TransportConf = { val conf = _conf.clone // specify default thread configuration based on our JVM's allocation of cores (rather than // necessarily assuming we have all the machine's cores). @@ -57,12 +77,14 @@ object SparkTransportConf { conf.set(s"spark.$module.io.$suffix", value) } - new TransportConf(module, new ConfigProvider { + val configProvider = sslOptions.map(_.createConfigProvider(conf)).getOrElse({ + new ConfigProvider { override def get(name: String): String = conf.get(name) override def get(name: String, defaultValue: String): String = conf.get(name, defaultValue) override def getAll(): java.lang.Iterable[java.util.Map.Entry[String, String]] = { conf.getAll.toMap.asJava.entrySet() } - }) + }}) + new TransportConf(module, configProvider) } } diff --git a/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala b/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala index 81bc4ae9da02f..ee6bf071ef695 100644 --- a/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala @@ -31,6 +31,8 @@ class SSLOptionsSuite extends SparkFunSuite { test("test resolving property file as spark conf ") { val keyStorePath = new File(this.getClass.getResource("/keystore").toURI).getAbsolutePath val trustStorePath = new File(this.getClass.getResource("/truststore").toURI).getAbsolutePath + val privateKeyPath = new File(this.getClass.getResource("/key.pem").toURI).getAbsolutePath + val certChainPath = new File(this.getClass.getResource("/certchain.pem").toURI).getAbsolutePath // Pick two cipher suites that the provider knows about val sslContext = SSLContext.getInstance("TLSv1.2") @@ -47,8 +49,13 @@ class SSLOptionsSuite extends SparkFunSuite { conf.set("spark.ssl.keyStore", keyStorePath) conf.set("spark.ssl.keyStorePassword", "password") conf.set("spark.ssl.keyPassword", "password") + conf.set("spark.ssl.privateKey", privateKeyPath) + conf.set("spark.ssl.certChain", certChainPath) conf.set("spark.ssl.trustStore", trustStorePath) conf.set("spark.ssl.trustStorePassword", "password") + conf.set("spark.ssl.trustStoreReloadingEnabled", "false") + conf.set("spark.ssl.trustStoreReloadIntervalMs", "10000") + conf.set("spark.ssl.openSslEnabled", "false") conf.set("spark.ssl.enabledAlgorithms", algorithms.mkString(",")) conf.set("spark.ssl.protocol", "TLSv1.2") @@ -62,7 +69,16 @@ class SSLOptionsSuite extends SparkFunSuite { assert(opts.keyStore.get.getName === "keystore") assert(opts.keyStore.get.getAbsolutePath === keyStorePath) assert(opts.trustStorePassword === Some("password")) + assert(opts.trustStoreReloadingEnabled === false) + assert(opts.trustStoreReloadIntervalMs === 10000) + assert(opts.privateKey.isDefined === true) + assert(opts.privateKey.get.getName === "key.pem") + assert(opts.privateKey.get.getAbsolutePath === privateKeyPath) + assert(opts.certChain.isDefined === true) + assert(opts.certChain.get.getName === "certchain.pem") + assert(opts.certChain.get.getAbsolutePath === certChainPath) assert(opts.keyStorePassword === Some("password")) + assert(opts.openSslEnabled === false) assert(opts.keyPassword === Some("password")) assert(opts.protocol === Some("TLSv1.2")) assert(opts.enabledAlgorithms === algorithms) @@ -71,6 +87,8 @@ class SSLOptionsSuite extends SparkFunSuite { test("test resolving property with defaults specified ") { val keyStorePath = new File(this.getClass.getResource("/keystore").toURI).getAbsolutePath val trustStorePath = new File(this.getClass.getResource("/truststore").toURI).getAbsolutePath + val privateKeyPath = new File(this.getClass.getResource("/key.pem").toURI).getAbsolutePath + val certChainPath = new File(this.getClass.getResource("/certchain.pem").toURI).getAbsolutePath val conf = new SparkConf val hadoopConf = new Configuration() @@ -78,8 +96,13 @@ class SSLOptionsSuite extends SparkFunSuite { conf.set("spark.ssl.keyStore", keyStorePath) conf.set("spark.ssl.keyStorePassword", "password") conf.set("spark.ssl.keyPassword", "password") + conf.set("spark.ssl.privateKey", privateKeyPath) + conf.set("spark.ssl.certChain", certChainPath) conf.set("spark.ssl.trustStore", trustStorePath) conf.set("spark.ssl.trustStorePassword", "password") + conf.set("spark.ssl.trustStoreReloadingEnabled", "false") + conf.set("spark.ssl.trustStoreReloadIntervalMs", "10000") + conf.set("spark.ssl.openSslEnabled", "false") conf.set("spark.ssl.enabledAlgorithms", "TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA") conf.set("spark.ssl.protocol", "SSLv3") @@ -91,12 +114,22 @@ class SSLOptionsSuite extends SparkFunSuite { assert(opts.trustStore.isDefined) assert(opts.trustStore.get.getName === "truststore") assert(opts.trustStore.get.getAbsolutePath === trustStorePath) + assert(opts.privateKey.isDefined === true) + assert(opts.privateKey.get.getName === "key.pem") + assert(opts.privateKey.get.getAbsolutePath === privateKeyPath) + assert(opts.certChain.isDefined === true) + assert(opts.certChain.get.getName === "certchain.pem") + assert(opts.certChain.get.getAbsolutePath === certChainPath) assert(opts.keyStore.isDefined) assert(opts.keyStore.get.getName === "keystore") assert(opts.keyStore.get.getAbsolutePath === keyStorePath) assert(opts.trustStorePassword === Some("password")) assert(opts.keyStorePassword === Some("password")) assert(opts.keyPassword === Some("password")) + assert(opts.trustStorePassword === Some("password")) + assert(opts.trustStoreReloadingEnabled === false) + assert(opts.trustStoreReloadIntervalMs === 10000) + assert(opts.openSslEnabled === false) assert(opts.protocol === Some("SSLv3")) assert(opts.enabledAlgorithms === Set("TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA")) @@ -105,6 +138,8 @@ class SSLOptionsSuite extends SparkFunSuite { test("test whether defaults can be overridden ") { val keyStorePath = new File(this.getClass.getResource("/keystore").toURI).getAbsolutePath val trustStorePath = new File(this.getClass.getResource("/truststore").toURI).getAbsolutePath + val privateKeyPath = new File(this.getClass.getResource("/key.pem").toURI).getAbsolutePath + val certChainPath = new File(this.getClass.getResource("/certchain.pem").toURI).getAbsolutePath val conf = new SparkConf val hadoopConf = new Configuration() @@ -115,8 +150,13 @@ class SSLOptionsSuite extends SparkFunSuite { conf.set("spark.ssl.keyStorePassword", "password") conf.set("spark.ssl.ui.keyStorePassword", "12345") conf.set("spark.ssl.keyPassword", "password") + conf.set("spark.ssl.privateKey", privateKeyPath) + conf.set("spark.ssl.certChain", certChainPath) conf.set("spark.ssl.trustStore", trustStorePath) conf.set("spark.ssl.trustStorePassword", "password") + conf.set("spark.ssl.ui.trustStoreReloadingEnabled", "true") + conf.set("spark.ssl.ui.trustStoreReloadIntervalMs", "20000") + conf.set("spark.ssl.ui.openSslEnabled", "true") conf.set("spark.ssl.enabledAlgorithms", "TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA") conf.set("spark.ssl.ui.enabledAlgorithms", "ABC, DEF") @@ -130,16 +170,88 @@ class SSLOptionsSuite extends SparkFunSuite { assert(opts.trustStore.isDefined) assert(opts.trustStore.get.getName === "truststore") assert(opts.trustStore.get.getAbsolutePath === trustStorePath) + assert(opts.privateKey.isDefined === true) + assert(opts.privateKey.get.getName === "key.pem") + assert(opts.privateKey.get.getAbsolutePath === privateKeyPath) + assert(opts.certChain.isDefined === true) + assert(opts.certChain.get.getName === "certchain.pem") + assert(opts.certChain.get.getAbsolutePath === certChainPath) + assert(opts.keyStore.isDefined) + assert(opts.keyStore.get.getName === "keystore") + assert(opts.keyStore.get.getAbsolutePath === keyStorePath) + assert(opts.trustStorePassword === Some("password")) + assert(opts.keyStorePassword === Some("12345")) + assert(opts.keyPassword === Some("password")) + assert(opts.trustStoreReloadingEnabled === true) + assert(opts.trustStoreReloadIntervalMs === 20000) + assert(opts.openSslEnabled === true) + assert(opts.protocol === Some("SSLv3")) + assert(opts.enabledAlgorithms === Set("ABC", "DEF")) + } + + test("ensure RPC settings don't get enabled via inheritance ") { + val keyStorePath = new File(this.getClass.getResource("/keystore").toURI).getAbsolutePath + val trustStorePath = new File(this.getClass.getResource("/truststore").toURI).getAbsolutePath + val privateKeyPath = new File(this.getClass.getResource("/key.pem").toURI).getAbsolutePath + val certChainPath = new File(this.getClass.getResource("/certchain.pem").toURI).getAbsolutePath + + val conf = new SparkConf + val hadoopConf = new Configuration() + conf.set("spark.ssl.enabled", "true") + conf.set("spark.ssl.rpc.port", "4242") + conf.set("spark.ssl.keyStore", keyStorePath) + conf.set("spark.ssl.keyStorePassword", "password") + conf.set("spark.ssl.rpc.keyStorePassword", "12345") + conf.set("spark.ssl.keyPassword", "password") + conf.set("spark.ssl.privateKey", privateKeyPath) + conf.set("spark.ssl.certChain", certChainPath) + conf.set("spark.ssl.trustStore", trustStorePath) + conf.set("spark.ssl.trustStorePassword", "password") + conf.set("spark.ssl.rpc.trustStoreReloadingEnabled", "true") + conf.set("spark.ssl.rpc.trustStoreReloadIntervalMs", "20000") + conf.set("spark.ssl.rpc.openSslEnabled", "true") + conf.set("spark.ssl.enabledAlgorithms", + "TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA") + conf.set("spark.ssl.rpc.enabledAlgorithms", "ABC, DEF") + conf.set("spark.ssl.protocol", "SSLv3") + + val disabledDefaults = SSLOptions.parse(conf, hadoopConf, "spark.ssl", defaults = None) + val disabledOpts = SSLOptions.parse( + conf, hadoopConf, "spark.ssl.rpc", defaults = Some(disabledDefaults)) + + assert(disabledOpts.enabled === false) + assert(disabledOpts.port.isEmpty) + + // Now enable it and test again + conf.set("spark.ssl.rpc.enabled", "true") + val defaultOpts = SSLOptions.parse(conf, hadoopConf, "spark.ssl", defaults = None) + val opts = SSLOptions.parse(conf, hadoopConf, "spark.ssl.rpc", defaults = Some(defaultOpts)) + + assert(opts.enabled === true) + assert(opts.port === Some(4242)) + assert(opts.trustStore.isDefined) + assert(opts.trustStore.get.getName === "truststore") + assert(opts.trustStore.get.getAbsolutePath === trustStorePath) + assert(opts.privateKey.isDefined === true) + assert(opts.privateKey.get.getName === "key.pem") + assert(opts.privateKey.get.getAbsolutePath === privateKeyPath) + assert(opts.certChain.isDefined === true) + assert(opts.certChain.get.getName === "certchain.pem") + assert(opts.certChain.get.getAbsolutePath === certChainPath) assert(opts.keyStore.isDefined) assert(opts.keyStore.get.getName === "keystore") assert(opts.keyStore.get.getAbsolutePath === keyStorePath) assert(opts.trustStorePassword === Some("password")) assert(opts.keyStorePassword === Some("12345")) assert(opts.keyPassword === Some("password")) + assert(opts.trustStoreReloadingEnabled === true) + assert(opts.trustStoreReloadIntervalMs === 20000) + assert(opts.openSslEnabled === true) assert(opts.protocol === Some("SSLv3")) assert(opts.enabledAlgorithms === Set("ABC", "DEF")) } + test("SPARK-41719: Skip ssl sub-settings if ssl is disabled") { val keyStorePath = new File(this.getClass.getResource("/keystore").toURI).getAbsolutePath val conf = new SparkConf @@ -169,6 +281,21 @@ class SSLOptionsSuite extends SparkFunSuite { assert(opts.trustStore === Some(new File("val2"))) } + test("get passwords from environment") { + val conf = new SparkConfWithEnv(Map( + SSLOptions.ENV_RPC_SSL_KEY_PASSWORD -> "val1", + SSLOptions.ENV_RPC_SSL_KEY_STORE_PASSWORD -> "val2", + SSLOptions.ENV_RPC_SSL_TRUST_STORE_PASSWORD -> "val3")) + val hadoopConf = new Configuration() + + conf.set("spark.ssl.enabled", "true") + + val opts = SSLOptions.parse(conf, hadoopConf, "spark.ssl", defaults = None) + assert(opts.keyPassword === Some("val1")) + assert(opts.keyStorePassword === Some("val2")) + assert(opts.trustStorePassword === Some("val3")) + } + test("get password from Hadoop credential provider") { val keyStorePath = new File(this.getClass.getResource("/keystore").toURI).getAbsolutePath val trustStorePath = new File(this.getClass.getResource("/truststore").toURI).getAbsolutePath diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index a2d41b92e0849..5f7958167507d 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -341,7 +341,7 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst } } - test("SPARK-26998: SSL configuration not needed on executors") { + test("SPARK-26998: SSL passwords not needed on executors") { val conf = new SparkConf(false) conf.set("spark.ssl.enabled", "true") conf.set("spark.ssl.keyPassword", "password") @@ -349,7 +349,9 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst conf.set("spark.ssl.trustStorePassword", "password") val filtered = conf.getAll.filter { case (k, _) => SparkConf.isExecutorStartupConf(k) } - assert(filtered.isEmpty) + // Only the enabled flag should propagate + assert(filtered.length == 1) + assert(filtered(0)._1 == "spark.ssl.enabled") } test("SPARK-27244 toDebugString redacts sensitive information") { diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/CommandUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/CommandUtilsSuite.scala index 28e35bc8183ba..fc67de0f32209 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/CommandUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/CommandUtilsSuite.scala @@ -21,8 +21,9 @@ import org.scalatest.PrivateMethodTester import org.scalatest.matchers.must.Matchers import org.scalatest.matchers.should.Matchers._ -import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} +import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite, SSLOptions} import org.apache.spark.deploy.Command +import org.apache.spark.network.ssl.SslSampleConfigs import org.apache.spark.util.Utils class CommandUtilsSuite extends SparkFunSuite with Matchers with PrivateMethodTester { @@ -68,4 +69,30 @@ class CommandUtilsSuite extends SparkFunSuite with Matchers with PrivateMethodTe assert(!cmd.javaOpts.exists(_.startsWith("-D" + SecurityManager.SPARK_AUTH_SECRET_CONF))) assert(cmd.environment(SecurityManager.ENV_AUTH_SECRET) === secret) } + + test("SSL RPC passwords shouldn't appear in java opts") { + val buildLocalCommand = PrivateMethod[Command](Symbol("buildLocalCommand")) + val conf = new SparkConf + conf.set("spark.ssl.rpc.enabled", "true") + + // This sets passwords + val updatedConfigs = SslSampleConfigs.createDefaultConfigMapForRpcNamespace() + updatedConfigs.entrySet().forEach(entry => conf.set(entry.getKey, entry.getValue)) + + val secret = "This is the secret sauce" + val command = Command("mainClass", Seq(), Map(), Seq(), Seq("lib"), + SSLOptions.SPARK_RPC_SSL_PASSWORD_FIELDS.map( + field => "-D" + field + "=" + secret + )) + + conf.set(SecurityManager.SPARK_AUTH_CONF, "true") + val cmd = CommandUtils invokePrivate buildLocalCommand( + command, new SecurityManager(conf), (t: String) => t, Seq(), Map()) + SSLOptions.SPARK_RPC_SSL_PASSWORD_FIELDS.foreach( + field => assert(!cmd.javaOpts.exists(_.startsWith("-D" + field))) + ) + SSLOptions.SPARK_RPC_SSL_PASSWORD_ENVS.foreach( + env => assert(cmd.environment(env) === "password") + ) + } }