From f1e9dc2a4b31f597f7b72e6eda137e990c7b3980 Mon Sep 17 00:00:00 2001 From: Hasnain Lakhani Date: Thu, 5 Oct 2023 11:40:38 -0500 Subject: [PATCH] [SPARK-45408][CORE] Add RPC SSL settings to TransportConf ### What changes were proposed in this pull request? This change adds new settings to `TransportConf` which are needed for the RPC SSL functionality to work. Additionally, add some sample configurations which are used by tests in follow up PRs (see https://github.com/apache/spark/pull/42685 for the full context) ### Why are the changes needed? These changes are needed so that other modules can easily access configurations, and that the sample configurations are easily accessible for tests. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added a test, then ran: ``` ./build/sbt > project network-common > testOnly org.apache.spark.network.TransportConfSuite ``` There are more follow up tests coming (see https://github.com/apache/spark/pull/42685) ### Was this patch authored or co-authored using generative AI tooling? No Closes #43220 from hasnain-db/spark-tls-configs-low. Authored-by: Hasnain Lakhani Signed-off-by: Mridul Muralidharan gmail.com> --- .../spark/network/util/TransportConf.java | 152 +++++++++++ .../src/test/java/TransportConfSuite.java | 88 +++++++ .../spark/network/ssl/SslSampleConfigs.java | 235 ++++++++++++++++++ 3 files changed, 475 insertions(+) create mode 100644 common/network-common/src/test/java/TransportConfSuite.java create mode 100644 common/network-common/src/test/java/org/apache/spark/network/ssl/SslSampleConfigs.java diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java index b8d8f6b85a468..3ebb38e310fba 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -17,6 +17,7 @@ package org.apache.spark.network.util; +import java.io.File; import java.util.Locale; import java.util.Properties; import java.util.concurrent.TimeUnit; @@ -257,6 +258,157 @@ public int sslShuffleChunkSize() { conf.get("spark.network.ssl.maxEncryptedBlockSize", "64k"))); } + /** + * Whether Secure (SSL/TLS) RPC (including Block Transfer Service) is enabled + */ + public boolean sslRpcEnabled() { + return conf.getBoolean("spark.ssl.rpc.enabled", false); + } + + /** + * SSL protocol (remember that SSLv3 was compromised) supported by Java + */ + public String sslRpcProtocol() { + return conf.get("spark.ssl.rpc.protocol", null); + } + + /** + * A comma separated list of ciphers + */ + public String[] sslRpcRequestedCiphers() { + String ciphers = conf.get("spark.ssl.rpc.enabledAlgorithms", null); + return (ciphers != null ? ciphers.split(",") : null); + } + + /** + * The key-store file; can be relative to the current directory + */ + public File sslRpcKeyStore() { + String keyStore = conf.get("spark.ssl.rpc.keyStore", null); + if (keyStore != null) { + return new File(keyStore); + } else { + return null; + } + } + + /** + * The password to the key-store file + */ + public String sslRpcKeyStorePassword() { + return conf.get("spark.ssl.rpc.keyStorePassword", null); + } + + /** + * A PKCS#8 private key file in PEM format; can be relative to the current directory + */ + public File sslRpcPrivateKey() { + String privateKey = conf.get("spark.ssl.rpc.privateKey", null); + if (privateKey != null) { + return new File(privateKey); + } else { + return null; + } + } + + /** + * The password to the private key + */ + public String sslRpcKeyPassword() { + return conf.get("spark.ssl.rpc.keyPassword", null); + } + + /** + * A X.509 certificate chain file in PEM format; can be relative to the current directory + */ + public File sslRpcCertChain() { + String certChain = conf.get("spark.ssl.rpc.certChain", null); + if (certChain != null) { + return new File(certChain); + } else { + return null; + } + } + + /** + * The trust-store file; can be relative to the current directory + */ + public File sslRpcTrustStore() { + String trustStore = conf.get("spark.ssl.rpc.trustStore", null); + if (trustStore != null) { + return new File(trustStore); + } else { + return null; + } + } + + /** + * The password to the trust-store file + */ + public String sslRpcTrustStorePassword() { + return conf.get("spark.ssl.rpc.trustStorePassword", null); + } + + /** + * If using a trust-store that that reloads its configuration is enabled. + * If true, when the trust-store file on disk changes, it will be reloaded + */ + public boolean sslRpcTrustStoreReloadingEnabled() { + return conf.getBoolean("spark.ssl.rpc.trustStoreReloadingEnabled", false); + } + + /** + * The interval, in milliseconds, the trust-store will reload its configuration + */ + public int sslRpctrustStoreReloadIntervalMs() { + return conf.getInt("spark.ssl.rpc.trustStoreReloadIntervalMs", 10000); + } + + /** + * If the OpenSSL implementation is enabled, + * (if available on host system), requires certChain and keyFile arguments + */ + public boolean sslRpcOpenSslEnabled() { + return conf.getBoolean("spark.ssl.rpc.openSslEnabled", false); + } + + /** + * + * @return true if and only if RPC encryption is enabled and the relevant keys exist + */ + public boolean sslRpcEnabledAndKeysAreValid() { + if (!sslRpcEnabled()) { + return false; + } + if (sslRpcOpenSslEnabled()) { + // OpenSSL requires both the privateKey and certChain + File privateKey = sslRpcPrivateKey(); + if (privateKey == null || !privateKey.exists()) { + return false; + } + File certChain = sslRpcCertChain(); + if (certChain == null || !certChain.exists()) { + return false; + } + return true; + } else { + File keyStore = sslRpcKeyStore(); + if (keyStore == null || !keyStore.exists()) { + return false; + } + // It's fine for the trust store to be missing, we would default to trusting all. + return true; + } + } + + /** + * If we can dangerously fallback to unencrypted connections if RPC over SSL is enabled + * but the key files are not present + */ + public boolean sslRpcDangerouslyFallbackIfKeysNotPresent() { + return conf.getBoolean("spark.ssl.rpc.dangerouslyFallbackIfKeysNotPresent", false); + } + /** * Flag indicating whether to share the pooled ByteBuf allocators between the different Netty * channels. If enabled then only two pooled ByteBuf allocators are created: one where caching diff --git a/common/network-common/src/test/java/TransportConfSuite.java b/common/network-common/src/test/java/TransportConfSuite.java new file mode 100644 index 0000000000000..1537f67e98d15 --- /dev/null +++ b/common/network-common/src/test/java/TransportConfSuite.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.network; + +import java.io.File; + +import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.*; + +import org.apache.spark.network.util.TransportConf; +import org.apache.spark.network.ssl.SslSampleConfigs; + +public class TransportConfSuite { + + private TransportConf transportConf = + new TransportConf( + "shuffle", SslSampleConfigs.createDefaultConfigProviderForRpcNamespace()); + + @Test + public void testKeyStorePath() { + assertEquals(new File(SslSampleConfigs.keyStorePath), transportConf.sslRpcKeyStore()); + } + + @Test + public void testPrivateKeyPath() { + assertEquals(new File(SslSampleConfigs.privateKeyPath), transportConf.sslRpcPrivateKey()); + } + + @Test + public void testCertChainPath() { + assertEquals(new File(SslSampleConfigs.certChainPath), transportConf.sslRpcCertChain()); + } + + @Test + public void testTrustStorePath() { + assertEquals(new File(SslSampleConfigs.trustStorePath), transportConf.sslRpcTrustStore()); + } + + @Test + public void testTrustStoreReloadingEnabled() { + assertFalse(transportConf.sslRpcTrustStoreReloadingEnabled()); + } + + @Test + public void testOpenSslEnabled() { + assertFalse(transportConf.sslRpcOpenSslEnabled()); + } + + @Test + public void testSslRpcEnabled() { + assertTrue(transportConf.sslRpcEnabled()); + } + + + @Test + public void testSslKeyStorePassword() { + assertEquals("password", transportConf.sslRpcKeyStorePassword()); + } + + @Test + public void testSslKeyPassword() { + assertEquals("password", transportConf.sslRpcKeyPassword()); + } + + @Test + public void testSslTrustStorePassword() { + assertEquals("password", transportConf.sslRpcTrustStorePassword()); + } + + @Test + public void testSsltrustStoreReloadIntervalMs() { + assertEquals(10000, transportConf.sslRpctrustStoreReloadIntervalMs()); + } +} diff --git a/common/network-common/src/test/java/org/apache/spark/network/ssl/SslSampleConfigs.java b/common/network-common/src/test/java/org/apache/spark/network/ssl/SslSampleConfigs.java new file mode 100644 index 0000000000000..3c81b0af3186c --- /dev/null +++ b/common/network-common/src/test/java/org/apache/spark/network/ssl/SslSampleConfigs.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.network.ssl; + +import javax.security.auth.x500.X500Principal; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.math.BigInteger; +import java.security.GeneralSecurityException; +import java.security.InvalidKeyException; +import java.security.Key; +import java.security.KeyPair; +import java.security.KeyPairGenerator; +import java.security.KeyStore; +import java.security.NoSuchAlgorithmException; +import java.security.SecureRandom; +import java.security.SignatureException; +import java.security.cert.Certificate; +import java.security.cert.CertificateEncodingException; +import java.security.cert.X509Certificate; +import java.util.*; + +import org.bouncycastle.x509.X509V1CertificateGenerator; + +import org.apache.spark.network.util.ConfigProvider; +import org.apache.spark.network.util.MapConfigProvider; + + +/** + * + */ +public class SslSampleConfigs { + + public static final String keyStorePath = getAbsolutePath("/keystore"); + public static final String privateKeyPath = getAbsolutePath("/key.pem"); + public static final String certChainPath = getAbsolutePath("/certchain.pem"); + public static final String untrustedKeyStorePath = getAbsolutePath("/untrusted-keystore"); + public static final String trustStorePath = getAbsolutePath("/truststore"); + + + /** + * Creates a config map containing the settings needed to enable the RPC SSL feature + * All the settings (except the enabled one) are intentionally set on the parent namespace + * so that we can verify settings inheritance works + */ + public static Map createDefaultConfigMap() { + Map confMap = new HashMap(); + confMap.put("spark.ssl.rpc.enabled", "true"); + // Need this so the other settings get parsed + confMap.put("spark.ssl.enabled", "true"); + confMap.put("spark.ssl.trustStoreReloadingEnabled", "false"); + confMap.put("spark.ssl.openSslEnabled", "false"); + confMap.put("spark.ssl.trustStoreReloadIntervalMs", "10000"); + confMap.put("spark.ssl.keyStore", SslSampleConfigs.keyStorePath); + confMap.put("spark.ssl.keyStorePassword", "password"); + confMap.put("spark.ssl.privateKey", SslSampleConfigs.privateKeyPath); + confMap.put("spark.ssl.keyPassword", "password"); + confMap.put("spark.ssl.certChain", SslSampleConfigs.certChainPath); + confMap.put("spark.ssl.trustStore", SslSampleConfigs.trustStorePath); + confMap.put("spark.ssl.trustStorePassword", "password"); + return confMap; + } + + /** + * Similar to the above, but sets the settings directly in the spark.ssl.rpc namespace + * This is needed for testing in the lower level modules (like network-common) where inheritance + * does not work as there is no access to SSLOptions. + */ + public static Map createDefaultConfigMapForRpcNamespace() { + Map confMap = new HashMap(); + confMap.put("spark.ssl.rpc.enabled", "true"); + confMap.put("spark.ssl.rpc.trustStoreReloadingEnabled", "false"); + confMap.put("spark.ssl.rpc.openSslEnabled", "false"); + confMap.put("spark.ssl.rpc.trustStoreReloadIntervalMs", "10000"); + confMap.put("spark.ssl.rpc.keyStore", SslSampleConfigs.keyStorePath); + confMap.put("spark.ssl.rpc.keyStorePassword", "password"); + confMap.put("spark.ssl.rpc.privateKey", SslSampleConfigs.privateKeyPath); + confMap.put("spark.ssl.rpc.keyPassword", "password"); + confMap.put("spark.ssl.rpc.certChain", SslSampleConfigs.certChainPath); + confMap.put("spark.ssl.rpc.trustStore", SslSampleConfigs.trustStorePath); + confMap.put("spark.ssl.rpc.trustStorePassword", "password"); + return confMap; + } + + /** + * Create ConfigProvider based on the method above + */ + public static ConfigProvider createDefaultConfigProviderForRpcNamespace() { + return new MapConfigProvider(createDefaultConfigMapForRpcNamespace()); + } + + /** + * Create ConfigProvider based on the method above + */ + public static ConfigProvider createDefaultConfigProviderForRpcNamespaceWithAdditionalEntries( + Map entries) { + Map confMap = createDefaultConfigMapForRpcNamespace(); + confMap.putAll(entries); + return new MapConfigProvider(confMap); + } + + public static void createTrustStore( + File trustStore, String password, String alias, Certificate cert) + throws GeneralSecurityException, IOException { + KeyStore ks = createEmptyKeyStore(); + ks.setCertificateEntry(alias, cert); + saveKeyStore(ks, trustStore, password); + } + + /** + * Creates a keystore with multiple keys and saves it to a file. + */ + public static void createTrustStore( + File trustStore, String password, Map certs) + throws GeneralSecurityException, IOException { + KeyStore ks = createEmptyKeyStore(); + for (Map.Entry cert : certs.entrySet()) { + ks.setCertificateEntry(cert.getKey(), cert.getValue()); + } + saveKeyStore(ks, trustStore, password); + } + + /** + * Create a self-signed X.509 Certificate. + * + * @param dn the X.509 Distinguished Name, eg "CN=Test, L=London, C=GB" + * @param pair the KeyPair + * @param days how many days from now the Certificate is valid for + * @param algorithm the signing algorithm, eg "SHA1withRSA" + * @return the self-signed certificate + */ + @SuppressWarnings("deprecation") + public static X509Certificate generateCertificate( + String dn, KeyPair pair, int days, String algorithm) + throws CertificateEncodingException, InvalidKeyException, IllegalStateException, + NoSuchAlgorithmException, SignatureException { + + Date from = new Date(); + Date to = new Date(from.getTime() + days * 86400000L); + BigInteger sn = new BigInteger(64, new SecureRandom()); + KeyPair keyPair = pair; + X509V1CertificateGenerator certGen = new X509V1CertificateGenerator(); + X500Principal dnName = new X500Principal(dn); + + certGen.setSerialNumber(sn); + certGen.setIssuerDN(dnName); + certGen.setNotBefore(from); + certGen.setNotAfter(to); + certGen.setSubjectDN(dnName); + certGen.setPublicKey(keyPair.getPublic()); + certGen.setSignatureAlgorithm(algorithm); + + X509Certificate cert = certGen.generate(pair.getPrivate()); + return cert; + } + + public static KeyPair generateKeyPair(String algorithm) + throws NoSuchAlgorithmException { + KeyPairGenerator keyGen = KeyPairGenerator.getInstance(algorithm); + keyGen.initialize(1024); + return keyGen.genKeyPair(); + } + + /** + * Creates a keystore with a single key and saves it to a file. + * + * @param keyStore File keystore to save + * @param password String store password to set on keystore + * @param keyPassword String key password to set on key + * @param alias String alias to use for the key + * @param privateKey Key to save in keystore + * @param cert Certificate to use as certificate chain associated to key + * @throws GeneralSecurityException for any error with the security APIs + * @throws IOException if there is an I/O error saving the file + */ + public static void createKeyStore( + File keyStore, String password, String keyPassword, + String alias, Key privateKey, Certificate cert) + throws GeneralSecurityException, IOException { + KeyStore ks = createEmptyKeyStore(); + ks.setKeyEntry(alias, privateKey, keyPassword.toCharArray(), + new Certificate[]{cert}); + saveKeyStore(ks, keyStore, password); + } + + public static void createKeyStore( + File keyStore, String password, + String alias, Key privateKey, Certificate cert) + throws GeneralSecurityException, IOException { + KeyStore ks = createEmptyKeyStore(); + ks.setKeyEntry(alias, privateKey, password.toCharArray(), new Certificate[]{cert}); + saveKeyStore(ks, keyStore, password); + } + + private static KeyStore createEmptyKeyStore() + throws GeneralSecurityException, IOException { + KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType()); + ks.load(null, null); // initialize + return ks; + } + + private static void saveKeyStore( + KeyStore ks, File keyStore, String password) + throws GeneralSecurityException, IOException { + FileOutputStream out = new FileOutputStream(keyStore); + try { + ks.store(out, password.toCharArray()); + } finally { + out.close(); + } + } + + public static String getAbsolutePath(String path) { + try { + return new File(SslSampleConfigs.class.getResource(path).getFile()).getCanonicalPath(); + } catch (IOException e) { + throw new RuntimeException("Failed to resolve path " + path, e); + } + } +}