Skip to content

Commit

Permalink
Manage log.dir in the EmbeddedKafkaBroker
Browse files Browse the repository at this point in the history
Resolves #194

Create the temporary directory in EKB instead of the broker to avoid
`NoSuchFileException`s during shutdown.

**cherry-pick to 2.4.x, 2.3.x, 2.2.x**
  • Loading branch information
garyrussell authored and artembilan committed May 1, 2020
1 parent ff60e84 commit 2407380
Showing 1 changed file with 17 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -30,6 +32,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -288,6 +291,7 @@ public void afterPropertiesSet() {
}
this.zkConnect = LOOPBACK + ":" + this.zookeeper.getPort();
this.kafkaServers.clear();
boolean userLogDir = this.brokerProperties.get(KafkaConfig.LogDirProp()) != null && this.count == 1;
for (int i = 0; i < this.count; i++) {
Properties brokerConfigProperties = createBrokerProperties(i);
brokerConfigProperties.setProperty(KafkaConfig.ReplicaSocketTimeoutMsProp(), "1000");
Expand All @@ -299,6 +303,9 @@ public void afterPropertiesSet() {
if (!this.brokerProperties.containsKey(KafkaConfig.NumPartitionsProp())) {
brokerConfigProperties.setProperty(KafkaConfig.NumPartitionsProp(), "" + this.partitionsPerTopic);
}
if (!userLogDir) {
logDir(brokerConfigProperties);
}
KafkaServer server = TestUtils.createServer(new KafkaConfig(brokerConfigProperties), Time.SYSTEM);
this.kafkaServers.add(server);
if (this.kafkaPorts[i] == 0) {
Expand All @@ -316,6 +323,16 @@ public void afterPropertiesSet() {
System.setProperty(SPRING_EMBEDDED_ZOOKEEPER_CONNECT, getZookeeperConnectionString());
}

private void logDir(Properties brokerConfigProperties) {
try {
brokerConfigProperties.put(KafkaConfig.LogDirProp(),
Files.createTempDirectory("spring.kafka." + UUID.randomUUID()).toString());
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}

private void overrideExitMethods() {
String exitMsg = "Exit.%s(%d, %s) called";
Exit.setExitProcedure((statusCode, message) -> {
Expand Down

0 comments on commit 2407380

Please sign in to comment.