Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Apache Pulsar Support #713

Merged
merged 4 commits into from
Jun 1, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ public String getKafkaImage() {
return (String) properties.getOrDefault("kafka.container.image", "confluentinc/cp-kafka");
}

public String getPulsarImage() {
return (String) properties.getOrDefault("pulsar.container.image", "apachepulsar/pulsar");
}

public boolean isDisableChecks() {
return Boolean.parseBoolean((String) properties.getOrDefault("checks.disable", "false"));
}
Expand Down
8 changes: 8 additions & 0 deletions modules/pulsar/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
description = "Testcontainers :: Pulsar"

dependencies {
compile project(':testcontainers')

testCompile group: 'org.apache.pulsar', name: 'pulsar-client', version: '2.0.0-rc1-incubating'
testCompile group: 'org.assertj', name: 'assertj-core', version: '3.10.0'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.testcontainers.containers;

import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.TestcontainersConfiguration;

/**
* This container wraps Apache pulsar running in stanalone mode
*/
public class PulsarContainer extends GenericContainer<PulsarContainer> {

public static final int PULSAR_PORT = 6850;

public PulsarContainer() {
this("2.0.0-rc1-incubating");
}

public PulsarContainer(String pulsarVersion) {
super(TestcontainersConfiguration.getInstance().getPulsarImage() + ":" + pulsarVersion);
withExposedPorts(PULSAR_PORT);
withCommand("/bin/bash", "-c", "" +
"servicePort=6850 webServicePort=8280 webServicePortTls=8643 bin/apply-config-from-env.py conf/proxy.conf && " +
"bin/pulsar standalone & " +
"bin/pulsar proxy --zookeeper-servers localhost:2181 --global-zookeeper-servers localhost:2181"
);

waitingFor(Wait.forLogMessage(".*messaging service is ready.*\\s", 1));
}

public String getPulsarBrokerUrl() {
return String.format("pulsar://%s:%s", this.getContainerIpAddress(), this.getFirstMappedPort());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package org.testcontainers.containers;

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.junit.Test;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;

public class PulsarContainerTest {

public static final String TEST_TOPIC = "test_topic";

@Test
public void testUsage() throws Exception {
try (PulsarContainer pulsar = new PulsarContainer()) {
pulsar.start();
testPulsarFunctionality(pulsar.getPulsarBrokerUrl());
}
}

protected void testPulsarFunctionality(String pulsarBrokerUrl) throws Exception {

try (
PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarBrokerUrl)
.build();
Consumer consumer = client.newConsumer()
.topic(TEST_TOPIC)
.subscriptionName("test-subs")
.subscribe();
Producer<byte[]> producer = client.newProducer()
.topic(TEST_TOPIC)
.create()
) {

producer.send("test containers".getBytes());
CompletableFuture<Message> future = consumer.receiveAsync();
Message message = future.get(5, TimeUnit.SECONDS);

assertThat(new String(message.getData()))
.isEqualTo("test containers");
}
}

}
18 changes: 18 additions & 0 deletions modules/pulsar/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<configuration>

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<!-- encoders are assigned the type
ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
<encoder>
<pattern>%d{HH:mm:ss.SSS} %-5level %logger - %msg%n</pattern>
</encoder>
</appender>

<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>

<logger name="org.testcontainers" level="DEBUG"/>
<logger name="org.testcontainers.shaded" level="WARN"/>

</configuration>