Skip to content

Commit

Permalink
[fix] [broker] Fix nothing changed after removing dynamic configs (ap…
Browse files Browse the repository at this point in the history
  • Loading branch information
poorbarcode authored May 8, 2024
1 parent 5ab0512 commit ada31a9
Show file tree
Hide file tree
Showing 2 changed files with 160 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,7 @@ public class BrokerService implements Closeable {
private final OrderedExecutor topicOrderedExecutor;
// offline topic backlog cache
private final ConcurrentOpenHashMap<TopicName, PersistentOfflineTopicStats> offlineTopicStatCache;
private final ConcurrentOpenHashMap<String, ConfigField> dynamicConfigurationMap =
prepareDynamicConfigurationMap();
private final ConcurrentOpenHashMap<String, ConfigField> dynamicConfigurationMap;
private final ConcurrentOpenHashMap<String, Consumer<?>> configRegisteredListeners;

private final ConcurrentLinkedQueue<TopicLoadingContext> pendingTopicLoadingQueue;
Expand Down Expand Up @@ -313,6 +312,7 @@ public class BrokerService implements Closeable {

public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws Exception {
this.pulsar = pulsar;
this.dynamicConfigurationMap = prepareDynamicConfigurationMap();
this.brokerPublishRateLimiter = new PublishRateLimiterImpl(pulsar.getMonotonicSnapshotClock());
this.preciseTopicPublishRateLimitingEnable =
pulsar.getConfiguration().isPreciseTopicPublishRateLimiterEnable();
Expand Down Expand Up @@ -2496,40 +2496,71 @@ private void handleDynamicConfigurationUpdates() {

if (dynamicConfigResources != null) {
dynamicConfigResources.getDynamicConfigurationAsync()
.thenAccept(optMap -> {
if (!optMap.isPresent()) {
return;
.thenAccept(optMap -> {
// Case some dynamic configs have been removed.
dynamicConfigurationMap.forEach((configKey, fieldWrapper) -> {
boolean configRemoved = optMap.isEmpty() || !optMap.get().containsKey(configKey);
if (fieldWrapper.lastDynamicValue != null && configRemoved) {
configValueChanged(configKey, null);
}
Map<String, String> data = optMap.get();
data.forEach((configKey, value) -> {
ConfigField configFieldWrapper = dynamicConfigurationMap.get(configKey);
if (configFieldWrapper == null) {
log.warn("{} does not exist in dynamicConfigurationMap, skip this config.", configKey);
return;
}
Field configField = configFieldWrapper.field;
Consumer listener = configRegisteredListeners.get(configKey);
try {
final Object existingValue;
final Object newValue;
if (configField != null) {
newValue = FieldParser.value(data.get(configKey), configField);
existingValue = configField.get(pulsar.getConfiguration());
configField.set(pulsar.getConfiguration(), newValue);
} else {
newValue = value;
existingValue = configFieldWrapper.customValue;
configFieldWrapper.customValue = newValue == null ? null : String.valueOf(newValue);
}
log.info("Successfully updated configuration {}/{}", configKey, data.get(configKey));
if (listener != null && !Objects.equals(existingValue, newValue)) {
listener.accept(newValue);
}
} catch (Exception e) {
log.error("Failed to update config {}", configKey, e);
}
});
});
// Some configs have been changed.
if (!optMap.isPresent()) {
return;
}
Map<String, String> data = optMap.get();
data.forEach((configKey, value) -> {
configValueChanged(configKey, value);
});
});
}
}

private void configValueChanged(String configKey, String newValueStr) {
ConfigField configFieldWrapper = dynamicConfigurationMap.get(configKey);
if (configFieldWrapper == null) {
log.warn("{} does not exist in dynamicConfigurationMap, skip this config.", configKey);
return;
}
Consumer listener = configRegisteredListeners.get(configKey);
try {
// Convert existingValue and newValue.
final Object existingValue;
final Object newValue;
if (configFieldWrapper.field != null) {
if (StringUtils.isBlank(newValueStr)) {
newValue = configFieldWrapper.defaultValue;
} else {
newValue = FieldParser.value(newValueStr, configFieldWrapper.field);
}
existingValue = configFieldWrapper.field.get(pulsar.getConfiguration());
configFieldWrapper.field.set(pulsar.getConfiguration(), newValue);
} else {
// This case only occurs when it is a customized item.
// See: https://github.com/apache/pulsar/blob/master/pip/pip-300.md.
log.info("Skip update customized dynamic configuration {}/{} in memory, only trigger an event"
+ " listeners.", configKey, newValueStr);
existingValue = configFieldWrapper.lastDynamicValue;
newValue = newValueStr == null ? configFieldWrapper.defaultValue : newValueStr;
}
// Record the latest dynamic config.
configFieldWrapper.lastDynamicValue = newValueStr;

if (newValueStr == null) {
log.info("Successfully remove the dynamic configuration {}, and revert to the default value",
configKey);
} else {
log.info("Successfully updated configuration {}/{}", configKey, newValueStr);
}

if (listener != null && !Objects.equals(existingValue, newValue)) {
// So far, all config items that related to configuration listeners, their default value is not null.
// And the customized config can be null before.
// So call "listener.accept(null)" is okay.
listener.accept(newValue);
}
} catch (Exception e) {
log.error("Failed to update config {}", configKey, e);
}
}

Expand Down Expand Up @@ -2936,6 +2967,9 @@ private void updateManagedLedgerConfig() {
* On notification, listener should first check if config value has been changed and after taking appropriate
* action, listener should update config value with new value if it has been changed (so, next time listener can
* compare values on configMap change).
*
* Note: The new value that the {@param listener} may accept could be a null value.
*
* @param <T>
*
* @param configKey
Expand Down Expand Up @@ -3057,16 +3091,23 @@ public boolean validateDynamicConfiguration(String key, String value) {
return true;
}

private static ConcurrentOpenHashMap<String, ConfigField> prepareDynamicConfigurationMap() {
private ConcurrentOpenHashMap<String, ConfigField> prepareDynamicConfigurationMap() {
ConcurrentOpenHashMap<String, ConfigField> dynamicConfigurationMap =
ConcurrentOpenHashMap.<String, ConfigField>newBuilder().build();
for (Field field : ServiceConfiguration.class.getDeclaredFields()) {
if (field != null && field.isAnnotationPresent(FieldContext.class)) {
field.setAccessible(true);
if (field.getAnnotation(FieldContext.class).dynamic()) {
dynamicConfigurationMap.put(field.getName(), new ConfigField(field));
try {
for (Field field : ServiceConfiguration.class.getDeclaredFields()) {
if (field != null && field.isAnnotationPresent(FieldContext.class)) {
field.setAccessible(true);
if (field.getAnnotation(FieldContext.class).dynamic()) {
Object defaultValue = field.get(pulsar.getConfiguration());
dynamicConfigurationMap.put(field.getName(), new ConfigField(field, defaultValue));
}
}
}
} catch (IllegalArgumentException | IllegalAccessException ex) {
// This error never occurs.
log.error("Failed to initialize dynamic configuration map", ex);
throw new RuntimeException(ex);
}
return dynamicConfigurationMap;
}
Expand Down Expand Up @@ -3348,19 +3389,25 @@ private static class ConfigField {
// field holds the pulsar dynamic configuration.
final Field field;

// customValue holds the external dynamic configuration.
volatile String customValue;
// It is the dynamic config value if set.
// It is null if has does not set a dynamic config, even if the value of "pulsar.config" is present.
volatile String lastDynamicValue;

// The default value of "pulsar.config", which is initialized when the broker is starting.
// After the dynamic config has been removed, revert the config to this default value.
final Object defaultValue;

Predicate<String> validator;

public ConfigField(Field field) {
public ConfigField(Field field, Object defaultValue) {
super();
this.field = field;
this.defaultValue = defaultValue;
}

public static ConfigField newCustomConfigField(String customValue) {
ConfigField configField = new ConfigField(null);
configField.customValue = customValue;
ConfigField configField = new ConfigField(null, null);
configField.lastDynamicValue = customValue;
return configField;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@
*/
package org.apache.pulsar.broker.admin;

import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.fail;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import javax.ws.rs.core.Response;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -107,4 +110,69 @@ public void testRegisterCustomDynamicConfiguration() throws PulsarAdminException
allDynamicConfigurations = admin.brokers().getAllDynamicConfigurations();
assertThat(allDynamicConfigurations).doesNotContainKey(key);
}

@Test
public void testDeleteStringDynamicConfig() throws PulsarAdminException {
String syncEventTopic = BrokerTestUtil.newUniqueName(SYSTEM_NAMESPACE + "/tp");
// The default value is null;
Awaitility.await().untilAsserted(() -> {
assertNull(pulsar.getConfig().getConfigurationMetadataSyncEventTopic());
});
// Set dynamic config.
admin.brokers().updateDynamicConfiguration("configurationMetadataSyncEventTopic", syncEventTopic);
Awaitility.await().untilAsserted(() -> {
assertEquals(pulsar.getConfig().getConfigurationMetadataSyncEventTopic(), syncEventTopic);
});
// Remove dynamic config.
admin.brokers().deleteDynamicConfiguration("configurationMetadataSyncEventTopic");
Awaitility.await().untilAsserted(() -> {
assertNull(pulsar.getConfig().getConfigurationMetadataSyncEventTopic());
});
}

@Test
public void testDeleteIntDynamicConfig() throws PulsarAdminException {
// Record the default value;
int defaultValue = pulsar.getConfig().getMaxConcurrentTopicLoadRequest();
// Set dynamic config.
int newValue = defaultValue + 1000;
admin.brokers().updateDynamicConfiguration("maxConcurrentTopicLoadRequest", newValue + "");
Awaitility.await().untilAsserted(() -> {
assertEquals(pulsar.getConfig().getMaxConcurrentTopicLoadRequest(), newValue);
});
// Verify: it has been reverted to the default value.
admin.brokers().deleteDynamicConfiguration("maxConcurrentTopicLoadRequest");
Awaitility.await().untilAsserted(() -> {
assertEquals(pulsar.getConfig().getMaxConcurrentTopicLoadRequest(), defaultValue);
});
}

@Test
public void testDeleteCustomizedDynamicConfig() throws PulsarAdminException {
// Record the default value;
String customizedConfigName = "a123";
pulsar.getBrokerService().registerCustomDynamicConfiguration(customizedConfigName, v -> true);

AtomicReference<Object> currentValue = new AtomicReference<>();
pulsar.getBrokerService().registerConfigurationListener(customizedConfigName, v -> {
currentValue.set(v);
});

// The default value is null;
Awaitility.await().untilAsserted(() -> {
assertNull(currentValue.get());
});

// Set dynamic config.
admin.brokers().updateDynamicConfiguration(customizedConfigName, "xxx");
Awaitility.await().untilAsserted(() -> {
assertEquals(currentValue.get(), "xxx");
});

// Remove dynamic config.
admin.brokers().deleteDynamicConfiguration(customizedConfigName);
Awaitility.await().untilAsserted(() -> {
assertNull(currentValue.get());
});
}
}

0 comments on commit ada31a9

Please sign in to comment.