From 0d64bc452fbb57bcbeb2b25439a8f1573d486b96 Mon Sep 17 00:00:00 2001 From: Hasnain Lakhani Date: Thu, 5 Oct 2023 13:23:01 -0700 Subject: [PATCH 1/4] working --- .../scala/org/apache/spark/SSLOptions.scala | 132 +++++++++++++++++- .../org/apache/spark/SecurityManager.scala | 66 ++++++++- .../scala/org/apache/spark/SparkConf.scala | 4 +- .../spark/deploy/worker/CommandUtils.scala | 13 +- .../network/netty/SparkTransportConf.scala | 32 ++++- .../org/apache/spark/SSLOptionsSuite.scala | 127 +++++++++++++++++ .../org/apache/spark/SparkConfSuite.scala | 6 +- .../deploy/worker/CommandUtilsSuite.scala | 29 +++- 8 files changed, 386 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SSLOptions.scala b/core/src/main/scala/org/apache/spark/SSLOptions.scala index d159f5717b090..1a3f04c0b7ee2 100644 --- a/core/src/main/scala/org/apache/spark/SSLOptions.scala +++ b/core/src/main/scala/org/apache/spark/SSLOptions.scala @@ -19,12 +19,16 @@ package org.apache.spark import java.io.File import java.security.NoSuchAlgorithmException +import java.util.HashMap +import java.util.Map import javax.net.ssl.SSLContext import org.apache.hadoop.conf.Configuration import org.eclipse.jetty.util.ssl.SslContextFactory import org.apache.spark.internal.Logging +import org.apache.spark.network.util.ConfigProvider +import org.apache.spark.network.util.MapConfigProvider /** * SSLOptions class is a common container for SSL configuration options. It offers methods to @@ -33,34 +37,53 @@ import org.apache.spark.internal.Logging * SSLOptions is intended to provide the maximum common set of SSL settings, which are supported * by the protocol, which it can generate the configuration for. * + * @param namespace the configuration namespace * @param enabled enables or disables SSL; if it is set to false, the rest of the * settings are disregarded * @param port the port where to bind the SSL server; if not defined, it will be * based on the non-SSL port for the same service. * @param keyStore a path to the key-store file * @param keyStorePassword a password to access the key-store file + * @param privateKey a PKCS#8 private key file in PEM format * @param keyPassword a password to access the private key in the key-store * @param keyStoreType the type of the key-store * @param needClientAuth set true if SSL needs client authentication + * @param certChain an X.509 certificate chain file in PEM format * @param trustStore a path to the trust-store file * @param trustStorePassword a password to access the trust-store file * @param trustStoreType the type of the trust-store + * @param trustStoreReloadingEnabled enables or disables using a trust-store that reloads + * its configuration when the trust-store file on disk changes + * @param trustStoreReloadIntervalMs the interval, in milliseconds, + * when the trust-store will reload its configuration + * @param openSslEnabled enables or disables using an OpenSSL implementation (if available), + * requires certChain and keyFile arguments * @param protocol SSL protocol (remember that SSLv3 was compromised) supported by Java * @param enabledAlgorithms a set of encryption algorithms that may be used + * @param dangerouslyFallbackIfKeysNotPresent If SSL is enabled but key files are not present, + * fall back to unencrypted communication. This is an + * advanced option and not recommended for normal use. */ private[spark] case class SSLOptions( + namespace: Option[String] = None, enabled: Boolean = false, port: Option[Int] = None, keyStore: Option[File] = None, keyStorePassword: Option[String] = None, + privateKey: Option[File] = None, keyPassword: Option[String] = None, keyStoreType: Option[String] = None, needClientAuth: Boolean = false, + certChain: Option[File] = None, trustStore: Option[File] = None, trustStorePassword: Option[String] = None, trustStoreType: Option[String] = None, + trustStoreReloadingEnabled: Boolean = false, + trustStoreReloadIntervalMs: Int = 10000, + openSslEnabled: Boolean = false, protocol: Option[String] = None, - enabledAlgorithms: Set[String] = Set.empty) + enabledAlgorithms: Set[String] = Set.empty, + dangerouslyFallbackIfKeysNotPresent: Boolean = false) extends Logging { /** @@ -134,12 +157,44 @@ private[spark] case class SSLOptions( supported } + /** + * + * @return + */ + def createConfigProvider(conf: SparkConf): ConfigProvider = { + val nsp = namespace.getOrElse("spark.ssl") + val confMap: Map[String, String] = new HashMap[String, String] + conf.getAll.foreach(tuple => confMap.put(tuple._1, tuple._2)) + confMap.put(s"$nsp.enabled", enabled.toString) + confMap.put(s"$nsp.trustStoreReloadingEnabled", trustStoreReloadingEnabled.toString) + confMap.put(s"$nsp.openSslEnabled", openSslEnabled.toString) + confMap.put(s"$nsp.trustStoreReloadIntervalMs", trustStoreReloadIntervalMs.toString) + keyStore.map(_.getAbsolutePath).foreach(confMap.put(s"$nsp.keyStore", _)) + keyStorePassword.foreach(confMap.put(s"$nsp.keyStorePassword", _)) + privateKey.map(_.getAbsolutePath).foreach(confMap.put(s"$nsp.privateKey", _)) + keyPassword.foreach(confMap.put(s"$nsp.keyPassword", _)) + certChain.map(_.getAbsolutePath).foreach(confMap.put(s"$nsp.certChain", _)) + trustStore.map(_.getAbsolutePath).foreach(confMap.put(s"$nsp.trustStore", _)) + trustStorePassword.foreach(confMap.put(s"$nsp.trustStorePassword", _)) + protocol.foreach(confMap.put(s"$nsp.protocol", _)) + confMap.put(s"$nsp.enabledAlgorithms", enabledAlgorithms.mkString(",")) + confMap.put( + s"$nsp.dangerouslyFallbackIfKeysNotPresent", dangerouslyFallbackIfKeysNotPresent.toString) + + new MapConfigProvider(confMap) + } + /** Returns a string representation of this SSLOptions with all the passwords masked. */ override def toString: String = s"SSLOptions{enabled=$enabled, port=$port, " + s"keyStore=$keyStore, keyStorePassword=${keyStorePassword.map(_ => "xxx")}, " + - s"trustStore=$trustStore, trustStorePassword=${trustStorePassword.map(_ => "xxx")}, " + - s"protocol=$protocol, enabledAlgorithms=$enabledAlgorithms}" - + s"privateKey=$privateKey, keyPassword=${keyPassword.map(_ => "xxx")}, " + + s"keyStoreType=$keyStoreType, needClientAuth=$needClientAuth, " + + s"certChain=$certChain, trustStore=$trustStore, " + + s"trustStorePassword=${trustStorePassword.map(_ => "xxx")}, " + + s"trustStoreReloadIntervalMs=$trustStoreReloadIntervalMs, " + + s"trustStoreReloadingEnabled=$trustStoreReloadingEnabled, openSSLEnabled=$openSslEnabled, " + + s"protocol=$protocol, enabledAlgorithms=$enabledAlgorithms}, " + + s"dangerouslyFallbackIfKeysNotPresent=$dangerouslyFallbackIfKeysNotPresent" } private[spark] object SSLOptions extends Logging { @@ -152,15 +207,25 @@ private[spark] object SSLOptions extends Logging { * $ - `[ns].port` - the port where to bind the SSL server * $ - `[ns].keyStore` - a path to the key-store file; can be relative to the current directory * $ - `[ns].keyStorePassword` - a password to the key-store file + * $ - `[ns].privateKey` - a PKCS#8 private key file in PEM format * $ - `[ns].keyPassword` - a password to the private key * $ - `[ns].keyStoreType` - the type of the key-store * $ - `[ns].needClientAuth` - whether SSL needs client authentication + * $ - `[ns].certChain` - an X.509 certificate chain file in PEM format * $ - `[ns].trustStore` - a path to the trust-store file; can be relative to the current * directory * $ - `[ns].trustStorePassword` - a password to the trust-store file * $ - `[ns].trustStoreType` - the type of trust-store + * $ - `[ns].trustStoreReloadingEnabled` - enables or disables using a trust-store + * that reloads its configuration when the trust-store file on disk changes + * $ - `[ns].trustStoreReloadIntervalMs` - the interval, in milliseconds, the + * trust-store will reload its configuration + * $ - `[ns].openSslEnabled` - enables or disables using an OpenSSL implementation + * (if available on host system), requires certChain and keyFile arguments * $ - `[ns].protocol` - a protocol name supported by a particular Java version * $ - `[ns].enabledAlgorithms` - a comma separated list of ciphers + * $ - `[ns].dangerouslyFallbackIfKeysNotPresent` - whether to fallback to unencrypted + * communication if keys are not present. * * For a list of protocols and ciphers supported by particular Java versions, you may go to * @@ -180,7 +245,15 @@ private[spark] object SSLOptions extends Logging { hadoopConf: Configuration, ns: String, defaults: Option[SSLOptions] = None): SSLOptions = { - val enabled = conf.getBoolean(s"$ns.enabled", defaultValue = defaults.exists(_.enabled)) + + // RPC does not inherit the default enabled setting due to backwards compatibility reasons + val enabledDefault = if (ns == "spark.ssl.rpc") { + false + } else { + defaults.exists(_.enabled) + } + + val enabled = conf.getBoolean(s"$ns.enabled", defaultValue = enabledDefault) if (!enabled) { return new SSLOptions() } @@ -194,15 +267,23 @@ private[spark] object SSLOptions extends Logging { val keyStorePassword = conf.getWithSubstitution(s"$ns.keyStorePassword") .orElse(Option(hadoopConf.getPassword(s"$ns.keyStorePassword")).map(new String(_))) + .orElse(Option(conf.getenv(ENV_RPC_SSL_KEY_STORE_PASSWORD)).filter(_.trim.nonEmpty)) .orElse(defaults.flatMap(_.keyStorePassword)) + val privateKey = conf.getOption(s"$ns.privateKey").map(new File(_)) + .orElse(defaults.flatMap(_.privateKey)) + val keyPassword = conf.getWithSubstitution(s"$ns.keyPassword") .orElse(Option(hadoopConf.getPassword(s"$ns.keyPassword")).map(new String(_))) + .orElse(Option(conf.getenv(ENV_RPC_SSL_KEY_PASSWORD)).filter(_.trim.nonEmpty)) .orElse(defaults.flatMap(_.keyPassword)) val keyStoreType = conf.getWithSubstitution(s"$ns.keyStoreType") .orElse(defaults.flatMap(_.keyStoreType)) + val certChain = conf.getOption(s"$ns.certChain").map(new File(_)) + .orElse(defaults.flatMap(_.certChain)) + val needClientAuth = conf.getBoolean(s"$ns.needClientAuth", defaultValue = defaults.exists(_.needClientAuth)) @@ -211,11 +292,21 @@ private[spark] object SSLOptions extends Logging { val trustStorePassword = conf.getWithSubstitution(s"$ns.trustStorePassword") .orElse(Option(hadoopConf.getPassword(s"$ns.trustStorePassword")).map(new String(_))) + .orElse(Option(conf.getenv(ENV_RPC_SSL_TRUST_STORE_PASSWORD)).filter(_.trim.nonEmpty)) .orElse(defaults.flatMap(_.trustStorePassword)) val trustStoreType = conf.getWithSubstitution(s"$ns.trustStoreType") .orElse(defaults.flatMap(_.trustStoreType)) + val trustStoreReloadingEnabled = conf.getBoolean(s"$ns.trustStoreReloadingEnabled", + defaultValue = defaults.exists(_.trustStoreReloadingEnabled)) + + val trustStoreReloadIntervalMs = conf.getInt(s"$ns.trustStoreReloadIntervalMs", + defaultValue = defaults.map(_.trustStoreReloadIntervalMs).getOrElse(10000)) + + val openSslEnabled = conf.getBoolean(s"$ns.openSslEnabled", + defaultValue = defaults.exists(_.openSslEnabled)) + val protocol = conf.getWithSubstitution(s"$ns.protocol") .orElse(defaults.flatMap(_.protocol)) @@ -224,20 +315,49 @@ private[spark] object SSLOptions extends Logging { .orElse(defaults.map(_.enabledAlgorithms)) .getOrElse(Set.empty) + val dangerouslyFallbackIfKeysNotPresent = + conf.getBoolean(s"$ns.dangerouslyFallbackIfKeysNotPresent", + defaultValue = defaults.exists(_.dangerouslyFallbackIfKeysNotPresent)) + new SSLOptions( + Some(ns), enabled, port, keyStore, keyStorePassword, + privateKey, keyPassword, keyStoreType, needClientAuth, + certChain, trustStore, trustStorePassword, trustStoreType, + trustStoreReloadingEnabled, + trustStoreReloadIntervalMs, + openSslEnabled, protocol, - enabledAlgorithms) + enabledAlgorithms, + dangerouslyFallbackIfKeysNotPresent) } + // Config names and environment variables for propagating SSL passwords + val SPARK_RPC_SSL_KEY_PASSWORD_CONF = "spark.ssl.rpc.keyPassword" + val SPARK_RPC_SSL_KEY_STORE_PASSWORD_CONF = "spark.ssl.rpc.keyStorePassword" + val SPARK_RPC_SSL_TRUST_STORE_PASSWORD_CONF = "spark.ssl.rpc.trustStorePassword" + val SPARK_RPC_SSL_PASSWORD_FIELDS: Seq[String] = Seq( + SPARK_RPC_SSL_KEY_PASSWORD_CONF, + SPARK_RPC_SSL_KEY_STORE_PASSWORD_CONF, + SPARK_RPC_SSL_TRUST_STORE_PASSWORD_CONF + ) + + val ENV_RPC_SSL_KEY_PASSWORD = "_SPARK_SSL_RPC_KEY_PASSWORD" + val ENV_RPC_SSL_KEY_STORE_PASSWORD = "_SPARK_SSL_RPC_KEY_STORE_PASSWORD" + val ENV_RPC_SSL_TRUST_STORE_PASSWORD = "_SPARK_SSL_RPC_TRUST_STORE_PASSWORD" + val SPARK_RPC_SSL_PASSWORD_ENVS: Seq[String] = Seq( + ENV_RPC_SSL_KEY_PASSWORD, + ENV_RPC_SSL_KEY_STORE_PASSWORD, + ENV_RPC_SSL_TRUST_STORE_PASSWORD + ) } diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala index 7e72ae8d89e37..d63773d413949 100644 --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala @@ -85,6 +85,10 @@ private[spark] class SecurityManager( setModifyAclsGroups(sparkConf.get(MODIFY_ACLS_GROUPS)) private var secretKey: String = _ + + private val isSslRPCEnabled = sparkConf.getBoolean( + "spark.ssl.rpc.enabled", false) + logInfo("SecurityManager: authentication " + (if (authOn) "enabled" else "disabled") + "; ui acls " + (if (aclsOn) "enabled" else "disabled") + "; users with view permissions: " + @@ -94,12 +98,15 @@ private[spark] class SecurityManager( "; users with modify permissions: " + (if (modifyAcls.nonEmpty) modifyAcls.mkString(", ") else "EMPTY") + "; groups with modify permissions: " + - (if (modifyAclsGroups.nonEmpty) modifyAclsGroups.mkString(", ") else "EMPTY")) + (if (modifyAclsGroups.nonEmpty) modifyAclsGroups.mkString(", ") else "EMPTY") + + "; RPC SSL " + (if (isSslRPCEnabled) "enabled" else "disabled")) private val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf) // the default SSL configuration - it will be used by all communication layers unless overwritten private val defaultSSLOptions = SSLOptions.parse(sparkConf, hadoopConf, "spark.ssl", defaults = None) + // the SSL configuration for RPCs + private val rpcSSLOptions = getSSLOptions("rpc") def getSSLOptions(module: String): SSLOptions = { val opts = @@ -258,18 +265,46 @@ private[spark] class SecurityManager( isUserInACL(user, modifyAcls, modifyAclsGroups) } + /** + * Check to see if authentication is enabled and SSL for RPC is disabled (as they are mutually + * exclusive) + * @return Whether authentication is enabled and SSL for RPC is disabled + */ + private def isAuthenticationEnabledAndSslRpcDisabled(): Boolean = { + if (sparkConf.get(NETWORK_AUTH_ENABLED)) { + if (rpcSSLOptions.enabled) { + logWarning("Network auth disabled as RPC SSL encryption is enabled") + false + } else { + true + } + } else { + false + } + } + + /** * Check to see if authentication for the Spark communication protocols is enabled * @return true if authentication is enabled, otherwise false */ - def isAuthenticationEnabled(): Boolean = authOn + def isAuthenticationEnabled(): Boolean = isAuthenticationEnabledAndSslRpcDisabled /** * Checks whether network encryption should be enabled. * @return Whether to enable encryption when connecting to services that support it. */ def isEncryptionEnabled(): Boolean = { - sparkConf.get(Network.NETWORK_CRYPTO_ENABLED) || sparkConf.get(SASL_ENCRYPTION_ENABLED) + if (sparkConf.get(Network.NETWORK_CRYPTO_ENABLED) || sparkConf.get(SASL_ENCRYPTION_ENABLED)) { + if (rpcSSLOptions.enabled) { + logWarning("Network encryption disabled as RPC SSL encryption is enabled") + false + } else { + true + } + } else { + false + } } /** @@ -284,7 +319,7 @@ private[spark] class SecurityManager( * @return the secret key as a String if authentication is enabled, otherwise returns null */ def getSecretKey(): String = { - if (isAuthenticationEnabled) { + if (authOn) { val creds = UserGroupInformation.getCurrentUser().getCredentials() Option(creds.getSecretKey(SECRET_LOOKUP_KEY)) .map { bytes => new String(bytes, UTF_8) } @@ -318,7 +353,7 @@ private[spark] class SecurityManager( def initializeAuth(): Unit = { import SparkMasterRegex._ - if (!sparkConf.get(NETWORK_AUTH_ENABLED)) { + if (!authOn) { return } @@ -391,6 +426,27 @@ private[spark] class SecurityManager( // Default SecurityManager only has a single secret key, so ignore appId. override def getSaslUser(appId: String): String = getSaslUser() override def getSecretKey(appId: String): String = getSecretKey() + + /** + * If the RPC SSL settings are enabled, returns a map containing the password + * values so they can be passed to executors or other subprocesses. + * + * @return Map containing environment variables to pass + */ + def getEnvironmentForSslRpcPasswords: Map[String, String] = { + if (rpcSSLOptions.enabled) { + val map = scala.collection.mutable.Map[String, String]() + rpcSSLOptions.keyPassword.foreach(password => + map += (SSLOptions.ENV_RPC_SSL_KEY_PASSWORD -> password)) + rpcSSLOptions.keyStorePassword.foreach(password => + map += (SSLOptions.ENV_RPC_SSL_KEY_STORE_PASSWORD -> password)) + rpcSSLOptions.trustStorePassword.foreach(password => + map += (SSLOptions.ENV_RPC_SSL_TRUST_STORE_PASSWORD -> password)) + map.toMap + } else { + Map() + } + } } private[spark] object SecurityManager { diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 091413bb9cc92..b688604beead6 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -739,6 +739,9 @@ private[spark] object SparkConf extends Logging { (name.startsWith("spark.auth") && name != SecurityManager.SPARK_AUTH_SECRET_CONF) || name.startsWith("spark.rpc") || name.startsWith("spark.network") || + // We need SSL configs to propagate as they may be needed for RPCs. + // Passwords are propagated separately though. + (name.startsWith("spark.ssl") && !name.contains("Password")) || isSparkPortConf(name) } @@ -804,5 +807,4 @@ private[spark] object SparkConf extends Logging { key: String, version: String, translation: String => String = null) - } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala index 9a4a037e35c66..d689eb02cc53f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala @@ -22,7 +22,7 @@ import java.io.{File, FileOutputStream, InputStream, IOException} import scala.collection.Map import scala.jdk.CollectionConverters._ -import org.apache.spark.SecurityManager +import org.apache.spark.{SecurityManager, SSLOptions} import org.apache.spark.deploy.Command import org.apache.spark.internal.Logging import org.apache.spark.launcher.WorkerCommandBuilder @@ -90,6 +90,8 @@ object CommandUtils extends Logging { if (securityMgr.isAuthenticationEnabled) { newEnvironment += (SecurityManager.ENV_AUTH_SECRET -> securityMgr.getSecretKey) } + // set SSL env variables if needed + newEnvironment ++= securityMgr.getEnvironmentForSslRpcPasswords Command( command.mainClass, @@ -97,8 +99,13 @@ object CommandUtils extends Logging { newEnvironment, command.classPathEntries ++ classPath, Seq.empty, // library path already captured in environment variable - // filter out auth secret from java options - command.javaOpts.filterNot(_.startsWith("-D" + SecurityManager.SPARK_AUTH_SECRET_CONF))) + // filter out secrets from java options + command.javaOpts.filterNot(opts => + opts.startsWith("-D" + SecurityManager.SPARK_AUTH_SECRET_CONF) || + SSLOptions.SPARK_RPC_SSL_PASSWORD_FIELDS.exists( + field => opts.startsWith("-D" + field) + ) + )) } /** Spawn a thread that will redirect a given stream to a file */ diff --git a/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala b/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala index 812d57ac67cb5..54823b40552db 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala @@ -19,7 +19,7 @@ package org.apache.spark.network.netty import scala.jdk.CollectionConverters._ -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, SSLOptions} import org.apache.spark.network.util.{ConfigProvider, NettyUtils, TransportConf} /** @@ -29,6 +29,24 @@ import org.apache.spark.network.util.{ConfigProvider, NettyUtils, TransportConf} */ object SparkTransportConf { + /** + * Utility for creating a [[TransportConf]] from a [[SparkConf]]. + * @param _conf the [[SparkConf]] + * @param module the module name + * @param numUsableCores if nonzero, this will restrict the server and client threads to only + * use the given number of cores, rather than all of the machine's cores. + * This restriction will only occur if these properties are not already set. + * @param role optional role, could be driver, executor, worker and master. Default is + * [[None]], means no role specific configurations. + */ + def fromSparkConf( + _conf: SparkConf, + module: String, + numUsableCores: Int = 0, + role: Option[String] = None): TransportConf = { + SparkTransportConf.fromSparkConfWithSslOptions( + _conf, module, numUsableCores, role, sslOptions = None) + } /** * Utility for creating a [[TransportConf]] from a [[SparkConf]]. * @param _conf the [[SparkConf]] @@ -38,12 +56,14 @@ object SparkTransportConf { * This restriction will only occur if these properties are not already set. * @param role optional role, could be driver, executor, worker and master. Default is * [[None]], means no role specific configurations. + * @param sslOptions SSL config options */ - def fromSparkConf( + def fromSparkConfWithSslOptions( _conf: SparkConf, module: String, numUsableCores: Int = 0, - role: Option[String] = None): TransportConf = { + role: Option[String] = None, + sslOptions: Option[SSLOptions]): TransportConf = { val conf = _conf.clone // specify default thread configuration based on our JVM's allocation of cores (rather than // necessarily assuming we have all the machine's cores). @@ -57,12 +77,14 @@ object SparkTransportConf { conf.set(s"spark.$module.io.$suffix", value) } - new TransportConf(module, new ConfigProvider { + val configProvider = sslOptions.map(_.createConfigProvider(conf)).getOrElse({ + new ConfigProvider { override def get(name: String): String = conf.get(name) override def get(name: String, defaultValue: String): String = conf.get(name, defaultValue) override def getAll(): java.lang.Iterable[java.util.Map.Entry[String, String]] = { conf.getAll.toMap.asJava.entrySet() } - }) + }}) + new TransportConf(module, configProvider) } } diff --git a/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala b/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala index 81bc4ae9da02f..ee6bf071ef695 100644 --- a/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala @@ -31,6 +31,8 @@ class SSLOptionsSuite extends SparkFunSuite { test("test resolving property file as spark conf ") { val keyStorePath = new File(this.getClass.getResource("/keystore").toURI).getAbsolutePath val trustStorePath = new File(this.getClass.getResource("/truststore").toURI).getAbsolutePath + val privateKeyPath = new File(this.getClass.getResource("/key.pem").toURI).getAbsolutePath + val certChainPath = new File(this.getClass.getResource("/certchain.pem").toURI).getAbsolutePath // Pick two cipher suites that the provider knows about val sslContext = SSLContext.getInstance("TLSv1.2") @@ -47,8 +49,13 @@ class SSLOptionsSuite extends SparkFunSuite { conf.set("spark.ssl.keyStore", keyStorePath) conf.set("spark.ssl.keyStorePassword", "password") conf.set("spark.ssl.keyPassword", "password") + conf.set("spark.ssl.privateKey", privateKeyPath) + conf.set("spark.ssl.certChain", certChainPath) conf.set("spark.ssl.trustStore", trustStorePath) conf.set("spark.ssl.trustStorePassword", "password") + conf.set("spark.ssl.trustStoreReloadingEnabled", "false") + conf.set("spark.ssl.trustStoreReloadIntervalMs", "10000") + conf.set("spark.ssl.openSslEnabled", "false") conf.set("spark.ssl.enabledAlgorithms", algorithms.mkString(",")) conf.set("spark.ssl.protocol", "TLSv1.2") @@ -62,7 +69,16 @@ class SSLOptionsSuite extends SparkFunSuite { assert(opts.keyStore.get.getName === "keystore") assert(opts.keyStore.get.getAbsolutePath === keyStorePath) assert(opts.trustStorePassword === Some("password")) + assert(opts.trustStoreReloadingEnabled === false) + assert(opts.trustStoreReloadIntervalMs === 10000) + assert(opts.privateKey.isDefined === true) + assert(opts.privateKey.get.getName === "key.pem") + assert(opts.privateKey.get.getAbsolutePath === privateKeyPath) + assert(opts.certChain.isDefined === true) + assert(opts.certChain.get.getName === "certchain.pem") + assert(opts.certChain.get.getAbsolutePath === certChainPath) assert(opts.keyStorePassword === Some("password")) + assert(opts.openSslEnabled === false) assert(opts.keyPassword === Some("password")) assert(opts.protocol === Some("TLSv1.2")) assert(opts.enabledAlgorithms === algorithms) @@ -71,6 +87,8 @@ class SSLOptionsSuite extends SparkFunSuite { test("test resolving property with defaults specified ") { val keyStorePath = new File(this.getClass.getResource("/keystore").toURI).getAbsolutePath val trustStorePath = new File(this.getClass.getResource("/truststore").toURI).getAbsolutePath + val privateKeyPath = new File(this.getClass.getResource("/key.pem").toURI).getAbsolutePath + val certChainPath = new File(this.getClass.getResource("/certchain.pem").toURI).getAbsolutePath val conf = new SparkConf val hadoopConf = new Configuration() @@ -78,8 +96,13 @@ class SSLOptionsSuite extends SparkFunSuite { conf.set("spark.ssl.keyStore", keyStorePath) conf.set("spark.ssl.keyStorePassword", "password") conf.set("spark.ssl.keyPassword", "password") + conf.set("spark.ssl.privateKey", privateKeyPath) + conf.set("spark.ssl.certChain", certChainPath) conf.set("spark.ssl.trustStore", trustStorePath) conf.set("spark.ssl.trustStorePassword", "password") + conf.set("spark.ssl.trustStoreReloadingEnabled", "false") + conf.set("spark.ssl.trustStoreReloadIntervalMs", "10000") + conf.set("spark.ssl.openSslEnabled", "false") conf.set("spark.ssl.enabledAlgorithms", "TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA") conf.set("spark.ssl.protocol", "SSLv3") @@ -91,12 +114,22 @@ class SSLOptionsSuite extends SparkFunSuite { assert(opts.trustStore.isDefined) assert(opts.trustStore.get.getName === "truststore") assert(opts.trustStore.get.getAbsolutePath === trustStorePath) + assert(opts.privateKey.isDefined === true) + assert(opts.privateKey.get.getName === "key.pem") + assert(opts.privateKey.get.getAbsolutePath === privateKeyPath) + assert(opts.certChain.isDefined === true) + assert(opts.certChain.get.getName === "certchain.pem") + assert(opts.certChain.get.getAbsolutePath === certChainPath) assert(opts.keyStore.isDefined) assert(opts.keyStore.get.getName === "keystore") assert(opts.keyStore.get.getAbsolutePath === keyStorePath) assert(opts.trustStorePassword === Some("password")) assert(opts.keyStorePassword === Some("password")) assert(opts.keyPassword === Some("password")) + assert(opts.trustStorePassword === Some("password")) + assert(opts.trustStoreReloadingEnabled === false) + assert(opts.trustStoreReloadIntervalMs === 10000) + assert(opts.openSslEnabled === false) assert(opts.protocol === Some("SSLv3")) assert(opts.enabledAlgorithms === Set("TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA")) @@ -105,6 +138,8 @@ class SSLOptionsSuite extends SparkFunSuite { test("test whether defaults can be overridden ") { val keyStorePath = new File(this.getClass.getResource("/keystore").toURI).getAbsolutePath val trustStorePath = new File(this.getClass.getResource("/truststore").toURI).getAbsolutePath + val privateKeyPath = new File(this.getClass.getResource("/key.pem").toURI).getAbsolutePath + val certChainPath = new File(this.getClass.getResource("/certchain.pem").toURI).getAbsolutePath val conf = new SparkConf val hadoopConf = new Configuration() @@ -115,8 +150,13 @@ class SSLOptionsSuite extends SparkFunSuite { conf.set("spark.ssl.keyStorePassword", "password") conf.set("spark.ssl.ui.keyStorePassword", "12345") conf.set("spark.ssl.keyPassword", "password") + conf.set("spark.ssl.privateKey", privateKeyPath) + conf.set("spark.ssl.certChain", certChainPath) conf.set("spark.ssl.trustStore", trustStorePath) conf.set("spark.ssl.trustStorePassword", "password") + conf.set("spark.ssl.ui.trustStoreReloadingEnabled", "true") + conf.set("spark.ssl.ui.trustStoreReloadIntervalMs", "20000") + conf.set("spark.ssl.ui.openSslEnabled", "true") conf.set("spark.ssl.enabledAlgorithms", "TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA") conf.set("spark.ssl.ui.enabledAlgorithms", "ABC, DEF") @@ -130,16 +170,88 @@ class SSLOptionsSuite extends SparkFunSuite { assert(opts.trustStore.isDefined) assert(opts.trustStore.get.getName === "truststore") assert(opts.trustStore.get.getAbsolutePath === trustStorePath) + assert(opts.privateKey.isDefined === true) + assert(opts.privateKey.get.getName === "key.pem") + assert(opts.privateKey.get.getAbsolutePath === privateKeyPath) + assert(opts.certChain.isDefined === true) + assert(opts.certChain.get.getName === "certchain.pem") + assert(opts.certChain.get.getAbsolutePath === certChainPath) + assert(opts.keyStore.isDefined) + assert(opts.keyStore.get.getName === "keystore") + assert(opts.keyStore.get.getAbsolutePath === keyStorePath) + assert(opts.trustStorePassword === Some("password")) + assert(opts.keyStorePassword === Some("12345")) + assert(opts.keyPassword === Some("password")) + assert(opts.trustStoreReloadingEnabled === true) + assert(opts.trustStoreReloadIntervalMs === 20000) + assert(opts.openSslEnabled === true) + assert(opts.protocol === Some("SSLv3")) + assert(opts.enabledAlgorithms === Set("ABC", "DEF")) + } + + test("ensure RPC settings don't get enabled via inheritance ") { + val keyStorePath = new File(this.getClass.getResource("/keystore").toURI).getAbsolutePath + val trustStorePath = new File(this.getClass.getResource("/truststore").toURI).getAbsolutePath + val privateKeyPath = new File(this.getClass.getResource("/key.pem").toURI).getAbsolutePath + val certChainPath = new File(this.getClass.getResource("/certchain.pem").toURI).getAbsolutePath + + val conf = new SparkConf + val hadoopConf = new Configuration() + conf.set("spark.ssl.enabled", "true") + conf.set("spark.ssl.rpc.port", "4242") + conf.set("spark.ssl.keyStore", keyStorePath) + conf.set("spark.ssl.keyStorePassword", "password") + conf.set("spark.ssl.rpc.keyStorePassword", "12345") + conf.set("spark.ssl.keyPassword", "password") + conf.set("spark.ssl.privateKey", privateKeyPath) + conf.set("spark.ssl.certChain", certChainPath) + conf.set("spark.ssl.trustStore", trustStorePath) + conf.set("spark.ssl.trustStorePassword", "password") + conf.set("spark.ssl.rpc.trustStoreReloadingEnabled", "true") + conf.set("spark.ssl.rpc.trustStoreReloadIntervalMs", "20000") + conf.set("spark.ssl.rpc.openSslEnabled", "true") + conf.set("spark.ssl.enabledAlgorithms", + "TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA") + conf.set("spark.ssl.rpc.enabledAlgorithms", "ABC, DEF") + conf.set("spark.ssl.protocol", "SSLv3") + + val disabledDefaults = SSLOptions.parse(conf, hadoopConf, "spark.ssl", defaults = None) + val disabledOpts = SSLOptions.parse( + conf, hadoopConf, "spark.ssl.rpc", defaults = Some(disabledDefaults)) + + assert(disabledOpts.enabled === false) + assert(disabledOpts.port.isEmpty) + + // Now enable it and test again + conf.set("spark.ssl.rpc.enabled", "true") + val defaultOpts = SSLOptions.parse(conf, hadoopConf, "spark.ssl", defaults = None) + val opts = SSLOptions.parse(conf, hadoopConf, "spark.ssl.rpc", defaults = Some(defaultOpts)) + + assert(opts.enabled === true) + assert(opts.port === Some(4242)) + assert(opts.trustStore.isDefined) + assert(opts.trustStore.get.getName === "truststore") + assert(opts.trustStore.get.getAbsolutePath === trustStorePath) + assert(opts.privateKey.isDefined === true) + assert(opts.privateKey.get.getName === "key.pem") + assert(opts.privateKey.get.getAbsolutePath === privateKeyPath) + assert(opts.certChain.isDefined === true) + assert(opts.certChain.get.getName === "certchain.pem") + assert(opts.certChain.get.getAbsolutePath === certChainPath) assert(opts.keyStore.isDefined) assert(opts.keyStore.get.getName === "keystore") assert(opts.keyStore.get.getAbsolutePath === keyStorePath) assert(opts.trustStorePassword === Some("password")) assert(opts.keyStorePassword === Some("12345")) assert(opts.keyPassword === Some("password")) + assert(opts.trustStoreReloadingEnabled === true) + assert(opts.trustStoreReloadIntervalMs === 20000) + assert(opts.openSslEnabled === true) assert(opts.protocol === Some("SSLv3")) assert(opts.enabledAlgorithms === Set("ABC", "DEF")) } + test("SPARK-41719: Skip ssl sub-settings if ssl is disabled") { val keyStorePath = new File(this.getClass.getResource("/keystore").toURI).getAbsolutePath val conf = new SparkConf @@ -169,6 +281,21 @@ class SSLOptionsSuite extends SparkFunSuite { assert(opts.trustStore === Some(new File("val2"))) } + test("get passwords from environment") { + val conf = new SparkConfWithEnv(Map( + SSLOptions.ENV_RPC_SSL_KEY_PASSWORD -> "val1", + SSLOptions.ENV_RPC_SSL_KEY_STORE_PASSWORD -> "val2", + SSLOptions.ENV_RPC_SSL_TRUST_STORE_PASSWORD -> "val3")) + val hadoopConf = new Configuration() + + conf.set("spark.ssl.enabled", "true") + + val opts = SSLOptions.parse(conf, hadoopConf, "spark.ssl", defaults = None) + assert(opts.keyPassword === Some("val1")) + assert(opts.keyStorePassword === Some("val2")) + assert(opts.trustStorePassword === Some("val3")) + } + test("get password from Hadoop credential provider") { val keyStorePath = new File(this.getClass.getResource("/keystore").toURI).getAbsolutePath val trustStorePath = new File(this.getClass.getResource("/truststore").toURI).getAbsolutePath diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index a2d41b92e0849..5f7958167507d 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -341,7 +341,7 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst } } - test("SPARK-26998: SSL configuration not needed on executors") { + test("SPARK-26998: SSL passwords not needed on executors") { val conf = new SparkConf(false) conf.set("spark.ssl.enabled", "true") conf.set("spark.ssl.keyPassword", "password") @@ -349,7 +349,9 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst conf.set("spark.ssl.trustStorePassword", "password") val filtered = conf.getAll.filter { case (k, _) => SparkConf.isExecutorStartupConf(k) } - assert(filtered.isEmpty) + // Only the enabled flag should propagate + assert(filtered.length == 1) + assert(filtered(0)._1 == "spark.ssl.enabled") } test("SPARK-27244 toDebugString redacts sensitive information") { diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/CommandUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/CommandUtilsSuite.scala index 28e35bc8183ba..fc67de0f32209 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/CommandUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/CommandUtilsSuite.scala @@ -21,8 +21,9 @@ import org.scalatest.PrivateMethodTester import org.scalatest.matchers.must.Matchers import org.scalatest.matchers.should.Matchers._ -import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} +import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite, SSLOptions} import org.apache.spark.deploy.Command +import org.apache.spark.network.ssl.SslSampleConfigs import org.apache.spark.util.Utils class CommandUtilsSuite extends SparkFunSuite with Matchers with PrivateMethodTester { @@ -68,4 +69,30 @@ class CommandUtilsSuite extends SparkFunSuite with Matchers with PrivateMethodTe assert(!cmd.javaOpts.exists(_.startsWith("-D" + SecurityManager.SPARK_AUTH_SECRET_CONF))) assert(cmd.environment(SecurityManager.ENV_AUTH_SECRET) === secret) } + + test("SSL RPC passwords shouldn't appear in java opts") { + val buildLocalCommand = PrivateMethod[Command](Symbol("buildLocalCommand")) + val conf = new SparkConf + conf.set("spark.ssl.rpc.enabled", "true") + + // This sets passwords + val updatedConfigs = SslSampleConfigs.createDefaultConfigMapForRpcNamespace() + updatedConfigs.entrySet().forEach(entry => conf.set(entry.getKey, entry.getValue)) + + val secret = "This is the secret sauce" + val command = Command("mainClass", Seq(), Map(), Seq(), Seq("lib"), + SSLOptions.SPARK_RPC_SSL_PASSWORD_FIELDS.map( + field => "-D" + field + "=" + secret + )) + + conf.set(SecurityManager.SPARK_AUTH_CONF, "true") + val cmd = CommandUtils invokePrivate buildLocalCommand( + command, new SecurityManager(conf), (t: String) => t, Seq(), Map()) + SSLOptions.SPARK_RPC_SSL_PASSWORD_FIELDS.foreach( + field => assert(!cmd.javaOpts.exists(_.startsWith("-D" + field))) + ) + SSLOptions.SPARK_RPC_SSL_PASSWORD_ENVS.foreach( + env => assert(cmd.environment(env) === "password") + ) + } } From 91ba3a8b01554a217160817bd34e97ed8ce14491 Mon Sep 17 00:00:00 2001 From: Hasnain Lakhani Date: Tue, 10 Oct 2023 22:28:49 -0700 Subject: [PATCH 2/4] address feedback --- .../scala/org/apache/spark/SSLOptions.scala | 24 ++----------- .../org/apache/spark/SecurityManager.scala | 35 ++++++------------ .../scala/org/apache/spark/SparkEnv.scala | 2 +- .../network/netty/SparkTransportConf.scala | 36 +++++-------------- .../deploy/worker/CommandUtilsSuite.scala | 1 - 5 files changed, 24 insertions(+), 74 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SSLOptions.scala b/core/src/main/scala/org/apache/spark/SSLOptions.scala index 1a3f04c0b7ee2..51b6b4445ea0a 100644 --- a/core/src/main/scala/org/apache/spark/SSLOptions.scala +++ b/core/src/main/scala/org/apache/spark/SSLOptions.scala @@ -60,9 +60,6 @@ import org.apache.spark.network.util.MapConfigProvider * requires certChain and keyFile arguments * @param protocol SSL protocol (remember that SSLv3 was compromised) supported by Java * @param enabledAlgorithms a set of encryption algorithms that may be used - * @param dangerouslyFallbackIfKeysNotPresent If SSL is enabled but key files are not present, - * fall back to unencrypted communication. This is an - * advanced option and not recommended for normal use. */ private[spark] case class SSLOptions( namespace: Option[String] = None, @@ -82,8 +79,7 @@ private[spark] case class SSLOptions( trustStoreReloadIntervalMs: Int = 10000, openSslEnabled: Boolean = false, protocol: Option[String] = None, - enabledAlgorithms: Set[String] = Set.empty, - dangerouslyFallbackIfKeysNotPresent: Boolean = false) + enabledAlgorithms: Set[String] = Set.empty) extends Logging { /** @@ -157,10 +153,6 @@ private[spark] case class SSLOptions( supported } - /** - * - * @return - */ def createConfigProvider(conf: SparkConf): ConfigProvider = { val nsp = namespace.getOrElse("spark.ssl") val confMap: Map[String, String] = new HashMap[String, String] @@ -178,8 +170,6 @@ private[spark] case class SSLOptions( trustStorePassword.foreach(confMap.put(s"$nsp.trustStorePassword", _)) protocol.foreach(confMap.put(s"$nsp.protocol", _)) confMap.put(s"$nsp.enabledAlgorithms", enabledAlgorithms.mkString(",")) - confMap.put( - s"$nsp.dangerouslyFallbackIfKeysNotPresent", dangerouslyFallbackIfKeysNotPresent.toString) new MapConfigProvider(confMap) } @@ -193,8 +183,7 @@ private[spark] case class SSLOptions( s"trustStorePassword=${trustStorePassword.map(_ => "xxx")}, " + s"trustStoreReloadIntervalMs=$trustStoreReloadIntervalMs, " + s"trustStoreReloadingEnabled=$trustStoreReloadingEnabled, openSSLEnabled=$openSslEnabled, " + - s"protocol=$protocol, enabledAlgorithms=$enabledAlgorithms}, " + - s"dangerouslyFallbackIfKeysNotPresent=$dangerouslyFallbackIfKeysNotPresent" + s"protocol=$protocol, enabledAlgorithms=$enabledAlgorithms}" } private[spark] object SSLOptions extends Logging { @@ -224,8 +213,6 @@ private[spark] object SSLOptions extends Logging { * (if available on host system), requires certChain and keyFile arguments * $ - `[ns].protocol` - a protocol name supported by a particular Java version * $ - `[ns].enabledAlgorithms` - a comma separated list of ciphers - * $ - `[ns].dangerouslyFallbackIfKeysNotPresent` - whether to fallback to unencrypted - * communication if keys are not present. * * For a list of protocols and ciphers supported by particular Java versions, you may go to * @@ -315,10 +302,6 @@ private[spark] object SSLOptions extends Logging { .orElse(defaults.map(_.enabledAlgorithms)) .getOrElse(Set.empty) - val dangerouslyFallbackIfKeysNotPresent = - conf.getBoolean(s"$ns.dangerouslyFallbackIfKeysNotPresent", - defaultValue = defaults.exists(_.dangerouslyFallbackIfKeysNotPresent)) - new SSLOptions( Some(ns), enabled, @@ -337,8 +320,7 @@ private[spark] object SSLOptions extends Logging { trustStoreReloadIntervalMs, openSslEnabled, protocol, - enabledAlgorithms, - dangerouslyFallbackIfKeysNotPresent) + enabledAlgorithms) } // Config names and environment variables for propagating SSL passwords diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala index d63773d413949..ba5396b4de0dc 100644 --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala @@ -86,7 +86,7 @@ private[spark] class SecurityManager( private var secretKey: String = _ - private val isSslRPCEnabled = sparkConf.getBoolean( + private val sslRpcEnabled = sparkConf.getBoolean( "spark.ssl.rpc.enabled", false) logInfo("SecurityManager: authentication " + (if (authOn) "enabled" else "disabled") + @@ -99,7 +99,7 @@ private[spark] class SecurityManager( (if (modifyAcls.nonEmpty) modifyAcls.mkString(", ") else "EMPTY") + "; groups with modify permissions: " + (if (modifyAclsGroups.nonEmpty) modifyAclsGroups.mkString(", ") else "EMPTY") + - "; RPC SSL " + (if (isSslRPCEnabled) "enabled" else "disabled")) + "; RPC SSL " + (if (sslRpcEnabled) "enabled" else "disabled")) private val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf) // the default SSL configuration - it will be used by all communication layers unless overwritten @@ -265,30 +265,11 @@ private[spark] class SecurityManager( isUserInACL(user, modifyAcls, modifyAclsGroups) } - /** - * Check to see if authentication is enabled and SSL for RPC is disabled (as they are mutually - * exclusive) - * @return Whether authentication is enabled and SSL for RPC is disabled - */ - private def isAuthenticationEnabledAndSslRpcDisabled(): Boolean = { - if (sparkConf.get(NETWORK_AUTH_ENABLED)) { - if (rpcSSLOptions.enabled) { - logWarning("Network auth disabled as RPC SSL encryption is enabled") - false - } else { - true - } - } else { - false - } - } - - /** * Check to see if authentication for the Spark communication protocols is enabled * @return true if authentication is enabled, otherwise false */ - def isAuthenticationEnabled(): Boolean = isAuthenticationEnabledAndSslRpcDisabled + def isAuthenticationEnabled(): Boolean = authOn /** * Checks whether network encryption should be enabled. @@ -307,6 +288,12 @@ private[spark] class SecurityManager( } } + /** + * Checks whether RPC SSL is enabled or not + * @return Whether RPC SSL is enabled or not + */ + def isSslRpcEnabled(): Boolean = sslRpcEnabled + /** * Gets the user used for authenticating SASL connections. * For now use a single hardcoded user. @@ -319,7 +306,7 @@ private[spark] class SecurityManager( * @return the secret key as a String if authentication is enabled, otherwise returns null */ def getSecretKey(): String = { - if (authOn) { + if (isAuthenticationEnabled) { val creds = UserGroupInformation.getCurrentUser().getCredentials() Option(creds.getSecretKey(SECRET_LOOKUP_KEY)) .map { bytes => new String(bytes, UTF_8) } @@ -353,7 +340,7 @@ private[spark] class SecurityManager( def initializeAuth(): Unit = { import SparkMasterRegex._ - if (!authOn) { + if (!sparkConf.get(NETWORK_AUTH_ENABLED)) { return } diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index a4f05c7618919..310dc8284401e 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -308,7 +308,7 @@ object SparkEnv extends Logging { } ioEncryptionKey.foreach { _ => - if (!securityManager.isEncryptionEnabled()) { + if (!(securityManager.isEncryptionEnabled() || securityManager.isSslRpcEnabled())) { logWarning("I/O encryption enabled without RPC encryption: keys will be visible on the " + "wire.") } diff --git a/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala b/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala index 54823b40552db..f97b6668aae5d 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala @@ -38,32 +38,14 @@ object SparkTransportConf { * This restriction will only occur if these properties are not already set. * @param role optional role, could be driver, executor, worker and master. Default is * [[None]], means no role specific configurations. - */ - def fromSparkConf( - _conf: SparkConf, - module: String, - numUsableCores: Int = 0, - role: Option[String] = None): TransportConf = { - SparkTransportConf.fromSparkConfWithSslOptions( - _conf, module, numUsableCores, role, sslOptions = None) - } - /** - * Utility for creating a [[TransportConf]] from a [[SparkConf]]. - * @param _conf the [[SparkConf]] - * @param module the module name - * @param numUsableCores if nonzero, this will restrict the server and client threads to only - * use the given number of cores, rather than all of the machine's cores. - * This restriction will only occur if these properties are not already set. - * @param role optional role, could be driver, executor, worker and master. Default is - * [[None]], means no role specific configurations. * @param sslOptions SSL config options */ - def fromSparkConfWithSslOptions( + def fromSparkConf( _conf: SparkConf, module: String, numUsableCores: Int = 0, role: Option[String] = None, - sslOptions: Option[SSLOptions]): TransportConf = { + sslOptions: Option[SSLOptions] = None): TransportConf = { val conf = _conf.clone // specify default thread configuration based on our JVM's allocation of cores (rather than // necessarily assuming we have all the machine's cores). @@ -77,14 +59,14 @@ object SparkTransportConf { conf.set(s"spark.$module.io.$suffix", value) } - val configProvider = sslOptions.map(_.createConfigProvider(conf)).getOrElse({ + val configProvider = sslOptions.map(_.createConfigProvider(conf)).getOrElse( new ConfigProvider { - override def get(name: String): String = conf.get(name) - override def get(name: String, defaultValue: String): String = conf.get(name, defaultValue) - override def getAll(): java.lang.Iterable[java.util.Map.Entry[String, String]] = { - conf.getAll.toMap.asJava.entrySet() - } - }}) + override def get(name: String): String = conf.get(name) + override def get(name: String, defaultValue: String): String = conf.get(name, defaultValue) + override def getAll(): java.lang.Iterable[java.util.Map.Entry[String, String]] = { + conf.getAll.toMap.asJava.entrySet() + } + }) new TransportConf(module, configProvider) } } diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/CommandUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/CommandUtilsSuite.scala index fc67de0f32209..e864b609d0e48 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/CommandUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/CommandUtilsSuite.scala @@ -85,7 +85,6 @@ class CommandUtilsSuite extends SparkFunSuite with Matchers with PrivateMethodTe field => "-D" + field + "=" + secret )) - conf.set(SecurityManager.SPARK_AUTH_CONF, "true") val cmd = CommandUtils invokePrivate buildLocalCommand( command, new SecurityManager(conf), (t: String) => t, Seq(), Map()) SSLOptions.SPARK_RPC_SSL_PASSWORD_FIELDS.foreach( From 1b0074827d29a1ffc6cb5778821102790969653b Mon Sep 17 00:00:00 2001 From: Hasnain Lakhani Date: Wed, 11 Oct 2023 09:09:37 -0700 Subject: [PATCH 3/4] fix mima --- project/MimaExcludes.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 691425cfc5d6a..47fd7881d2f7a 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -41,7 +41,9 @@ object MimaExcludes { // [SPARK-44705][PYTHON] Make PythonRunner single-threaded ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.api.python.BasePythonRunner#ReaderIterator.this"), // [SPARK-44198][CORE] Support propagation of the log level to the executors - ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages$SparkAppConfig$") + ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages$SparkAppConfig$"), + // [SPARK-45427][CORE] Add RPC SSL settings to SSLOptions and SparkTransportConf + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.network.netty.SparkTransportConf.fromSparkConf") ) // Default exclude rules From fd0801218a8c99c46f9c29ca5292a379483022ea Mon Sep 17 00:00:00 2001 From: Hasnain Lakhani Date: Thu, 12 Oct 2023 15:15:31 -0700 Subject: [PATCH 4/4] comments --- .../scala/org/apache/spark/SecurityManager.scala | 14 ++++++-------- .../spark/network/netty/SparkTransportConf.scala | 6 +++--- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala index ba5396b4de0dc..821577f089aa3 100644 --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala @@ -276,15 +276,13 @@ private[spark] class SecurityManager( * @return Whether to enable encryption when connecting to services that support it. */ def isEncryptionEnabled(): Boolean = { - if (sparkConf.get(Network.NETWORK_CRYPTO_ENABLED) || sparkConf.get(SASL_ENCRYPTION_ENABLED)) { - if (rpcSSLOptions.enabled) { - logWarning("Network encryption disabled as RPC SSL encryption is enabled") - false - } else { - true - } - } else { + val encryptionEnabled = sparkConf.get(Network.NETWORK_CRYPTO_ENABLED) || + sparkConf.get(SASL_ENCRYPTION_ENABLED) + if (encryptionEnabled && sslRpcEnabled) { + logWarning("Network encryption disabled as RPC SSL encryption is enabled") false + } else { + encryptionEnabled } } diff --git a/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala b/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala index f97b6668aae5d..fc9e5201cfb3d 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala @@ -31,13 +31,13 @@ object SparkTransportConf { /** * Utility for creating a [[TransportConf]] from a [[SparkConf]]. - * @param _conf the [[SparkConf]] - * @param module the module name + * @param _conf the [[SparkConf]] + * @param module the module name * @param numUsableCores if nonzero, this will restrict the server and client threads to only * use the given number of cores, rather than all of the machine's cores. * This restriction will only occur if these properties are not already set. * @param role optional role, could be driver, executor, worker and master. Default is - * [[None]], means no role specific configurations. + * [[None]], means no role specific configurations. * @param sslOptions SSL config options */ def fromSparkConf(