diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java index 57650758bbc70..ea793cabff522 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java @@ -264,7 +264,7 @@ public void getAllDynamicConfigurations(@Suspended AsyncResponse asyncResponse) @ApiResponse(code = 403, message = "You don't have admin permission to get configuration")}) public void getDynamicConfigurationName(@Suspended AsyncResponse asyncResponse) { validateSuperUserAccessAsync() - .thenAccept(__ -> asyncResponse.resume(BrokerService.getDynamicConfiguration())) + .thenAccept(__ -> asyncResponse.resume(pulsar().getBrokerService().getDynamicConfiguration())) .exceptionally(ex -> { LOG.error("[{}] Failed to get all dynamic configuration names.", clientAppId(), ex); resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -297,11 +297,11 @@ public void getRuntimeConfiguration(@Suspended AsyncResponse asyncResponse) { */ private synchronized CompletableFuture persistDynamicConfigurationAsync( String configName, String configValue) { - if (!BrokerService.validateDynamicConfiguration(configName, configValue)) { + if (!pulsar().getBrokerService().validateDynamicConfiguration(configName, configValue)) { return FutureUtil .failedFuture(new RestException(Status.PRECONDITION_FAILED, " Invalid dynamic-config value")); } - if (BrokerService.isDynamicConfiguration(configName)) { + if (pulsar().getBrokerService().isDynamicConfiguration(configName)) { return dynamicConfigurationResources().setDynamicConfigurationWithCreateAsync(old -> { Map configurationMap = old.orElseGet(Maps::newHashMap); configurationMap.put(configName, configValue); @@ -526,7 +526,7 @@ private CompletableFuture healthCheckRecursiveReadNext(Reader read } private CompletableFuture internalDeleteDynamicConfigurationOnMetadataAsync(String configName) { - if (!BrokerService.isDynamicConfiguration(configName)) { + if (!pulsar().getBrokerService().isDynamicConfiguration(configName)) { throw new RestException(Status.PRECONDITION_FAILED, " Can't update non-dynamic configuration"); } else { return dynamicConfigurationResources().setDynamicConfigurationAsync(old -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 2adacecb1fadd..bcfe531e39f30 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -218,8 +218,7 @@ public class BrokerService implements Closeable { private final OrderedExecutor topicOrderedExecutor; // offline topic backlog cache private final ConcurrentOpenHashMap offlineTopicStatCache; - private static final ConcurrentOpenHashMap dynamicConfigurationMap = - prepareDynamicConfigurationMap(); + private final ConcurrentOpenHashMap dynamicConfigurationMap; private final ConcurrentOpenHashMap> configRegisteredListeners; private final ConcurrentLinkedQueue pendingTopicLoadingQueue; @@ -292,6 +291,7 @@ public class BrokerService implements Closeable { public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws Exception { this.pulsar = pulsar; + this.dynamicConfigurationMap = prepareDynamicConfigurationMap(); this.preciseTopicPublishRateLimitingEnable = pulsar.getConfiguration().isPreciseTopicPublishRateLimiterEnable(); this.managedLedgerFactory = pulsar.getManagedLedgerFactory(); @@ -2517,37 +2517,73 @@ private void handleDynamicConfigurationUpdates() { if (dynamicConfigResources != null) { dynamicConfigResources.getDynamicConfigurationAsync() - .thenAccept(optMap -> { - if (!optMap.isPresent()) { - return; + .thenAccept(optMap -> { + // Case some dynamic configs have been removed. + dynamicConfigurationMap.forEach((configKey, fieldWrapper) -> { + boolean configRemoved = optMap.isEmpty() || !optMap.get().containsKey(configKey); + if (fieldWrapper.lastDynamicValue != null && configRemoved) { + configValueChanged(configKey, null); } - Map data = optMap.get(); - data.forEach((configKey, value) -> { - ConfigField configFieldWrapper = dynamicConfigurationMap.get(configKey); - if (configFieldWrapper == null) { - log.warn("{} does not exist in dynamicConfigurationMap, skip this config.", configKey); - return; - } - Field configField = configFieldWrapper.field; - Object newValue = FieldParser.value(data.get(configKey), configField); - if (configField != null) { - Consumer listener = configRegisteredListeners.get(configKey); - try { - Object existingValue = configField.get(pulsar.getConfiguration()); - configField.set(pulsar.getConfiguration(), newValue); - log.info("Successfully updated configuration {}/{}", configKey, - data.get(configKey)); - if (listener != null && !existingValue.equals(newValue)) { - listener.accept(newValue); - } - } catch (Exception e) { - log.error("Failed to update config {}/{}", configKey, newValue); - } - } else { - log.error("Found non-dynamic field in dynamicConfigMap {}/{}", configKey, newValue); - } - }); }); + // Some configs have been changed. + if (!optMap.isPresent()) { + return; + } + Map data = optMap.get(); + data.forEach((configKey, value) -> { + configValueChanged(configKey, value); + }); + }); + } + } + + private void configValueChanged(String configKey, String newValueStr) { + ConfigField configFieldWrapper = dynamicConfigurationMap.get(configKey); + if (configFieldWrapper == null) { + log.warn("{} does not exist in dynamicConfigurationMap, skip this config.", configKey); + return; + } + Consumer listener = configRegisteredListeners.get(configKey); + try { + // Convert existingValue and newValue. + final Object existingValue; + final Object newValue; + if (configFieldWrapper.field != null) { + if (StringUtils.isBlank(newValueStr)) { + newValue = configFieldWrapper.defaultValue; + } else { + newValue = FieldParser.value(newValueStr, configFieldWrapper.field); + } + existingValue = configFieldWrapper.field.get(pulsar.getConfiguration()); + configFieldWrapper.field.set(pulsar.getConfiguration(), newValue); + } else { + // This case only occurs when it is a customized item. + // Since https://github.com/apache/pulsar/blob/master/pip/pip-300.md has not been cherry-picked, this + // case should never occur. + log.error("Skip update customized dynamic configuration {}/{} in memory, only trigger an event" + + " listeners. Since PIP-300 has net been cherry-picked, this case should never occur", + configKey, newValueStr); + existingValue = configFieldWrapper.lastDynamicValue; + newValue = newValueStr == null ? configFieldWrapper.defaultValue : newValueStr; + } + // Record the latest dynamic config. + configFieldWrapper.lastDynamicValue = newValueStr; + + if (newValueStr == null) { + log.info("Successfully remove the dynamic configuration {}, and revert to the default value", + configKey); + } else { + log.info("Successfully updated configuration {}/{}", configKey, newValueStr); + } + + if (listener != null && !Objects.equals(existingValue, newValue)) { + // So far, all config items that related to configuration listeners, their default value is not null. + // And the customized config can be null before. + // So call "listener.accept(null)" is okay. + listener.accept(newValue); + } + } catch (Exception e) { + log.error("Failed to update config {}", configKey, e); } } @@ -2958,6 +2994,9 @@ private void updateManagedLedgerConfig() { * On notification, listener should first check if config value has been changed and after taking appropriate * action, listener should update config value with new value if it has been changed (so, next time listener can * compare values on configMap change). + * + * Note: The new value that the {@param listener} may accept could be a null value. + * * @param * * @param configKey @@ -3048,7 +3087,7 @@ public DelayedDeliveryTrackerFactory getDelayedDeliveryTrackerFactory() { return delayedDeliveryTrackerFactory; } - public static List getDynamicConfiguration() { + public List getDynamicConfiguration() { return dynamicConfigurationMap.keys(); } @@ -3061,27 +3100,34 @@ public Map getRuntimeConfiguration() { return configMap; } - public static boolean isDynamicConfiguration(String key) { + public boolean isDynamicConfiguration(String key) { return dynamicConfigurationMap.containsKey(key); } - public static boolean validateDynamicConfiguration(String key, String value) { + public boolean validateDynamicConfiguration(String key, String value) { if (dynamicConfigurationMap.containsKey(key) && dynamicConfigurationMap.get(key).validator != null) { return dynamicConfigurationMap.get(key).validator.test(value); } return true; } - private static ConcurrentOpenHashMap prepareDynamicConfigurationMap() { + private ConcurrentOpenHashMap prepareDynamicConfigurationMap() { ConcurrentOpenHashMap dynamicConfigurationMap = ConcurrentOpenHashMap.newBuilder().build(); - for (Field field : ServiceConfiguration.class.getDeclaredFields()) { - if (field != null && field.isAnnotationPresent(FieldContext.class)) { - field.setAccessible(true); - if (field.getAnnotation(FieldContext.class).dynamic()) { - dynamicConfigurationMap.put(field.getName(), new ConfigField(field)); + try { + for (Field field : ServiceConfiguration.class.getDeclaredFields()) { + if (field != null && field.isAnnotationPresent(FieldContext.class)) { + field.setAccessible(true); + if (field.getAnnotation(FieldContext.class).dynamic()) { + Object defaultValue = field.get(pulsar.getConfiguration()); + dynamicConfigurationMap.put(field.getName(), new ConfigField(field, defaultValue)); + } } } + } catch (IllegalArgumentException | IllegalAccessException ex) { + // This error never occurs. + log.error("Failed to initialize dynamic configuration map", ex); + throw new RuntimeException(ex); } return dynamicConfigurationMap; } @@ -3361,11 +3407,21 @@ public void unblockDispatchersOnUnAckMessages(List validator; - public ConfigField(Field field) { + public ConfigField(Field field, Object defaultValue) { super(); this.field = field; + this.defaultValue = defaultValue; } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDynamicConfigurationsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDynamicConfigurationsTest.java index c9a07dc966de6..d7a21e6741e9b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDynamicConfigurationsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDynamicConfigurationsTest.java @@ -20,12 +20,16 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; +import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE; +import static org.testng.Assert.assertNull; import static org.testng.Assert.fail; import java.util.Map; import javax.ws.rs.core.Response; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.awaitility.Awaitility; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -69,4 +73,40 @@ public void TestDeleteInvalidDynamicConfiguration() { } } } + + @Test + public void testDeleteStringDynamicConfig() throws PulsarAdminException { + String syncEventTopic = BrokerTestUtil.newUniqueName(SYSTEM_NAMESPACE + "/tp"); + // The default value is null; + Awaitility.await().untilAsserted(() -> { + assertNull(pulsar.getConfig().getConfigurationMetadataSyncEventTopic()); + }); + // Set dynamic config. + admin.brokers().updateDynamicConfiguration("configurationMetadataSyncEventTopic", syncEventTopic); + Awaitility.await().untilAsserted(() -> { + assertEquals(pulsar.getConfig().getConfigurationMetadataSyncEventTopic(), syncEventTopic); + }); + // Remove dynamic config. + admin.brokers().deleteDynamicConfiguration("configurationMetadataSyncEventTopic"); + Awaitility.await().untilAsserted(() -> { + assertNull(pulsar.getConfig().getConfigurationMetadataSyncEventTopic()); + }); + } + + @Test + public void testDeleteIntDynamicConfig() throws PulsarAdminException { + // Record the default value; + int defaultValue = pulsar.getConfig().getMaxConcurrentTopicLoadRequest(); + // Set dynamic config. + int newValue = defaultValue + 1000; + admin.brokers().updateDynamicConfiguration("maxConcurrentTopicLoadRequest", newValue + ""); + Awaitility.await().untilAsserted(() -> { + assertEquals(pulsar.getConfig().getMaxConcurrentTopicLoadRequest(), newValue); + }); + // Verify: it has been reverted to the default value. + admin.brokers().deleteDynamicConfiguration("maxConcurrentTopicLoadRequest"); + Awaitility.await().untilAsserted(() -> { + assertEquals(pulsar.getConfig().getMaxConcurrentTopicLoadRequest(), defaultValue); + }); + } }