Skip to content

Commit

Permalink
Add Cassandra module (#776)
Browse files Browse the repository at this point in the history
* Implement Cassandra support

* use lombok in CassandraContainer, rephase comment, add import, add getCluster() method, remove excess exception usage

* remove excess casting from CassandraContainerTest

* split comma-separated dependencies

* update Cassandra driver

* use new wait strategy logic, fix the Docker image, remove Netty workaround
  • Loading branch information
bsideup committed Jul 10, 2018
1 parent e8bc279 commit e178313
Show file tree
Hide file tree
Showing 9 changed files with 1,682 additions and 11 deletions.
6 changes: 6 additions & 0 deletions modules/cassandra/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
description = "TestContainers :: Cassandra"

dependencies {
compile project(":database-commons")
compile "com.datastax.cassandra:cassandra-driver-core:3.5.1"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package org.testcontainers.containers;

import com.datastax.driver.core.Cluster;
import com.github.dockerjava.api.command.InspectContainerResponse;
import org.apache.commons.io.IOUtils;
import org.testcontainers.containers.delegate.CassandraDatabaseDelegate;
import org.testcontainers.delegate.DatabaseDelegate;
import org.testcontainers.ext.ScriptUtils;
import org.testcontainers.ext.ScriptUtils.ScriptLoadException;
import org.testcontainers.utility.MountableFile;

import javax.script.ScriptException;
import java.io.IOException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Optional;

/**
* Cassandra container
*
* Supports 2.x and 3.x Cassandra versions
*
* @author Eugeny Karpov
*/
public class CassandraContainer<SELF extends CassandraContainer<SELF>> extends GenericContainer<SELF> {

public static final String IMAGE = "cassandra";
public static final Integer CQL_PORT = 9042;
private static final String CONTAINER_CONFIG_LOCATION = "/etc/cassandra";
private static final String USERNAME = "cassandra";
private static final String PASSWORD = "cassandra";

private String configLocation;
private String initScriptPath;

public CassandraContainer() {
this(IMAGE + ":3.11.2");
}

public CassandraContainer(String dockerImageName) {
super(dockerImageName);
addExposedPort(CQL_PORT);
setStartupAttempts(3);
}

@Override
protected void configure() {
optionallyMapResourceParameterAsVolume(CONTAINER_CONFIG_LOCATION, configLocation);
}

@Override
protected void containerIsStarted(InspectContainerResponse containerInfo) {
runInitScriptIfRequired();
}

/**
* Load init script content and apply it to the database if initScriptPath is set
*/
private void runInitScriptIfRequired() {
if (initScriptPath != null) {
try {
URL resource = Thread.currentThread().getContextClassLoader().getResource(initScriptPath);
if (resource == null) {
logger().warn("Could not load classpath init script: {}", initScriptPath);
throw new ScriptLoadException("Could not load classpath init script: " + initScriptPath + ". Resource not found.");
}
String cql = IOUtils.toString(resource, StandardCharsets.UTF_8);
DatabaseDelegate databaseDelegate = getDatabaseDelegate();
ScriptUtils.executeDatabaseScript(databaseDelegate, initScriptPath, cql);
} catch (IOException e) {
logger().warn("Could not load classpath init script: {}", initScriptPath);
throw new ScriptLoadException("Could not load classpath init script: " + initScriptPath, e);
} catch (ScriptException e) {
logger().error("Error while executing init script: {}", initScriptPath, e);
throw new ScriptUtils.UncategorizedScriptException("Error while executing init script: " + initScriptPath, e);
}
}
}

/**
* Map (effectively replace) directory in Docker with the content of resourceLocation if resource location is not null
*
* Protected to allow for changing implementation by extending the class
*
* @param pathNameInContainer path in docker
* @param resourceLocation relative classpath to resource
*/
protected void optionallyMapResourceParameterAsVolume(String pathNameInContainer, String resourceLocation) {
Optional.ofNullable(resourceLocation)
.map(MountableFile::forClasspathResource)
.ifPresent(mountableFile -> addFileSystemBind(mountableFile.getResolvedPath(), pathNameInContainer, BindMode.READ_WRITE));
}

/**
* Initialize Cassandra with the custom overridden Cassandra configuration
* <p>
* Be aware, that Docker effectively replaces all /etc/cassandra content with the content of config location, so if
* Cassandra.yaml in configLocation is absent or corrupted, then Cassandra just won't launch
*
* @param configLocation relative classpath with the directory that contains cassandra.yaml and other configuration files
*/
public SELF withConfigurationOverride(String configLocation) {
this.configLocation = configLocation;
return self();
}

/**
* Initialize Cassandra with init CQL script
* <p>
* CQL script will be applied after container is started (see using WaitStrategy)
*
* @param initScriptPath relative classpath resource
*/
public SELF withInitScript(String initScriptPath) {
this.initScriptPath = initScriptPath;
return self();
}

/**
* Get username
*
* By default Cassandra has authenticator: AllowAllAuthenticator in cassandra.yaml
* If username and password need to be used, then authenticator should be set as PasswordAuthenticator
* (through custom Cassandra configuration) and through CQL with default cassandra-cassandra credentials
* user management should be modified
*/
public String getUsername() {
return USERNAME;
}

/**
* Get password
*
* By default Cassandra has authenticator: AllowAllAuthenticator in cassandra.yaml
* If username and password need to be used, then authenticator should be set as PasswordAuthenticator
* (through custom Cassandra configuration) and through CQL with default cassandra-cassandra credentials
* user management should be modified
*/
public String getPassword() {
return PASSWORD;
}

/**
* Get configured Cluster
*
* Can be used to obtain connections to Cassandra in the container
*/
public Cluster getCluster() {
return getCluster(this);
}

public static Cluster getCluster(ContainerState containerState) {
return Cluster.builder()
.addContactPoint(containerState.getContainerIpAddress())
.withPort(containerState.getMappedPort(CQL_PORT))
.build();
}

private DatabaseDelegate getDatabaseDelegate() {
return new CassandraDatabaseDelegate(this);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package org.testcontainers.containers.delegate;

import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.exceptions.DriverException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.testcontainers.containers.CassandraContainer;
import org.testcontainers.containers.ContainerState;
import org.testcontainers.delegate.AbstractDatabaseDelegate;
import org.testcontainers.exception.ConnectionCreationException;
import org.testcontainers.ext.ScriptUtils.ScriptStatementFailedException;

/**
* Cassandra database delegate
*
* @author Eugeny Karpov
*/
@Slf4j
@RequiredArgsConstructor
public class CassandraDatabaseDelegate extends AbstractDatabaseDelegate<Session> {

private final ContainerState container;

@Override
protected Session createNewConnection() {
try {
return CassandraContainer.getCluster(container)
.newSession();
} catch (DriverException e) {
log.error("Could not obtain cassandra connection");
throw new ConnectionCreationException("Could not obtain cassandra connection", e);
}
}

@Override
public void execute(String statement, String scriptPath, int lineNumber, boolean continueOnError, boolean ignoreFailedDrops) {
try {
ResultSet result = getConnection().execute(statement);
if (result.wasApplied()) {
log.debug("Statement {} was applied", statement);
} else {
throw new ScriptStatementFailedException(statement, lineNumber, scriptPath);
}
} catch (DriverException e) {
throw new ScriptStatementFailedException(statement, lineNumber, scriptPath, e);
}
}

@Override
protected void closeConnectionQuietly(Session session) {
try {
session.getCluster().close();
} catch (Exception e) {
log.error("Could not close cassandra connection", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package org.testcontainers.containers.wait;

import org.rnorth.ducttape.TimeoutException;
import org.testcontainers.containers.ContainerLaunchException;
import org.testcontainers.containers.delegate.CassandraDatabaseDelegate;
import org.testcontainers.containers.wait.strategy.AbstractWaitStrategy;
import org.testcontainers.delegate.DatabaseDelegate;

import java.util.concurrent.TimeUnit;

import static org.rnorth.ducttape.unreliables.Unreliables.retryUntilSuccess;

/**
* Waits until Cassandra returns its version
*
* @author Eugeny Karpov
*/
public class CassandraQueryWaitStrategy extends AbstractWaitStrategy {

private static final String SELECT_VERSION_QUERY = "SELECT release_version FROM system.local";
private static final String TIMEOUT_ERROR = "Timed out waiting for Cassandra to be accessible for query execution";

@Override
protected void waitUntilReady() {
// execute select version query until success or timeout
try {
retryUntilSuccess((int) startupTimeout.getSeconds(), TimeUnit.SECONDS, () -> {
getRateLimiter().doWhenReady(() -> {
try (DatabaseDelegate databaseDelegate = getDatabaseDelegate()) {
databaseDelegate.execute(SELECT_VERSION_QUERY, "", 1, false, false);
}
});
return true;
});
} catch (TimeoutException e) {
throw new ContainerLaunchException(TIMEOUT_ERROR);
}
}

private DatabaseDelegate getDatabaseDelegate() {
return new CassandraDatabaseDelegate(waitStrategyTarget);
}
}
Loading

0 comments on commit e178313

Please sign in to comment.