Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Kraft post Confluent Platform 7.4.0 #7014

Merged
merged 8 commits into from
May 9, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions docs/modules/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,23 @@ Now your tests or any other process running on your machine can get access to ru
<!--/codeinclude-->

## Options

### <a name="zookeeper"></a> Using external Zookeeper

If for some reason you want to use an externally running Zookeeper, then just pass its location during construction:
<!--codeinclude-->
[External Zookeeper](../../modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java) inside_block:withExternalZookeeper
<!--/codeinclude-->

### Using Kraft mode

The self-managed (Kraft) mode is available as a preview feature since version 3.0 (confluentinc/cp-kafka:7.0.x) and
declared as a production ready in 3.3.1 (confluentinc/cp-kafka:7.3.x).
KRaft mode was declared production ready in 3.3.1 (confluentinc/cp-kafka:7.3.x)"

<!--codeinclude-->
[Kraft mode](../../modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java) inside_block:withKraftMode
<!--/codeinclude-->

See the [versions interoperability matrix](https://docs.confluent.io/platform/current/installation/versions-interoperability.html) for more details.
See the [versions interoperability matrix](https://docs.confluent.io/platform/current/installation/versions-interoperability.html) for more details.

### <a name="zookeeper"></a> Using external Zookeeper

If for some reason you want to use an externally running Zookeeper, then just pass its location during construction:
<!--codeinclude-->
[External Zookeeper](../../modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java) inside_block:withExternalZookeeper
<!--/codeinclude-->
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason to move it? I guess there is preferences but I think it doesn't hurt 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, definitively Kraft is the preferred option from now on.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like this change did not get merged. 😢
@eddumelendez as said, Kraft would be the preferred option, would you mind to invert the order and put Kraft first?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's up to the user. That's why I said it doesn't hurt.


## Multi-container usage

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,18 @@ public void testKafkaContainerKraftCluster() throws Exception {
}
}

@Test
public void testKafkaContainerKraftClusterAfterConfluentPlatform740() throws Exception {
try (KafkaContainerKraftCluster cluster = new KafkaContainerKraftCluster("7.4.0", 3, 2)) {
cluster.start();
String bootstrapServers = cluster.getBootstrapServers();

assertThat(cluster.getBrokers()).hasSize(3);

testKafkaFunctionality(bootstrapServers, 3, 2);
}
}

protected void testKafkaFunctionality(String bootstrapServers, int partitions, int rf) throws Exception {
try (
AdminClient adminClient = AdminClient.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import org.testcontainers.utility.ComparableVersion;
import org.testcontainers.utility.DockerImageName;

import java.io.IOException;
import java.util.Objects;

/**
* This container wraps Confluent Kafka and Zookeeper (optionally)
Expand All @@ -29,11 +29,13 @@ public class KafkaContainer extends GenericContainer<KafkaContainer> {
// https://docs.confluent.io/platform/7.0.0/release-notes/index.html#ak-raft-kraft
private static final String MIN_KRAFT_TAG = "7.0.0";

public static final String DEFAULT_CLUSTER_ID = "4L6g3nShT-eMCtK--X86sw";

protected String externalZookeeperConnect = null;

private boolean kraftEnabled = false;

private String clusterId;
private String clusterId = DEFAULT_CLUSTER_ID;

/**
* @deprecated use {@link KafkaContainer(DockerImageName)} instead
Expand Down Expand Up @@ -115,7 +117,13 @@ private void verifyMinKraftVersion() {
}
}

private boolean isLessThanCP740() {
String actualVersion = DockerImageName.parse(getDockerImageName()).getVersionPart();
return new ComparableVersion(actualVersion).isLessThan("7.4.0");
}

public KafkaContainer withClusterId(String clusterId) {
Objects.requireNonNull(clusterId, "clusterId cannot be null");
this.clusterId = clusterId;
return self();
}
Expand All @@ -136,6 +144,8 @@ protected void configure() {
}

protected void configureKraft() {
//CP 7.4.0
getEnvMap().computeIfAbsent("CLUSTER_ID", key -> clusterId);
withEnv(
"KAFKA_NODE_ID",
getEnvMap().computeIfAbsent("KAFKA_NODE_ID", key -> getEnvMap().get("KAFKA_BROKER_ID"))
Expand Down Expand Up @@ -186,24 +196,25 @@ protected void containerIsStarting(InspectContainerResponse containerInfo) {
brokerAdvertisedListener(containerInfo)
);

command += (kraftEnabled) ? commandKraft() : commandZookeeper();
if (kraftEnabled && isLessThanCP740()) {
// Optimization: skip the checks
command += "echo '' > /etc/confluent/docker/ensure \n";
command += commandKraft();
}

if (!kraftEnabled) {
// Optimization: skip the checks
command += "echo '' > /etc/confluent/docker/ensure \n";
command += commandZookeeper();
}

// Optimization: skip the checks
command += "echo '' > /etc/confluent/docker/ensure \n";
// Run the original command
command += "/etc/confluent/docker/run \n";
copyFileToContainer(Transferable.of(command, 0777), STARTER_SCRIPT);
}

protected String commandKraft() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this only called for < CP 7.4? If yes, it may be good to add a defense in depth and check that it is indeed isLessThanCP740 ()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it supposed to be invoked in Zk mode or when Kraft AND CP <7.4
https://github.com/testcontainers/testcontainers-java/pull/7014/files#diff-7c5a407b71c96d4816697ed454df5cb084987573025af294ffa6c182dbd8879eR199-R209

Would like to add a test?

TBH, I'm questioning the optimization gains of removing the checks /etc/confluent/docker/ensure contains... For now, I suggested this solution to be fully backward compatible.

@eddumelendez here are the checks the script is performing... IMHO, the gain does not worth the "complexity" of the code... but that's my 2cts...
https://github.com/confluentinc/kafka-images/blob/master/kafka/include/etc/confluent/docker/ensure

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree, the module is already complex enough 😅 Let's rollback that change, please

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done
#7030

String command = "sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure\n";
try {
if (clusterId == null) {
clusterId = execInContainer("kafka-storage", "random-uuid").getStdout().trim();
}
} catch (IOException | InterruptedException e) {
logger().error("Failed to execute `kafka-storage random-uuid`. Exception message: {}", e.getMessage());
}
command +=
"echo 'kafka-storage format --ignore-formatted -t \"" +
clusterId +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,20 @@ public void testWithHostExposedPortAndExternalNetwork() throws Exception {
}

@Test
public void testUsageKraft() throws Exception {
public void testUsageKraftBeforeConfluentPlatformVersion74() throws Exception {
try (
// withKraftMode {
KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.0.1")).withKraft()
) {
kafka.start();
testKafkaFunctionality(kafka.getBootstrapServers());
}
}

@Test
public void testUsageKraftAfterConfluentPlatformVersion74() throws Exception {
try (
// withKraftMode {
danielpetisme marked this conversation as resolved.
Show resolved Hide resolved
KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0")).withKraft()
// }
danielpetisme marked this conversation as resolved.
Show resolved Hide resolved
) {
kafka.start();
Expand Down