From 04f650db6c070979ba98f395c641dad4ca4a4079 Mon Sep 17 00:00:00 2001 From: Misaya295 <45778734+misaya295@users.noreply.github.com> Date: Mon, 31 Oct 2022 14:51:35 +0800 Subject: [PATCH] [ISSUE #4088] fix sync data by etcd io.vertx.core.VertxException (#4141) * fix #4088 * fix sync data * fix TU * refactor code Co-authored-by: xiaoyu --- .../shenyu/sync/data/etcd/EtcdClient.java | 71 ++++++++++++++++--- .../sync/data/etcd/EtcdSyncDataService.java | 55 +++++++++----- .../data/etcd/EtcdSyncDataServiceTest.java | 1 - 3 files changed, 99 insertions(+), 28 deletions(-) diff --git a/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdClient.java b/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdClient.java index b334b9a93809..596bbad4cea8 100644 --- a/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdClient.java +++ b/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdClient.java @@ -25,11 +25,12 @@ import io.etcd.jetcd.options.WatchOption; import io.etcd.jetcd.watch.WatchEvent; import org.apache.commons.collections4.CollectionUtils; +import org.apache.shenyu.common.exception.ShenyuException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.charset.StandardCharsets; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; @@ -37,6 +38,8 @@ import java.util.function.Consumer; import java.util.stream.Collectors; +import static java.nio.charset.StandardCharsets.UTF_8; + /** * Etcd client of Bootstrap. */ @@ -71,7 +74,7 @@ public void close() { public String get(final String key) { List keyValues = null; try { - keyValues = client.getKVClient().get(ByteSequence.from(key, StandardCharsets.UTF_8)).get().getKvs(); + keyValues = client.getKVClient().get(bytesOf(key)).get().getKvs(); } catch (InterruptedException | ExecutionException e) { LOG.error(e.getMessage(), e); } @@ -80,7 +83,37 @@ public String get(final String key) { return null; } - return keyValues.iterator().next().getValue().toString(StandardCharsets.UTF_8); + return keyValues.iterator().next().getValue().toString(UTF_8); + } + + /** + * get keys by prefix. + * + * @param prefix key prefix. + * @return key valuesMap. + */ + public Map getKeysMapByPrefix(final String prefix) { + GetOption getOption = GetOption.newBuilder() + .isPrefix(true) + .build(); + try { + return this.client.getKVClient().get(bytesOf(prefix), getOption) + .get().getKvs().stream() + .collect(Collectors.toMap(e -> e.getKey().toString(UTF_8), e -> e.getValue().toString(UTF_8))); + } catch (ExecutionException | InterruptedException e) { + LOG.error("etcd getKeysMapByPrefix key {} error {}", prefix, e); + throw new ShenyuException(e); + } + + } + + /** + * bytesOf string. + * @param val val. + * @return bytes val. + */ + public ByteSequence bytesOf(final String val) { + return ByteSequence.from(val, UTF_8); } /** @@ -93,7 +126,7 @@ public String get(final String key) { * @throws InterruptedException the exception */ public List getChildrenKeys(final String prefix, final String separator) throws ExecutionException, InterruptedException { - ByteSequence prefixByteSequence = ByteSequence.from(prefix, StandardCharsets.UTF_8); + ByteSequence prefixByteSequence = bytesOf(prefix); GetOption getOption = GetOption.newBuilder() .withPrefix(prefixByteSequence) .withSortField(GetOption.SortTarget.KEY) @@ -106,7 +139,25 @@ public List getChildrenKeys(final String prefix, final String separator) .getKvs(); return keyValues.stream() - .map(e -> getSubNodeKeyName(prefix, e.getKey().toString(StandardCharsets.UTF_8), separator)) + .map(e -> getSubNodeKeyName(prefix, e.getKey().toString(UTF_8), separator)) + .distinct() + .filter(e -> Objects.nonNull(e)) + .collect(Collectors.toList()); + } + + /** + * get keyPrefix map. + * + * @param prefix key prefix. + * @param separator separator char + * @param map prefix map + * @return sub map + */ + public List getChildrenKeysByMap(final String prefix, final String separator, final Map map) { + + return map.entrySet().stream() + .filter(e -> e.getKey().contains(prefix)) + .map(e -> getSubNodeKeyName(prefix, e.getKey(), separator)) .distinct() .filter(e -> Objects.nonNull(e)) .collect(Collectors.toList()); @@ -131,7 +182,7 @@ public void watchDataChange(final String key, final BiConsumer updateHandler, final Consumer deleteHandler) { Watch.Listener listener = watch(updateHandler, deleteHandler); - Watch.Watcher watch = client.getWatchClient().watch(ByteSequence.from(key, StandardCharsets.UTF_8), listener); + Watch.Watcher watch = client.getWatchClient().watch(ByteSequence.from(key, UTF_8), listener); watchCache.put(key, watch); } @@ -147,9 +198,9 @@ public void watchChildChange(final String key, final Consumer deleteHandler) { Watch.Listener listener = watch(updateHandler, deleteHandler); WatchOption option = WatchOption.newBuilder() - .withPrefix(ByteSequence.from(key, StandardCharsets.UTF_8)) + .withPrefix(ByteSequence.from(key, UTF_8)) .build(); - Watch.Watcher watch = client.getWatchClient().watch(ByteSequence.from(key, StandardCharsets.UTF_8), option, listener); + Watch.Watcher watch = client.getWatchClient().watch(ByteSequence.from(key, UTF_8), option, listener); watchCache.put(key, watch); } @@ -157,8 +208,8 @@ private Watch.Listener watch(final BiConsumer updateHandler, final Consumer deleteHandler) { return Watch.listener(response -> { for (WatchEvent event : response.getEvents()) { - String path = event.getKeyValue().getKey().toString(StandardCharsets.UTF_8); - String value = event.getKeyValue().getValue().toString(StandardCharsets.UTF_8); + String path = event.getKeyValue().getKey().toString(UTF_8); + String value = event.getKeyValue().getValue().toString(UTF_8); switch (event.getEventType()) { case PUT: updateHandler.accept(path, value); diff --git a/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataService.java b/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataService.java index cfb6c5409303..889e2cbfe4f2 100644 --- a/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataService.java +++ b/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataService.java @@ -38,10 +38,13 @@ import java.io.UnsupportedEncodingException; import java.net.URLDecoder; import java.nio.charset.StandardCharsets; -import java.util.Collections; + import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Collections; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; /** @@ -54,6 +57,8 @@ public class EtcdSyncDataService implements SyncDataService { */ private static final Logger LOG = LoggerFactory.getLogger(EtcdSyncDataService.class); + private static final String PRE_FIX = "/shenyu"; + private final EtcdClient etcdClient; private final PluginDataSubscriber pluginDataSubscriber; @@ -62,6 +67,8 @@ public class EtcdSyncDataService implements SyncDataService { private final List authDataSubscribers; + private Map keysMap = new ConcurrentHashMap<>(); + /** * Instantiates a new Zookeeper cache manager. * @@ -78,18 +85,28 @@ public EtcdSyncDataService(final EtcdClient etcdClient, this.pluginDataSubscriber = pluginDataSubscriber; this.metaDataSubscribers = metaDataSubscribers; this.authDataSubscribers = authDataSubscribers; + watchAllKeys(); watcherData(); watchAppAuth(); watchMetaData(); } + private void watchAllKeys() { + keysMap = etcdClient.getKeysMapByPrefix(PRE_FIX); + etcdClient.watchDataChange(PRE_FIX, (updateKey, updateValue) -> { + keysMap.put(updateKey, updateValue); + }, deleteKey -> { + keysMap.remove(deleteKey); + }); + + } + private void watcherData() { final String pluginParent = DefaultPathConstants.PLUGIN_PARENT; - List pluginChildren = etcdClientGetChildren(pluginParent); + List pluginChildren = etcdClientGetChildrenByMap(pluginParent, keysMap); for (String pluginName : pluginChildren) { watcherAll(pluginName); } - etcdClient.watchChildChange(pluginParent, (updateNode, updateValue) -> { if (!updateNode.isEmpty()) { watcherAll(updateNode); @@ -105,17 +122,17 @@ private void watcherAll(final String pluginName) { private void watcherPlugin(final String pluginName) { String pluginPath = DefaultPathConstants.buildPluginPath(pluginName); - cachePluginData(etcdClient.get(pluginPath)); + cachePluginData(keysMap.get(pluginPath)); subscribePluginDataChanges(pluginPath, pluginName); } private void watcherSelector(final String pluginName) { String selectorParentPath = DefaultPathConstants.buildSelectorParentPath(pluginName); - List childrenList = etcdClientGetChildren(selectorParentPath); + List childrenList = etcdClientGetChildrenByMap(selectorParentPath, keysMap); if (CollectionUtils.isNotEmpty(childrenList)) { childrenList.forEach(children -> { String realPath = buildRealPath(selectorParentPath, children); - cacheSelectorData(etcdClient.get(realPath)); + cacheSelectorData(keysMap.get(realPath)); subscribeSelectorDataChanges(realPath); }); } @@ -124,11 +141,11 @@ private void watcherSelector(final String pluginName) { private void watcherRule(final String pluginName) { String ruleParent = DefaultPathConstants.buildRuleParentPath(pluginName); - List childrenList = etcdClientGetChildren(ruleParent); + List childrenList = etcdClientGetChildrenByMap(ruleParent, keysMap); if (CollectionUtils.isNotEmpty(childrenList)) { childrenList.forEach(children -> { String realPath = buildRealPath(ruleParent, children); - cacheRuleData(etcdClient.get(realPath)); + cacheRuleData(keysMap.get(realPath)); subscribeRuleDataChanges(realPath); }); } @@ -137,11 +154,11 @@ private void watcherRule(final String pluginName) { private void watchAppAuth() { final String appAuthParent = DefaultPathConstants.APP_AUTH_PARENT; - List childrenList = etcdClientGetChildren(appAuthParent); + List childrenList = etcdClientGetChildrenByMap(appAuthParent, keysMap); if (CollectionUtils.isNotEmpty(childrenList)) { childrenList.forEach(children -> { String realPath = buildRealPath(appAuthParent, children); - cacheAuthData(etcdClient.get(realPath)); + cacheAuthData(keysMap.get(realPath)); subscribeAppAuthDataChanges(realPath); }); } @@ -150,11 +167,11 @@ private void watchAppAuth() { private void watchMetaData() { final String metaDataPath = DefaultPathConstants.META_DATA; - List childrenList = etcdClientGetChildren(metaDataPath); + List childrenList = etcdClientGetChildrenByMap(metaDataPath, keysMap); if (CollectionUtils.isNotEmpty(childrenList)) { childrenList.forEach(children -> { String realPath = buildRealPath(metaDataPath, children); - cacheMetaData(etcdClient.get(realPath)); + cacheMetaData(keysMap.get(realPath)); subscribeMetaDataChanges(realPath); }); } @@ -165,25 +182,25 @@ private void subscribeChildChanges(final ConfigGroupEnum groupKey, final String switch (groupKey) { case SELECTOR: etcdClient.watchChildChange(groupParentPath, (updatePath, updateValue) -> { - cacheSelectorData(etcdClient.get(updatePath)); + cacheSelectorData(keysMap.get(updatePath)); subscribeSelectorDataChanges(updatePath); }, null); break; case RULE: etcdClient.watchChildChange(groupParentPath, (updatePath, updateValue) -> { - cacheRuleData(etcdClient.get(updatePath)); + cacheRuleData(keysMap.get(updatePath)); subscribeRuleDataChanges(updatePath); }, null); break; case APP_AUTH: etcdClient.watchChildChange(groupParentPath, (updatePath, updateValue) -> { - cacheAuthData(etcdClient.get(updatePath)); + cacheAuthData(keysMap.get(updatePath)); subscribeAppAuthDataChanges(updatePath); }, null); break; case META_DATA: etcdClient.watchChildChange(groupParentPath, (updatePath, updateValue) -> { - cacheMetaData(etcdClient.get(updatePath)); + cacheMetaData(keysMap.get(updatePath)); subscribeMetaDataChanges(updatePath); }, null); break; @@ -195,7 +212,7 @@ private void subscribeChildChanges(final ConfigGroupEnum groupKey, final String private void subscribePluginDataChanges(final String pluginPath, final String pluginName) { etcdClient.watchDataChange(pluginPath, (updatePath, updateValue) -> { final String dataPath = buildRealPath(pluginPath, updatePath); - final String dataStr = etcdClient.get(dataPath); + final String dataStr = keysMap.get(dataPath); final PluginData data = GsonUtils.getInstance().fromJson(dataStr, PluginData.class); Optional.ofNullable(data) .ifPresent(d -> Optional.ofNullable(pluginDataSubscriber).ifPresent(e -> e.onSubscribe(d))); @@ -323,6 +340,10 @@ private List etcdClientGetChildren(final String parent) { return Collections.emptyList(); } + private List etcdClientGetChildrenByMap(final String parent, final Map map) { + return etcdClient.getChildrenKeysByMap(parent, "/", map); + } + @Override public void close() { if (Objects.nonNull(etcdClient)) { diff --git a/shenyu-sync-data-center/shenyu-sync-data-etcd/src/test/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataServiceTest.java b/shenyu-sync-data-center/shenyu-sync-data-etcd/src/test/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataServiceTest.java index 3d5824cf41d2..e1cd37c0af4a 100644 --- a/shenyu-sync-data-center/shenyu-sync-data-etcd/src/test/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataServiceTest.java +++ b/shenyu-sync-data-center/shenyu-sync-data-etcd/src/test/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataServiceTest.java @@ -101,7 +101,6 @@ public void setUp() { * mock get method. */ when(client.getKVClient()).thenReturn(kv); - when(kv.get(any())).thenReturn(future); try { when(future.get()).thenReturn(getResponse); } catch (Exception e) {