Skip to content

Commit

Permalink
Add Apache Pulsar Support (#713)
Browse files Browse the repository at this point in the history
* Add Apache Pulsar Support

* Reafctor container class

* Change property configuration

* Remove unused imports
  • Loading branch information
aahmed-se authored and bsideup committed Jun 1, 2018
1 parent 2fdf6e7 commit 0fa4118
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ public String getKafkaImage() {
return (String) properties.getOrDefault("kafka.container.image", "confluentinc/cp-kafka");
}

public String getPulsarImage() {
return (String) properties.getOrDefault("pulsar.container.image", "apachepulsar/pulsar");
}

public boolean isDisableChecks() {
return Boolean.parseBoolean((String) properties.getOrDefault("checks.disable", "false"));
}
Expand Down
8 changes: 8 additions & 0 deletions modules/pulsar/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
description = "Testcontainers :: Pulsar"

dependencies {
compile project(':testcontainers')

testCompile group: 'org.apache.pulsar', name: 'pulsar-client', version: '2.0.0-rc1-incubating'
testCompile group: 'org.assertj', name: 'assertj-core', version: '3.10.0'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.testcontainers.containers;

import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.TestcontainersConfiguration;

/**
* This container wraps Apache pulsar running in stanalone mode
*/
public class PulsarContainer extends GenericContainer<PulsarContainer> {

public static final int PULSAR_PORT = 6850;

public PulsarContainer() {
this("2.0.0-rc1-incubating");
}

public PulsarContainer(String pulsarVersion) {
super(TestcontainersConfiguration.getInstance().getPulsarImage() + ":" + pulsarVersion);
withExposedPorts(PULSAR_PORT);
withCommand("/bin/bash", "-c", "" +
"servicePort=6850 webServicePort=8280 webServicePortTls=8643 bin/apply-config-from-env.py conf/proxy.conf && " +
"bin/pulsar standalone & " +
"bin/pulsar proxy --zookeeper-servers localhost:2181 --global-zookeeper-servers localhost:2181"
);

waitingFor(Wait.forLogMessage(".*messaging service is ready.*\\s", 1));
}

public String getPulsarBrokerUrl() {
return String.format("pulsar://%s:%s", this.getContainerIpAddress(), this.getFirstMappedPort());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package org.testcontainers.containers;

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.junit.Test;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;

public class PulsarContainerTest {

public static final String TEST_TOPIC = "test_topic";

@Test
public void testUsage() throws Exception {
try (PulsarContainer pulsar = new PulsarContainer()) {
pulsar.start();
testPulsarFunctionality(pulsar.getPulsarBrokerUrl());
}
}

protected void testPulsarFunctionality(String pulsarBrokerUrl) throws Exception {

try (
PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarBrokerUrl)
.build();
Consumer consumer = client.newConsumer()
.topic(TEST_TOPIC)
.subscriptionName("test-subs")
.subscribe();
Producer<byte[]> producer = client.newProducer()
.topic(TEST_TOPIC)
.create()
) {

producer.send("test containers".getBytes());
CompletableFuture<Message> future = consumer.receiveAsync();
Message message = future.get(5, TimeUnit.SECONDS);

assertThat(new String(message.getData()))
.isEqualTo("test containers");
}
}

}
18 changes: 18 additions & 0 deletions modules/pulsar/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<configuration>

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<!-- encoders are assigned the type
ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
<encoder>
<pattern>%d{HH:mm:ss.SSS} %-5level %logger - %msg%n</pattern>
</encoder>
</appender>

<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>

<logger name="org.testcontainers" level="DEBUG"/>
<logger name="org.testcontainers.shaded" level="WARN"/>

</configuration>

0 comments on commit 0fa4118

Please sign in to comment.