Skip to content

Commit

Permalink
Support Kraft post Confluent Platform 7.4.0 (#7014)
Browse files Browse the repository at this point in the history
Fixes #7010


Co-authored-by: Eddú Meléndez <eddu.melendez@gmail.com>
  • Loading branch information
danielpetisme and eddumelendez committed May 9, 2023
1 parent 3616ebf commit c64aab9
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 38 deletions.
3 changes: 1 addition & 2 deletions docs/modules/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ If for some reason you want to use an externally running Zookeeper, then just pa

### Using Kraft mode

The self-managed (Kraft) mode is available as a preview feature since version 3.0 (confluentinc/cp-kafka:7.0.x) and
declared as a production ready in 3.3.1 (confluentinc/cp-kafka:7.3.x).
KRaft mode was declared production ready in 3.3.1 (confluentinc/cp-kafka:7.3.x)"

<!--codeinclude-->
[Kraft mode](../../modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java) inside_block:withKraftMode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,18 @@ public void testKafkaContainerKraftCluster() throws Exception {
}
}

@Test
public void testKafkaContainerKraftClusterAfterConfluentPlatform740() throws Exception {
try (KafkaContainerKraftCluster cluster = new KafkaContainerKraftCluster("7.4.0", 3, 2)) {
cluster.start();
String bootstrapServers = cluster.getBootstrapServers();

assertThat(cluster.getBrokers()).hasSize(3);

testKafkaFunctionality(bootstrapServers, 3, 2);
}
}

protected void testKafkaFunctionality(String bootstrapServers, int partitions, int rf) throws Exception {
try (
AdminClient adminClient = AdminClient.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@
import org.testcontainers.utility.ComparableVersion;
import org.testcontainers.utility.DockerImageName;

import java.io.IOException;
import java.util.Objects;

/**
* This container wraps Confluent Kafka and Zookeeper (optionally)
*
*/
public class KafkaContainer extends GenericContainer<KafkaContainer> {

Expand All @@ -29,11 +28,13 @@ public class KafkaContainer extends GenericContainer<KafkaContainer> {
// https://docs.confluent.io/platform/7.0.0/release-notes/index.html#ak-raft-kraft
private static final String MIN_KRAFT_TAG = "7.0.0";

public static final String DEFAULT_CLUSTER_ID = "4L6g3nShT-eMCtK--X86sw";

protected String externalZookeeperConnect = null;

private boolean kraftEnabled = false;

private String clusterId;
private String clusterId = DEFAULT_CLUSTER_ID;

/**
* @deprecated use {@link KafkaContainer(DockerImageName)} instead
Expand Down Expand Up @@ -98,7 +99,7 @@ public KafkaContainer withKraft() {
throw new IllegalStateException("Cannot configure Kraft mode when Zookeeper configured");
}
verifyMinKraftVersion();
kraftEnabled = true;
this.kraftEnabled = true;
return self();
}

Expand All @@ -115,7 +116,13 @@ private void verifyMinKraftVersion() {
}
}

private boolean isLessThanCP740() {
String actualVersion = DockerImageName.parse(getDockerImageName()).getVersionPart();
return new ComparableVersion(actualVersion).isLessThan("7.4.0");
}

public KafkaContainer withClusterId(String clusterId) {
Objects.requireNonNull(clusterId, "clusterId cannot be null");
this.clusterId = clusterId;
return self();
}
Expand All @@ -126,7 +133,7 @@ public String getBootstrapServers() {

@Override
protected void configure() {
if (kraftEnabled) {
if (this.kraftEnabled) {
waitingFor(Wait.forLogMessage(".*Transitioning from RECOVERY to RUNNING.*", 1));
configureKraft();
} else {
Expand All @@ -136,31 +143,27 @@ protected void configure() {
}

protected void configureKraft() {
withEnv(
"KAFKA_NODE_ID",
getEnvMap().computeIfAbsent("KAFKA_NODE_ID", key -> getEnvMap().get("KAFKA_BROKER_ID"))
);
//CP 7.4.0
getEnvMap().computeIfAbsent("CLUSTER_ID", key -> clusterId);
getEnvMap().computeIfAbsent("KAFKA_NODE_ID", key -> getEnvMap().get("KAFKA_BROKER_ID"));
withEnv(
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
String.format("%s,CONTROLLER:PLAINTEXT", getEnvMap().get("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"))
);
withEnv("KAFKA_LISTENERS", String.format("%s,CONTROLLER://0.0.0.0:9094", getEnvMap().get("KAFKA_LISTENERS")));

withEnv("KAFKA_PROCESS_ROLES", "broker,controller");
withEnv(
"KAFKA_CONTROLLER_QUORUM_VOTERS",
getEnvMap()
.computeIfAbsent(
"KAFKA_CONTROLLER_QUORUM_VOTERS",
key -> {
return String.format(
"%s@%s:9094",
getEnvMap().get("KAFKA_NODE_ID"),
getNetwork() != null ? getNetworkAliases().get(0) : "localhost"
);
}
)
);
getEnvMap()
.computeIfAbsent(
"KAFKA_CONTROLLER_QUORUM_VOTERS",
key -> {
return String.format(
"%s@%s:9094",
getEnvMap().get("KAFKA_NODE_ID"),
getNetwork() != null ? getNetworkAliases().get(0) : "localhost"
);
}
);
withEnv("KAFKA_CONTROLLER_LISTENER_NAMES", "CONTROLLER");
}

Expand All @@ -186,27 +189,28 @@ protected void containerIsStarting(InspectContainerResponse containerInfo) {
brokerAdvertisedListener(containerInfo)
);

command += (kraftEnabled) ? commandKraft() : commandZookeeper();
if (this.kraftEnabled && isLessThanCP740()) {
// Optimization: skip the checks
command += "echo '' > /etc/confluent/docker/ensure \n";
command += commandKraft();
}

if (!this.kraftEnabled) {
// Optimization: skip the checks
command += "echo '' > /etc/confluent/docker/ensure \n";
command += commandZookeeper();
}

// Optimization: skip the checks
command += "echo '' > /etc/confluent/docker/ensure \n";
// Run the original command
command += "/etc/confluent/docker/run \n";
copyFileToContainer(Transferable.of(command, 0777), STARTER_SCRIPT);
}

protected String commandKraft() {
String command = "sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure\n";
try {
if (clusterId == null) {
clusterId = execInContainer("kafka-storage", "random-uuid").getStdout().trim();
}
} catch (IOException | InterruptedException e) {
logger().error("Failed to execute `kafka-storage random-uuid`. Exception message: {}", e.getMessage());
}
command +=
"echo 'kafka-storage format --ignore-formatted -t \"" +
clusterId +
this.clusterId +
"\" -c /etc/kafka/kafka.properties' >> /etc/confluent/docker/configure\n";
return command;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,20 @@ public void testWithHostExposedPortAndExternalNetwork() throws Exception {
}

@Test
public void testUsageKraft() throws Exception {
public void testUsageKraftBeforeConfluentPlatformVersion74() throws Exception {
try (
// withKraftMode {
KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.0.1")).withKraft()
) {
kafka.start();
testKafkaFunctionality(kafka.getBootstrapServers());
}
}

@Test
public void testUsageKraftAfterConfluentPlatformVersion74() throws Exception {
try (
// withKraftMode {
KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0")).withKraft()
// }
) {
kafka.start();
Expand Down

0 comments on commit c64aab9

Please sign in to comment.