Skip to content

Commit

Permalink
Add integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Rowanto committed Jul 29, 2019
1 parent 09169fe commit 16fb39c
Show file tree
Hide file tree
Showing 14 changed files with 447 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@
import org.springframework.validation.annotation.Validated;

import javax.validation.constraints.NotEmpty;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;

@AutoService(ApplicationContextInitializer.class)
public class PulsarRecordsStorageConfiguration implements ApplicationContextInitializer<GenericApplicationContext> {
Expand All @@ -29,34 +27,33 @@ public void initialize(GenericApplicationContext applicationContext) {
return;
}

if (!"PULSAR".equals(environment.getProperty("storage.records.type"))) {
if (!"PULSAR".equalsIgnoreCase(environment.getProperty("storage.records.type"))) {
return;
}

var binder = Binder.get(environment);

var pulsarProperties = binder.bind("pulsar", PulsarProperties.class).get();

applicationContext.registerBean(RecordsStorage.class, new Supplier<RecordsStorage>() {
@Override
@SneakyThrows(PulsarClientException.class)
public RecordsStorage get() {
var clientBuilder = PulsarClient.builder()
.serviceUrl(pulsarProperties.getServiceUrl());

pulsarProperties.getTlsTrustCertsFilePath().ifPresent(clientBuilder::tlsTrustCertsFilePath);
pulsarProperties.getAuthPluginClassName()
.ifPresent(authClass -> {
try {
clientBuilder.authentication(authClass, pulsarProperties.getAuthPluginParams());
} catch (PulsarClientException.UnsupportedAuthenticationException e) {
throw new IllegalStateException(e);
}
});

return new PulsarRecordsStorage(clientBuilder.build());
}
});
applicationContext.registerBean(RecordsStorage.class, () -> createStorage(pulsarProperties));
}

@SneakyThrows
private RecordsStorage createStorage(PulsarProperties pulsarProperties) {
var clientBuilder = PulsarClient.builder()
.serviceUrl(pulsarProperties.getServiceUrl());

pulsarProperties.getTlsTrustCertsFilePath().ifPresent(clientBuilder::tlsTrustCertsFilePath);
pulsarProperties.getAuthPluginClassName()
.ifPresent(authClass -> {
try {
clientBuilder.authentication(authClass, pulsarProperties.getAuthPluginParams().orElse(""));
} catch (PulsarClientException.UnsupportedAuthenticationException e) {
throw new IllegalStateException(e);
}
});

return new PulsarRecordsStorage(clientBuilder.build());
}

@Data
Expand All @@ -70,7 +67,7 @@ private static class PulsarProperties {

Optional<String> authPluginClassName = Optional.empty();

Map<String, String> authPluginParams = Map.of();
Optional<String> authPluginParams = Optional.empty();

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.github.bsideup.liiklus.pulsar;

import com.github.bsideup.liiklus.records.RecordStorageTests;
import lombok.Getter;
import org.apache.pulsar.client.impl.Murmur3_32Hash;
import org.apache.pulsar.client.util.MathUtils;
import org.springframework.context.ApplicationContext;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

public abstract class PulsarAbstractStorageTest implements RecordStorageTests {

static final String VERSION = "2.4.0";

private static final int NUM_OF_PARTITIONS = 4;

// Generate a set of keys where each key goes to unique partition
private static Map<Integer, String> PARTITION_KEYS = Mono.fromCallable(() -> UUID.randomUUID().toString())
.repeat()
.scanWith(
() -> new HashMap<Integer, String>(),
(acc, it) -> {
acc.put(MathUtils.signSafeMod(Murmur3_32Hash.getInstance().makeHash(it), NUM_OF_PARTITIONS), it);
return acc;
}
)
.filter(it -> it.size() == NUM_OF_PARTITIONS)
.blockFirst(Duration.ofSeconds(10));

static ApplicationContext applicationContext;

@Getter
String topic = UUID.randomUUID().toString();

@Override
public String keyByPartition(int partition) {
return PARTITION_KEYS.get(partition);
}

@Override
public int getNumberOfPartitions() {
return NUM_OF_PARTITIONS;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,37 +6,12 @@
import lombok.Getter;
import lombok.SneakyThrows;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.impl.Murmur3_32Hash;
import org.apache.pulsar.client.util.MathUtils;
import org.springframework.context.ApplicationContext;
import org.junit.jupiter.api.AfterAll;
import org.testcontainers.containers.PulsarContainer;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
public class PulsarRecordsStorageTest extends PulsarAbstractStorageTest implements RecordStorageTests {

public class PulsarRecordsStorageTest implements RecordStorageTests {

private static final int NUM_OF_PARTITIONS = 4;

// Generate a set of keys where each key goes to unique partition
public static Map<Integer, String> PARTITION_KEYS = Mono.fromCallable(() -> UUID.randomUUID().toString())
.repeat()
.scanWith(
() -> new HashMap<Integer, String>(),
(acc, it) -> {
acc.put(MathUtils.signSafeMod(Murmur3_32Hash.getInstance().makeHash(it), NUM_OF_PARTITIONS), it);
return acc;
}
)
.filter(it -> it.size() == NUM_OF_PARTITIONS)
.blockFirst(Duration.ofSeconds(10));

private static final PulsarContainer pulsar = new PulsarContainer();

static final ApplicationContext applicationContext;
private static final PulsarContainer pulsar = new PulsarContainer(VERSION);

static {
pulsar.start();
Expand All @@ -48,9 +23,6 @@ public class PulsarRecordsStorageTest implements RecordStorageTests {
@Getter
RecordsStorage target = applicationContext.getBean(RecordsStorage.class);

@Getter
String topic = UUID.randomUUID().toString();

@SneakyThrows
public PulsarRecordsStorageTest() {
PulsarAdmin pulsarAdmin = PulsarAdmin.builder()
Expand All @@ -60,13 +32,8 @@ public PulsarRecordsStorageTest() {
pulsarAdmin.topics().createPartitionedTopic(topic, getNumberOfPartitions());
}

@Override
public String keyByPartition(int partition) {
return PARTITION_KEYS.get(partition);
}

@Override
public int getNumberOfPartitions() {
return NUM_OF_PARTITIONS;
@AfterAll
static void tearDown() {
pulsar.stop();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.github.bsideup.liiklus.pulsar;

import com.github.bsideup.liiklus.ApplicationRunner;
import com.github.bsideup.liiklus.pulsar.container.PulsarTlsContainer;
import com.github.bsideup.liiklus.records.RecordStorageTests;
import com.github.bsideup.liiklus.records.RecordsStorage;
import lombok.Getter;
import lombok.SneakyThrows;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.junit.jupiter.api.AfterAll;

import java.util.Map;

public class PulsarWithTlsAuthRecordsStorageTest extends PulsarAbstractStorageTest implements RecordStorageTests {

private static final PulsarTlsContainer pulsar = new PulsarTlsContainer(VERSION).withTlsAuthentication();

static {
pulsar.start();

System.getProperties().putAll(Map.of(
"pulsar.serviceUrl", pulsar.getPulsarTlsBrokerUrl(),
"pulsar.tlsTrustCertsFilePath", pulsar.getCaCert().toAbsolutePath().toString(),
"pulsar.authPluginClassName", "org.apache.pulsar.broker.authentication.AuthenticationProviderTls",
"pulsar.authPluginParams",
String.format(
"tlsCertFile:%s,tlsKeyFile:%s",
pulsar.getUserCert().toAbsolutePath().toString(),
pulsar.getUserKey().toAbsolutePath().toString()
)
));

applicationContext = new ApplicationRunner("PULSAR", "MEMORY").run();
}

@Getter
RecordsStorage target = applicationContext.getBean(RecordsStorage.class);

@SneakyThrows
public PulsarWithTlsAuthRecordsStorageTest() {
PulsarAdmin pulsarAdmin = PulsarAdmin.builder()
.serviceHttpUrl(pulsar.getHttpsServiceUrl())
.tlsTrustCertsFilePath(pulsar.getCaCert().toAbsolutePath().toString())
.authentication(new AuthenticationTls(
pulsar.getUserCert().toAbsolutePath().toString(),
pulsar.getUserKey().toAbsolutePath().toString()
))
.build();

pulsarAdmin.topics().createPartitionedTopic(topic, getNumberOfPartitions());
}

@AfterAll
static void tearDown() {
pulsar.stop();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.github.bsideup.liiklus.pulsar;

import com.github.bsideup.liiklus.ApplicationRunner;
import com.github.bsideup.liiklus.pulsar.container.PulsarTlsContainer;
import com.github.bsideup.liiklus.records.RecordStorageTests;
import com.github.bsideup.liiklus.records.RecordsStorage;
import lombok.Getter;
import lombok.SneakyThrows;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.junit.jupiter.api.AfterAll;

import java.util.Map;

public class PulsarWithTlsRecordsStorageTest extends PulsarAbstractStorageTest implements RecordStorageTests {

private static final PulsarTlsContainer pulsar = new PulsarTlsContainer(VERSION).withTlsAuthentication();

static {
pulsar.start();

System.getProperties().putAll(Map.of(
"pulsar.serviceUrl", pulsar.getPulsarTlsBrokerUrl(),
"pulsar.tlsTrustCertsFilePath", pulsar.getCaCert().toAbsolutePath().toString()
));
applicationContext = new ApplicationRunner("PULSAR", "MEMORY").run();
}

@Getter
RecordsStorage target = applicationContext.getBean(RecordsStorage.class);

@SneakyThrows
public PulsarWithTlsRecordsStorageTest() {
PulsarAdmin pulsarAdmin = PulsarAdmin.builder()
.serviceHttpUrl(pulsar.getHttpsServiceUrl())
.tlsTrustCertsFilePath(pulsar.getCaCert().toAbsolutePath().toString())
.authentication(new AuthenticationTls(
pulsar.getUserCert().toAbsolutePath().toString(),
pulsar.getUserKey().toAbsolutePath().toString()
))
.build();

pulsarAdmin.topics().createPartitionedTopic(topic, getNumberOfPartitions());
}

@AfterAll
static void tearDown() {
pulsar.stop();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.github.bsideup.liiklus.pulsar.container;

import org.testcontainers.containers.PulsarContainer;
import org.testcontainers.utility.MountableFile;

import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;

public class PulsarTlsContainer extends PulsarContainer {

public PulsarTlsContainer(String pulsarVersion) {
super(pulsarVersion);
withExposedPorts(BROKER_HTTP_PORT, BROKER_PORT, 6651, 8443);
withEnv("PULSAR_PREFIX_brokerServicePortTls", "6651");
withEnv("PULSAR_PREFIX_webServicePortTls", "8443");
withEnv("PULSAR_PREFIX_tlsEnabled", "true");
withEnv("PULSAR_PREFIX_tlsCertificateFilePath", "/pulsar/broker.cert.pem");
withEnv("PULSAR_PREFIX_tlsKeyFilePath", "/pulsar/broker.key-pk8.pem");
withEnv("PULSAR_PREFIX_tlsTrustCertsFilePath", "/pulsar/ca.cert.pem");

Path resourceDirectory = getTestCertsFolder();
Arrays.stream(resourceDirectory.toFile().listFiles())
.forEach(certFile -> {
var certFilePath = certFile.toPath().toAbsolutePath();
var fileName = certFilePath.getFileName().toString();
withCopyFileToContainer(
MountableFile.forHostPath(certFilePath),
"/pulsar/" + fileName
);
});

setCommand(
"/bin/bash",
"-c",
"bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone --no-functions-worker -nss"
);
}

private Path getTestCertsFolder() {
return Paths.get("src", "test", "resources", "certs");
}

public Path getCaCert() {
return getTestCertsFolder().resolve("ca.cert.pem");
}

public Path getUserCert() {
return getTestCertsFolder().resolve("user1.cert.pem");
}

public Path getUserKey() {
return getTestCertsFolder().resolve("user1.key-pk8.pem");
}

public PulsarTlsContainer withTlsAuthentication() {
withEnv("PULSAR_PREFIX_authenticationEnabled", "true");
withEnv("PULSAR_PREFIX_authenticationProviders", "org.apache.pulsar.broker.authentication.AuthenticationProviderTls");
return this;
}

public String getPulsarTlsBrokerUrl() {
return String.format("pulsar+ssl://%s:%s", getContainerIpAddress(), getMappedPort(6651));
}

public String getHttpsServiceUrl() {
return String.format("https://%s:%s", getContainerIpAddress(), getMappedPort(8443));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Files are taken from Pulsar's example:
https://github.com/apache/pulsar/tree/master/tests/docker-images/latest-version-image/ssl
Loading

0 comments on commit 16fb39c

Please sign in to comment.