Skip to content

Commit

Permalink
Create KafkaContainerDef (#7748)
Browse files Browse the repository at this point in the history
Apply `ContainerDef` to `KafkaContainer`
  • Loading branch information
eddumelendez authored Oct 31, 2023
1 parent 1dba8d1 commit c6d7202
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 95 deletions.
7 changes: 7 additions & 0 deletions modules/kafka/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,10 @@ dependencies {
testImplementation 'org.assertj:assertj-core:3.24.2'
testImplementation 'com.google.guava:guava:23.0'
}

tasks.japicmp {
methodExcludes = [
"org.testcontainers.containers.KafkaContainer#configureKraft()",
"org.testcontainers.containers.KafkaContainer#configureZookeeper()"
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,8 @@ public class KafkaContainer extends GenericContainer<KafkaContainer> {

private boolean kraftEnabled = false;

private String clusterId = DEFAULT_CLUSTER_ID;

private static final String PROTOCOL_PREFIX = "TC";

private final Set<Supplier<String>> listeners = new HashSet<>();

/**
* @deprecated use {@link #KafkaContainer(DockerImageName)} instead
*/
Expand All @@ -73,38 +69,32 @@ public KafkaContainer(String confluentPlatformVersion) {
public KafkaContainer(final DockerImageName dockerImageName) {
super(dockerImageName);
dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME);
}

withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER");

withEnv("KAFKA_BROKER_ID", "1");
withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF);
withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", DEFAULT_INTERNAL_TOPIC_RF);
withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF);
withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", DEFAULT_INTERNAL_TOPIC_RF);
withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + "");
withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0");

withExposedPorts(KAFKA_PORT);
@Override
KafkaContainerDef createContainerDef() {
return new KafkaContainerDef();
}

withCreateContainerCmdModifier(cmd -> {
cmd.withEntrypoint("sh");
});
withCommand("-c", "while [ ! -f " + STARTER_SCRIPT + " ]; do sleep 0.1; done; " + STARTER_SCRIPT);
@Override
KafkaContainerDef getContainerDef() {
return (KafkaContainerDef) super.getContainerDef();
}

public KafkaContainer withEmbeddedZookeeper() {
if (this.kraftEnabled) {
throw new IllegalStateException("Cannot configure Zookeeper when using Kraft mode");
}
externalZookeeperConnect = null;
getContainerDef().withEmbeddedZookeeper();
return self();
}

public KafkaContainer withExternalZookeeper(String connectString) {
if (this.kraftEnabled) {
throw new IllegalStateException("Cannot configure Zookeeper when using Kraft mode");
}
externalZookeeperConnect = connectString;
this.externalZookeeperConnect = connectString;
getContainerDef().withZookeeper(connectString);
return self();
}

Expand All @@ -114,6 +104,7 @@ public KafkaContainer withKraft() {
}
verifyMinKraftVersion();
this.kraftEnabled = true;
getContainerDef().withRaft();
return self();
}

Expand All @@ -137,7 +128,7 @@ private boolean isLessThanCP740() {

public KafkaContainer withClusterId(String clusterId) {
Objects.requireNonNull(clusterId, "clusterId cannot be null");
this.clusterId = clusterId;
getContainerDef().withClusterId(clusterId);
return self();
}

Expand All @@ -147,77 +138,10 @@ public String getBootstrapServers() {

@Override
protected void configure() {
// Use two listeners with different names, it will force Kafka to communicate with itself via internal
// listener when KAFKA_INTER_BROKER_LISTENER_NAME is set, otherwise Kafka will try to use the advertised listener
Set<String> listeners = new HashSet<>();
listeners.add("PLAINTEXT://0.0.0.0:" + KAFKA_PORT);
listeners.add("BROKER://0.0.0.0:9092");

Set<String> listenerSecurityProtocolMap = new HashSet<>();
listenerSecurityProtocolMap.add("BROKER:PLAINTEXT");
listenerSecurityProtocolMap.add("PLAINTEXT:PLAINTEXT");
getContainerDef().resolveListeners();

List<Supplier<String>> listenersToTransform = new ArrayList<>(this.listeners);
for (int i = 0; i < listenersToTransform.size(); i++) {
Supplier<String> listenerSupplier = listenersToTransform.get(i);
String protocol = String.format("%s-%d", PROTOCOL_PREFIX, i);
String listener = listenerSupplier.get();
String listenerPort = listener.split(":")[1];
String listenerProtocol = String.format("%s://0.0.0.0:%s", protocol, listenerPort);
String protocolMap = String.format("%s:PLAINTEXT", protocol);
listeners.add(listenerProtocol);
listenerSecurityProtocolMap.add(protocolMap);

String host = listener.split(":")[0];
withNetworkAliases(host);
}

String kafkaListeners = String.join(",", listeners);
String kafkaListenerSecurityProtocolMap = String.join(",", listenerSecurityProtocolMap);

withEnv("KAFKA_LISTENERS", kafkaListeners);
withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", kafkaListenerSecurityProtocolMap);

if (this.kraftEnabled) {
waitingFor(Wait.forLogMessage(".*Transitioning from RECOVERY to RUNNING.*", 1));
configureKraft();
} else {
waitingFor(Wait.forLogMessage(".*\\[KafkaServer id=\\d+\\] started.*", 1));
configureZookeeper();
}
}

protected void configureKraft() {
//CP 7.4.0
getEnvMap().computeIfAbsent("CLUSTER_ID", key -> clusterId);
getEnvMap().computeIfAbsent("KAFKA_NODE_ID", key -> getEnvMap().get("KAFKA_BROKER_ID"));
withEnv(
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
String.format("%s,CONTROLLER:PLAINTEXT", getEnvMap().get("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"))
);
withEnv("KAFKA_LISTENERS", String.format("%s,CONTROLLER://0.0.0.0:9094", getEnvMap().get("KAFKA_LISTENERS")));

withEnv("KAFKA_PROCESS_ROLES", "broker,controller");
getEnvMap()
.computeIfAbsent(
"KAFKA_CONTROLLER_QUORUM_VOTERS",
key -> {
return String.format(
"%s@%s:9094",
getEnvMap().get("KAFKA_NODE_ID"),
getNetwork() != null ? getNetworkAliases().get(0) : "localhost"
);
}
);
withEnv("KAFKA_CONTROLLER_LISTENER_NAMES", "CONTROLLER");
}

protected void configureZookeeper() {
if (externalZookeeperConnect != null) {
withEnv("KAFKA_ZOOKEEPER_CONNECT", externalZookeeperConnect);
} else {
addExposedPort(ZOOKEEPER_PORT);
withEnv("KAFKA_ZOOKEEPER_CONNECT", "localhost:" + ZOOKEEPER_PORT);
if (!this.kraftEnabled && this.externalZookeeperConnect == null) {
getContainerDef().withEmbeddedZookeeper();
}
}

Expand All @@ -229,7 +153,7 @@ protected void containerIsStarting(InspectContainerResponse containerInfo) {
advertisedListeners.add(getBootstrapServers());
advertisedListeners.add(brokerAdvertisedListener(containerInfo));

List<Supplier<String>> listenersToTransform = new ArrayList<>(this.listeners);
List<Supplier<String>> listenersToTransform = new ArrayList<>(getContainerDef().listeners);
for (int i = 0; i < listenersToTransform.size(); i++) {
Supplier<String> listenerSupplier = listenersToTransform.get(i);
String protocol = String.format("%s-%d", PROTOCOL_PREFIX, i);
Expand Down Expand Up @@ -265,7 +189,7 @@ protected String commandKraft() {
String command = "sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure\n";
command +=
"echo 'kafka-storage format --ignore-formatted -t \"" +
this.clusterId +
getContainerDef().getEnvVars().get("CLUSTER_ID") +
"\" -c /etc/kafka/kafka.properties' >> /etc/confluent/docker/configure\n";
return command;
}
Expand Down Expand Up @@ -299,11 +223,113 @@ protected String commandZookeeper() {
* @return this {@link KafkaContainer} instance
*/
public KafkaContainer withListener(Supplier<String> listenerSupplier) {
this.listeners.add(listenerSupplier);
getContainerDef().withListener(listenerSupplier);
return this;
}

protected String brokerAdvertisedListener(InspectContainerResponse containerInfo) {
return String.format("BROKER://%s:%s", containerInfo.getConfig().getHostName(), "9092");
}

private static class KafkaContainerDef extends ContainerDef {

private final Set<Supplier<String>> listeners = new HashSet<>();

private String clusterId = DEFAULT_CLUSTER_ID;

KafkaContainerDef() {
// Use two listeners with different names, it will force Kafka to communicate with itself via internal
// listener when KAFKA_INTER_BROKER_LISTENER_NAME is set, otherwise Kafka will try to use the advertised listener
addEnvVar("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:" + KAFKA_PORT + ",BROKER://0.0.0.0:9092");
addEnvVar("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT");
addEnvVar("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER");

addEnvVar("KAFKA_BROKER_ID", "1");
addEnvVar("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF);
addEnvVar("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", DEFAULT_INTERNAL_TOPIC_RF);
addEnvVar("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF);
addEnvVar("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", DEFAULT_INTERNAL_TOPIC_RF);
addEnvVar("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + "");
addEnvVar("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0");

addExposedTcpPort(KAFKA_PORT);

setEntrypoint("sh");
setCommand("-c", "while [ ! -f " + STARTER_SCRIPT + " ]; do sleep 0.1; done; " + STARTER_SCRIPT);

setWaitStrategy(Wait.forLogMessage(".*\\[KafkaServer id=\\d+\\] started.*", 1));
}

private void resolveListeners() {
Set<String> additionalKafkaListeners = new HashSet<>();
Set<String> additionalListenerSecurityProtocolMap = new HashSet<>();

List<Supplier<String>> listenersToTransform = new ArrayList<>(this.listeners);
for (int i = 0; i < listenersToTransform.size(); i++) {
Supplier<String> listenerSupplier = listenersToTransform.get(i);
String protocol = String.format("%s-%d", PROTOCOL_PREFIX, i);
String listener = listenerSupplier.get();
String listenerPort = listener.split(":")[1];
String listenerProtocol = String.format("%s://0.0.0.0:%s", protocol, listenerPort);
String protocolMap = String.format("%s:PLAINTEXT", protocol);
additionalKafkaListeners.add(listenerProtocol);
additionalListenerSecurityProtocolMap.add(protocolMap);

String host = listener.split(":")[0];
addNetworkAlias(host);
}

String kafkaListeners = String.join(",", additionalKafkaListeners);
String kafkaListenerSecurityProtocolMap = String.join(",", additionalListenerSecurityProtocolMap);

this.envVars.computeIfPresent("KAFKA_LISTENERS", (k, v) -> String.join(",", v, kafkaListeners));
this.envVars.computeIfPresent(
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
(k, v) -> String.join(",", v, kafkaListenerSecurityProtocolMap)
);
}

void withListener(Supplier<String> listenerSupplier) {
this.listeners.add(listenerSupplier);
}

void withEmbeddedZookeeper() {
addExposedTcpPort(ZOOKEEPER_PORT);
addEnvVar("KAFKA_ZOOKEEPER_CONNECT", "localhost:" + ZOOKEEPER_PORT);
}

void withZookeeper(String connectionString) {
addEnvVar("KAFKA_ZOOKEEPER_CONNECT", connectionString);
}

void withClusterId(String clusterId) {
this.clusterId = clusterId;
}

void withRaft() {
this.envVars.computeIfAbsent("CLUSTER_ID", key -> clusterId);
this.envVars.computeIfAbsent("KAFKA_NODE_ID", key -> getEnvVars().get("KAFKA_BROKER_ID"));
addEnvVar(
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
String.format("%s,CONTROLLER:PLAINTEXT", getEnvVars().get("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"))
);
addEnvVar(
"KAFKA_LISTENERS",
String.format("%s,CONTROLLER://0.0.0.0:9094", getEnvVars().get("KAFKA_LISTENERS"))
);
addEnvVar("KAFKA_PROCESS_ROLES", "broker,controller");

String firstNetworkAlias = getNetworkAliases().stream().findFirst().orElse(null);
String networkAlias = getNetwork() != null ? firstNetworkAlias : "localhost";
String controllerQuorumVoters = String.format(
"%s@%s:9094",
getEnvVars().get("KAFKA_NODE_ID"),
networkAlias
);
this.envVars.computeIfAbsent("KAFKA_CONTROLLER_QUORUM_VOTERS", key -> controllerQuorumVoters);
addEnvVar("KAFKA_CONTROLLER_LISTENER_NAMES", "CONTROLLER");

setWaitStrategy(Wait.forLogMessage(".*Transitioning from RECOVERY to RUNNING.*", 1));
}
}
}

0 comments on commit c6d7202

Please sign in to comment.