From 4ffa4d23e07036110769770699e3989f7b923b4e Mon Sep 17 00:00:00 2001 From: Jennifer Meyer Date: Tue, 24 Oct 2023 14:22:26 +0200 Subject: [PATCH] feat(core): allow to delete topic config (GH-362) --- src/main/java/org/akhq/models/Config.java | 6 ++++- .../akhq/repositories/ConfigRepository.java | 12 +++++++--- .../repositories/ConfigRepositoryTest.java | 22 +++++++++++++++++-- 3 files changed, 34 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/akhq/models/Config.java b/src/main/java/org/akhq/models/Config.java index 0075bbf57..fc1e66196 100644 --- a/src/main/java/org/akhq/models/Config.java +++ b/src/main/java/org/akhq/models/Config.java @@ -46,6 +46,10 @@ public Config(ConfigEntry entry) { } } + public boolean shouldResetToDefault() { + return this.getValue().isBlank(); + } + private String findDescription(String name) { String docName = name.toUpperCase().replace(".", "_") + "_DOC"; @@ -89,4 +93,4 @@ public enum Source { DEFAULT_CONFIG, UNKNOWN } -} +} \ No newline at end of file diff --git a/src/main/java/org/akhq/repositories/ConfigRepository.java b/src/main/java/org/akhq/repositories/ConfigRepository.java index 674e36623..9dfa62066 100644 --- a/src/main/java/org/akhq/repositories/ConfigRepository.java +++ b/src/main/java/org/akhq/repositories/ConfigRepository.java @@ -68,14 +68,20 @@ public void updateTopic(String clusterId, String name, List configs) thr private void update(String clusterId, ConfigResource.Type type, String name, List configs) throws ExecutionException, InterruptedException { List entries = new ArrayList<>(); + List configNamesToReset = configs.stream() + .filter(Config::shouldResetToDefault) + .map(Config::getName) + .collect(Collectors.toList()); + this.find(clusterId, type, Collections.singletonList(name)) .get(name) .stream() .filter(config -> config.getSource().name().startsWith("DYNAMIC_")) + .filter(config -> !configNamesToReset.contains(config.getName())) .forEach(config -> entries.add(new ConfigEntry(config.getName(), config.getValue()))); - configs - .stream() + configs.stream() + .filter(config -> !config.shouldResetToDefault()) .map(config -> new ConfigEntry(config.getName(), config.getValue())) .forEach(entries::add); @@ -102,4 +108,4 @@ public static List updatedConfigs(Map request, List config.withValue(request.get(configFn.apply(config)))) .collect(Collectors.toList()); } -} +} \ No newline at end of file diff --git a/src/test/java/org/akhq/repositories/ConfigRepositoryTest.java b/src/test/java/org/akhq/repositories/ConfigRepositoryTest.java index 2588d23a5..47782ce76 100644 --- a/src/test/java/org/akhq/repositories/ConfigRepositoryTest.java +++ b/src/test/java/org/akhq/repositories/ConfigRepositoryTest.java @@ -11,6 +11,7 @@ import java.util.concurrent.ExecutionException; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; class ConfigRepositoryTest extends AbstractTest { @Inject @@ -18,6 +19,7 @@ class ConfigRepositoryTest extends AbstractTest { @Test void updateTopic() throws ExecutionException, InterruptedException { + // write config the first time repository.updateTopic( KafkaTestCluster.CLUSTER_ID, KafkaTestCluster.TOPIC_HUGE, @@ -30,6 +32,7 @@ void updateTopic() throws ExecutionException, InterruptedException { assertEquals("1", getConfig("file.delete.delay.ms").getValue()); assertEquals("2", getConfig("index.interval.bytes").getValue()); + // update config 1 repository.updateTopic( KafkaTestCluster.CLUSTER_ID, KafkaTestCluster.TOPIC_HUGE, @@ -41,7 +44,7 @@ void updateTopic() throws ExecutionException, InterruptedException { assertEquals("3", getConfig("file.delete.delay.ms").getValue()); assertEquals("2", getConfig("index.interval.bytes").getValue()); - + // update config 2 repository.updateTopic( KafkaTestCluster.CLUSTER_ID, KafkaTestCluster.TOPIC_HUGE, @@ -52,6 +55,21 @@ void updateTopic() throws ExecutionException, InterruptedException { assertEquals("3", getConfig("file.delete.delay.ms").getValue()); assertEquals("4", getConfig("index.interval.bytes").getValue()); + + // reset config index.interval.bytes (leave config file.delete.delay.ms unchanged) + repository.updateTopic( + KafkaTestCluster.CLUSTER_ID, + KafkaTestCluster.TOPIC_HUGE, + Collections.singletonList( + new Config("index.interval.bytes", "") + ) + ); + + Config unchangedConfig = getConfig("file.delete.delay.ms"); + assertTrue(unchangedConfig.getSource().name().startsWith("DYNAMIC_")); + + Config resettedConfig = getConfig("index.interval.bytes"); + assertTrue(resettedConfig.getSource().name().startsWith("DEFAULT_")); } private Config getConfig(String name) throws ExecutionException, InterruptedException { @@ -62,4 +80,4 @@ private Config getConfig(String name) throws ExecutionException, InterruptedExce .findAny() .get(); } -} +} \ No newline at end of file