From b2bfbc76d13106a3211f5af1713ba50848560060 Mon Sep 17 00:00:00 2001 From: Huxing Zhang Date: Sat, 16 Mar 2019 13:11:33 +0800 Subject: [PATCH] [Dubbo-3653] etcd as config center (#3663) * Minor refactor, no functinoal change. * Separate ConnectionStateListener * Simplify code * Fix typo * Support get external config from etcd config center * Polish diamond operator * Initial etcd support as config center * Add a put interface for JEtcdClient * Enhanced Etcd config center support with the ability to watch and cancel watch * Polish code * Distinguish modification event and delete event * Add etcd registry and configcenter to dubbo-all * Watch again when connection is re-established --- dubbo-all/pom.xml | 16 + dubbo-bom/pom.xml | 5 + .../dubbo-configcenter-etcd/pom.xml | 46 +++ .../etcd/EtcdDynamicConfiguration.java | 187 ++++++++++++ .../etcd/EtcdDynamicConfigurationFactory.java | 33 ++ ...o.configcenter.DynamicConfigurationFactory | 1 + .../etcd/EtcdDynamicConfigurationTest.java | 141 +++++++++ dubbo-configcenter/pom.xml | 1 + .../dubbo/registry/etcd/EtcdRegistry.java | 18 +- .../dubbo/remoting/etcd/EtcdClient.java | 16 + .../etcd/jetcd/ConnectionStateListener.java | 31 ++ .../remoting/etcd/jetcd/JEtcdClient.java | 22 +- .../etcd/jetcd/JEtcdClientWrapper.java | 282 +++++++++--------- .../etcd/support/AbstractEtcdClient.java | 4 +- .../remoting/etcd/jetcd/JEtcdClientTest.java | 166 +++++++++++ 15 files changed, 811 insertions(+), 158 deletions(-) create mode 100644 dubbo-configcenter/dubbo-configcenter-etcd/pom.xml create mode 100644 dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfiguration.java create mode 100644 dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfigurationFactory.java create mode 100644 dubbo-configcenter/dubbo-configcenter-etcd/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.configcenter.DynamicConfigurationFactory create mode 100644 dubbo-configcenter/dubbo-configcenter-etcd/src/test/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfigurationTest.java create mode 100644 dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/ConnectionStateListener.java diff --git a/dubbo-all/pom.xml b/dubbo-all/pom.xml index edb0f361348..a4a0b5cc3cd 100644 --- a/dubbo-all/pom.xml +++ b/dubbo-all/pom.xml @@ -241,6 +241,13 @@ compile true + + org.apache.dubbo + dubbo-registry-etcd3 + ${project.version} + compile + true + org.apache.dubbo dubbo-monitor-api @@ -360,6 +367,13 @@ compile true + + org.apache.dubbo + dubbo-configcenter-etcd + ${project.version} + compile + true + org.apache.dubbo dubbo-compatible @@ -494,6 +508,7 @@ org.apache.dubbo:dubbo-registry-zookeeper org.apache.dubbo:dubbo-registry-redis org.apache.dubbo:dubbo-registry-consul + org.apache.dubbo:dubbo-registry-etcd3 org.apache.dubbo:dubbo-monitor-api org.apache.dubbo:dubbo-monitor-default org.apache.dubbo:dubbo-config-api @@ -515,6 +530,7 @@ org.apache.dubbo:dubbo-configcenter-apollo org.apache.dubbo:dubbo-configcenter-zookeeper org.apache.dubbo:dubbo-configcenter-consul + org.apache.dubbo:dubbo-configcenter-etcd org.apache.dubbo:dubbo-metadata-report-api org.apache.dubbo:dubbo-metadata-definition org.apache.dubbo:dubbo-metadata-report-redis diff --git a/dubbo-bom/pom.xml b/dubbo-bom/pom.xml index d04495a47ce..47875b83ffb 100644 --- a/dubbo-bom/pom.xml +++ b/dubbo-bom/pom.xml @@ -343,6 +343,11 @@ dubbo-configcenter-consul ${project.version} + + org.apache.dubbo + dubbo-configcenter-etcd + ${project.version} + org.apache.dubbo dubbo-metadata-definition diff --git a/dubbo-configcenter/dubbo-configcenter-etcd/pom.xml b/dubbo-configcenter/dubbo-configcenter-etcd/pom.xml new file mode 100644 index 00000000000..60efc8e4bb9 --- /dev/null +++ b/dubbo-configcenter/dubbo-configcenter-etcd/pom.xml @@ -0,0 +1,46 @@ + + + + + + dubbo-configcenter + org.apache.dubbo + 2.7.1-SNAPSHOT + + 4.0.0 + + dubbo-configcenter-etcd + jar + ${project.artifactId} + The etcd implementation of the config-center api + + + + org.apache.dubbo + dubbo-configcenter-api + ${project.parent.version} + + + org.apache.dubbo + dubbo-remoting-etcd3 + ${project.parent.version} + + + \ No newline at end of file diff --git a/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfiguration.java b/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfiguration.java new file mode 100644 index 00000000000..18e90887592 --- /dev/null +++ b/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfiguration.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dubbo.configcenter.support.etcd; + +import com.google.protobuf.ByteString; +import io.etcd.jetcd.api.Event; +import io.etcd.jetcd.api.WatchCancelRequest; +import io.etcd.jetcd.api.WatchCreateRequest; +import io.etcd.jetcd.api.WatchGrpc; +import io.etcd.jetcd.api.WatchRequest; +import io.etcd.jetcd.api.WatchResponse; +import io.grpc.ManagedChannel; +import io.grpc.stub.StreamObserver; +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.utils.StringUtils; +import org.apache.dubbo.configcenter.ConfigChangeEvent; +import org.apache.dubbo.configcenter.ConfigChangeType; +import org.apache.dubbo.configcenter.ConfigurationListener; +import org.apache.dubbo.configcenter.DynamicConfiguration; +import org.apache.dubbo.remoting.etcd.StateListener; +import org.apache.dubbo.remoting.etcd.jetcd.JEtcdClient; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.dubbo.common.Constants.CONFIG_NAMESPACE_KEY; +import static org.apache.dubbo.common.Constants.PATH_SEPARATOR; + +/** + * The etcd implementation of {@link DynamicConfiguration} + */ +public class EtcdDynamicConfiguration implements DynamicConfiguration { + + /** + * The final root path would be: /$NAME_SPACE/config + */ + private String rootPath; + + /** + * The etcd client + */ + private final JEtcdClient etcdClient; + + /** + * The map store the key to {@link EtcdConfigWatcher} mapping + */ + private final ConcurrentMap watchListenerMap; + + EtcdDynamicConfiguration(URL url) { + rootPath = "/" + url.getParameter(CONFIG_NAMESPACE_KEY, DEFAULT_GROUP) + "/config"; + etcdClient = new JEtcdClient(url); + etcdClient.addStateListener(state -> { + if (state == StateListener.CONNECTED) { + try { + recover(); + } catch (Exception e) { + // ignore + } + } + }); + watchListenerMap = new ConcurrentHashMap<>(); + } + + @Override + public void addListener(String key, String group, ConfigurationListener listener) { + if (watchListenerMap.get(listener) == null) { + String normalizedKey = convertKey(key); + EtcdConfigWatcher watcher = new EtcdConfigWatcher(normalizedKey, listener); + watchListenerMap.put(listener, watcher); + watcher.watch(); + } + } + + @Override + public void removeListener(String key, String group, ConfigurationListener listener) { + EtcdConfigWatcher watcher = watchListenerMap.get(listener); + watcher.cancelWatch(); + } + + // TODO Abstract the logic into super class + @Override + public String getConfig(String key, String group, long timeout) throws IllegalStateException { + if (StringUtils.isNotEmpty(group)) { + key = group + PATH_SEPARATOR + key; + } else { + int i = key.lastIndexOf("."); + key = key.substring(0, i) + PATH_SEPARATOR + key.substring(i + 1); + } + return (String) getInternalProperty(rootPath + PATH_SEPARATOR + key); + } + + @Override + public Object getInternalProperty(String key) { + return etcdClient.getKVValue(key); + } + + + private String convertKey(String key) { + int index = key.lastIndexOf('.'); + return rootPath + PATH_SEPARATOR + key.substring(0, index) + PATH_SEPARATOR + key.substring(index + 1); + } + + private void recover() { + for (EtcdConfigWatcher watcher: watchListenerMap.values()) { + watcher.watch(); + } + } + + public class EtcdConfigWatcher implements StreamObserver { + + private ConfigurationListener listener; + protected WatchGrpc.WatchStub watchStub; + private StreamObserver observer; + protected long watchId; + private ManagedChannel channel; + private String key; + + public EtcdConfigWatcher(String key, ConfigurationListener listener) { + this.key = key; + this.listener = listener; + this.channel = etcdClient.getChannel(); + } + + @Override + public void onNext(WatchResponse watchResponse) { + this.watchId = watchResponse.getWatchId(); + for (Event etcdEvent : watchResponse.getEventsList()) { + ConfigChangeType type = ConfigChangeType.MODIFIED; + if (etcdEvent.getType() == Event.EventType.DELETE) { + type = ConfigChangeType.DELETED; + } + ConfigChangeEvent event = new ConfigChangeEvent( + etcdEvent.getKv().getKey().toString(UTF_8), + etcdEvent.getKv().getValue().toString(UTF_8), type); + listener.process(event); + } + } + + @Override + public void onError(Throwable throwable) { + // ignore + } + + @Override + public void onCompleted() { + // ignore + } + + public long getWatchId() { + return watchId; + } + + private void watch() { + watchStub = WatchGrpc.newStub(channel); + observer = watchStub.watch(this); + WatchCreateRequest.Builder builder = WatchCreateRequest.newBuilder() + .setKey(ByteString.copyFromUtf8(key)) + .setProgressNotify(true); + WatchRequest req = WatchRequest.newBuilder().setCreateRequest(builder).build(); + observer.onNext(req); + } + + private void cancelWatch() { + WatchCancelRequest watchCancelRequest = + WatchCancelRequest.newBuilder().setWatchId(watchId).build(); + WatchRequest cancelRequest = WatchRequest.newBuilder() + .setCancelRequest(watchCancelRequest).build(); + observer.onNext(cancelRequest); + } + } +} diff --git a/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfigurationFactory.java b/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfigurationFactory.java new file mode 100644 index 00000000000..02e91a62db7 --- /dev/null +++ b/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfigurationFactory.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dubbo.configcenter.support.etcd; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.configcenter.AbstractDynamicConfigurationFactory; +import org.apache.dubbo.configcenter.DynamicConfiguration; + +/** + * The etcd implementation of {@link AbstractDynamicConfigurationFactory} + */ +public class EtcdDynamicConfigurationFactory extends AbstractDynamicConfigurationFactory { + + @Override + protected DynamicConfiguration createDynamicConfiguration(URL url) { + return new EtcdDynamicConfiguration(url); + } +} diff --git a/dubbo-configcenter/dubbo-configcenter-etcd/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.configcenter.DynamicConfigurationFactory b/dubbo-configcenter/dubbo-configcenter-etcd/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.configcenter.DynamicConfigurationFactory new file mode 100644 index 00000000000..d84b1ae0e1a --- /dev/null +++ b/dubbo-configcenter/dubbo-configcenter-etcd/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.configcenter.DynamicConfigurationFactory @@ -0,0 +1 @@ +etcd=org.apache.dubbo.configcenter.support.etcd.EtcdDynamicConfigurationFactory \ No newline at end of file diff --git a/dubbo-configcenter/dubbo-configcenter-etcd/src/test/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfigurationTest.java b/dubbo-configcenter/dubbo-configcenter-etcd/src/test/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfigurationTest.java new file mode 100644 index 00000000000..87143fdcacc --- /dev/null +++ b/dubbo-configcenter/dubbo-configcenter-etcd/src/test/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfigurationTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dubbo.configcenter.support.etcd; + +import io.etcd.jetcd.ByteSequence; +import io.etcd.jetcd.Client; +import org.apache.dubbo.common.Constants; +import org.apache.dubbo.common.URL; +import org.apache.dubbo.configcenter.ConfigChangeEvent; +import org.apache.dubbo.configcenter.ConfigurationListener; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static java.nio.charset.StandardCharsets.UTF_8; + +/** + * Unit test for etcd config center support + * TODO Integrate with https://github.com/etcd-io/jetcd#launcher or using mock data. + */ +@Disabled +public class EtcdDynamicConfigurationTest { + + private static final String ENDPOINT = "http://127.0.0.1:2379"; + + private static EtcdDynamicConfiguration config; + + private static Client etcdClient; + + @Test + public void testGetConfig() { + put("/dubbo/config/org.apache.dubbo.etcd.testService/configurators", "hello"); + put("/dubbo/config/test/dubbo.properties", "aaa=bbb"); + Assertions.assertEquals("hello", config.getConfig("org.apache.dubbo.etcd.testService.configurators")); + Assertions.assertEquals("aaa=bbb", config.getConfig("dubbo.properties", "test")); + } + + @Test + public void testAddListener() throws Exception { + CountDownLatch latch = new CountDownLatch(4); + TestListener listener1 = new TestListener(latch); + TestListener listener2 = new TestListener(latch); + TestListener listener3 = new TestListener(latch); + TestListener listener4 = new TestListener(latch); + config.addListener("AService.configurators", listener1); + config.addListener("AService.configurators", listener2); + config.addListener("testapp.tagrouters", listener3); + config.addListener("testapp.tagrouters", listener4); + + put("/dubbo/config/AService/configurators", "new value1"); + Thread.sleep(200); + put("/dubbo/config/testapp/tagrouters", "new value2"); + Thread.sleep(200); + put("/dubbo/config/testapp", "new value3"); + + Thread.sleep(1000); + + Assertions.assertTrue(latch.await(5, TimeUnit.SECONDS)); + Assertions.assertEquals(1, listener1.getCount("/dubbo/config/AService/configurators")); + Assertions.assertEquals(1, listener2.getCount("/dubbo/config/AService/configurators")); + Assertions.assertEquals(1, listener3.getCount("/dubbo/config/testapp/tagrouters")); + Assertions.assertEquals(1, listener4.getCount("/dubbo/config/testapp/tagrouters")); + + Assertions.assertEquals("new value1", listener1.getValue()); + Assertions.assertEquals("new value1", listener2.getValue()); + Assertions.assertEquals("new value2", listener3.getValue()); + Assertions.assertEquals("new value2", listener4.getValue()); + } + + private class TestListener implements ConfigurationListener { + private CountDownLatch latch; + private String value; + private Map countMap = new HashMap<>(); + + public TestListener(CountDownLatch latch) { + this.latch = latch; + } + + @Override + public void process(ConfigChangeEvent event) { + Integer count = countMap.computeIfAbsent(event.getKey(), k -> 0); + countMap.put(event.getKey(), ++count); + value = event.getValue(); + latch.countDown(); + } + + public int getCount(String key) { + return countMap.get(key); + } + + public String getValue() { + return value; + } + } + + static void put(String key, String value) { + try { + etcdClient.getKVClient() + .put(ByteSequence.from(key, UTF_8), ByteSequence.from(value, UTF_8)) + .get(); + } catch (Exception e) { + System.out.println("Error put value to etcd."); + } + } + + @BeforeAll + static void setUp() { + etcdClient = Client.builder().endpoints(ENDPOINT).build(); + // timeout in 15 seconds. + URL url = URL.valueOf("etcd3://127.0.0.1:2379/org.apache.dubbo.etcd.testService") + .addParameter(Constants.SESSION_TIMEOUT_KEY, 15000); + config = new EtcdDynamicConfiguration(url); + } + + @AfterAll + static void tearDown() { + etcdClient.close(); + } +} diff --git a/dubbo-configcenter/pom.xml b/dubbo-configcenter/pom.xml index fa703be6864..92f727d2321 100644 --- a/dubbo-configcenter/pom.xml +++ b/dubbo-configcenter/pom.xml @@ -34,5 +34,6 @@ dubbo-configcenter-zookeeper dubbo-configcenter-apollo dubbo-configcenter-consul + dubbo-configcenter-etcd diff --git a/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistry.java b/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistry.java index 504d521aaed..f0d94067d25 100644 --- a/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistry.java +++ b/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistry.java @@ -71,7 +71,7 @@ public class EtcdRegistry extends FailbackRegistry { private final Set anyServices = new ConcurrentHashSet(); - private final ConcurrentMap> etcdListeners = new ConcurrentHashMap>(); + private final ConcurrentMap> etcdListeners = new ConcurrentHashMap<>(); private final EtcdClient etcdClient; private long expirePeriod; @@ -86,14 +86,12 @@ public EtcdRegistry(URL url, EtcdTransporter etcdTransporter) { } this.root = group; etcdClient = etcdTransporter.connect(url); - etcdClient.addStateListener(new StateListener() { - public void stateChanged(int state) { - if (state == CONNECTED) { - try { - recover(); - } catch (Exception e) { - logger.error(e.getMessage(), e); - } + etcdClient.addStateListener(state -> { + if (state == StateListener.CONNECTED) { + try { + recover(); + } catch (Exception e) { + logger.error(e.getMessage(), e); } } }); @@ -345,7 +343,7 @@ protected List toUnsubscribedPath(URL url) { } protected List toUrlsWithoutEmpty(URL consumer, List providers) { - List urls = new ArrayList(); + List urls = new ArrayList<>(); if (providers != null && providers.size() > 0) { for (String provider : providers) { provider = URL.decode(provider); diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java index b1e765d3416..286be934469 100644 --- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java +++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java @@ -164,4 +164,20 @@ public long createLease(long ttl, long timeout, TimeUnit unit) */ void revokeLease(long lease); + + /** + * Get the value of the specified key. + * @param key the specified key + * @return null if the value is not found + */ + String getKVValue(String key); + + /** + * Put the key value pair to etcd + * @param key the specified key + * @param value the paired value + * @return true if put success + */ + boolean put(String key, String value); + } diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/ConnectionStateListener.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/ConnectionStateListener.java new file mode 100644 index 00000000000..788aa401e9b --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/ConnectionStateListener.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dubbo.remoting.etcd.jetcd; + +import io.etcd.jetcd.Client; + +public interface ConnectionStateListener { + + /** + * Called when there is a state change in the connection + * + * @param client the client + * @param newState the new state + */ + void stateChanged(Client client, int newState); +} \ No newline at end of file diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java index d07cad06405..ff4c118b2a3 100644 --- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java +++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java @@ -17,6 +17,7 @@ package org.apache.dubbo.remoting.etcd.jetcd; +import io.grpc.ManagedChannel; import org.apache.dubbo.common.Constants; import org.apache.dubbo.common.URL; import org.apache.dubbo.common.logger.Logger; @@ -185,6 +186,20 @@ public void doClose() { } } + @Override + public String getKVValue(String key) { + return clientWrapper.getKVValue(key); + } + + @Override + public boolean put(String key, String value) { + return clientWrapper.put(key, value); + } + + public ManagedChannel getChannel() { + return clientWrapper.getChannel(); + } + public class EtcdWatcher implements StreamObserver { protected WatchGrpc.WatchStub watchStub; @@ -233,12 +248,7 @@ public void onNext(WatchResponse response) { } } if (modified > 0) { - notifyExecutor.execute(new Runnable() { - @Override - public void run() { - listener.childChanged(path, new ArrayList<>(urls)); - } - }); + notifyExecutor.execute(() -> listener.childChanged(path, new ArrayList<>(urls))); } } diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java index 8515b617ae9..c7f472d4011 100644 --- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java +++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java @@ -17,6 +17,7 @@ package org.apache.dubbo.remoting.etcd.jetcd; +import io.etcd.jetcd.kv.PutResponse; import org.apache.dubbo.common.URL; import org.apache.dubbo.common.logger.Logger; import org.apache.dubbo.common.logger.LoggerFactory; @@ -31,9 +32,11 @@ import io.etcd.jetcd.Client; import io.etcd.jetcd.ClientBuilder; import io.etcd.jetcd.CloseableClient; +import io.etcd.jetcd.KeyValue; import io.etcd.jetcd.Observers; import io.etcd.jetcd.common.exception.ErrorCode; import io.etcd.jetcd.common.exception.EtcdException; +import io.etcd.jetcd.kv.GetResponse; import io.etcd.jetcd.lease.LeaseKeepAliveResponse; import io.etcd.jetcd.options.GetOption; import io.etcd.jetcd.options.PutOption; @@ -87,7 +90,8 @@ public class JEtcdClientWrapper { private RuntimeException failed; private final ScheduledFuture retryFuture; - private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Etcd3RegistryKeepAliveFailedRetryTimer", true)); + private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, + new NamedThreadFactory("Etcd3RegistryKeepAliveFailedRetryTimer", true)); private final Set failedRegistered = new ConcurrentHashSet(); @@ -117,28 +121,26 @@ public JEtcdClientWrapper(URL url) { this.failed = new IllegalStateException("Etcd3 registry is not connected yet, url:" + url); int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD); - this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() { - public void run() { - try { - retry(); - } catch (Throwable t) { - logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t); - } + this.retryFuture = retryExecutor.scheduleWithFixedDelay(() -> { + try { + retry(); + } catch (Throwable t) { + logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t); } }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS); } private Client prepareClient(URL url) { - int maxInboudSize = DEFAULT_INBOUT_SIZE; - if (StringUtils.isNotEmpty(System.getProperty(GRPC_MAX_INBOUD_SIZE_KEY))) { - maxInboudSize = Integer.valueOf(System.getProperty(GRPC_MAX_INBOUD_SIZE_KEY)); + int maxInboundSize = DEFAULT_INBOUND_SIZE; + if (StringUtils.isNotEmpty(System.getProperty(GRPC_MAX_INBOUND_SIZE_KEY))) { + maxInboundSize = Integer.valueOf(System.getProperty(GRPC_MAX_INBOUND_SIZE_KEY)); } ClientBuilder clientBuilder = Client.builder() .loadBalancerFactory(RoundRobinLoadBalancerFactory.getInstance()) .endpoints(endPoints(url.getBackupAddress())) - .maxInboundMessageSize(maxInboudSize); + .maxInboundMessageSize(maxInboundSize); return clientBuilder.build(); } @@ -170,29 +172,26 @@ public ManagedChannel getChannel() { public List getChildren(String path) { try { return RetryLoops.invokeWithRetry( - new Callable>() { - @Override - public List call() throws Exception { - requiredNotNull(client, failed); - int len = path.length(); - return client.getKVClient() - .get(ByteSequence.from(path, UTF_8), - GetOption.newBuilder().withPrefix(ByteSequence.from(path, UTF_8)).build()) - .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS) - .getKvs().stream().parallel() - .filter(pair -> { - String key = pair.getKey().toString(UTF_8); - int index = len, count = 0; - if (key.length() > len) { - for (; (index = key.indexOf(Constants.PATH_SEPARATOR, index)) != -1; ++index) { - if (count++ > 1) break; - } + () -> { + requiredNotNull(client, failed); + int len = path.length(); + return client.getKVClient() + .get(ByteSequence.from(path, UTF_8), + GetOption.newBuilder().withPrefix(ByteSequence.from(path, UTF_8)).build()) + .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS) + .getKvs().stream().parallel() + .filter(pair -> { + String key = pair.getKey().toString(UTF_8); + int index = len, count = 0; + if (key.length() > len) { + for (; (index = key.indexOf(Constants.PATH_SEPARATOR, index)) != -1; ++index) { + if (count++ > 1) break; } - return count == 1; - }) - .map(pair -> pair.getKey().toString(UTF_8)) - .collect(toList()); - } + } + return count == 1; + }) + .map(pair -> pair.getKey().toString(UTF_8)) + .collect(toList()); }, retryPolicy); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); @@ -207,15 +206,12 @@ public boolean isConnected() { public long createLease(long second) { try { return RetryLoops.invokeWithRetry( - new Callable() { - @Override - public Long call() throws Exception { - requiredNotNull(client, failed); - return client.getLeaseClient() - .grant(second) - .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS) - .getID(); - } + () -> { + requiredNotNull(client, failed); + return client.getLeaseClient() + .grant(second) + .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS) + .getID(); }, retryPolicy); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); @@ -225,15 +221,12 @@ public Long call() throws Exception { public void revokeLease(long lease) { try { RetryLoops.invokeWithRetry( - new Callable() { - @Override - public Void call() throws Exception { - requiredNotNull(client, failed); - client.getLeaseClient() - .revoke(lease) - .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS); - return null; - } + (Callable) () -> { + requiredNotNull(client, failed); + client.getLeaseClient() + .revoke(lease) + .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS); + return null; }, retryPolicy); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); @@ -260,15 +253,12 @@ public long createLease(long ttl, long timeout, TimeUnit unit) public boolean checkExists(String path) { try { return RetryLoops.invokeWithRetry( - new Callable() { - @Override - public Boolean call() throws Exception { - requiredNotNull(client, failed); - return client.getKVClient() - .get(ByteSequence.from(path, UTF_8), GetOption.newBuilder().withCountOnly(true).build()) - .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS) - .getCount() > 0; - } + () -> { + requiredNotNull(client, failed); + return client.getKVClient() + .get(ByteSequence.from(path, UTF_8), GetOption.newBuilder().withCountOnly(true).build()) + .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS) + .getCount() > 0; }, retryPolicy); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); @@ -281,17 +271,14 @@ public Boolean call() throws Exception { protected Long find(String path) { try { return RetryLoops.invokeWithRetry( - new Callable() { - @Override - public Long call() throws Exception { - requiredNotNull(client, failed); - return client.getKVClient() - .get(ByteSequence.from(path, UTF_8)) - .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS) - .getKvs().stream() - .mapToLong(keyValue -> Long.valueOf(keyValue.getValue().toString(UTF_8))) - .findFirst().getAsLong(); - } + () -> { + requiredNotNull(client, failed); + return client.getKVClient() + .get(ByteSequence.from(path, UTF_8)) + .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS) + .getKvs().stream() + .mapToLong(keyValue -> Long.valueOf(keyValue.getValue().toString(UTF_8))) + .findFirst().getAsLong(); }, retryPolicy); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); @@ -301,16 +288,13 @@ public Long call() throws Exception { public void createPersistent(String path) { try { RetryLoops.invokeWithRetry( - new Callable() { - @Override - public Void call() throws Exception { - requiredNotNull(client, failed); - client.getKVClient() - .put(ByteSequence.from(path, UTF_8), - ByteSequence.from(String.valueOf(path.hashCode()), UTF_8)) - .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS); - return null; - } + (Callable) () -> { + requiredNotNull(client, failed); + client.getKVClient() + .put(ByteSequence.from(path, UTF_8), + ByteSequence.from(String.valueOf(path.hashCode()), UTF_8)) + .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS); + return null; }, retryPolicy); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); @@ -328,21 +312,18 @@ public Void call() throws Exception { public long createEphemeral(String path) { try { return RetryLoops.invokeWithRetry( - new Callable() { - @Override - public Long call() throws Exception { - requiredNotNull(client, failed); - - registeredPaths.add(path); - keepAlive(); - final long leaseId = globalLeaseId; - client.getKVClient() - .put(ByteSequence.from(path, UTF_8) - , ByteSequence.from(String.valueOf(leaseId), UTF_8) - , PutOption.newBuilder().withLeaseId(leaseId).build()) - .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS); - return leaseId; - } + () -> { + requiredNotNull(client, failed); + + registeredPaths.add(path); + keepAlive(); + final long leaseId = globalLeaseId; + client.getKVClient() + .put(ByteSequence.from(path, UTF_8) + , ByteSequence.from(String.valueOf(leaseId), UTF_8) + , PutOption.newBuilder().withLeaseId(leaseId).build()) + .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS); + return leaseId; }, retryPolicy); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); @@ -354,6 +335,7 @@ public void keepAlive(long lease) { this.keepAlive(lease, null); } + @SuppressWarnings("unchecked") private void keepAlive(long lease, Consumer onFailed) { final StreamObserver observer = new Observers.Builder() .onError((e) -> { @@ -471,16 +453,13 @@ private void recovery() { public void delete(String path) { try { RetryLoops.invokeWithRetry( - new Callable() { - @Override - public Void call() throws Exception { - requiredNotNull(client, failed); - client.getKVClient() - .delete(ByteSequence.from(path, UTF_8)) - .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS); - registeredPaths.remove(path); - return null; - } + (Callable) () -> { + requiredNotNull(client, failed); + client.getKVClient() + .delete(ByteSequence.from(path, UTF_8)) + .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS); + registeredPaths.remove(path); + return null; }, retryPolicy); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); @@ -494,13 +473,13 @@ public Void call() throws Exception { public String[] endPoints(String backupAddress) { String[] endpoints = backupAddress.split(Constants.COMMA_SEPARATOR); - List addressess = Arrays.stream(endpoints) - .map(address -> address.indexOf(Constants.HTTP_SUBFIX_KEY) > -1 + List addresses = Arrays.stream(endpoints) + .map(address -> address.contains(Constants.HTTP_SUBFIX_KEY) ? address : Constants.HTTP_KEY + address) .collect(toList()); - Collections.shuffle(addressess); - return addressess.toArray(new String[0]); + Collections.shuffle(addresses); + return addresses.toArray(new String[0]); } /** @@ -527,26 +506,22 @@ public void start() { } try { - this.future = reconnectNotify.scheduleWithFixedDelay(new Runnable() { - @Override - public void run() { - boolean connected = isConnected(); - if (connectState != connected) { - int notifyState = connected ? StateListener.CONNECTED : StateListener.DISCONNECTED; - if (connectionStateListener != null) { - try { - if (connected) { - clearKeepAlive(); - } - connectionStateListener.stateChanged(getClient(), notifyState); - } finally { - cancelKeepAlive = false; + this.future = reconnectNotify.scheduleWithFixedDelay(() -> { + boolean connected = isConnected(); + if (connectState != connected) { + int notifyState = connected ? StateListener.CONNECTED : StateListener.DISCONNECTED; + if (connectionStateListener != null) { + try { + if (connected) { + clearKeepAlive(); } + connectionStateListener.stateChanged(getClient(), notifyState); + } finally { + cancelKeepAlive = false; } - connectState = connected; } + connectState = connected; } - }, Constants.DEFAULT_REGISTRY_RECONNECT_PERIOD, Constants.DEFAULT_REGISTRY_RECONNECT_PERIOD, TimeUnit.MILLISECONDS); } catch (Throwable t) { logger.error("monitor reconnect status failed.", t); @@ -575,7 +550,9 @@ protected void doClose() { try { cancelKeepAlive = true; - revokeLease(this.globalLeaseId); + if (globalLeaseId > 0) { + revokeLease(this.globalLeaseId); + } } catch (Exception e) { logger.warn("revoke global lease '" + globalLeaseId + "' failed, registry: " + url, e); } @@ -638,6 +615,41 @@ public static void requiredNotNull(Object obj, RuntimeException exeception) { } } + public String getKVValue(String key) { + if (null == key) { + return null; + } + + CompletableFuture responseFuture = this.client.getKVClient().get(ByteSequence.from(key, UTF_8)); + + try { + List result = responseFuture.get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS).getKvs(); + if (!result.isEmpty()) { + return result.get(0).getValue().toString(UTF_8); + } + } catch (Exception e) { + // ignore + } + + return null; + } + + + public boolean put(String key, String value) { + if (key == null || value == null) { + return false; + } + CompletableFuture putFuture = + this.client.getKVClient().put(ByteSequence.from(key, UTF_8), ByteSequence.from(value, UTF_8)); + try { + putFuture.get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS); + return true; + } catch (Exception e) { + // ignore + } + return false; + } + private void retry() { if (!failedRegistered.isEmpty()) { Set failed = new HashSet(failedRegistered); @@ -679,24 +691,14 @@ private void retry() { } } - public interface ConnectionStateListener { - /** - * Called when there is a state change in the connection - * - * @param client the client - * @param newState the new state - */ - public void stateChanged(Client client, int newState); - } - /** * default request timeout */ public static final long DEFAULT_REQUEST_TIMEOUT = obtainRequestTimeout(); - public static final int DEFAULT_INBOUT_SIZE = 100 * 1024 * 1024; + public static final int DEFAULT_INBOUND_SIZE = 100 * 1024 * 1024; - public static final String GRPC_MAX_INBOUD_SIZE_KEY = "grpc.max.inbound.size"; + public static final String GRPC_MAX_INBOUND_SIZE_KEY = "grpc.max.inbound.size"; public static final String ETCD_REQUEST_TIMEOUT_KEY = "etcd.request.timeout"; diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/support/AbstractEtcdClient.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/support/AbstractEtcdClient.java index 31752bffe8f..5fecd14d395 100644 --- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/support/AbstractEtcdClient.java +++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/support/AbstractEtcdClient.java @@ -57,7 +57,7 @@ public abstract class AbstractEtcdClient implements EtcdClient private final Set stateListeners = new ConcurrentHashSet<>(); - private final ConcurrentMap> childListeners = new ConcurrentHashMap>(); + private final ConcurrentMap> childListeners = new ConcurrentHashMap<>(); private final List categroies = Arrays.asList(Constants.PROVIDERS_CATEGORY , Constants.CONSUMERS_CATEGORY , Constants.ROUTERS_CATEGORY @@ -99,7 +99,7 @@ public Set getSessionListeners() { public List addChildListener(String path, final ChildListener listener) { ConcurrentMap listeners = childListeners.get(path); if (listeners == null) { - childListeners.putIfAbsent(path, new ConcurrentHashMap()); + childListeners.putIfAbsent(path, new ConcurrentHashMap<>()); listeners = childListeners.get(path); } WatcherListener targetListener = listeners.get(listener); diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientTest.java b/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientTest.java index 19254abeac7..9674feec35d 100644 --- a/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientTest.java +++ b/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientTest.java @@ -33,8 +33,21 @@ */ package org.apache.dubbo.remoting.etcd.jetcd; +import com.google.protobuf.ByteString; +import io.etcd.jetcd.ByteSequence; +import io.etcd.jetcd.Client; +import io.etcd.jetcd.Watch; +import io.etcd.jetcd.api.Event; +import io.etcd.jetcd.api.WatchCancelRequest; +import io.etcd.jetcd.api.WatchCreateRequest; +import io.etcd.jetcd.api.WatchGrpc; +import io.etcd.jetcd.api.WatchRequest; +import io.etcd.jetcd.api.WatchResponse; import io.etcd.jetcd.common.exception.ClosedClientException; +import io.etcd.jetcd.watch.WatchEvent; +import io.grpc.ManagedChannel; import io.grpc.Status; +import io.grpc.stub.StreamObserver; import org.apache.dubbo.common.Constants; import org.apache.dubbo.common.URL; import org.apache.dubbo.remoting.etcd.ChildListener; @@ -44,8 +57,13 @@ import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static java.nio.charset.StandardCharsets.UTF_8; @Disabled public class JEtcdClientTest { @@ -75,6 +93,139 @@ public void test_watch_when_create_path() throws InterruptedException { client.delete(child); } + @Test + public void test_watch_when_modify() { + String path = "/dubbo/config/jetcd-client-unit-test/configurators"; + String endpoint = "http://127.0.0.1:2379"; + CountDownLatch latch = new CountDownLatch(1); + ByteSequence key = ByteSequence.from(path, UTF_8); + + Watch.Listener listener = Watch.listener(response -> { + for (WatchEvent event : response.getEvents()) { + Assertions.assertEquals("PUT", event.getEventType().toString()); + Assertions.assertEquals(path, event.getKeyValue().getKey().toString(UTF_8)); + Assertions.assertEquals("Hello", event.getKeyValue().getValue().toString(UTF_8)); + latch.countDown(); + } + + }); + + try (Client client = Client.builder().endpoints(endpoint).build(); + Watch watch = client.getWatchClient(); + Watch.Watcher watcher = watch.watch(key, listener)) { + // try to modify the key + client.getKVClient().put(ByteSequence.from(path, UTF_8), ByteSequence.from("Hello", UTF_8)); + latch.await(); + } catch (Exception e) { + Assertions.fail(e.getMessage()); + } + } + + @Test + public void testWatchWithGrpc() { + String path = "/dubbo/config/test_watch_with_grpc/configurators"; + String endpoint = "http://127.0.0.1:2379"; + CountDownLatch latch = new CountDownLatch(1); + try (Client client = Client.builder().endpoints(endpoint).build()) { + ManagedChannel channel = getChannel(client); + StreamObserver observer = WatchGrpc.newStub(channel).watch(new StreamObserver() { + @Override + public void onNext(WatchResponse response) { + for (Event event : response.getEventsList()) { + Assertions.assertEquals("PUT", event.getType().toString()); + Assertions.assertEquals(path, event.getKv().getKey().toString(UTF_8)); + Assertions.assertEquals("Hello", event.getKv().getValue().toString(UTF_8)); + latch.countDown(); + } + } + + @Override + public void onError(Throwable throwable) { + + } + + @Override + public void onCompleted() { + + } + }); + WatchCreateRequest.Builder builder = WatchCreateRequest.newBuilder() + .setKey(ByteString.copyFrom(path, UTF_8)); + + observer.onNext(WatchRequest.newBuilder().setCreateRequest(builder).build()); + + // try to modify the key + client.getKVClient().put(ByteSequence.from(path, UTF_8), ByteSequence.from("Hello", UTF_8)); + latch.await(5, TimeUnit.SECONDS); + } catch (Exception e) { + Assertions.fail(e.getMessage()); + } + } + + @Test + public void testCancelWatchWithGrpc() { + String path = "/dubbo/config/testCancelWatchWithGrpc/configurators"; + String endpoint = "http://127.0.0.1:2379"; + CountDownLatch updateLatch = new CountDownLatch(1); + CountDownLatch cancelLatch = new CountDownLatch(1); + final AtomicLong watchID = new AtomicLong(-1L); + try (Client client = Client.builder().endpoints(endpoint).build()) { + ManagedChannel channel = getChannel(client); + StreamObserver observer = WatchGrpc.newStub(channel).watch(new StreamObserver() { + @Override + public void onNext(WatchResponse response) { + watchID.set(response.getWatchId()); + for (Event event : response.getEventsList()) { + Assertions.assertEquals("PUT", event.getType().toString()); + Assertions.assertEquals(path, event.getKv().getKey().toString(UTF_8)); + Assertions.assertEquals("Hello", event.getKv().getValue().toString(UTF_8)); + updateLatch.countDown(); + } + if (response.getCanceled()) { + // received the cancel response + cancelLatch.countDown(); + } + } + + @Override + public void onError(Throwable throwable) { + + } + + @Override + public void onCompleted() { + + } + }); + // create + WatchCreateRequest.Builder builder = WatchCreateRequest.newBuilder() + .setKey(ByteString.copyFrom(path, UTF_8)); + + // make the grpc call to watch the key + observer.onNext(WatchRequest.newBuilder().setCreateRequest(builder).build()); + + // try to put the value + client.getKVClient().put(ByteSequence.from(path, UTF_8), ByteSequence.from("Hello", UTF_8)); + + // response received, latch counts down to zero + updateLatch.await(); + + WatchCancelRequest watchCancelRequest = + WatchCancelRequest.newBuilder().setWatchId(watchID.get()).build(); + WatchRequest cancelRequest = WatchRequest.newBuilder() + .setCancelRequest(watchCancelRequest).build(); + observer.onNext(cancelRequest); + + // try to put the value + client.getKVClient().put(ByteSequence.from(path, UTF_8), ByteSequence.from("Hello world", UTF_8)); + + cancelLatch.await(); + } catch (Exception e) { + Assertions.fail(e.getMessage()); + } + + } + @Test public void test_watch_when_create_wrong_path() throws InterruptedException { @@ -257,4 +408,19 @@ synchronized int increaseAndGet() { return ++value; } } + + private ManagedChannel getChannel(Client client) { + try { + // hack, use reflection to get the shared channel. + Field connectionField = client.getClass().getDeclaredField("connectionManager"); + connectionField.setAccessible(true); + Object connection = connectionField.get(client); + Method channelMethod = connection.getClass().getDeclaredMethod("getChannel"); + channelMethod.setAccessible(true); + ManagedChannel channel = (ManagedChannel) channelMethod.invoke(connection); + return channel; + } catch (Exception e) { + return null; + } + } }