Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pulsar: TLS and Authentication support #158

Merged
merged 6 commits into from
Aug 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions plugins/pulsar-records-storage/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ dependencies {

compileOnly project(":app")

compile 'org.apache.pulsar:pulsar-client:2.3.2'
compile 'org.apache.pulsar:pulsar-client-original:2.4.0'

testCompileOnly 'org.projectlombok:lombok'
testAnnotationProcessor 'org.projectlombok:lombok'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
testCompile project(":tck")
testCompile 'org.testcontainers:pulsar'
testCompile 'org.apache.pulsar:pulsar-client-admin:2.3.2'
testCompile 'org.apache.pulsar:pulsar-client-admin-original:2.4.0'
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
import org.springframework.validation.annotation.Validated;

import javax.validation.constraints.NotEmpty;
import java.util.function.Supplier;
import java.util.Map;
import java.util.Optional;

@AutoService(ApplicationContextInitializer.class)
public class PulsarRecordsStorageConfiguration implements ApplicationContextInitializer<GenericApplicationContext> {
Expand All @@ -35,22 +36,40 @@ public void initialize(GenericApplicationContext applicationContext) {

var pulsarProperties = binder.bind("pulsar", PulsarProperties.class).get();

applicationContext.registerBean(RecordsStorage.class, new Supplier<RecordsStorage>() {
@Override
@SneakyThrows(PulsarClientException.class)
public RecordsStorage get() {
return new PulsarRecordsStorage(
PulsarClient.builder().serviceUrl(pulsarProperties.getServiceUrl()).build()
);
applicationContext.registerBean(RecordsStorage.class, () -> {
return new PulsarRecordsStorage(createClient(pulsarProperties));
});
}

@SneakyThrows
PulsarClient createClient(PulsarProperties pulsarProperties) {
var clientBuilder = PulsarClient.builder()
.serviceUrl(pulsarProperties.getServiceUrl());

pulsarProperties.getTlsTrustCertsFilePath().ifPresent(clientBuilder::tlsTrustCertsFilePath);
pulsarProperties.getAuthPluginClassName().ifPresent(authClass -> {
try {
clientBuilder.authentication(authClass, pulsarProperties.getAuthPluginParams());
} catch (PulsarClientException.UnsupportedAuthenticationException e) {
throw new IllegalStateException(e);
}
});

return clientBuilder.build();
}

@Data
@Validated
private static class PulsarProperties {
static class PulsarProperties {

@NotEmpty
String serviceUrl;

Optional<String> tlsTrustCertsFilePath = Optional.empty();

Optional<String> authPluginClassName = Optional.empty();

Map<String, String> authPluginParams = Map.of();

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ public class PulsarRecordsStorageTest implements RecordStorageTests {

static {
pulsar.start();
System.setProperty("pulsar.serviceUrl", pulsar.getPulsarBrokerUrl());

applicationContext = new ApplicationRunner("PULSAR", "MEMORY").run();
applicationContext = new ApplicationRunner("PULSAR", "MEMORY")
.withProperty("pulsar.serviceUrl", pulsar.getPulsarBrokerUrl())
.run();
}

@Getter
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package com.github.bsideup.liiklus.pulsar.config;

import com.github.bsideup.liiklus.pulsar.config.PulsarRecordsStorageConfiguration.PulsarProperties;
import com.github.bsideup.liiklus.pulsar.container.PulsarTlsContainer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.auth.AuthenticationBasic;
import org.junit.jupiter.api.Test;
import org.testcontainers.utility.MountableFile;

import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

class PulsarRecordsStorageConfigurationTest {

@Test
void shouldSupportTLS() throws Exception {
try (var pulsar = new PulsarTlsContainer()) {
pulsar.start();
var configuration = new PulsarRecordsStorageConfiguration();

var properties = new PulsarProperties();
properties.setServiceUrl(pulsar.getPulsarBrokerUrl());

var topicName = UUID.randomUUID().toString();

try (var client = configuration.createClient(properties)) {
assertThatThrownBy(() -> client.getPartitionsForTopic(topicName).get(10, TimeUnit.SECONDS))
lanwen marked this conversation as resolved.
Show resolved Hide resolved
.hasCauseInstanceOf(PulsarClientException.class);
}

properties.setTlsTrustCertsFilePath(Optional.of(pulsar.getCaCert().toAbsolutePath().toString()));

try (var client = configuration.createClient(properties)) {
assertThat(client.getPartitionsForTopic(topicName).get(10, TimeUnit.SECONDS))
.isNotEmpty();
}
}
}

@Test
void shouldSupportAuth() throws Exception {
try (var pulsar = new PulsarTlsContainer()) {
pulsar.withCopyFileToContainer(MountableFile.forClasspathResource(".htpasswd"), "/pulsar/conf/.htpasswd");
pulsar.withEnv("PULSAR_MEM", "-Dpulsar.auth.basic.conf=/pulsar/conf/.htpasswd");
pulsar.withEnv("superUserRoles", "super");

pulsar.withEnv("authenticationEnabled", "true");
pulsar.withEnv("authenticationProviders", "org.apache.pulsar.broker.authentication.AuthenticationProviderBasic");

pulsar.withEnv("brokerClientAuthenticationPlugin", "org.apache.pulsar.client.impl.auth.AuthenticationBasic");
pulsar.withEnv("brokerClientAuthenticationParameters", "{\"userId\":\"super\",\"password\":\"superpass\"}");
pulsar.start();
var configuration = new PulsarRecordsStorageConfiguration();

var topicName = UUID.randomUUID().toString();

var properties = new PulsarProperties();
properties.setServiceUrl(pulsar.getPulsarBrokerUrl());
properties.setTlsTrustCertsFilePath(Optional.of(pulsar.getCaCert().toAbsolutePath().toString()));

try (var client = configuration.createClient(properties)) {
assertThatThrownBy(() -> client.getPartitionsForTopic(topicName).get(10, TimeUnit.SECONDS))
.hasCauseInstanceOf(PulsarClientException.class);
}

properties.setAuthPluginClassName(Optional.of(AuthenticationBasic.class.getName()));
properties.setAuthPluginParams(Map.of(
"userId", "super",
"password", UUID.randomUUID().toString()
));

try (var client = configuration.createClient(properties)) {
assertThatThrownBy(() -> client.getPartitionsForTopic(topicName).get(10, TimeUnit.SECONDS))
.hasCauseInstanceOf(PulsarClientException.class);
}

properties.setAuthPluginParams(Map.of(
"userId", "super",
"password", "superpass"
));

try (var client = configuration.createClient(properties)) {
assertThat(client.getPartitionsForTopic(topicName).get(10, TimeUnit.SECONDS))
.isNotEmpty();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.github.bsideup.liiklus.pulsar.container;

import org.testcontainers.containers.PulsarContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.MountableFile;

import java.nio.file.Path;
import java.nio.file.Paths;

public class PulsarTlsContainer extends PulsarContainer {

public static final int BROKER_TLS_PORT = 6651;

public PulsarTlsContainer() {
this("2.4.0");
}

public PulsarTlsContainer(String pulsarVersion) {
super(pulsarVersion);
withExposedPorts(BROKER_TLS_PORT, BROKER_HTTP_PORT);
withEnv("PULSAR_PREFIX_brokerServicePortTls", BROKER_TLS_PORT + "");
withEnv("PULSAR_PREFIX_tlsEnabled", "true");
withEnv("PULSAR_PREFIX_tlsCertificateFilePath", "/pulsar/broker.cert.pem");
withEnv("PULSAR_PREFIX_tlsKeyFilePath", "/pulsar/broker.key-pk8.pem");
withEnv("PULSAR_PREFIX_tlsTrustCertsFilePath", "/pulsar/ca.cert.pem");

withCopyFileToContainer(MountableFile.forClasspathResource("certs/"), "/pulsar/");

setCommand(
"/bin/bash",
"-c",
"bin/apply-config-from-env.py conf/standalone.conf && " +
"bin/apply-config-from-env.py conf/proxy.conf && " +
"bin/pulsar standalone --no-functions-worker -nss"
);

waitingFor(Wait.forLogMessage(".*Created namespace public\\/default.*", 1));
}

public Path getCaCert() {
return Paths.get("src", "test", "resources", "certs").resolve("ca.cert.pem");
}

@Override
public String getPulsarBrokerUrl() {
return String.format("pulsar+ssl://%s:%s", getContainerIpAddress(), getMappedPort(BROKER_TLS_PORT));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
super:$apr1$6s/xhL9h$Ow3i695wwrEc1mYq/BG2q/
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Files are taken from Pulsar's example:
https://github.com/apache/pulsar/tree/master/tests/docker-images/latest-version-image/ssl
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
-----BEGIN CERTIFICATE-----
MIIEkDCCAnigAwIBAgICEAAwDQYJKoZIhvcNAQELBQAwETEPMA0GA1UEAwwGZm9v
YmFyMCAXDTE4MDYyMjA4NTUzMloYDzIyOTIwNDA2MDg1NTMyWjAjMSEwHwYDVQQD
DBhicm9rZXIucHVsc2FyLmFwYWNoZS5vcmcwggEiMA0GCSqGSIb3DQEBAQUAA4IB
DwAwggEKAoIBAQDQouKhZah4hMCqmg4aS5RhQG/Y1gA+yP9DGF9mlw35tfhfWs63
EvNjEK4L/ZWSEV45L/wc6YV14RmM6bJ0V/0vXo4xmISbqptND/2kRIspkLZQ5F0O
OQXVicqZLOc6igZQhRg8ANDYdTJUTF65DqauX4OJt3YMhF2FSt7jQtlj06IQBa01
+ARO9OotMJtBY+vIU5bV6JydfgkhQH9rIDI7AMeY5j02gGkJJrelfm+WoOsUez+X
aqTN3/tF8+MBcFB3G04s1qc2CJPJM3YGxvxEtHqTGI14t9J8p5O7X9JHpcY8X00s
bxa4FGbKgfDobbkJ+GgblWCkAcLN95sKTqtHAgMBAAGjgd0wgdowCQYDVR0TBAIw
ADARBglghkgBhvhCAQEEBAMCBkAwMwYJYIZIAYb4QgENBCYWJE9wZW5TU0wgR2Vu
ZXJhdGVkIFNlcnZlciBDZXJ0aWZpY2F0ZTAdBgNVHQ4EFgQUaxFvJrkEGqk8azTA
DyVyTyTbJAIwQQYDVR0jBDowOIAUVwvpyyPov0c+UHo/RX6hGEOdFSehFaQTMBEx
DzANBgNVBAMMBmZvb2JhcoIJANfih0+geeIMMA4GA1UdDwEB/wQEAwIFoDATBgNV
HSUEDDAKBggrBgEFBQcDATANBgkqhkiG9w0BAQsFAAOCAgEA35QDGclHzQtHs3yQ
ZzNOSKisg5srTiIoQgRzfHrXfkthNFCnBzhKjBxqk3EIasVtvyGuk0ThneC1ai3y
ZK3BivnMZfm1SfyvieFoqWetsxohWfcpOSVkpvO37P6v/NmmaTIGkBN3gxKCx0QN
zqApLQyNTM++X3wxetYH/afAGUrRmBGWZuJheQpB9yZ+FB6BRp8YuYIYBzANJyW9
spvXW03TpqX2AIoRBoGMLzK72vbhAbLWiCIfEYREhbZVRkP+yvD338cWrILlOEur
x/n8L/FTmbf7mXzHg4xaQ3zg/5+0OCPMDPUBE4xWDBAbZ82hgOcTqfVjwoPgo2V0
fbbx6redq44J3Vn5d9Xhi59fkpqEjHpX4xebr5iMikZsNTJMeLh0h3uf7DstuO9d
mfnF5j+yDXCKb9XzCsTSvGCN+spmUh6RfSrbkw8/LrRvBUpKVEM0GfKSnaFpOaSS
efM4UEi72FRjszzHEkdvpiLhYvihINLJmDXszhc3fCi42be/DGmUhuhTZWynOPmp
0N0V/8/sGT5gh4fGEtGzS/8xEvZwO9uDlccJiG8Pi+aO0/K9urB9nppd/xKWXv3C
cib/QrW0Qow4TADWC1fnGYCpFzzaZ2esPL2MvzOYXnW4/AbEqmb6Weatluai64ZK
3N2cGJWRyvpvvmbP2hKCa4eLgEc=
-----END CERTIFICATE-----
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
-----BEGIN PRIVATE KEY-----
MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDQouKhZah4hMCq
mg4aS5RhQG/Y1gA+yP9DGF9mlw35tfhfWs63EvNjEK4L/ZWSEV45L/wc6YV14RmM
6bJ0V/0vXo4xmISbqptND/2kRIspkLZQ5F0OOQXVicqZLOc6igZQhRg8ANDYdTJU
TF65DqauX4OJt3YMhF2FSt7jQtlj06IQBa01+ARO9OotMJtBY+vIU5bV6Jydfgkh
QH9rIDI7AMeY5j02gGkJJrelfm+WoOsUez+XaqTN3/tF8+MBcFB3G04s1qc2CJPJ
M3YGxvxEtHqTGI14t9J8p5O7X9JHpcY8X00sbxa4FGbKgfDobbkJ+GgblWCkAcLN
95sKTqtHAgMBAAECggEBALE1eMtfnk3nbAI74bih84D7C0Ug14p8jJv/qqBnsx4j
WrgbWDMVrJa7Rym2FQHBMMfgIwKnso0iSeJvaPz683j1lk833YKe0VQOPgD1m0IN
wV1J6mQ3OOZcKDIcerY1IBHqSmBEzR7dxIbnaxlCAX9gb0hdBK6zCwA5TMG5OQ5Y
3cGOmevK5i2PiejhpruA8h7E48P1ATaGHUZif9YD724oi6AcilQ8H/DlOjZTvlmK
r4aJ30f72NwGM8Ecet5CE2wyflAGtY0k+nChYkPRfy54u64Z/T9B53AvneFaj8jv
yFepZgRTs2cWhEl0KQGuBHQ4+IeOfMt2LebhvjWW8YkCgYEA7BXVsnqPHKRDd8wP
eNkolY4Fjdq4wu9ad+DaFiZcJuv7ugr+Kplltq6e4aU36zEdBYdPp/6KM/HGE/Xj
bo0CELNUKs/Ny9H/UJc8DDbVEmoF3XGiIbKKq1T8NTXTETFnwrGkBFD8nl7YTsOF
M4FZmSok0MhhkpEULAqxBS6YpQsCgYEA4jxM1egTVSWjTreg2UdYo2507jKa7maP
PRtoPsNJzWNbOpfj26l3/8pd6oYKWck6se6RxIUxUrk3ywhNJIIOvWEC7TaOH1c9
T4NQNcweqBW9+A1x5gyzT14gDaBfl45gs82vI+kcpVv/w2N3HZOQZX3yAUqWpfw2
yw1uQDXtgDUCgYEAiYPWbBXTkp1j5z3nrT7g0uxc89n5USLWkYlZvxktCEbg4+dP
UUT06EoipdD1F3wOKZA9p98uZT9pX2sUxOpBz7SFTEKq3xQ9IZZWFc9CoW08aVat
V++FsnLYTa5CeXtLsy6CGTmLTDx2xrpAtlWb+QmBVFPD8fmrxFOd9STFKS0CgYAt
6ztVN3OlFqyc75yQPXD6SxMkvdTAisSMDKIOCylRrNb5f5baIP2gR3zkeyxiqPtm
3htsHfSy67EtXpP50wQW4Dft2eLi7ZweJXMEWFfomfEjBeeWYAGNHHe5DFIauuVZ
2WexDEGqNpAlIm0s7aSjVPrn1DHbouOkNyenlMqN+QKBgQDVYVhk9widShSnCmUA
G30moXDgj3eRqCf5T7NEr9GXD1QBD/rQSPh5agnDV7IYLpV7/wkYLI7l9x7mDwu+
I9mRXkyAmTVEctLTdXQHt0jdJa5SfUaVEDUzQbr0fUjkmythTvqZ809+d3ELPeLI
5qJ7jxgksHWji4lYfL4r4J6Zaw==
-----END PRIVATE KEY-----
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
-----BEGIN CERTIFICATE-----
MIIFCDCCAvCgAwIBAgIJANfih0+geeIMMA0GCSqGSIb3DQEBCwUAMBExDzANBgNV
BAMMBmZvb2JhcjAeFw0xODA2MjIwODQ2MjFaFw0zODA2MTcwODQ2MjFaMBExDzAN
BgNVBAMMBmZvb2JhcjCCAiIwDQYJKoZIhvcNAQEBBQADggIPADCCAgoCggIBAOVU
UpTPeXCeyfUiQS824l9s9krZd4R6TA4D97eQ9EWm2D7ppV4gPApHO8j5f+joo/b6
Iso4aFlHpJ8VV2a5Ol7rjQw43MJHaBgwDxB1XWgsNdfoI7ebtp/BWg2nM3r8wm+Z
gKenf9d1/1Ol+6yFUehkLkIXUvldiVegmmje8FnwhcDNE1eTrh66XqSJXEXqgBKu
NqsoYcVak72OyOO1/N8CESoSdyBkbSiH5vJyo0AUCjn7tULga7fxojmqBZDog9Pg
e5Fi/hbCrdinbxBrMgIxQ7wqXw2sw6iOWu4FU8Ih/CuF4xaQy2YP7MEk4Ff0LCY0
KMhFMWU7550r/fz/C2l7fKhREyCQPa/bVE+dfxgZ/gCZ+p7vQ154hCCjpd+5bECv
SN1bcVIPG6ngQu4vMXa7QRBi/Od40jSVGVJXYY6kXvrYatad7035w2GGGGkvMsQm
y53yh4tqQfH7ulHqB0J5LebTQRp6nRizWigVCLjNkxJYI+Dj51qvT1zdyWEegKr1
CthBfYzXlfjeH3xri1f0UABeC12n24Wkacd9af7zs7S3rYntEK444w/3fB0F62Lh
SESfMLAmUH0dF5plRShrFUXz23nUeS8EYgWmnGkpf/HDzB67vdfAK0tfJEtmmY78
q06OSgMr+AOOqaomh4Ez2ZQG592bS71G8MrE7r2/AgMBAAGjYzBhMB0GA1UdDgQW
BBRXC+nLI+i/Rz5Qej9FfqEYQ50VJzAfBgNVHSMEGDAWgBRXC+nLI+i/Rz5Qej9F
fqEYQ50VJzAPBgNVHRMBAf8EBTADAQH/MA4GA1UdDwEB/wQEAwIBhjANBgkqhkiG
9w0BAQsFAAOCAgEAYd2PxdV+YOaWcmMG1fK7CGwSzDOGsgC7hi4gWPiNsVbz6fwQ
m5Ac7Zw76dzin8gzOPKST7B8WIoc7ZWrMnyh3G6A3u29Ec8iWahqGa91NPA3bOIl
0ldXnXfa416+JL/Q5utpiV6W2XDaB53v9GqpMk4rOTS9kCFOiuH5ZU8P69jp9mq6
7pI/+hWFr+21ibmXH6ANxRLd/5+AqojRUYowAu2997Z+xmbpwx/2Svciq3LNY/Vz
s9DudUHCBHj/DPgNxsEUt8QNohjQkRbFTY0a1aXodJ/pm0Ehk2kf9KwYYYduR7ak
6UmPIPrZg6FePNahxwMZ0RtgX7EXmpiiIH1q9BsulddWkrFQclevsWO3ONQVrDs2
gwY0HQuCRCJ+xgS2cyGiGohW5MkIsg1aI0i0j5GIUSppCIYgirAGCairARbCjhcx
pbMe8RTuBhCqO3R2wZ0wXu7P7/ArI/Ltm1dU6IeHUAUmeneVj5ie0SdA19mHTS2o
lG77N0jy6eq2zyEwJE6tuS/tyP1xrxdzXCYY7f6X9aNfsuPVQTcnrFajvDv8R6uD
YnRStVCdS6fZEP0JzsLrqp9bgLIRRsiqsVVBCgJdK1I/X59qk2EyCLXWSgk8T9XZ
iux8LlPpskt30YYt1KhlWB9zVz7k0uYAwits5foU6RfCRDPAyOa1q/QOXk0=
-----END CERTIFICATE-----
29 changes: 16 additions & 13 deletions tck/src/main/java/com/github/bsideup/liiklus/ApplicationRunner.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.github.bsideup.liiklus;

import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
Expand All @@ -14,17 +13,27 @@
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.HashMap;
import java.util.Map;

@RequiredArgsConstructor
@Slf4j
public class ApplicationRunner {

@NonNull
final String recordsStorageType;
final Map<String, Object> properties = new HashMap<>(Map.of(
"server.port", 0,
"rsocket.enabled", false,
"grpc.enabled", false
));

@NonNull
final String positionsStorageType;
public ApplicationRunner(@NonNull String recordsStorageType, @NonNull String positionsStorageType) {
withProperty("storage.records.type", recordsStorageType);
withProperty("storage.positions.type", positionsStorageType);
}

public ApplicationRunner withProperty(String key, Object value) {
properties.put(key, value);
return this;
}

@SneakyThrows
public ConfigurableApplicationContext run() {
Expand Down Expand Up @@ -72,13 +81,7 @@ protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundE
var createSpringApplicationMethod = applicationClass.getDeclaredMethod("createSpringApplication", String[].class);

var application = (SpringApplication) createSpringApplicationMethod.invoke(null, (Object) new String[0]);
application.setDefaultProperties(Map.of(
"server.port", 0,
"rsocket.enabled", false,
"grpc.enabled", false,
"storage.records.type", recordsStorageType,
"storage.positions.type", positionsStorageType
));
application.setDefaultProperties(properties);
return application.run();
} finally {
Thread.currentThread().setContextClassLoader(currentClassLoader);
Expand Down