Skip to content
This repository has been archived by the owner on Jul 22, 2020. It is now read-only.

Add support for Cassandra clusters with SSL enabled #25

Merged
merged 1 commit into from
Dec 12, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ Cluster:
* `cassandra.migration.cluster.port`: CQL native transport port (default=9042)
* `cassandra.migration.cluster.username`: Username for password authenticator (optional)
* `cassandra.migration.cluster.password`: Password for password authenticator (optional)
* `cassandra.migration.cluster.truststore`: Path to truststore.jar for cassandra client SSL (optional)
* `cassandra.migration.cluster.truststore_password`: Password for truststore.jar (optional)
* `cassandra.migration.cluster.keystore`: Path to keystore.jar for cassandra client SSL with certificate authentication (optional)
* `cassandra.migration.cluster.keystore_password`: Password for keystore.jar (optional)

Keyspace:
* `cassandra.migration.keyspace.name`: Name of Cassandra keyspace (required)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,16 @@ import com.builtamont.cassandra.migration.internal.util.VersionPrinter
import com.builtamont.cassandra.migration.internal.util.logging.LogFactory
import com.datastax.driver.core.Cluster
import com.datastax.driver.core.Metadata
import com.datastax.driver.core.NettySSLOptions
import com.datastax.driver.core.Session
import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy
import com.datastax.driver.core.policies.TokenAwarePolicy
import io.netty.handler.ssl.SslContextBuilder
import io.netty.handler.ssl.SslProvider
import java.io.FileInputStream
import java.security.KeyStore
import javax.net.ssl.KeyManagerFactory
import javax.net.ssl.TrustManagerFactory

/**
* This is the centre point of Cassandra migration, and for most users, the only class they will ever have to deal with.
Expand Down Expand Up @@ -247,13 +256,53 @@ class CassandraMigration : CassandraMigrationConfiguration {
// Build the Cluster
val builder = Cluster.Builder()
builder.addContactPoints(*keyspaceConfig.clusterConfig.contactpoints).withPort(keyspaceConfig.clusterConfig.port)

// Use TokenAware & DCAware load balancing policies
builder.withLoadBalancingPolicy(TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().build()))

if (!keyspaceConfig.clusterConfig.username.isNullOrBlank()) {
if (!keyspaceConfig.clusterConfig.password.isNullOrBlank()) {
builder.withCredentials(keyspaceConfig.clusterConfig.username, keyspaceConfig.clusterConfig.password)
} else {
throw IllegalArgumentException("Password must be provided with username.")
}
}

// Add SSL options to cluster builder
if (keyspaceConfig.clusterConfig.truststore != null) {
FileInputStream(keyspaceConfig.clusterConfig.truststore?.toFile()).use {

val sslCtxBuilder = SslContextBuilder.forClient()
.sslProvider(SslProvider.JDK)
// The Java cryptographic extensions (JCE) are required for AES 256
.ciphers(listOf("TLS_RSA_WITH_AES_256_CBC_SHA", "TLS_RSA_WITH_AES_128_CBC_SHA"))

val truststore = KeyStore.getInstance("JKS")
truststore.load(it, keyspaceConfig.clusterConfig.truststorePassword?.toCharArray() ?:
throw IllegalArgumentException("Truststore password must be provided with truststore."))

val tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm())
tmf.init(truststore)
sslCtxBuilder.trustManager(tmf)

if (keyspaceConfig.clusterConfig.keystore != null) {
FileInputStream(keyspaceConfig.clusterConfig.keystore?.toFile()).use {

val keystore = KeyStore.getInstance("JKS")
val keystorePass = keyspaceConfig.clusterConfig.keystorePassword?.toCharArray() ?:
throw IllegalArgumentException("Keystore password must be provided with keystore.")
keystore.load(it, keystorePass)

val kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm())
kmf.init(keystore, keystorePass)
sslCtxBuilder.keyManager(kmf)
}
}
builder.withSSL(NettySSLOptions(sslCtxBuilder.build()))
}
}


cluster = builder.build()

LOG.info(getConnectionInfo(cluster.metadata))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package com.builtamont.cassandra.migration.api.configuration

import java.nio.file.Path;
import java.nio.file.Paths;
import com.builtamont.cassandra.migration.internal.util.StringUtils

/**
Expand Down Expand Up @@ -51,6 +53,30 @@ class ClusterConfiguration {
var password: String? = null
get set

/**
* The path to the truststore.
*/
var truststore: Path? = null
get set

/**
* The password for the truststore.
*/
var truststorePassword: String? = null
get set

/**
* The path to the keystore.
*/
var keystore: Path? = null
get set

/**
* The password for the keystore.
*/
var keystorePassword: String? = null
get set

/**
* ClusterConfiguration initialization.
*/
Expand All @@ -66,6 +92,18 @@ class ClusterConfiguration {

val passwordProp = System.getProperty(ConfigurationProperty.PASSWORD.namespace)
if (!passwordProp.isNullOrBlank()) this.password = passwordProp.trim()

val truststoreProp = System.getProperty(ConfigurationProperty.TRUSTSTORE.namespace)
if (!truststoreProp.isNullOrBlank()) this.truststore = Paths.get(truststoreProp.trim())

val truststorePasswordProp = System.getProperty(ConfigurationProperty.TRUSTSTORE_PASSWORD.namespace)
if (!truststorePasswordProp.isNullOrBlank()) this.truststorePassword = truststorePasswordProp.trim()

val keystoreProp = System.getProperty(ConfigurationProperty.KEYSTORE.namespace)
if (!keystoreProp.isNullOrBlank()) this.keystore = Paths.get(keystoreProp.trim())

val keystorePasswordProp = System.getProperty(ConfigurationProperty.KEYSTORE_PASSWORD.namespace)
if (!keystorePasswordProp.isNullOrBlank()) this.keystorePassword = keystorePasswordProp.trim()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,26 @@ enum class ConfigurationProperty(val namespace: String, val description: String)
"Password for password authenticator"
),

TRUSTSTORE(
"cassandra.migration.cluster.truststore",
"Path to the truststore for client SSL"
),

TRUSTSTORE_PASSWORD(
"cassandra.migration.cluster.truststore_password",
"Password for the truststore"
),

KEYSTORE(
"cassandra.migration.cluster.keystore",
"Path to the keystore for client SSL certificate authentication"
),

KEYSTORE_PASSWORD(
"cassandra.migration.cluster.keystore_password",
"Password for the keystore"
),

// Keyspace name configuration properties
// ~~~~~~
KEYSPACE_NAME(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import com.builtamont.cassandra.migration.api.configuration.ConfigurationProperty;
import org.junit.Test;

import java.nio.file.Paths;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;

Expand All @@ -29,6 +31,18 @@ public void shouldHaveDefaultConfigValues() {

if (hasProperty(ConfigurationProperty.PASSWORD.getNamespace()))
assertThat(clusterConfig.getPassword(), is(nullValue()));

if (hasProperty("cassandra.migration.cluster.truststore"))
assertThat(clusterConfig.getTruststore(), is(nullValue()));

if (hasProperty("cassandra.migration.cluster.truststorePassword"))
assertThat(clusterConfig.getTruststorePassword(), is(nullValue()));

if (hasProperty("cassandra.migration.cluster.keystore"))
assertThat(clusterConfig.getKeystore(), is(nullValue()));

if (hasProperty("cassandra.migration.cluster.keystorePassword"))
assertThat(clusterConfig.getKeystorePassword(), is(nullValue()));
}

@Test
Expand All @@ -37,6 +51,10 @@ public void systemPropsShouldOverrideDefaultConfigValues() {
System.setProperty(ConfigurationProperty.PORT.getNamespace(), "9144");
System.setProperty(ConfigurationProperty.USERNAME.getNamespace(), "user");
System.setProperty(ConfigurationProperty.PASSWORD.getNamespace(), "pass");
System.setProperty(ConfigurationProperty.TRUSTSTORE.getNamespace(), "truststore.jks");
System.setProperty(ConfigurationProperty.TRUSTSTORE_PASSWORD.getNamespace(), "pass");
System.setProperty(ConfigurationProperty.KEYSTORE.getNamespace(), "keystore.jks");
System.setProperty(ConfigurationProperty.KEYSTORE_PASSWORD.getNamespace(), "pass");

ClusterConfiguration clusterConfig = new ClusterConfiguration();
assertThat(clusterConfig.getContactpoints()[0], is("192.168.0.1"));
Expand All @@ -45,6 +63,10 @@ public void systemPropsShouldOverrideDefaultConfigValues() {
assertThat(clusterConfig.getPort(), is(9144));
assertThat(clusterConfig.getUsername(), is("user"));
assertThat(clusterConfig.getPassword(), is("pass"));
assertThat(clusterConfig.getTruststore(), is(Paths.get("truststore.jks")));
assertThat(clusterConfig.getTruststorePassword(), is("pass"));
assertThat(clusterConfig.getKeystore(), is(Paths.get("keystore.jks")));
assertThat(clusterConfig.getKeystorePassword(), is("pass"));
}

/**
Expand Down