Skip to content

Commit

Permalink
Add support for apache/kafka (#8416)
Browse files Browse the repository at this point in the history
Fixes #8398
  • Loading branch information
eddumelendez committed Apr 3, 2024
1 parent cda5609 commit 8db891a
Show file tree
Hide file tree
Showing 5 changed files with 246 additions and 112 deletions.
12 changes: 10 additions & 2 deletions docs/modules/kafka.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
# Kafka Containers
# Kafka Module

Testcontainers can be used to automatically instantiate and manage [Apache Kafka](https://kafka.apache.org) containers.
More precisely Testcontainers uses the official Docker images for [Confluent OSS Platform](https://hub.docker.com/r/confluentinc/cp-kafka/)

Currently, two different Kafka images are supported:

* `org.testcontainers.containers.KafkaContainer` supports
[confluentinc/cp-kafka](https://hub.docker.com/r/confluentinc/cp-kafka/)
* `org.testcontainers.kafka.KafkaContainer` supports [apache/kafka](https://hub.docker.com/r/apache/kafka/)

## Benefits

Expand All @@ -24,6 +29,9 @@ Now your tests or any other process running on your machine can get access to ru
<!--/codeinclude-->

## Options

!!! note
The options below are only available for `org.testcontainers.containers.KafkaContainer`

### <a name="zookeeper"></a> Using external Zookeeper

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package org.testcontainers.kafka;

import com.github.dockerjava.api.command.InspectContainerResponse;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.utility.DockerImageName;

import java.util.ArrayList;
import java.util.List;

/**
* Testcontainers implementation for Apache Kafka.
* <p>
* Supported image: {@code apache/kafka}
* <p>
* Exposed ports: 9092
*/
public class KafkaContainer extends GenericContainer<KafkaContainer> {

private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("apache/kafka");

private static final int KAFKA_PORT = 9092;

private static final String DEFAULT_INTERNAL_TOPIC_RF = "1";

private static final String STARTER_SCRIPT = "/testcontainers_start.sh";

private static final String DEFAULT_CLUSTER_ID = "4L6g3nShT-eMCtK--X86sw";

public KafkaContainer(String imageName) {
this(DockerImageName.parse(imageName));
}

public KafkaContainer(DockerImageName dockerImageName) {
super(dockerImageName);
dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME);

withExposedPorts(KAFKA_PORT);
withEnv("CLUSTER_ID", DEFAULT_CLUSTER_ID);

withEnv(
"KAFKA_LISTENERS",
"PLAINTEXT://0.0.0.0:" + KAFKA_PORT + ",BROKER://0.0.0.0:9093, CONTROLLER://0.0.0.0:9094"
);
withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT");
withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER");
withEnv("KAFKA_PROCESS_ROLES", "broker,controller");
withEnv("KAFKA_CONTROLLER_LISTENER_NAMES", "CONTROLLER");

withEnv("KAFKA_NODE_ID", "1");
withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF);
withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", DEFAULT_INTERNAL_TOPIC_RF);
withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF);
withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", DEFAULT_INTERNAL_TOPIC_RF);
withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + "");
withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0");

withCommand("sh", "-c", "while [ ! -f " + STARTER_SCRIPT + " ]; do sleep 0.1; done; " + STARTER_SCRIPT);
waitingFor(Wait.forLogMessage(".*Transitioning from RECOVERY to RUNNING.*", 1));
}

@Override
protected void configure() {
String firstNetworkAlias = getNetworkAliases().stream().findFirst().orElse(null);
String networkAlias = getNetwork() != null ? firstNetworkAlias : "localhost";
String controllerQuorumVoters = String.format("%s@%s:9094", getEnvMap().get("KAFKA_NODE_ID"), networkAlias);
withEnv("KAFKA_CONTROLLER_QUORUM_VOTERS", controllerQuorumVoters);
}

@Override
protected void containerIsStarting(InspectContainerResponse containerInfo) {
String brokerAdvertisedListener = String.format(
"BROKER://%s:%s",
containerInfo.getConfig().getHostName(),
"9093"
);
List<String> advertisedListeners = new ArrayList<>();
advertisedListeners.add("PLAINTEXT://" + getBootstrapServers());
advertisedListeners.add(brokerAdvertisedListener);
String kafkaAdvertisedListeners = String.join(",", advertisedListeners);
String command = "#!/bin/bash\n";
// exporting KAFKA_ADVERTISED_LISTENERS with the container hostname
command += String.format("export KAFKA_ADVERTISED_LISTENERS=%s\n", kafkaAdvertisedListeners);

command += "/etc/kafka/docker/run \n";
copyFileToContainer(Transferable.of(command, 0777), STARTER_SCRIPT);
}

public String getBootstrapServers() {
return String.format("%s:%s", getHost(), getMappedPort(KAFKA_PORT));
}
}
126 changes: 126 additions & 0 deletions modules/kafka/src/test/java/org/testcontainers/AbstractKafka.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package org.testcontainers;

import com.google.common.collect.ImmutableMap;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.rnorth.ducttape.unreliables.Unreliables;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.tuple;

public class AbstractKafka {

private final ImmutableMap<String, String> properties = ImmutableMap.of(
AdminClientConfig.SECURITY_PROTOCOL_CONFIG,
"SASL_PLAINTEXT",
SaslConfigs.SASL_MECHANISM,
"PLAIN",
SaslConfigs.SASL_JAAS_CONFIG,
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";"
);

protected void testKafkaFunctionality(String bootstrapServers) throws Exception {
testKafkaFunctionality(bootstrapServers, false, 1, 1);
}

protected void testSecureKafkaFunctionality(String bootstrapServers) throws Exception {
testKafkaFunctionality(bootstrapServers, true, 1, 1);
}

protected void testKafkaFunctionality(String bootstrapServers, boolean authenticated, int partitions, int rf)
throws Exception {
ImmutableMap<String, String> adminClientDefaultProperties = ImmutableMap.of(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers
);
Properties adminClientProperties = new Properties();
adminClientProperties.putAll(adminClientDefaultProperties);

ImmutableMap<String, String> consumerDefaultProperties = ImmutableMap.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers,
ConsumerConfig.GROUP_ID_CONFIG,
"tc-" + UUID.randomUUID(),
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest"
);
Properties consumerProperties = new Properties();
consumerProperties.putAll(consumerDefaultProperties);

ImmutableMap<String, String> producerDefaultProperties = ImmutableMap.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers,
ProducerConfig.CLIENT_ID_CONFIG,
UUID.randomUUID().toString()
);
Properties producerProperties = new Properties();
producerProperties.putAll(producerDefaultProperties);

if (authenticated) {
adminClientProperties.putAll(this.properties);
consumerProperties.putAll(this.properties);
producerProperties.putAll(this.properties);
}
try (
AdminClient adminClient = AdminClient.create(adminClientProperties);
KafkaProducer<String, String> producer = new KafkaProducer<>(
producerProperties,
new StringSerializer(),
new StringSerializer()
);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(
consumerProperties,
new StringDeserializer(),
new StringDeserializer()
);
) {
String topicName = "messages-" + UUID.randomUUID();

Collection<NewTopic> topics = Collections.singletonList(new NewTopic(topicName, partitions, (short) rf));
adminClient.createTopics(topics).all().get(30, TimeUnit.SECONDS);

consumer.subscribe(Collections.singletonList(topicName));

producer.send(new ProducerRecord<>(topicName, "testcontainers", "rulezzz")).get();

Unreliables.retryUntilTrue(
10,
TimeUnit.SECONDS,
() -> {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

if (records.isEmpty()) {
return false;
}

assertThat(records)
.hasSize(1)
.extracting(ConsumerRecord::topic, ConsumerRecord::key, ConsumerRecord::value)
.containsExactly(tuple(topicName, "testcontainers", "rulezzz"));

return true;
}
);

consumer.unsubscribe();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,25 @@
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.awaitility.Awaitility;
import org.junit.Test;
import org.rnorth.ducttape.unreliables.Unreliables;
import org.testcontainers.AbstractKafka;
import org.testcontainers.Testcontainers;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.utility.DockerImageName;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.tuple;

public class KafkaContainerTest {
public class KafkaContainerTest extends AbstractKafka {

private static final DockerImageName KAFKA_TEST_IMAGE = DockerImageName.parse("confluentinc/cp-kafka:6.2.1");

Expand All @@ -45,15 +33,6 @@ public class KafkaContainerTest {
"confluentinc/cp-zookeeper:4.0.0"
);

private final ImmutableMap<String, String> properties = ImmutableMap.of(
AdminClientConfig.SECURITY_PROTOCOL_CONFIG,
"SASL_PLAINTEXT",
SaslConfigs.SASL_MECHANISM,
"PLAIN",
SaslConfigs.SASL_JAAS_CONFIG,
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";"
);

@Test
public void testUsage() throws Exception {
try (KafkaContainer kafka = new KafkaContainer(KAFKA_TEST_IMAGE)) {
Expand Down Expand Up @@ -344,91 +323,4 @@ private static String getJaasConfig() {
"user_test=\"secret\";";
return jaasConfig;
}

private void testKafkaFunctionality(String bootstrapServers) throws Exception {
testKafkaFunctionality(bootstrapServers, false, 1, 1);
}

private void testSecureKafkaFunctionality(String bootstrapServers) throws Exception {
testKafkaFunctionality(bootstrapServers, true, 1, 1);
}

private void testKafkaFunctionality(String bootstrapServers, boolean authenticated, int partitions, int rf)
throws Exception {
ImmutableMap<String, String> adminClientDefaultProperties = ImmutableMap.of(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers
);
Properties adminClientProperties = new Properties();
adminClientProperties.putAll(adminClientDefaultProperties);

ImmutableMap<String, String> consumerDefaultProperties = ImmutableMap.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers,
ConsumerConfig.GROUP_ID_CONFIG,
"tc-" + UUID.randomUUID(),
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest"
);
Properties consumerProperties = new Properties();
consumerProperties.putAll(consumerDefaultProperties);

ImmutableMap<String, String> producerDefaultProperties = ImmutableMap.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers,
ProducerConfig.CLIENT_ID_CONFIG,
UUID.randomUUID().toString()
);
Properties producerProperties = new Properties();
producerProperties.putAll(producerDefaultProperties);

if (authenticated) {
adminClientProperties.putAll(this.properties);
consumerProperties.putAll(this.properties);
producerProperties.putAll(this.properties);
}
try (
AdminClient adminClient = AdminClient.create(adminClientProperties);
KafkaProducer<String, String> producer = new KafkaProducer<>(
producerProperties,
new StringSerializer(),
new StringSerializer()
);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(
consumerProperties,
new StringDeserializer(),
new StringDeserializer()
);
) {
String topicName = "messages-" + UUID.randomUUID();

Collection<NewTopic> topics = Collections.singletonList(new NewTopic(topicName, partitions, (short) rf));
adminClient.createTopics(topics).all().get(30, TimeUnit.SECONDS);

consumer.subscribe(Collections.singletonList(topicName));

producer.send(new ProducerRecord<>(topicName, "testcontainers", "rulezzz")).get();

Unreliables.retryUntilTrue(
10,
TimeUnit.SECONDS,
() -> {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

if (records.isEmpty()) {
return false;
}

assertThat(records)
.hasSize(1)
.extracting(ConsumerRecord::topic, ConsumerRecord::key, ConsumerRecord::value)
.containsExactly(tuple(topicName, "testcontainers", "rulezzz"));

return true;
}
);

consumer.unsubscribe();
}
}
}
Loading

0 comments on commit 8db891a

Please sign in to comment.