Skip to content

Commit

Permalink
[SPARK-45408][CORE] Add RPC SSL settings to TransportConf
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This change adds new settings to `TransportConf` which are needed for the RPC SSL functionality to work. Additionally, add some sample configurations which are used by tests in follow up PRs (see #42685 for the full context)

### Why are the changes needed?

These changes are needed so that other modules can easily access configurations, and that the sample configurations are easily accessible for tests.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added a test, then ran:

```
./build/sbt
> project network-common
> testOnly org.apache.spark.network.TransportConfSuite
```

There are more follow up tests coming (see #42685)

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #43220 from hasnain-db/spark-tls-configs-low.

Authored-by: Hasnain Lakhani <hasnain.lakhani@databricks.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
  • Loading branch information
hasnain-db authored and Mridul Muralidharan committed Oct 5, 2023
1 parent e8eb245 commit f1e9dc2
Show file tree
Hide file tree
Showing 3 changed files with 475 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.network.util;

import java.io.File;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -257,6 +258,157 @@ public int sslShuffleChunkSize() {
conf.get("spark.network.ssl.maxEncryptedBlockSize", "64k")));
}

/**
* Whether Secure (SSL/TLS) RPC (including Block Transfer Service) is enabled
*/
public boolean sslRpcEnabled() {
return conf.getBoolean("spark.ssl.rpc.enabled", false);
}

/**
* SSL protocol (remember that SSLv3 was compromised) supported by Java
*/
public String sslRpcProtocol() {
return conf.get("spark.ssl.rpc.protocol", null);
}

/**
* A comma separated list of ciphers
*/
public String[] sslRpcRequestedCiphers() {
String ciphers = conf.get("spark.ssl.rpc.enabledAlgorithms", null);
return (ciphers != null ? ciphers.split(",") : null);
}

/**
* The key-store file; can be relative to the current directory
*/
public File sslRpcKeyStore() {
String keyStore = conf.get("spark.ssl.rpc.keyStore", null);
if (keyStore != null) {
return new File(keyStore);
} else {
return null;
}
}

/**
* The password to the key-store file
*/
public String sslRpcKeyStorePassword() {
return conf.get("spark.ssl.rpc.keyStorePassword", null);
}

/**
* A PKCS#8 private key file in PEM format; can be relative to the current directory
*/
public File sslRpcPrivateKey() {
String privateKey = conf.get("spark.ssl.rpc.privateKey", null);
if (privateKey != null) {
return new File(privateKey);
} else {
return null;
}
}

/**
* The password to the private key
*/
public String sslRpcKeyPassword() {
return conf.get("spark.ssl.rpc.keyPassword", null);
}

/**
* A X.509 certificate chain file in PEM format; can be relative to the current directory
*/
public File sslRpcCertChain() {
String certChain = conf.get("spark.ssl.rpc.certChain", null);
if (certChain != null) {
return new File(certChain);
} else {
return null;
}
}

/**
* The trust-store file; can be relative to the current directory
*/
public File sslRpcTrustStore() {
String trustStore = conf.get("spark.ssl.rpc.trustStore", null);
if (trustStore != null) {
return new File(trustStore);
} else {
return null;
}
}

/**
* The password to the trust-store file
*/
public String sslRpcTrustStorePassword() {
return conf.get("spark.ssl.rpc.trustStorePassword", null);
}

/**
* If using a trust-store that that reloads its configuration is enabled.
* If true, when the trust-store file on disk changes, it will be reloaded
*/
public boolean sslRpcTrustStoreReloadingEnabled() {
return conf.getBoolean("spark.ssl.rpc.trustStoreReloadingEnabled", false);
}

/**
* The interval, in milliseconds, the trust-store will reload its configuration
*/
public int sslRpctrustStoreReloadIntervalMs() {
return conf.getInt("spark.ssl.rpc.trustStoreReloadIntervalMs", 10000);
}

/**
* If the OpenSSL implementation is enabled,
* (if available on host system), requires certChain and keyFile arguments
*/
public boolean sslRpcOpenSslEnabled() {
return conf.getBoolean("spark.ssl.rpc.openSslEnabled", false);
}

/**
*
* @return true if and only if RPC encryption is enabled and the relevant keys exist
*/
public boolean sslRpcEnabledAndKeysAreValid() {
if (!sslRpcEnabled()) {
return false;
}
if (sslRpcOpenSslEnabled()) {
// OpenSSL requires both the privateKey and certChain
File privateKey = sslRpcPrivateKey();
if (privateKey == null || !privateKey.exists()) {
return false;
}
File certChain = sslRpcCertChain();
if (certChain == null || !certChain.exists()) {
return false;
}
return true;
} else {
File keyStore = sslRpcKeyStore();
if (keyStore == null || !keyStore.exists()) {
return false;
}
// It's fine for the trust store to be missing, we would default to trusting all.
return true;
}
}

/**
* If we can dangerously fallback to unencrypted connections if RPC over SSL is enabled
* but the key files are not present
*/
public boolean sslRpcDangerouslyFallbackIfKeysNotPresent() {
return conf.getBoolean("spark.ssl.rpc.dangerouslyFallbackIfKeysNotPresent", false);
}

/**
* Flag indicating whether to share the pooled ByteBuf allocators between the different Netty
* channels. If enabled then only two pooled ByteBuf allocators are created: one where caching
Expand Down
88 changes: 88 additions & 0 deletions common/network-common/src/test/java/TransportConfSuite.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.network;

import java.io.File;

import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*;

import org.apache.spark.network.util.TransportConf;
import org.apache.spark.network.ssl.SslSampleConfigs;

public class TransportConfSuite {

private TransportConf transportConf =
new TransportConf(
"shuffle", SslSampleConfigs.createDefaultConfigProviderForRpcNamespace());

@Test
public void testKeyStorePath() {
assertEquals(new File(SslSampleConfigs.keyStorePath), transportConf.sslRpcKeyStore());
}

@Test
public void testPrivateKeyPath() {
assertEquals(new File(SslSampleConfigs.privateKeyPath), transportConf.sslRpcPrivateKey());
}

@Test
public void testCertChainPath() {
assertEquals(new File(SslSampleConfigs.certChainPath), transportConf.sslRpcCertChain());
}

@Test
public void testTrustStorePath() {
assertEquals(new File(SslSampleConfigs.trustStorePath), transportConf.sslRpcTrustStore());
}

@Test
public void testTrustStoreReloadingEnabled() {
assertFalse(transportConf.sslRpcTrustStoreReloadingEnabled());
}

@Test
public void testOpenSslEnabled() {
assertFalse(transportConf.sslRpcOpenSslEnabled());
}

@Test
public void testSslRpcEnabled() {
assertTrue(transportConf.sslRpcEnabled());
}


@Test
public void testSslKeyStorePassword() {
assertEquals("password", transportConf.sslRpcKeyStorePassword());
}

@Test
public void testSslKeyPassword() {
assertEquals("password", transportConf.sslRpcKeyPassword());
}

@Test
public void testSslTrustStorePassword() {
assertEquals("password", transportConf.sslRpcTrustStorePassword());
}

@Test
public void testSsltrustStoreReloadIntervalMs() {
assertEquals(10000, transportConf.sslRpctrustStoreReloadIntervalMs());
}
}
Loading

0 comments on commit f1e9dc2

Please sign in to comment.