diff --git a/docs/modules/kafka.md b/docs/modules/kafka.md
index fb953716225..6ec63126387 100644
--- a/docs/modules/kafka.md
+++ b/docs/modules/kafka.md
@@ -1,7 +1,12 @@
-# Kafka Containers
+# Kafka Module
Testcontainers can be used to automatically instantiate and manage [Apache Kafka](https://kafka.apache.org) containers.
-More precisely Testcontainers uses the official Docker images for [Confluent OSS Platform](https://hub.docker.com/r/confluentinc/cp-kafka/)
+
+Currently, two different Kafka images are supported:
+
+* `org.testcontainers.containers.KafkaContainer` supports
+[confluentinc/cp-kafka](https://hub.docker.com/r/confluentinc/cp-kafka/)
+* `org.testcontainers.kafka.KafkaContainer` supports [apache/kafka](https://hub.docker.com/r/apache/kafka/)
## Benefits
@@ -24,6 +29,9 @@ Now your tests or any other process running on your machine can get access to ru
## Options
+
+!!! note
+ The options below are only available for `org.testcontainers.containers.KafkaContainer`
### Using external Zookeeper
diff --git a/modules/kafka/src/main/java/org/testcontainers/kafka/KafkaContainer.java b/modules/kafka/src/main/java/org/testcontainers/kafka/KafkaContainer.java
new file mode 100644
index 00000000000..547a7dc7960
--- /dev/null
+++ b/modules/kafka/src/main/java/org/testcontainers/kafka/KafkaContainer.java
@@ -0,0 +1,93 @@
+package org.testcontainers.kafka;
+
+import com.github.dockerjava.api.command.InspectContainerResponse;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.images.builder.Transferable;
+import org.testcontainers.utility.DockerImageName;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Testcontainers implementation for Apache Kafka.
+ *
+ * Supported image: {@code apache/kafka}
+ *
+ * Exposed ports: 9092
+ */
+public class KafkaContainer extends GenericContainer {
+
+ private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("apache/kafka");
+
+ private static final int KAFKA_PORT = 9092;
+
+ private static final String DEFAULT_INTERNAL_TOPIC_RF = "1";
+
+ private static final String STARTER_SCRIPT = "/testcontainers_start.sh";
+
+ private static final String DEFAULT_CLUSTER_ID = "4L6g3nShT-eMCtK--X86sw";
+
+ public KafkaContainer(String imageName) {
+ this(DockerImageName.parse(imageName));
+ }
+
+ public KafkaContainer(DockerImageName dockerImageName) {
+ super(dockerImageName);
+ dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME);
+
+ withExposedPorts(KAFKA_PORT);
+ withEnv("CLUSTER_ID", DEFAULT_CLUSTER_ID);
+
+ withEnv(
+ "KAFKA_LISTENERS",
+ "PLAINTEXT://0.0.0.0:" + KAFKA_PORT + ",BROKER://0.0.0.0:9093, CONTROLLER://0.0.0.0:9094"
+ );
+ withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT");
+ withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER");
+ withEnv("KAFKA_PROCESS_ROLES", "broker,controller");
+ withEnv("KAFKA_CONTROLLER_LISTENER_NAMES", "CONTROLLER");
+
+ withEnv("KAFKA_NODE_ID", "1");
+ withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF);
+ withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", DEFAULT_INTERNAL_TOPIC_RF);
+ withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF);
+ withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", DEFAULT_INTERNAL_TOPIC_RF);
+ withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + "");
+ withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0");
+
+ withCommand("sh", "-c", "while [ ! -f " + STARTER_SCRIPT + " ]; do sleep 0.1; done; " + STARTER_SCRIPT);
+ waitingFor(Wait.forLogMessage(".*Transitioning from RECOVERY to RUNNING.*", 1));
+ }
+
+ @Override
+ protected void configure() {
+ String firstNetworkAlias = getNetworkAliases().stream().findFirst().orElse(null);
+ String networkAlias = getNetwork() != null ? firstNetworkAlias : "localhost";
+ String controllerQuorumVoters = String.format("%s@%s:9094", getEnvMap().get("KAFKA_NODE_ID"), networkAlias);
+ withEnv("KAFKA_CONTROLLER_QUORUM_VOTERS", controllerQuorumVoters);
+ }
+
+ @Override
+ protected void containerIsStarting(InspectContainerResponse containerInfo) {
+ String brokerAdvertisedListener = String.format(
+ "BROKER://%s:%s",
+ containerInfo.getConfig().getHostName(),
+ "9093"
+ );
+ List advertisedListeners = new ArrayList<>();
+ advertisedListeners.add("PLAINTEXT://" + getBootstrapServers());
+ advertisedListeners.add(brokerAdvertisedListener);
+ String kafkaAdvertisedListeners = String.join(",", advertisedListeners);
+ String command = "#!/bin/bash\n";
+ // exporting KAFKA_ADVERTISED_LISTENERS with the container hostname
+ command += String.format("export KAFKA_ADVERTISED_LISTENERS=%s\n", kafkaAdvertisedListeners);
+
+ command += "/etc/kafka/docker/run \n";
+ copyFileToContainer(Transferable.of(command, 0777), STARTER_SCRIPT);
+ }
+
+ public String getBootstrapServers() {
+ return String.format("%s:%s", getHost(), getMappedPort(KAFKA_PORT));
+ }
+}
diff --git a/modules/kafka/src/test/java/org/testcontainers/AbstractKafka.java b/modules/kafka/src/test/java/org/testcontainers/AbstractKafka.java
new file mode 100644
index 00000000000..f7e9dcedb30
--- /dev/null
+++ b/modules/kafka/src/test/java/org/testcontainers/AbstractKafka.java
@@ -0,0 +1,126 @@
+package org.testcontainers;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.rnorth.ducttape.unreliables.Unreliables;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.tuple;
+
+public class AbstractKafka {
+
+ private final ImmutableMap properties = ImmutableMap.of(
+ AdminClientConfig.SECURITY_PROTOCOL_CONFIG,
+ "SASL_PLAINTEXT",
+ SaslConfigs.SASL_MECHANISM,
+ "PLAIN",
+ SaslConfigs.SASL_JAAS_CONFIG,
+ "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";"
+ );
+
+ protected void testKafkaFunctionality(String bootstrapServers) throws Exception {
+ testKafkaFunctionality(bootstrapServers, false, 1, 1);
+ }
+
+ protected void testSecureKafkaFunctionality(String bootstrapServers) throws Exception {
+ testKafkaFunctionality(bootstrapServers, true, 1, 1);
+ }
+
+ protected void testKafkaFunctionality(String bootstrapServers, boolean authenticated, int partitions, int rf)
+ throws Exception {
+ ImmutableMap adminClientDefaultProperties = ImmutableMap.of(
+ AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
+ bootstrapServers
+ );
+ Properties adminClientProperties = new Properties();
+ adminClientProperties.putAll(adminClientDefaultProperties);
+
+ ImmutableMap consumerDefaultProperties = ImmutableMap.of(
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ bootstrapServers,
+ ConsumerConfig.GROUP_ID_CONFIG,
+ "tc-" + UUID.randomUUID(),
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+ "earliest"
+ );
+ Properties consumerProperties = new Properties();
+ consumerProperties.putAll(consumerDefaultProperties);
+
+ ImmutableMap producerDefaultProperties = ImmutableMap.of(
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ bootstrapServers,
+ ProducerConfig.CLIENT_ID_CONFIG,
+ UUID.randomUUID().toString()
+ );
+ Properties producerProperties = new Properties();
+ producerProperties.putAll(producerDefaultProperties);
+
+ if (authenticated) {
+ adminClientProperties.putAll(this.properties);
+ consumerProperties.putAll(this.properties);
+ producerProperties.putAll(this.properties);
+ }
+ try (
+ AdminClient adminClient = AdminClient.create(adminClientProperties);
+ KafkaProducer producer = new KafkaProducer<>(
+ producerProperties,
+ new StringSerializer(),
+ new StringSerializer()
+ );
+ KafkaConsumer consumer = new KafkaConsumer<>(
+ consumerProperties,
+ new StringDeserializer(),
+ new StringDeserializer()
+ );
+ ) {
+ String topicName = "messages-" + UUID.randomUUID();
+
+ Collection topics = Collections.singletonList(new NewTopic(topicName, partitions, (short) rf));
+ adminClient.createTopics(topics).all().get(30, TimeUnit.SECONDS);
+
+ consumer.subscribe(Collections.singletonList(topicName));
+
+ producer.send(new ProducerRecord<>(topicName, "testcontainers", "rulezzz")).get();
+
+ Unreliables.retryUntilTrue(
+ 10,
+ TimeUnit.SECONDS,
+ () -> {
+ ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
+
+ if (records.isEmpty()) {
+ return false;
+ }
+
+ assertThat(records)
+ .hasSize(1)
+ .extracting(ConsumerRecord::topic, ConsumerRecord::key, ConsumerRecord::value)
+ .containsExactly(tuple(topicName, "testcontainers", "rulezzz"));
+
+ return true;
+ }
+ );
+
+ consumer.unsubscribe();
+ }
+ }
+}
diff --git a/modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java b/modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java
index e5d1617298b..5284209a76d 100644
--- a/modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java
+++ b/modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java
@@ -5,37 +5,25 @@
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
import org.awaitility.Awaitility;
import org.junit.Test;
-import org.rnorth.ducttape.unreliables.Unreliables;
+import org.testcontainers.AbstractKafka;
import org.testcontainers.Testcontainers;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.utility.DockerImageName;
-import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
-import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.assertj.core.api.Assertions.tuple;
-public class KafkaContainerTest {
+public class KafkaContainerTest extends AbstractKafka {
private static final DockerImageName KAFKA_TEST_IMAGE = DockerImageName.parse("confluentinc/cp-kafka:6.2.1");
@@ -45,15 +33,6 @@ public class KafkaContainerTest {
"confluentinc/cp-zookeeper:4.0.0"
);
- private final ImmutableMap properties = ImmutableMap.of(
- AdminClientConfig.SECURITY_PROTOCOL_CONFIG,
- "SASL_PLAINTEXT",
- SaslConfigs.SASL_MECHANISM,
- "PLAIN",
- SaslConfigs.SASL_JAAS_CONFIG,
- "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";"
- );
-
@Test
public void testUsage() throws Exception {
try (KafkaContainer kafka = new KafkaContainer(KAFKA_TEST_IMAGE)) {
@@ -344,91 +323,4 @@ private static String getJaasConfig() {
"user_test=\"secret\";";
return jaasConfig;
}
-
- private void testKafkaFunctionality(String bootstrapServers) throws Exception {
- testKafkaFunctionality(bootstrapServers, false, 1, 1);
- }
-
- private void testSecureKafkaFunctionality(String bootstrapServers) throws Exception {
- testKafkaFunctionality(bootstrapServers, true, 1, 1);
- }
-
- private void testKafkaFunctionality(String bootstrapServers, boolean authenticated, int partitions, int rf)
- throws Exception {
- ImmutableMap adminClientDefaultProperties = ImmutableMap.of(
- AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
- bootstrapServers
- );
- Properties adminClientProperties = new Properties();
- adminClientProperties.putAll(adminClientDefaultProperties);
-
- ImmutableMap consumerDefaultProperties = ImmutableMap.of(
- ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
- bootstrapServers,
- ConsumerConfig.GROUP_ID_CONFIG,
- "tc-" + UUID.randomUUID(),
- ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
- "earliest"
- );
- Properties consumerProperties = new Properties();
- consumerProperties.putAll(consumerDefaultProperties);
-
- ImmutableMap producerDefaultProperties = ImmutableMap.of(
- ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
- bootstrapServers,
- ProducerConfig.CLIENT_ID_CONFIG,
- UUID.randomUUID().toString()
- );
- Properties producerProperties = new Properties();
- producerProperties.putAll(producerDefaultProperties);
-
- if (authenticated) {
- adminClientProperties.putAll(this.properties);
- consumerProperties.putAll(this.properties);
- producerProperties.putAll(this.properties);
- }
- try (
- AdminClient adminClient = AdminClient.create(adminClientProperties);
- KafkaProducer producer = new KafkaProducer<>(
- producerProperties,
- new StringSerializer(),
- new StringSerializer()
- );
- KafkaConsumer consumer = new KafkaConsumer<>(
- consumerProperties,
- new StringDeserializer(),
- new StringDeserializer()
- );
- ) {
- String topicName = "messages-" + UUID.randomUUID();
-
- Collection topics = Collections.singletonList(new NewTopic(topicName, partitions, (short) rf));
- adminClient.createTopics(topics).all().get(30, TimeUnit.SECONDS);
-
- consumer.subscribe(Collections.singletonList(topicName));
-
- producer.send(new ProducerRecord<>(topicName, "testcontainers", "rulezzz")).get();
-
- Unreliables.retryUntilTrue(
- 10,
- TimeUnit.SECONDS,
- () -> {
- ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
-
- if (records.isEmpty()) {
- return false;
- }
-
- assertThat(records)
- .hasSize(1)
- .extracting(ConsumerRecord::topic, ConsumerRecord::key, ConsumerRecord::value)
- .containsExactly(tuple(topicName, "testcontainers", "rulezzz"));
-
- return true;
- }
- );
-
- consumer.unsubscribe();
- }
- }
}
diff --git a/modules/kafka/src/test/java/org/testcontainers/kafka/KafkaContainerTest.java b/modules/kafka/src/test/java/org/testcontainers/kafka/KafkaContainerTest.java
new file mode 100644
index 00000000000..d520cf33f7c
--- /dev/null
+++ b/modules/kafka/src/test/java/org/testcontainers/kafka/KafkaContainerTest.java
@@ -0,0 +1,15 @@
+package org.testcontainers.kafka;
+
+import org.junit.Test;
+import org.testcontainers.AbstractKafka;
+
+public class KafkaContainerTest extends AbstractKafka {
+
+ @Test
+ public void testUsage() throws Exception {
+ try (KafkaContainer kafka = new KafkaContainer("apache/kafka:3.7.0")) {
+ kafka.start();
+ testKafkaFunctionality(kafka.getBootstrapServers());
+ }
+ }
+}