Skip to content

Commit

Permalink
StrimziKafkaCluster builder (#86)
Browse files Browse the repository at this point in the history
* StrimziKafkaCluster builder

Signed-off-by: see-quick <maros.orsak159@gmail.com>

* add javadoc

Signed-off-by: see-quick <maros.orsak159@gmail.com>

* add Deprecation to constructors and let them live for next release

Signed-off-by: see-quick <maros.orsak159@gmail.com>

* remove un-necessary

Signed-off-by: see-quick <maros.orsak159@gmail.com>

* re-name method accordingly

Signed-off-by: see-quick <maros.orsak159@gmail.com>

---------

Signed-off-by: see-quick <maros.orsak159@gmail.com>
  • Loading branch information
see-quick authored Oct 15, 2024
1 parent 0f30b88 commit f981d57
Show file tree
Hide file tree
Showing 3 changed files with 259 additions and 57 deletions.
200 changes: 161 additions & 39 deletions src/main/java/io/strimzi/test/container/StrimziKafkaCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,26 @@
* A multi-node instance of Kafka and Zookeeper using the latest image from quay.io/strimzi/kafka with the given version.
* It perfectly fits for integration/system testing. We always deploy one zookeeper with a specified number of Kafka instances,
* running as a separate container inside Docker. The additional configuration for Kafka brokers can be passed to the constructor.
* <br><br>
* Note: Direct constructor calls are deprecated and will be removed in the next released version.
* Please use {@link StrimziKafkaClusterBuilder} for creating instances of this class.
*/
public class StrimziKafkaCluster implements KafkaContainer {

// class attributes
private static final Logger LOGGER = LoggerFactory.getLogger(StrimziKafkaCluster.class);

// instance attributes
private final int brokersNum;
private int brokersNum;
private int internalTopicReplicationFactor;
private Map<String, String> additionalKafkaConfiguration;
private ToxiproxyContainer proxyContainer;
private boolean enableSharedNetwork;

// not editable
private final Network network;
private final StrimziZookeeperContainer zookeeper;
private final Collection<KafkaContainer> brokers;
private Collection<KafkaContainer> brokers;

/**
* Constructor for @StrimziKafkaCluster class, which allows you to specify number of brokers @see{brokersNum},
Expand All @@ -53,58 +62,25 @@ public class StrimziKafkaCluster implements KafkaContainer {
* @param proxyContainer Proxy container
* @param enableSharedNetwork enable Kafka cluster to use a shared Docker network.
*/
@Deprecated
public StrimziKafkaCluster(final int brokersNum,
final int internalTopicReplicationFactor,
final Map<String, String> additionalKafkaConfiguration,
final ToxiproxyContainer proxyContainer,
final boolean enableSharedNetwork) {
if (brokersNum <= 0) {
throw new IllegalArgumentException("brokersNum '" + brokersNum + "' must be greater than 0");
}
if (internalTopicReplicationFactor <= 0 || internalTopicReplicationFactor > brokersNum) {
throw new IllegalArgumentException("internalTopicReplicationFactor '" + internalTopicReplicationFactor + "' must be less than brokersNum and greater than 0");
}
validateBrokerNum(brokersNum);
validateInternalTopicReplicationFactor(internalTopicReplicationFactor);

this.brokersNum = brokersNum;
this.network = enableSharedNetwork ? Network.SHARED : Network.newNetwork();

this.zookeeper = new StrimziZookeeperContainer()
.withNetwork(this.network);

Map<String, String> defaultKafkaConfigurationForMultiNode = new HashMap<>();
defaultKafkaConfigurationForMultiNode.put("offsets.topic.replication.factor", String.valueOf(internalTopicReplicationFactor));
defaultKafkaConfigurationForMultiNode.put("num.partitions", String.valueOf(internalTopicReplicationFactor));
defaultKafkaConfigurationForMultiNode.put("transaction.state.log.replication.factor", String.valueOf(internalTopicReplicationFactor));
defaultKafkaConfigurationForMultiNode.put("transaction.state.log.min.isr", String.valueOf(internalTopicReplicationFactor));

if (additionalKafkaConfiguration != null) {
defaultKafkaConfigurationForMultiNode.putAll(additionalKafkaConfiguration);
}

if (proxyContainer != null) {
proxyContainer.setNetwork(this.network);
}

// multi-node set up
this.brokers = IntStream
.range(0, this.brokersNum)
.mapToObj(brokerId -> {
LOGGER.info("Starting broker with id {}", brokerId);
// adding broker id for each kafka container
KafkaContainer kafkaContainer = new StrimziKafkaContainer()
.withBrokerId(brokerId)
.withKafkaConfigurationMap(defaultKafkaConfigurationForMultiNode)
.withExternalZookeeperConnect("zookeeper:" + StrimziZookeeperContainer.ZOOKEEPER_PORT)
.withNetwork(this.network)
.withProxyContainer(proxyContainer)
.withNetworkAliases("broker-" + brokerId)
.dependsOn(this.zookeeper);

LOGGER.info("Started broker with id: {}", kafkaContainer);

return kafkaContainer;
})
.collect(Collectors.toList());
prepareKafkaCluster(additionalKafkaConfiguration);
}

/**
Expand All @@ -116,6 +92,7 @@ public StrimziKafkaCluster(final int brokersNum,
* @param internalTopicReplicationFactor internal topics
* @param additionalKafkaConfiguration additional Kafka configuration
*/
@Deprecated
public StrimziKafkaCluster(final int brokersNum,
final int internalTopicReplicationFactor,
final Map<String, String> additionalKafkaConfiguration) {
Expand All @@ -127,6 +104,7 @@ public StrimziKafkaCluster(final int brokersNum,
*
* @param brokersNum number of brokers
*/
@Deprecated
public StrimziKafkaCluster(final int brokersNum) {
this(brokersNum, brokersNum, null, null, false);
}
Expand All @@ -137,6 +115,7 @@ public StrimziKafkaCluster(final int brokersNum) {
* @param brokersNum number of brokers to be deployed
* @param proxyContainer Proxy container
*/
@Deprecated
public StrimziKafkaCluster(final int brokersNum, final ToxiproxyContainer proxyContainer) {
this(brokersNum, brokersNum, null, proxyContainer, false);
}
Expand All @@ -147,10 +126,153 @@ public StrimziKafkaCluster(final int brokersNum, final ToxiproxyContainer proxyC
* @param brokersNum number of brokers to be deployed
* @param enableSharedNetwork enable Kafka cluster to use a shared Docker network.
*/
@Deprecated
public StrimziKafkaCluster(final int brokersNum, final boolean enableSharedNetwork) {
this(brokersNum, brokersNum, null, null, enableSharedNetwork);
}

private StrimziKafkaCluster(StrimziKafkaClusterBuilder builder) {
this.brokersNum = builder.brokersNum;
this.enableSharedNetwork = builder.enableSharedNetwork;
this.network = this.enableSharedNetwork ? Network.SHARED : Network.newNetwork();
this.internalTopicReplicationFactor = builder.internalTopicReplicationFactor == 0 ? this.brokersNum : builder.internalTopicReplicationFactor;
this.additionalKafkaConfiguration = builder.additionalKafkaConfiguration;
this.proxyContainer = builder.proxyContainer;

validateBrokerNum(this.brokersNum);
validateInternalTopicReplicationFactor(this.internalTopicReplicationFactor);

this.zookeeper = new StrimziZookeeperContainer()
.withNetwork(this.network);

if (this.proxyContainer != null) {
this.proxyContainer.setNetwork(this.network);
}

prepareKafkaCluster(this.additionalKafkaConfiguration);
}

private void prepareKafkaCluster(final Map<String, String> additionalKafkaConfiguration) {
final Map<String, String> defaultKafkaConfigurationForMultiNode = new HashMap<>();
defaultKafkaConfigurationForMultiNode.put("offsets.topic.replication.factor", String.valueOf(internalTopicReplicationFactor));
defaultKafkaConfigurationForMultiNode.put("num.partitions", String.valueOf(internalTopicReplicationFactor));
defaultKafkaConfigurationForMultiNode.put("transaction.state.log.replication.factor", String.valueOf(internalTopicReplicationFactor));
defaultKafkaConfigurationForMultiNode.put("transaction.state.log.min.isr", String.valueOf(internalTopicReplicationFactor));

if (additionalKafkaConfiguration != null) {
defaultKafkaConfigurationForMultiNode.putAll(additionalKafkaConfiguration);
}

// multi-node set up
this.brokers = IntStream
.range(0, this.brokersNum)
.mapToObj(brokerId -> {
LOGGER.info("Starting broker with id {}", brokerId);
// adding broker id for each kafka container
KafkaContainer kafkaContainer = new StrimziKafkaContainer()
.withBrokerId(brokerId)
.withKafkaConfigurationMap(defaultKafkaConfigurationForMultiNode)
.withExternalZookeeperConnect("zookeeper:" + StrimziZookeeperContainer.ZOOKEEPER_PORT)
.withNetwork(this.network)
.withProxyContainer(proxyContainer)
.withNetworkAliases("broker-" + brokerId)
.dependsOn(this.zookeeper);

LOGGER.info("Started broker with id: {}", kafkaContainer);

return kafkaContainer;
})
.collect(Collectors.toList());
}

private void validateBrokerNum(int brokersNum) {
if (brokersNum <= 0) {
throw new IllegalArgumentException("brokersNum '" + brokersNum + "' must be greater than 0");
}
}

private void validateInternalTopicReplicationFactor(int internalTopicReplicationFactor) {
if (internalTopicReplicationFactor <= 0 || internalTopicReplicationFactor > this.brokersNum) {
throw new IllegalArgumentException("internalTopicReplicationFactor '" + internalTopicReplicationFactor + "' must be less than brokersNum and greater than 0");
}
}

public static class StrimziKafkaClusterBuilder {
private int brokersNum;
private int internalTopicReplicationFactor;
private Map<String, String> additionalKafkaConfiguration = new HashMap<>();
private ToxiproxyContainer proxyContainer;
private boolean enableSharedNetwork;

/**
* Sets the number of Kafka brokers in the cluster.
*
* @param brokersNum the number of Kafka brokers
* @return the current instance of {@code StrimziKafkaClusterBuilder} for method chaining
*/
public StrimziKafkaClusterBuilder withNumberOfBrokers(int brokersNum) {
this.brokersNum = brokersNum;
return this;
}

/**
* Sets the internal topic replication factor for Kafka brokers.
* If not provided, it defaults to the number of brokers.
*
* @param internalTopicReplicationFactor the replication factor for internal topics
* @return the current instance of {@code StrimziKafkaClusterBuilder} for method chaining
*/
public StrimziKafkaClusterBuilder withInternalTopicReplicationFactor(int internalTopicReplicationFactor) {
this.internalTopicReplicationFactor = internalTopicReplicationFactor;
return this;
}

/**
* Adds additional Kafka configuration parameters.
* These configurations are applied to all brokers in the cluster.
*
* @param additionalKafkaConfiguration a map of additional Kafka configuration options
* @return the current instance of {@code StrimziKafkaClusterBuilder} for method chaining
*/
public StrimziKafkaClusterBuilder withAdditionalKafkaConfiguration(Map<String, String> additionalKafkaConfiguration) {
if (additionalKafkaConfiguration != null) {
this.additionalKafkaConfiguration.putAll(additionalKafkaConfiguration);
}
return this;
}

/**
* Sets a {@code ToxiproxyContainer} to simulate network conditions such as latency or disconnection.
*
* @param proxyContainer the proxy container for simulating network conditions
* @return the current instance of {@code StrimziKafkaClusterBuilder} for method chaining
*/
public StrimziKafkaClusterBuilder withProxyContainer(ToxiproxyContainer proxyContainer) {
this.proxyContainer = proxyContainer;
return this;
}

/**
* Enables a shared Docker network for the Kafka cluster.
* This allows the Kafka cluster to interact with other containers on the same network.
*
* @return the current instance of {@code StrimziKafkaClusterBuilder} for method chaining
*/
public StrimziKafkaClusterBuilder withSharedNetwork() {
this.enableSharedNetwork = true;
return this;
}

/**
* Builds and returns a {@code StrimziKafkaCluster} instance based on the provided configurations.
*
* @return a new instance of {@code StrimziKafkaCluster}
*/
public StrimziKafkaCluster build() {
return new StrimziKafkaCluster(this);
}
}

/**
* Get collection of Strimzi kafka containers
* @return collection of Strimzi kafka containers
Expand Down
19 changes: 15 additions & 4 deletions src/test/java/io/strimzi/test/container/StrimziKafkaClusterIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ void testKafkaClusterStartup() throws IOException, InterruptedException {
@Test
void testKafkaClusterStartupWithSharedNetwork() throws IOException, InterruptedException {
try {
systemUnderTest = new StrimziKafkaCluster(3, true);
systemUnderTest = new StrimziKafkaCluster.StrimziKafkaClusterBuilder()
.withNumberOfBrokers(NUMBER_OF_REPLICAS)
.withSharedNetwork()
.build();
systemUnderTest.start();

verifyReadinessOfKafkaCluster();
Expand All @@ -91,7 +94,10 @@ void testKafkaClusterFunctionality() throws ExecutionException, InterruptedExcep
@Test
void testKafkaClusterWithSharedNetworkFunctionality() throws ExecutionException, InterruptedException, TimeoutException {
try {
systemUnderTest = new StrimziKafkaCluster(NUMBER_OF_REPLICAS, true);
systemUnderTest = new StrimziKafkaCluster.StrimziKafkaClusterBuilder()
.withNumberOfBrokers(NUMBER_OF_REPLICAS)
.withSharedNetwork()
.build();
systemUnderTest.start();

verifyFunctionalityOfKafkaCluster();
Expand All @@ -111,7 +117,10 @@ void testStartClusterWithProxyContainer() {
StrimziKafkaCluster kafkaCluster = null;

try {
kafkaCluster = new StrimziKafkaCluster(3, proxyContainer);
kafkaCluster = new StrimziKafkaCluster.StrimziKafkaClusterBuilder()
.withNumberOfBrokers(NUMBER_OF_REPLICAS)
.withProxyContainer(proxyContainer)
.build();

kafkaCluster.start();
List<String> bootstrapUrls = new ArrayList<>();
Expand All @@ -130,7 +139,9 @@ void testStartClusterWithProxyContainer() {
}

private void setUpKafkaCluster() {
systemUnderTest = new StrimziKafkaCluster(NUMBER_OF_REPLICAS);
systemUnderTest = new StrimziKafkaCluster.StrimziKafkaClusterBuilder()
.withNumberOfBrokers(NUMBER_OF_REPLICAS)
.build();
systemUnderTest.start();
}

Expand Down
Loading

0 comments on commit f981d57

Please sign in to comment.