Skip to content

Commit

Permalink
Fix inbound TLS enforcement
Browse files Browse the repository at this point in the history
  • Loading branch information
iamaleksey committed Jan 29, 2021
1 parent 88e7430 commit c2f24d2
Show file tree
Hide file tree
Showing 9 changed files with 232 additions and 26 deletions.
1 change: 1 addition & 0 deletions src/java/org/apache/cassandra/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ public enum DiskOptimizationStrategy
private static final List<String> SENSITIVE_KEYS = new ArrayList<String>() {{
add("client_encryption_options");
add("server_encryption_options");
add("encryption_options");
}};

public static void log(Config config)
Expand Down
4 changes: 3 additions & 1 deletion src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -728,13 +728,15 @@ else if (conf.native_transport_max_frame_size_in_mb >= 2048)
throw new ConfigurationException("index_summary_capacity_in_mb option was set incorrectly to '"
+ conf.index_summary_capacity_in_mb + "', it should be a non-negative integer.", false);

if(conf.encryption_options != null)
if (conf.encryption_options != null)
{
logger.warn("Please rename encryption_options as server_encryption_options in the yaml");
//operate under the assumption that server_encryption_options is not set in yaml rather than both
conf.server_encryption_options = conf.encryption_options;
}

conf.server_encryption_options.validate();

// load the seeds for node contact points
if (conf.seed_provider == null)
{
Expand Down
48 changes: 48 additions & 0 deletions src/java/org/apache/cassandra/config/EncryptionOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,18 @@
*/
package org.apache.cassandra.config;

import java.net.InetAddress;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.utils.FBUtilities;

public abstract class EncryptionOptions
{
private static final Logger logger = LoggerFactory.getLogger(EncryptionOptions.class);

public String keystore = "conf/.keystore";
public String keystore_password = "cassandra";
public String truststore = "conf/.truststore";
Expand All @@ -45,6 +55,44 @@ public static enum InternodeEncryption
{
all, none, dc, rack
}

public InternodeEncryption internode_encryption = InternodeEncryption.none;

public boolean shouldEncrypt(InetAddress endpoint)
{
IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
InetAddress local = FBUtilities.getBroadcastAddress();

switch (internode_encryption)
{
case none:
return false; // if nothing needs to be encrypted then return immediately.
case all:
break;
case dc:
if (snitch.getDatacenter(endpoint).equals(snitch.getDatacenter(local)))
return false;
break;
case rack:
// for rack then check if the DC's are the same.
if (snitch.getRack(endpoint).equals(snitch.getRack(local))
&& snitch.getDatacenter(endpoint).equals(snitch.getDatacenter(local)))
return false;
break;
}
return true;
}

public void validate()
{
if (require_client_auth && (internode_encryption == InternodeEncryption.rack || internode_encryption == InternodeEncryption.dc))
{
logger.warn("Setting require_client_auth is incompatible with 'rack' and 'dc' internode_encryption values."
+ " It is possible for an internode connection to pretend to be in the same rack/dc by spoofing"
+ " its broadcast address in the handshake and bypass authentication. To ensure that mutual TLS"
+ " authentication is not bypassed, please set internode_encryption to 'all'. Continuing with"
+ " insecure configuration.");
}
}
}
}
20 changes: 20 additions & 0 deletions src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@

import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import java.util.Set;

import javax.net.ssl.SSLSocket;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -65,6 +68,13 @@ public void run()
DataInputPlus input = new DataInputStreamPlus(socket.getInputStream());
StreamInitMessage init = StreamInitMessage.serializer.deserialize(input, version);

if (isEncryptionRequired(init.from) && !isEncrypted())
{
logger.warn("Peer {} attempted to establish an unencrypted streaming connection (broadcast address {})",
socket.getRemoteSocketAddress(), init.from);
throw new IOException("Peer " + init.from + " attempted an unencrypted streaming connection");
}

//Set SO_TIMEOUT on follower side
if (!init.isForOutgoing)
socket.setSoTimeout(DatabaseDescriptor.getStreamingSocketTimeout());
Expand Down Expand Up @@ -101,4 +111,14 @@ public void close()
group.remove(this);
}
}

private boolean isEncryptionRequired(InetAddress peer)
{
return DatabaseDescriptor.getServerEncryptionOptions().shouldEncrypt(peer);
}

private boolean isEncrypted()
{
return socket instanceof SSLSocket;
}
}
22 changes: 22 additions & 0 deletions src/java/org/apache/cassandra/net/IncomingTcpConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import java.util.zip.Checksum;
import java.util.Set;

import javax.net.ssl.SSLServerSocket;
import javax.net.ssl.SSLSocket;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -35,6 +38,7 @@
import net.jpountz.xxhash.XXHashFactory;

import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.EncryptionOptions;
import org.xerial.snappy.SnappyInputStream;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.UnknownColumnFamilyException;
Expand Down Expand Up @@ -149,6 +153,14 @@ private void receiveMessages() throws IOException
DataInputPlus in = new DataInputStreamPlus(socket.getInputStream());
int maxVersion = in.readInt();
from = CompactEndpointSerializationHelper.deserialize(in);

if (isEncryptionRequired(from) && !isEncrypted())
{
logger.warn("Peer {} attempted to establish an unencrypted connection (broadcast address {})",
socket.getRemoteSocketAddress(), from);
throw new IOException("Peer " + from + " attempted an unencrypted connection");
}

// record the (true) version of the endpoint
MessagingService.instance().setVersion(from, maxVersion);
logger.trace("Set version for {} to {} (will use {})", from, maxVersion, MessagingService.instance().getVersion(from));
Expand Down Expand Up @@ -217,4 +229,14 @@ private InetAddress receiveMessage(DataInputPlus input, int version) throws IOEx
}
return message.from;
}

private boolean isEncryptionRequired(InetAddress peer)
{
return DatabaseDescriptor.getServerEncryptionOptions().shouldEncrypt(peer);
}

private boolean isEncrypted()
{
return socket instanceof SSLSocket;
}
}
26 changes: 1 addition & 25 deletions src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.metrics.ConnectionMetrics;
import org.apache.cassandra.security.SSLFactory;
import org.apache.cassandra.utils.FBUtilities;
Expand Down Expand Up @@ -127,7 +126,7 @@ public Socket newSocket() throws IOException
public static Socket newSocket(InetAddress endpoint) throws IOException
{
// zero means 'bind on any available port.'
if (isEncryptedChannel(endpoint))
if (DatabaseDescriptor.getServerEncryptionOptions().shouldEncrypt(endpoint))
{
if (DatabaseDescriptor.getOutboundBindAny())
return SSLFactory.getSocket(DatabaseDescriptor.getServerEncryptionOptions(), endpoint, DatabaseDescriptor.getSSLStoragePort());
Expand All @@ -151,29 +150,6 @@ public InetAddress endPoint()
return resetEndpoint;
}

public static boolean isEncryptedChannel(InetAddress address)
{
IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
switch (DatabaseDescriptor.getServerEncryptionOptions().internode_encryption)
{
case none:
return false; // if nothing needs to be encrypted then return immediately.
case all:
break;
case dc:
if (snitch.getDatacenter(address).equals(snitch.getDatacenter(FBUtilities.getBroadcastAddress())))
return false;
break;
case rack:
// for rack then check if the DC's are the same.
if (snitch.getRack(address).equals(snitch.getRack(FBUtilities.getBroadcastAddress()))
&& snitch.getDatacenter(address).equals(snitch.getDatacenter(FBUtilities.getBroadcastAddress())))
return false;
break;
}
return true;
}

public void start()
{
smallMessages.start();
Expand Down
Binary file added test/conf/cassandra_ssl_test.keystore
Binary file not shown.
Binary file added test/conf/cassandra_ssl_test.truststore
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.distributed.test;

import java.util.HashMap;
import java.util.List;

import com.google.common.collect.ImmutableMap;
import org.junit.Test;

import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.distributed.api.IIsolatedExecutor.SerializableRunnable;
import org.apache.cassandra.distributed.shared.NetworkTopology;
import org.apache.cassandra.net.MessagingService;

import static com.google.common.collect.Iterables.getOnlyElement;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public final class InternodeEncryptionEnforcementTest extends TestBaseImpl
{
@Test
public void testConnectionsAreRejectedWithInvalidConfig() throws Throwable
{
Cluster.Builder builder = builder()
.withNodes(2)
.withConfig(c ->
{
c.with(Feature.NETWORK);
c.with(Feature.NATIVE_PROTOCOL);

if (c.num() == 1)
{
HashMap<String, Object> encryption = new HashMap<>();
encryption.put("keystore", "test/conf/cassandra_ssl_test.keystore");
encryption.put("keystore_password", "cassandra");
encryption.put("truststore", "test/conf/cassandra_ssl_test.truststore");
encryption.put("truststore_password", "cassandra");
encryption.put("internode_encryption", "dc");
c.set("server_encryption_options", encryption);
}
})
.withNodeIdTopology(ImmutableMap.of(1, NetworkTopology.dcAndRack("dc1", "r1a"),
2, NetworkTopology.dcAndRack("dc2", "r2a")));

try (Cluster cluster = builder.start())
{
/*
* instance (1) won't connect to (2), since (2) won't have a TLS listener;
* instance (2) won't connect to (1), since inbound check will reject
* the unencrypted connection attempt;
*
* without the patch, instance (2) *CAN* connect to (1), without encryption,
* despite being in a different dc.
*/

cluster.get(1).runOnInstance(() ->
{
List<MessagingService.SocketThread> threads = MessagingService.instance().getSocketThreads();
assertEquals(2, threads.size());

for (MessagingService.SocketThread thread : threads)
{
assertEquals(0, thread.connections.size());
}
});

cluster.get(2).runOnInstance(() ->
{
List<MessagingService.SocketThread> threads = MessagingService.instance().getSocketThreads();
assertEquals(1, threads.size());
assertTrue(getOnlyElement(threads).connections.isEmpty());
});
}
}

@Test
public void testConnectionsAreAcceptedWithValidConfig() throws Throwable
{
Cluster.Builder builder = builder()
.withNodes(2)
.withConfig(c ->
{
c.with(Feature.NETWORK);
c.with(Feature.NATIVE_PROTOCOL);

HashMap<String, Object> encryption = new HashMap<>();
encryption.put("keystore", "test/conf/cassandra_ssl_test.keystore");
encryption.put("keystore_password", "cassandra");
encryption.put("truststore", "test/conf/cassandra_ssl_test.truststore");
encryption.put("truststore_password", "cassandra");
encryption.put("internode_encryption", "dc");
c.set("server_encryption_options", encryption);
})
.withNodeIdTopology(ImmutableMap.of(1, NetworkTopology.dcAndRack("dc1", "r1a"),
2, NetworkTopology.dcAndRack("dc2", "r2a")));

try (Cluster cluster = builder.start())
{
/*
* instance (1) should connect to instance (2) without any issues;
* instance (2) should connect to instance (1) without any issues.
*/

SerializableRunnable runnable = () ->
{
List<MessagingService.SocketThread> threads = MessagingService.instance().getSocketThreads();
assertEquals(2, threads.size());

MessagingService.SocketThread sslThread = threads.get(0);
assertEquals(1, sslThread.connections.size());

MessagingService.SocketThread plainThread = threads.get(1);
assertEquals(0, plainThread.connections.size());
};

cluster.get(1).runOnInstance(runnable);
cluster.get(2).runOnInstance(runnable);
}
}
}

0 comments on commit c2f24d2

Please sign in to comment.