Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create KafkaContainerDef #7748

Merged
merged 1 commit into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions modules/kafka/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,10 @@ dependencies {
testImplementation 'org.assertj:assertj-core:3.24.2'
testImplementation 'com.google.guava:guava:23.0'
}

tasks.japicmp {
methodExcludes = [
"org.testcontainers.containers.KafkaContainer#configureKraft()",
"org.testcontainers.containers.KafkaContainer#configureZookeeper()"
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,8 @@ public class KafkaContainer extends GenericContainer<KafkaContainer> {

private boolean kraftEnabled = false;

private String clusterId = DEFAULT_CLUSTER_ID;

private static final String PROTOCOL_PREFIX = "TC";

private final Set<Supplier<String>> listeners = new HashSet<>();

/**
* @deprecated use {@link #KafkaContainer(DockerImageName)} instead
*/
Expand All @@ -73,38 +69,32 @@ public KafkaContainer(String confluentPlatformVersion) {
public KafkaContainer(final DockerImageName dockerImageName) {
super(dockerImageName);
dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME);
}

withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER");

withEnv("KAFKA_BROKER_ID", "1");
withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF);
withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", DEFAULT_INTERNAL_TOPIC_RF);
withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF);
withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", DEFAULT_INTERNAL_TOPIC_RF);
withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + "");
withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0");

withExposedPorts(KAFKA_PORT);
@Override
KafkaContainerDef createContainerDef() {
return new KafkaContainerDef();
}

withCreateContainerCmdModifier(cmd -> {
cmd.withEntrypoint("sh");
});
withCommand("-c", "while [ ! -f " + STARTER_SCRIPT + " ]; do sleep 0.1; done; " + STARTER_SCRIPT);
@Override
KafkaContainerDef getContainerDef() {
return (KafkaContainerDef) super.getContainerDef();
}

public KafkaContainer withEmbeddedZookeeper() {
if (this.kraftEnabled) {
throw new IllegalStateException("Cannot configure Zookeeper when using Kraft mode");
}
externalZookeeperConnect = null;
getContainerDef().withEmbeddedZookeeper();
return self();
}

public KafkaContainer withExternalZookeeper(String connectString) {
if (this.kraftEnabled) {
throw new IllegalStateException("Cannot configure Zookeeper when using Kraft mode");
}
externalZookeeperConnect = connectString;
this.externalZookeeperConnect = connectString;
getContainerDef().withZookeeper(connectString);
return self();
}

Expand All @@ -114,6 +104,7 @@ public KafkaContainer withKraft() {
}
verifyMinKraftVersion();
this.kraftEnabled = true;
getContainerDef().withRaft();
return self();
}

Expand All @@ -137,7 +128,7 @@ private boolean isLessThanCP740() {

public KafkaContainer withClusterId(String clusterId) {
Objects.requireNonNull(clusterId, "clusterId cannot be null");
this.clusterId = clusterId;
getContainerDef().withClusterId(clusterId);
return self();
}

Expand All @@ -147,77 +138,10 @@ public String getBootstrapServers() {

@Override
protected void configure() {
// Use two listeners with different names, it will force Kafka to communicate with itself via internal
// listener when KAFKA_INTER_BROKER_LISTENER_NAME is set, otherwise Kafka will try to use the advertised listener
Set<String> listeners = new HashSet<>();
listeners.add("PLAINTEXT://0.0.0.0:" + KAFKA_PORT);
listeners.add("BROKER://0.0.0.0:9092");

Set<String> listenerSecurityProtocolMap = new HashSet<>();
listenerSecurityProtocolMap.add("BROKER:PLAINTEXT");
listenerSecurityProtocolMap.add("PLAINTEXT:PLAINTEXT");
getContainerDef().resolveListeners();

List<Supplier<String>> listenersToTransform = new ArrayList<>(this.listeners);
for (int i = 0; i < listenersToTransform.size(); i++) {
Supplier<String> listenerSupplier = listenersToTransform.get(i);
String protocol = String.format("%s-%d", PROTOCOL_PREFIX, i);
String listener = listenerSupplier.get();
String listenerPort = listener.split(":")[1];
String listenerProtocol = String.format("%s://0.0.0.0:%s", protocol, listenerPort);
String protocolMap = String.format("%s:PLAINTEXT", protocol);
listeners.add(listenerProtocol);
listenerSecurityProtocolMap.add(protocolMap);

String host = listener.split(":")[0];
withNetworkAliases(host);
}

String kafkaListeners = String.join(",", listeners);
String kafkaListenerSecurityProtocolMap = String.join(",", listenerSecurityProtocolMap);

withEnv("KAFKA_LISTENERS", kafkaListeners);
withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", kafkaListenerSecurityProtocolMap);

if (this.kraftEnabled) {
waitingFor(Wait.forLogMessage(".*Transitioning from RECOVERY to RUNNING.*", 1));
configureKraft();
} else {
waitingFor(Wait.forLogMessage(".*\\[KafkaServer id=\\d+\\] started.*", 1));
configureZookeeper();
}
}

protected void configureKraft() {
//CP 7.4.0
getEnvMap().computeIfAbsent("CLUSTER_ID", key -> clusterId);
getEnvMap().computeIfAbsent("KAFKA_NODE_ID", key -> getEnvMap().get("KAFKA_BROKER_ID"));
withEnv(
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
String.format("%s,CONTROLLER:PLAINTEXT", getEnvMap().get("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"))
);
withEnv("KAFKA_LISTENERS", String.format("%s,CONTROLLER://0.0.0.0:9094", getEnvMap().get("KAFKA_LISTENERS")));

withEnv("KAFKA_PROCESS_ROLES", "broker,controller");
getEnvMap()
.computeIfAbsent(
"KAFKA_CONTROLLER_QUORUM_VOTERS",
key -> {
return String.format(
"%s@%s:9094",
getEnvMap().get("KAFKA_NODE_ID"),
getNetwork() != null ? getNetworkAliases().get(0) : "localhost"
);
}
);
withEnv("KAFKA_CONTROLLER_LISTENER_NAMES", "CONTROLLER");
}

protected void configureZookeeper() {
if (externalZookeeperConnect != null) {
withEnv("KAFKA_ZOOKEEPER_CONNECT", externalZookeeperConnect);
} else {
addExposedPort(ZOOKEEPER_PORT);
withEnv("KAFKA_ZOOKEEPER_CONNECT", "localhost:" + ZOOKEEPER_PORT);
if (!this.kraftEnabled && this.externalZookeeperConnect == null) {
getContainerDef().withEmbeddedZookeeper();
}
}

Expand All @@ -229,7 +153,7 @@ protected void containerIsStarting(InspectContainerResponse containerInfo) {
advertisedListeners.add(getBootstrapServers());
advertisedListeners.add(brokerAdvertisedListener(containerInfo));

List<Supplier<String>> listenersToTransform = new ArrayList<>(this.listeners);
List<Supplier<String>> listenersToTransform = new ArrayList<>(getContainerDef().listeners);
for (int i = 0; i < listenersToTransform.size(); i++) {
Supplier<String> listenerSupplier = listenersToTransform.get(i);
String protocol = String.format("%s-%d", PROTOCOL_PREFIX, i);
Expand Down Expand Up @@ -265,7 +189,7 @@ protected String commandKraft() {
String command = "sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure\n";
command +=
"echo 'kafka-storage format --ignore-formatted -t \"" +
this.clusterId +
getContainerDef().getEnvVars().get("CLUSTER_ID") +
"\" -c /etc/kafka/kafka.properties' >> /etc/confluent/docker/configure\n";
return command;
}
Expand Down Expand Up @@ -299,11 +223,113 @@ protected String commandZookeeper() {
* @return this {@link KafkaContainer} instance
*/
public KafkaContainer withListener(Supplier<String> listenerSupplier) {
this.listeners.add(listenerSupplier);
getContainerDef().withListener(listenerSupplier);
return this;
}

protected String brokerAdvertisedListener(InspectContainerResponse containerInfo) {
return String.format("BROKER://%s:%s", containerInfo.getConfig().getHostName(), "9092");
}

private static class KafkaContainerDef extends ContainerDef {

private final Set<Supplier<String>> listeners = new HashSet<>();

private String clusterId = DEFAULT_CLUSTER_ID;

KafkaContainerDef() {
// Use two listeners with different names, it will force Kafka to communicate with itself via internal
// listener when KAFKA_INTER_BROKER_LISTENER_NAME is set, otherwise Kafka will try to use the advertised listener
addEnvVar("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:" + KAFKA_PORT + ",BROKER://0.0.0.0:9092");
addEnvVar("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT");
addEnvVar("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER");

addEnvVar("KAFKA_BROKER_ID", "1");
addEnvVar("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF);
addEnvVar("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", DEFAULT_INTERNAL_TOPIC_RF);
addEnvVar("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF);
addEnvVar("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", DEFAULT_INTERNAL_TOPIC_RF);
addEnvVar("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + "");
addEnvVar("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0");

addExposedTcpPort(KAFKA_PORT);

setEntrypoint("sh");
setCommand("-c", "while [ ! -f " + STARTER_SCRIPT + " ]; do sleep 0.1; done; " + STARTER_SCRIPT);

setWaitStrategy(Wait.forLogMessage(".*\\[KafkaServer id=\\d+\\] started.*", 1));
}

private void resolveListeners() {
Set<String> additionalKafkaListeners = new HashSet<>();
Set<String> additionalListenerSecurityProtocolMap = new HashSet<>();

List<Supplier<String>> listenersToTransform = new ArrayList<>(this.listeners);
for (int i = 0; i < listenersToTransform.size(); i++) {
Supplier<String> listenerSupplier = listenersToTransform.get(i);
String protocol = String.format("%s-%d", PROTOCOL_PREFIX, i);
String listener = listenerSupplier.get();
String listenerPort = listener.split(":")[1];
String listenerProtocol = String.format("%s://0.0.0.0:%s", protocol, listenerPort);
String protocolMap = String.format("%s:PLAINTEXT", protocol);
additionalKafkaListeners.add(listenerProtocol);
additionalListenerSecurityProtocolMap.add(protocolMap);

String host = listener.split(":")[0];
addNetworkAlias(host);
}

String kafkaListeners = String.join(",", additionalKafkaListeners);
String kafkaListenerSecurityProtocolMap = String.join(",", additionalListenerSecurityProtocolMap);

this.envVars.computeIfPresent("KAFKA_LISTENERS", (k, v) -> String.join(",", v, kafkaListeners));
this.envVars.computeIfPresent(
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
(k, v) -> String.join(",", v, kafkaListenerSecurityProtocolMap)
);
}

void withListener(Supplier<String> listenerSupplier) {
this.listeners.add(listenerSupplier);
}

void withEmbeddedZookeeper() {
addExposedTcpPort(ZOOKEEPER_PORT);
addEnvVar("KAFKA_ZOOKEEPER_CONNECT", "localhost:" + ZOOKEEPER_PORT);
}

void withZookeeper(String connectionString) {
addEnvVar("KAFKA_ZOOKEEPER_CONNECT", connectionString);
}

void withClusterId(String clusterId) {
this.clusterId = clusterId;
}

void withRaft() {
this.envVars.computeIfAbsent("CLUSTER_ID", key -> clusterId);
this.envVars.computeIfAbsent("KAFKA_NODE_ID", key -> getEnvVars().get("KAFKA_BROKER_ID"));
addEnvVar(
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
String.format("%s,CONTROLLER:PLAINTEXT", getEnvVars().get("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"))
);
addEnvVar(
"KAFKA_LISTENERS",
String.format("%s,CONTROLLER://0.0.0.0:9094", getEnvVars().get("KAFKA_LISTENERS"))
);
addEnvVar("KAFKA_PROCESS_ROLES", "broker,controller");

String firstNetworkAlias = getNetworkAliases().stream().findFirst().orElse(null);
String networkAlias = getNetwork() != null ? firstNetworkAlias : "localhost";
String controllerQuorumVoters = String.format(
"%s@%s:9094",
getEnvVars().get("KAFKA_NODE_ID"),
networkAlias
);
this.envVars.computeIfAbsent("KAFKA_CONTROLLER_QUORUM_VOTERS", key -> controllerQuorumVoters);
addEnvVar("KAFKA_CONTROLLER_LISTENER_NAMES", "CONTROLLER");

setWaitStrategy(Wait.forLogMessage(".*Transitioning from RECOVERY to RUNNING.*", 1));
}
}
}
Loading