Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add redpandadata/redpanda as a compatible image #7898

Merged
merged 1 commit into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
/**
* Testcontainers implementation for Redpanda.
* <p>
* Supported images: {@code docker.redpanda.com/redpandadata/redpanda}, {@code docker.redpanda.com/vectorized/redpanda}
* Supported images: {@code redpandadata/redpanda}, {@code docker.redpanda.com/redpandadata/redpanda}
* <p>
* Exposed ports:
* <ul>
Expand All @@ -43,11 +43,15 @@ public class RedpandaContainer extends GenericContainer<RedpandaContainer> {

private static final String REDPANDA_FULL_IMAGE_NAME = "docker.redpanda.com/redpandadata/redpanda";

private static final String IMAGE_NAME = "redpandadata/redpanda";

@Deprecated
private static final String REDPANDA_OLD_FULL_IMAGE_NAME = "docker.redpanda.com/vectorized/redpanda";

private static final DockerImageName REDPANDA_IMAGE = DockerImageName.parse(REDPANDA_FULL_IMAGE_NAME);

private static final DockerImageName IMAGE = DockerImageName.parse(IMAGE_NAME);

@Deprecated
private static final DockerImageName REDPANDA_OLD_IMAGE = DockerImageName.parse(REDPANDA_OLD_FULL_IMAGE_NAME);

Expand Down Expand Up @@ -75,10 +79,14 @@ public RedpandaContainer(String image) {

public RedpandaContainer(DockerImageName imageName) {
super(imageName);
imageName.assertCompatibleWith(REDPANDA_OLD_IMAGE, REDPANDA_IMAGE);
imageName.assertCompatibleWith(REDPANDA_OLD_IMAGE, REDPANDA_IMAGE, IMAGE);

boolean isLessThanBaseVersion = new ComparableVersion(imageName.getVersionPart()).isLessThan("v22.2.1");
if (REDPANDA_FULL_IMAGE_NAME.equals(imageName.getUnversionedPart()) && isLessThanBaseVersion) {
boolean isPublicCompatibleImage =
REDPANDA_FULL_IMAGE_NAME.equals(imageName.getUnversionedPart()) ||
IMAGE_NAME.equals(imageName.getUnversionedPart()) ||
REDPANDA_OLD_FULL_IMAGE_NAME.equals(imageName.getUnversionedPart());
if (isPublicCompatibleImage && isLessThanBaseVersion) {
throw new IllegalArgumentException("Redpanda version must be >= v22.2.1");
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package org.testcontainers.redpanda;

import com.google.common.collect.ImmutableMap;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.rnorth.ducttape.unreliables.Unreliables;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.tuple;

public class AbstractRedpanda {

protected void testKafkaFunctionality(String bootstrapServers) throws Exception {
testKafkaFunctionality(bootstrapServers, 1, 1);
}

protected void testKafkaFunctionality(String bootstrapServers, int partitions, int rf) throws Exception {
try (
AdminClient adminClient = AdminClient.create(
ImmutableMap.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
);
KafkaProducer<String, String> producer = new KafkaProducer<>(
ImmutableMap.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers,
ProducerConfig.CLIENT_ID_CONFIG,
UUID.randomUUID().toString()
),
new StringSerializer(),
new StringSerializer()
);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(
ImmutableMap.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers,
ConsumerConfig.GROUP_ID_CONFIG,
"tc-" + UUID.randomUUID(),
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest"
),
new StringDeserializer(),
new StringDeserializer()
);
) {
String topicName = "messages-" + UUID.randomUUID();

Collection<NewTopic> topics = Collections.singletonList(new NewTopic(topicName, partitions, (short) rf));
adminClient.createTopics(topics).all().get(30, TimeUnit.SECONDS);

consumer.subscribe(Collections.singletonList(topicName));

producer.send(new ProducerRecord<>(topicName, "testcontainers", "rulezzz")).get();

Unreliables.retryUntilTrue(
10,
TimeUnit.SECONDS,
() -> {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

if (records.isEmpty()) {
return false;
}

assertThat(records)
.hasSize(1)
.extracting(ConsumerRecord::topic, ConsumerRecord::key, ConsumerRecord::value)
.containsExactly(tuple(topicName, "testcontainers", "rulezzz"));

return true;
}
);

consumer.unsubscribe();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package org.testcontainers.redpanda;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
public class CompatibleImageTest extends AbstractRedpanda {

private final String image;

public CompatibleImageTest(String image) {
this.image = image;
}

@Parameterized.Parameters(name = "{0}")
public static String[] image() {
return new String[] { "docker.redpanda.com/vectorized/redpanda:v22.2.1", "redpandadata/redpanda:v22.2.1" };
}

@Test
public void shouldProduceAndConsumeMessage() throws Exception {
try (RedpandaContainer container = new RedpandaContainer(this.image)) {
container.start();
testKafkaFunctionality(container.getBootstrapServers());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,16 @@
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.awaitility.Awaitility;
import org.junit.Test;
import org.rnorth.ducttape.unreliables.Unreliables;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.utility.DockerImageName;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -39,9 +28,8 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.tuple;

public class RedpandaContainerTest {
public class RedpandaContainerTest extends AbstractRedpanda {

private static final String REDPANDA_IMAGE = "docker.redpanda.com/redpandadata/redpanda:v22.2.1";

Expand Down Expand Up @@ -78,6 +66,20 @@ public void testNotCompatibleVersion() {
.hasMessageContaining("Redpanda version must be >= v22.2.1");
}

@Test
public void vectorizedRedpandaImageVersion2221ShouldNotBeCompatible() {
assertThatThrownBy(() -> new RedpandaContainer("docker.redpanda.com/vectorized/redpanda:v21.11.19"))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Redpanda version must be >= v22.2.1");
}

@Test
public void redpandadataRedpandaImageVersion2221ShouldNotBeCompatible() {
assertThatThrownBy(() -> new RedpandaContainer("redpandadata/redpanda:v21.11.19"))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Redpanda version must be >= v22.2.1");
}

@Test
public void testSchemaRegistry() {
try (RedpandaContainer container = new RedpandaContainer(REDPANDA_DOCKER_IMAGE)) {
Expand Down Expand Up @@ -359,70 +361,6 @@ public void testRestProxy() {
}
}

private void testKafkaFunctionality(String bootstrapServers) throws Exception {
testKafkaFunctionality(bootstrapServers, 1, 1);
}

private void testKafkaFunctionality(String bootstrapServers, int partitions, int rf) throws Exception {
try (
AdminClient adminClient = AdminClient.create(
ImmutableMap.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
);
KafkaProducer<String, String> producer = new KafkaProducer<>(
ImmutableMap.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers,
ProducerConfig.CLIENT_ID_CONFIG,
UUID.randomUUID().toString()
),
new StringSerializer(),
new StringSerializer()
);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(
ImmutableMap.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers,
ConsumerConfig.GROUP_ID_CONFIG,
"tc-" + UUID.randomUUID(),
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest"
),
new StringDeserializer(),
new StringDeserializer()
);
) {
String topicName = "messages-" + UUID.randomUUID();

Collection<NewTopic> topics = Collections.singletonList(new NewTopic(topicName, partitions, (short) rf));
adminClient.createTopics(topics).all().get(30, TimeUnit.SECONDS);

consumer.subscribe(Collections.singletonList(topicName));

producer.send(new ProducerRecord<>(topicName, "testcontainers", "rulezzz")).get();

Unreliables.retryUntilTrue(
10,
TimeUnit.SECONDS,
() -> {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

if (records.isEmpty()) {
return false;
}

assertThat(records)
.hasSize(1)
.extracting(ConsumerRecord::topic, ConsumerRecord::key, ConsumerRecord::value)
.containsExactly(tuple(topicName, "testcontainers", "rulezzz"));

return true;
}
);

consumer.unsubscribe();
}
}

private AdminClient getAdminClient(RedpandaContainer redpanda) {
String bootstrapServer = String.format("%s:%s", redpanda.getHost(), redpanda.getMappedPort(9092));
// createAdminClient {
Expand Down
Loading