Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Apache Pulsar Support #713

Merged
merged 4 commits into from
Jun 1, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ public String getKafkaImage() {
return (String) properties.getOrDefault("kafka.container.image", "confluentinc/cp-kafka");
}

public String getPulsarImage() {
return (String) properties.getOrDefault("pulsar.container.image", "apachepulsar/pulsar");
}

public boolean isDisableChecks() {
return Boolean.parseBoolean((String) properties.getOrDefault("checks.disable", "false"));
}
Expand Down
8 changes: 8 additions & 0 deletions modules/pulsar/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
description = "Testcontainers :: Pulsar"

dependencies {
compile project(':testcontainers')

testCompile group: 'org.apache.pulsar', name: 'pulsar-client', version: '2.0.0-rc1-incubating'
testCompile group: 'org.assertj', name: 'assertj-core', version: '3.10.0'
}
104 changes: 104 additions & 0 deletions modules/pulsar/out/production/resources/proxy.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
#
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this file seems to be committed by accident ( modules/pulsar/out/production/ folder)

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

# Zookeeper quorum connection string (comma-separated)
zookeeperServers=

# Configuration Store connection string (comma-separated)
configurationStoreServers=

# ZooKeeper session timeout
zookeeperSessionTimeoutMs=30000

# Port to use to server binary-proto request
servicePort=6750

# Port to use to server binary-proto-tls request
servicePortTls=6751

# Port that discovery service listen on
webServicePort=8180

# Port to use to server HTTPS request
webServicePortTls=8543

# Path for the file used to determine the rotation status for the proxy-instance when responding
# to service discovery health checks
statusFilePath=

### --- Authentication --- ###

# Enable authentication
authenticationEnabled=false

# Authentication provider name list, which is comma separated list of class names (comma-separated)
authenticationProviders=

# Enforce authorization
authorizationEnabled=false

# Authorization provider fully qualified class-name
authorizationProvider=org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider

# Authentication settings of the proxy itself. Used to connect to brokers
brokerClientAuthenticationPlugin=
brokerClientAuthenticationParameters=
brokerClientTrustCertsFilePath=

# Role names that are treated as "super-user", meaning they will be able to do all admin
# operations and publish/consume from all topics (comma-separated)
superUserRoles=

# Forward client authorization Credentials to Broker for re authorization
# make sure authentication is enabled for this to take effect
forwardAuthorizationCredentials=false

# --- RateLimiting ----
# Max concurrent inbound Connections, proxy will reject requests beyond that. Default value is 10,000
maxConcurrentInboundConnections=10000

# Max concurrent outbound Connections, proxy will error out requests beyond that. Default value is 10,000
maxConcurrentLookupRequests=10000

##### --- TLS --- #####

# Enable TLS in the proxy
tlsEnabledInProxy=false

# Enable TLS when talking with the brokers
tlsEnabledWithBroker=false

# Path for the TLS certificate file
tlsCertificateFilePath=

# Path for the TLS private key file
tlsKeyFilePath=

# Validates hostname when proxy creates tls connection with broker
tlsHostnameVerificationEnabled=false

# Specify whether Client certificates are required for TLS
# Reject the Connection if the Client Certificate is not trusted.
tlsRequireTrustedClientCertOnConnect=false


### --- Deprecated config variables --- ###

# Deprecated. Use configurationStoreServers
globalZookeeperServers=
18 changes: 18 additions & 0 deletions modules/pulsar/out/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<configuration>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the same as my previous comment about modules/pulsar/out/ folder


<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<!-- encoders are assigned the type
ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
<encoder>
<pattern>%d{HH:mm:ss.SSS} %-5level %logger - %msg%n</pattern>
</encoder>
</appender>

<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>

<logger name="org.testcontainers" level="DEBUG"/>
<logger name="org.testcontainers.shaded" level="WARN"/>

</configuration>
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package org.testcontainers.containers;

import org.testcontainers.utility.Base58;
import org.testcontainers.utility.TestcontainersConfiguration;

import java.util.stream.Stream;

/**
* This container wraps Apache pulsar running in stanalone mode
*/
public class PulsarContainer extends GenericContainer<PulsarContainer> {

public static final int PULSAR_PORT = 6850;

public PulsarContainer() {
this("2.0.0-rc1-incubating");
}

public PulsarContainer(String pulsarVersion) {
super(TestcontainersConfiguration.getInstance().getPulsarImage() + ":" + pulsarVersion);

withNetwork(Network.newNetwork());
String networkAlias = "pulsar-" + Base58.randomString(6);
withNetworkAliases(networkAlias);
withExposedPorts(PULSAR_PORT);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

network is not needed here (was only needed for Kafka), only withExposedPorts

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


}

public String getPulsarBrokerUrl() {
return String.format("pulsar://%s:%s", this.getContainerIpAddress(), this.getFirstMappedPort());
}

@Override
public void start() {
withClasspathResourceMapping("proxy.conf", "/pulsar/conf/proxy.conf", BindMode.READ_ONLY);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please move these to the constructor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

withCommand("/bin/bash", "-c", "bin/pulsar standalone & bin/pulsar proxy --zookeeper-servers localhost:2181 --global-zookeeper-servers localhost:2181");
super.start();
}

@Override
public void stop() {
Stream.<Runnable>of(super::stop).parallel().forEach(Runnable::run);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this override is not needed.
In Kafka, we have to start a few more containers besides Kafka, but Pulsar is much better and does not need that 💯

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
}
104 changes: 104 additions & 0 deletions modules/pulsar/src/main/resources/proxy.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
#
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will it work without this file? If so, it would be better to avoid file mounting because in some rare environments that's not possible

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do need it simply because we need to change the default port of the embedded proxy , otherwise we will have to manipulate the advertised address which I prefer less.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it's possible to keep only changed properties? Currently it's a bit unclear what was changed compared to the default config :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so the best I can think of is to add comments

Copy link
Member

@bsideup bsideup May 31, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was able to do it like this:

withCommand("/bin/bash", "-c", "" +
    "servicePort=6850 webServicePort=8280 webServicePortTls=8643 bin/apply-config-from-env.py conf/proxy.conf && " +
    "bin/pulsar standalone & " +
    "bin/pulsar proxy --zookeeper-servers localhost:2181 --global-zookeeper-servers localhost:2181"
);

waitingFor(Wait.forLogMessage(".*messaging service is ready.*\\s", 1));

WDYT? :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's fine will go with that.

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

# Zookeeper quorum connection string (comma-separated)
zookeeperServers=

# Configuration Store connection string (comma-separated)
configurationStoreServers=

# ZooKeeper session timeout
zookeeperSessionTimeoutMs=30000

# Port to use to server binary-proto request
servicePort=6850

# Port to use to server binary-proto-tls request
servicePortTls=6851

# Port that discovery service listen on
webServicePort=8280

# Port to use to server HTTPS request
webServicePortTls=8643

# Path for the file used to determine the rotation status for the proxy-instance when responding
# to service discovery health checks
statusFilePath=

### --- Authentication --- ###

# Enable authentication
authenticationEnabled=false

# Authentication provider name list, which is comma separated list of class names (comma-separated)
authenticationProviders=

# Enforce authorization
authorizationEnabled=false

# Authorization provider fully qualified class-name
authorizationProvider=org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider

# Authentication settings of the proxy itself. Used to connect to brokers
brokerClientAuthenticationPlugin=
brokerClientAuthenticationParameters=
brokerClientTrustCertsFilePath=

# Role names that are treated as "super-user", meaning they will be able to do all admin
# operations and publish/consume from all topics (comma-separated)
superUserRoles=

# Forward client authorization Credentials to Broker for re authorization
# make sure authentication is enabled for this to take effect
forwardAuthorizationCredentials=false

# --- RateLimiting ----
# Max concurrent inbound Connections, proxy will reject requests beyond that. Default value is 10,000
maxConcurrentInboundConnections=10000

# Max concurrent outbound Connections, proxy will error out requests beyond that. Default value is 10,000
maxConcurrentLookupRequests=10000

##### --- TLS --- #####

# Enable TLS in the proxy
tlsEnabledInProxy=false

# Enable TLS when talking with the brokers
tlsEnabledWithBroker=false

# Path for the TLS certificate file
tlsCertificateFilePath=

# Path for the TLS private key file
tlsKeyFilePath=

# Validates hostname when proxy creates tls connection with broker
tlsHostnameVerificationEnabled=false

# Specify whether Client certificates are required for TLS
# Reject the Connection if the Client Certificate is not trusted.
tlsRequireTrustedClientCertOnConnect=false


### --- Deprecated config variables --- ###

# Deprecated. Use configurationStoreServers
globalZookeeperServers=
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package org.testcontainers.containers;

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.junit.Test;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;

public class PulsarContainerTest {

public static final String TEST_TOPIC = "test_topic";

@Test
public void testUsage() throws Exception {
try (PulsarContainer pulsar = new PulsarContainer()) {
pulsar.start();
testPulsarFunctionality(pulsar.getPulsarBrokerUrl());
}
}

protected void testPulsarFunctionality(String pulsarBrokerUrl) throws Exception {

try (
PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarBrokerUrl)
.build();
Consumer consumer = client.newConsumer()
.topic(TEST_TOPIC)
.subscriptionName("test-subs")
.subscribe();
Producer<byte[]> producer = client.newProducer()
.topic(TEST_TOPIC)
.create()
) {

producer.send("test containers".getBytes());
CompletableFuture<Message> future = consumer.receiveAsync();
Message message = future.get(5, TimeUnit.SECONDS);

assertThat(new String(message.getData()))
.isEqualTo("test containers");
}
}

}
18 changes: 18 additions & 0 deletions modules/pulsar/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<configuration>

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<!-- encoders are assigned the type
ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
<encoder>
<pattern>%d{HH:mm:ss.SSS} %-5level %logger - %msg%n</pattern>
</encoder>
</appender>

<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>

<logger name="org.testcontainers" level="DEBUG"/>
<logger name="org.testcontainers.shaded" level="WARN"/>

</configuration>