Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for apache/kafka #8416

Merged
merged 3 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions docs/modules/kafka.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
# Kafka Containers
# Kafka Module

Testcontainers can be used to automatically instantiate and manage [Apache Kafka](https://kafka.apache.org) containers.
More precisely Testcontainers uses the official Docker images for [Confluent OSS Platform](https://hub.docker.com/r/confluentinc/cp-kafka/)

Currently, two different Kafka images are supported:

* `org.testcontainers.containers.KafkaContainer` supports
[confluentinc/cp-kafka](https://hub.docker.com/r/confluentinc/cp-kafka/)
* `org.testcontainers.kafka.KafkaContainer` supports [apache/kafka](https://hub.docker.com/r/apache/kafka/)

## Benefits

Expand All @@ -24,6 +29,9 @@ Now your tests or any other process running on your machine can get access to ru
<!--/codeinclude-->

## Options

!!! note
The options below are only available for `org.testcontainers.containers.KafkaContainer`

### <a name="zookeeper"></a> Using external Zookeeper

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package org.testcontainers.kafka;

import com.github.dockerjava.api.command.InspectContainerResponse;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.utility.DockerImageName;

import java.util.ArrayList;
import java.util.List;

/**
* Testcontainers implementation for Apache Kafka.
* <p>
* Supported image: {@code apache/kafka}
* <p>
* Exposed ports: 9092
*/
public class KafkaContainer extends GenericContainer<KafkaContainer> {

private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("apache/kafka");

private static final int KAFKA_PORT = 9092;

private static final String DEFAULT_INTERNAL_TOPIC_RF = "1";

private static final String STARTER_SCRIPT = "/testcontainers_start.sh";

private static final String DEFAULT_CLUSTER_ID = "4L6g3nShT-eMCtK--X86sw";

public KafkaContainer(String imageName) {
this(DockerImageName.parse(imageName));
}

public KafkaContainer(DockerImageName dockerImageName) {
super(dockerImageName);
dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME);

withExposedPorts(KAFKA_PORT);
withEnv("CLUSTER_ID", DEFAULT_CLUSTER_ID);

withEnv(
"KAFKA_LISTENERS",
"PLAINTEXT://0.0.0.0:" + KAFKA_PORT + ",BROKER://0.0.0.0:9093, CONTROLLER://0.0.0.0:9094"
);
withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT");
withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER");
withEnv("KAFKA_PROCESS_ROLES", "broker,controller");
withEnv("KAFKA_CONTROLLER_LISTENER_NAMES", "CONTROLLER");

withEnv("KAFKA_NODE_ID", "1");
withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF);
withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", DEFAULT_INTERNAL_TOPIC_RF);
withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF);
withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", DEFAULT_INTERNAL_TOPIC_RF);
withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + "");
withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0");

withCommand("sh", "-c", "while [ ! -f " + STARTER_SCRIPT + " ]; do sleep 0.1; done; " + STARTER_SCRIPT);
waitingFor(Wait.forLogMessage(".*Transitioning from RECOVERY to RUNNING.*", 1));
}

@Override
protected void configure() {
String firstNetworkAlias = getNetworkAliases().stream().findFirst().orElse(null);
String networkAlias = getNetwork() != null ? firstNetworkAlias : "localhost";
String controllerQuorumVoters = String.format("%s@%s:9094", getEnvMap().get("KAFKA_NODE_ID"), networkAlias);
withEnv("KAFKA_CONTROLLER_QUORUM_VOTERS", controllerQuorumVoters);
}

@Override
protected void containerIsStarting(InspectContainerResponse containerInfo) {
String brokerAdvertisedListener = String.format(
"BROKER://%s:%s",
containerInfo.getConfig().getHostName(),
"9093"
);
List<String> advertisedListeners = new ArrayList<>();
advertisedListeners.add("PLAINTEXT://" + getBootstrapServers());
advertisedListeners.add(brokerAdvertisedListener);
String kafkaAdvertisedListeners = String.join(",", advertisedListeners);
String command = "#!/bin/bash\n";
// exporting KAFKA_ADVERTISED_LISTENERS with the container hostname
command += String.format("export KAFKA_ADVERTISED_LISTENERS=%s\n", kafkaAdvertisedListeners);

command += "/etc/kafka/docker/run \n";
copyFileToContainer(Transferable.of(command, 0777), STARTER_SCRIPT);
}

public String getBootstrapServers() {
return String.format("%s:%s", getHost(), getMappedPort(KAFKA_PORT));
}
}
126 changes: 126 additions & 0 deletions modules/kafka/src/test/java/org/testcontainers/AbstractKafka.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package org.testcontainers;

import com.google.common.collect.ImmutableMap;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.rnorth.ducttape.unreliables.Unreliables;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.tuple;

public class AbstractKafka {

private final ImmutableMap<String, String> properties = ImmutableMap.of(
AdminClientConfig.SECURITY_PROTOCOL_CONFIG,
"SASL_PLAINTEXT",
SaslConfigs.SASL_MECHANISM,
"PLAIN",
SaslConfigs.SASL_JAAS_CONFIG,
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";"
);

protected void testKafkaFunctionality(String bootstrapServers) throws Exception {
testKafkaFunctionality(bootstrapServers, false, 1, 1);
}

protected void testSecureKafkaFunctionality(String bootstrapServers) throws Exception {
testKafkaFunctionality(bootstrapServers, true, 1, 1);
}

protected void testKafkaFunctionality(String bootstrapServers, boolean authenticated, int partitions, int rf)
throws Exception {
ImmutableMap<String, String> adminClientDefaultProperties = ImmutableMap.of(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers
);
Properties adminClientProperties = new Properties();
adminClientProperties.putAll(adminClientDefaultProperties);

ImmutableMap<String, String> consumerDefaultProperties = ImmutableMap.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers,
ConsumerConfig.GROUP_ID_CONFIG,
"tc-" + UUID.randomUUID(),
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest"
);
Properties consumerProperties = new Properties();
consumerProperties.putAll(consumerDefaultProperties);

ImmutableMap<String, String> producerDefaultProperties = ImmutableMap.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers,
ProducerConfig.CLIENT_ID_CONFIG,
UUID.randomUUID().toString()
);
Properties producerProperties = new Properties();
producerProperties.putAll(producerDefaultProperties);

if (authenticated) {
adminClientProperties.putAll(this.properties);
consumerProperties.putAll(this.properties);
producerProperties.putAll(this.properties);
}
try (
AdminClient adminClient = AdminClient.create(adminClientProperties);
KafkaProducer<String, String> producer = new KafkaProducer<>(
producerProperties,
new StringSerializer(),
new StringSerializer()
);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(
consumerProperties,
new StringDeserializer(),
new StringDeserializer()
);
) {
String topicName = "messages-" + UUID.randomUUID();

Collection<NewTopic> topics = Collections.singletonList(new NewTopic(topicName, partitions, (short) rf));
adminClient.createTopics(topics).all().get(30, TimeUnit.SECONDS);

consumer.subscribe(Collections.singletonList(topicName));

producer.send(new ProducerRecord<>(topicName, "testcontainers", "rulezzz")).get();

Unreliables.retryUntilTrue(
10,
TimeUnit.SECONDS,
() -> {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

if (records.isEmpty()) {
return false;
}

assertThat(records)
.hasSize(1)
.extracting(ConsumerRecord::topic, ConsumerRecord::key, ConsumerRecord::value)
.containsExactly(tuple(topicName, "testcontainers", "rulezzz"));

return true;
}
);

consumer.unsubscribe();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,25 @@
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.awaitility.Awaitility;
import org.junit.Test;
import org.rnorth.ducttape.unreliables.Unreliables;
import org.testcontainers.AbstractKafka;
import org.testcontainers.Testcontainers;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.utility.DockerImageName;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.tuple;

public class KafkaContainerTest {
public class KafkaContainerTest extends AbstractKafka {

private static final DockerImageName KAFKA_TEST_IMAGE = DockerImageName.parse("confluentinc/cp-kafka:6.2.1");

Expand All @@ -45,15 +33,6 @@ public class KafkaContainerTest {
"confluentinc/cp-zookeeper:4.0.0"
);

private final ImmutableMap<String, String> properties = ImmutableMap.of(
AdminClientConfig.SECURITY_PROTOCOL_CONFIG,
"SASL_PLAINTEXT",
SaslConfigs.SASL_MECHANISM,
"PLAIN",
SaslConfigs.SASL_JAAS_CONFIG,
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";"
);

@Test
public void testUsage() throws Exception {
try (KafkaContainer kafka = new KafkaContainer(KAFKA_TEST_IMAGE)) {
Expand Down Expand Up @@ -344,91 +323,4 @@ private static String getJaasConfig() {
"user_test=\"secret\";";
return jaasConfig;
}

private void testKafkaFunctionality(String bootstrapServers) throws Exception {
testKafkaFunctionality(bootstrapServers, false, 1, 1);
}

private void testSecureKafkaFunctionality(String bootstrapServers) throws Exception {
testKafkaFunctionality(bootstrapServers, true, 1, 1);
}

private void testKafkaFunctionality(String bootstrapServers, boolean authenticated, int partitions, int rf)
throws Exception {
ImmutableMap<String, String> adminClientDefaultProperties = ImmutableMap.of(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers
);
Properties adminClientProperties = new Properties();
adminClientProperties.putAll(adminClientDefaultProperties);

ImmutableMap<String, String> consumerDefaultProperties = ImmutableMap.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers,
ConsumerConfig.GROUP_ID_CONFIG,
"tc-" + UUID.randomUUID(),
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest"
);
Properties consumerProperties = new Properties();
consumerProperties.putAll(consumerDefaultProperties);

ImmutableMap<String, String> producerDefaultProperties = ImmutableMap.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers,
ProducerConfig.CLIENT_ID_CONFIG,
UUID.randomUUID().toString()
);
Properties producerProperties = new Properties();
producerProperties.putAll(producerDefaultProperties);

if (authenticated) {
adminClientProperties.putAll(this.properties);
consumerProperties.putAll(this.properties);
producerProperties.putAll(this.properties);
}
try (
AdminClient adminClient = AdminClient.create(adminClientProperties);
KafkaProducer<String, String> producer = new KafkaProducer<>(
producerProperties,
new StringSerializer(),
new StringSerializer()
);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(
consumerProperties,
new StringDeserializer(),
new StringDeserializer()
);
) {
String topicName = "messages-" + UUID.randomUUID();

Collection<NewTopic> topics = Collections.singletonList(new NewTopic(topicName, partitions, (short) rf));
adminClient.createTopics(topics).all().get(30, TimeUnit.SECONDS);

consumer.subscribe(Collections.singletonList(topicName));

producer.send(new ProducerRecord<>(topicName, "testcontainers", "rulezzz")).get();

Unreliables.retryUntilTrue(
10,
TimeUnit.SECONDS,
() -> {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

if (records.isEmpty()) {
return false;
}

assertThat(records)
.hasSize(1)
.extracting(ConsumerRecord::topic, ConsumerRecord::key, ConsumerRecord::value)
.containsExactly(tuple(topicName, "testcontainers", "rulezzz"));

return true;
}
);

consumer.unsubscribe();
}
}
}
Loading
Loading