Skip to content

Commit

Permalink
[ISSUE #4088] fix sync data by etcd io.vertx.core.VertxException (#4141)
Browse files Browse the repository at this point in the history
* fix #4088

* fix sync data

* fix TU

* refactor code

Co-authored-by: xiaoyu <xiaoyu@apache.org>
  • Loading branch information
misaya295 and yu199195 committed Oct 31, 2022
1 parent 3a167a2 commit 04f650d
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,21 @@
import io.etcd.jetcd.options.WatchOption;
import io.etcd.jetcd.watch.WatchEvent;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.shenyu.common.exception.ShenyuException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static java.nio.charset.StandardCharsets.UTF_8;

/**
* Etcd client of Bootstrap.
*/
Expand Down Expand Up @@ -71,7 +74,7 @@ public void close() {
public String get(final String key) {
List<KeyValue> keyValues = null;
try {
keyValues = client.getKVClient().get(ByteSequence.from(key, StandardCharsets.UTF_8)).get().getKvs();
keyValues = client.getKVClient().get(bytesOf(key)).get().getKvs();
} catch (InterruptedException | ExecutionException e) {
LOG.error(e.getMessage(), e);
}
Expand All @@ -80,7 +83,37 @@ public String get(final String key) {
return null;
}

return keyValues.iterator().next().getValue().toString(StandardCharsets.UTF_8);
return keyValues.iterator().next().getValue().toString(UTF_8);
}

/**
* get keys by prefix.
*
* @param prefix key prefix.
* @return key valuesMap.
*/
public Map<String, String> getKeysMapByPrefix(final String prefix) {
GetOption getOption = GetOption.newBuilder()
.isPrefix(true)
.build();
try {
return this.client.getKVClient().get(bytesOf(prefix), getOption)
.get().getKvs().stream()
.collect(Collectors.toMap(e -> e.getKey().toString(UTF_8), e -> e.getValue().toString(UTF_8)));
} catch (ExecutionException | InterruptedException e) {
LOG.error("etcd getKeysMapByPrefix key {} error {}", prefix, e);
throw new ShenyuException(e);
}

}

/**
* bytesOf string.
* @param val val.
* @return bytes val.
*/
public ByteSequence bytesOf(final String val) {
return ByteSequence.from(val, UTF_8);
}

/**
Expand All @@ -93,7 +126,7 @@ public String get(final String key) {
* @throws InterruptedException the exception
*/
public List<String> getChildrenKeys(final String prefix, final String separator) throws ExecutionException, InterruptedException {
ByteSequence prefixByteSequence = ByteSequence.from(prefix, StandardCharsets.UTF_8);
ByteSequence prefixByteSequence = bytesOf(prefix);
GetOption getOption = GetOption.newBuilder()
.withPrefix(prefixByteSequence)
.withSortField(GetOption.SortTarget.KEY)
Expand All @@ -106,7 +139,25 @@ public List<String> getChildrenKeys(final String prefix, final String separator)
.getKvs();

return keyValues.stream()
.map(e -> getSubNodeKeyName(prefix, e.getKey().toString(StandardCharsets.UTF_8), separator))
.map(e -> getSubNodeKeyName(prefix, e.getKey().toString(UTF_8), separator))
.distinct()
.filter(e -> Objects.nonNull(e))
.collect(Collectors.toList());
}

/**
* get keyPrefix map.
*
* @param prefix key prefix.
* @param separator separator char
* @param map prefix map
* @return sub map
*/
public List<String> getChildrenKeysByMap(final String prefix, final String separator, final Map<String, String> map) {

return map.entrySet().stream()
.filter(e -> e.getKey().contains(prefix))
.map(e -> getSubNodeKeyName(prefix, e.getKey(), separator))
.distinct()
.filter(e -> Objects.nonNull(e))
.collect(Collectors.toList());
Expand All @@ -131,7 +182,7 @@ public void watchDataChange(final String key,
final BiConsumer<String, String> updateHandler,
final Consumer<String> deleteHandler) {
Watch.Listener listener = watch(updateHandler, deleteHandler);
Watch.Watcher watch = client.getWatchClient().watch(ByteSequence.from(key, StandardCharsets.UTF_8), listener);
Watch.Watcher watch = client.getWatchClient().watch(ByteSequence.from(key, UTF_8), listener);
watchCache.put(key, watch);
}

Expand All @@ -147,18 +198,18 @@ public void watchChildChange(final String key,
final Consumer<String> deleteHandler) {
Watch.Listener listener = watch(updateHandler, deleteHandler);
WatchOption option = WatchOption.newBuilder()
.withPrefix(ByteSequence.from(key, StandardCharsets.UTF_8))
.withPrefix(ByteSequence.from(key, UTF_8))
.build();
Watch.Watcher watch = client.getWatchClient().watch(ByteSequence.from(key, StandardCharsets.UTF_8), option, listener);
Watch.Watcher watch = client.getWatchClient().watch(ByteSequence.from(key, UTF_8), option, listener);
watchCache.put(key, watch);
}

private Watch.Listener watch(final BiConsumer<String, String> updateHandler,
final Consumer<String> deleteHandler) {
return Watch.listener(response -> {
for (WatchEvent event : response.getEvents()) {
String path = event.getKeyValue().getKey().toString(StandardCharsets.UTF_8);
String value = event.getKeyValue().getValue().toString(StandardCharsets.UTF_8);
String path = event.getKeyValue().getKey().toString(UTF_8);
String value = event.getKeyValue().getValue().toString(UTF_8);
switch (event.getEventType()) {
case PUT:
updateHandler.accept(path, value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,13 @@
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.Collections;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;

/**
Expand All @@ -54,6 +57,8 @@ public class EtcdSyncDataService implements SyncDataService {
*/
private static final Logger LOG = LoggerFactory.getLogger(EtcdSyncDataService.class);

private static final String PRE_FIX = "/shenyu";

private final EtcdClient etcdClient;

private final PluginDataSubscriber pluginDataSubscriber;
Expand All @@ -62,6 +67,8 @@ public class EtcdSyncDataService implements SyncDataService {

private final List<AuthDataSubscriber> authDataSubscribers;

private Map<String, String> keysMap = new ConcurrentHashMap<>();

/**
* Instantiates a new Zookeeper cache manager.
*
Expand All @@ -78,18 +85,28 @@ public EtcdSyncDataService(final EtcdClient etcdClient,
this.pluginDataSubscriber = pluginDataSubscriber;
this.metaDataSubscribers = metaDataSubscribers;
this.authDataSubscribers = authDataSubscribers;
watchAllKeys();
watcherData();
watchAppAuth();
watchMetaData();
}

private void watchAllKeys() {
keysMap = etcdClient.getKeysMapByPrefix(PRE_FIX);
etcdClient.watchDataChange(PRE_FIX, (updateKey, updateValue) -> {
keysMap.put(updateKey, updateValue);
}, deleteKey -> {
keysMap.remove(deleteKey);
});

}

private void watcherData() {
final String pluginParent = DefaultPathConstants.PLUGIN_PARENT;
List<String> pluginChildren = etcdClientGetChildren(pluginParent);
List<String> pluginChildren = etcdClientGetChildrenByMap(pluginParent, keysMap);
for (String pluginName : pluginChildren) {
watcherAll(pluginName);
}

etcdClient.watchChildChange(pluginParent, (updateNode, updateValue) -> {
if (!updateNode.isEmpty()) {
watcherAll(updateNode);
Expand All @@ -105,17 +122,17 @@ private void watcherAll(final String pluginName) {

private void watcherPlugin(final String pluginName) {
String pluginPath = DefaultPathConstants.buildPluginPath(pluginName);
cachePluginData(etcdClient.get(pluginPath));
cachePluginData(keysMap.get(pluginPath));
subscribePluginDataChanges(pluginPath, pluginName);
}

private void watcherSelector(final String pluginName) {
String selectorParentPath = DefaultPathConstants.buildSelectorParentPath(pluginName);
List<String> childrenList = etcdClientGetChildren(selectorParentPath);
List<String> childrenList = etcdClientGetChildrenByMap(selectorParentPath, keysMap);
if (CollectionUtils.isNotEmpty(childrenList)) {
childrenList.forEach(children -> {
String realPath = buildRealPath(selectorParentPath, children);
cacheSelectorData(etcdClient.get(realPath));
cacheSelectorData(keysMap.get(realPath));
subscribeSelectorDataChanges(realPath);
});
}
Expand All @@ -124,11 +141,11 @@ private void watcherSelector(final String pluginName) {

private void watcherRule(final String pluginName) {
String ruleParent = DefaultPathConstants.buildRuleParentPath(pluginName);
List<String> childrenList = etcdClientGetChildren(ruleParent);
List<String> childrenList = etcdClientGetChildrenByMap(ruleParent, keysMap);
if (CollectionUtils.isNotEmpty(childrenList)) {
childrenList.forEach(children -> {
String realPath = buildRealPath(ruleParent, children);
cacheRuleData(etcdClient.get(realPath));
cacheRuleData(keysMap.get(realPath));
subscribeRuleDataChanges(realPath);
});
}
Expand All @@ -137,11 +154,11 @@ private void watcherRule(final String pluginName) {

private void watchAppAuth() {
final String appAuthParent = DefaultPathConstants.APP_AUTH_PARENT;
List<String> childrenList = etcdClientGetChildren(appAuthParent);
List<String> childrenList = etcdClientGetChildrenByMap(appAuthParent, keysMap);
if (CollectionUtils.isNotEmpty(childrenList)) {
childrenList.forEach(children -> {
String realPath = buildRealPath(appAuthParent, children);
cacheAuthData(etcdClient.get(realPath));
cacheAuthData(keysMap.get(realPath));
subscribeAppAuthDataChanges(realPath);
});
}
Expand All @@ -150,11 +167,11 @@ private void watchAppAuth() {

private void watchMetaData() {
final String metaDataPath = DefaultPathConstants.META_DATA;
List<String> childrenList = etcdClientGetChildren(metaDataPath);
List<String> childrenList = etcdClientGetChildrenByMap(metaDataPath, keysMap);
if (CollectionUtils.isNotEmpty(childrenList)) {
childrenList.forEach(children -> {
String realPath = buildRealPath(metaDataPath, children);
cacheMetaData(etcdClient.get(realPath));
cacheMetaData(keysMap.get(realPath));
subscribeMetaDataChanges(realPath);
});
}
Expand All @@ -165,25 +182,25 @@ private void subscribeChildChanges(final ConfigGroupEnum groupKey, final String
switch (groupKey) {
case SELECTOR:
etcdClient.watchChildChange(groupParentPath, (updatePath, updateValue) -> {
cacheSelectorData(etcdClient.get(updatePath));
cacheSelectorData(keysMap.get(updatePath));
subscribeSelectorDataChanges(updatePath);
}, null);
break;
case RULE:
etcdClient.watchChildChange(groupParentPath, (updatePath, updateValue) -> {
cacheRuleData(etcdClient.get(updatePath));
cacheRuleData(keysMap.get(updatePath));
subscribeRuleDataChanges(updatePath);
}, null);
break;
case APP_AUTH:
etcdClient.watchChildChange(groupParentPath, (updatePath, updateValue) -> {
cacheAuthData(etcdClient.get(updatePath));
cacheAuthData(keysMap.get(updatePath));
subscribeAppAuthDataChanges(updatePath);
}, null);
break;
case META_DATA:
etcdClient.watchChildChange(groupParentPath, (updatePath, updateValue) -> {
cacheMetaData(etcdClient.get(updatePath));
cacheMetaData(keysMap.get(updatePath));
subscribeMetaDataChanges(updatePath);
}, null);
break;
Expand All @@ -195,7 +212,7 @@ private void subscribeChildChanges(final ConfigGroupEnum groupKey, final String
private void subscribePluginDataChanges(final String pluginPath, final String pluginName) {
etcdClient.watchDataChange(pluginPath, (updatePath, updateValue) -> {
final String dataPath = buildRealPath(pluginPath, updatePath);
final String dataStr = etcdClient.get(dataPath);
final String dataStr = keysMap.get(dataPath);
final PluginData data = GsonUtils.getInstance().fromJson(dataStr, PluginData.class);
Optional.ofNullable(data)
.ifPresent(d -> Optional.ofNullable(pluginDataSubscriber).ifPresent(e -> e.onSubscribe(d)));
Expand Down Expand Up @@ -323,6 +340,10 @@ private List<String> etcdClientGetChildren(final String parent) {
return Collections.emptyList();
}

private List<String> etcdClientGetChildrenByMap(final String parent, final Map<String, String> map) {
return etcdClient.getChildrenKeysByMap(parent, "/", map);
}

@Override
public void close() {
if (Objects.nonNull(etcdClient)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ public void setUp() {
* mock get method.
*/
when(client.getKVClient()).thenReturn(kv);
when(kv.get(any())).thenReturn(future);
try {
when(future.get()).thenReturn(getResponse);
} catch (Exception e) {
Expand Down

0 comments on commit 04f650d

Please sign in to comment.