From d0fb3bc09d07f6c931c537da32125bca44b587a1 Mon Sep 17 00:00:00 2001 From: xiuyuhang <442367943@qq.com> Date: Thu, 8 Nov 2018 12:47:18 +0800 Subject: [PATCH 01/11] Abstract retry task --- .../registry/support/AbstractRetryTask.java | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) create mode 100644 dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRetryTask.java diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRetryTask.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRetryTask.java new file mode 100644 index 00000000000..5f78cf409f2 --- /dev/null +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRetryTask.java @@ -0,0 +1,36 @@ +package org.apache.dubbo.registry.support; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.timer.Timeout; +import org.apache.dubbo.common.timer.Timer; +import org.apache.dubbo.common.timer.TimerTask; + +import java.util.concurrent.TimeUnit; + +/** + * AbstractRetryTask + */ +public abstract class AbstractRetryTask implements TimerTask { + + /** + * url for retry task + */ + private final URL url; + + public AbstractRetryTask(URL url) { + this.url = url; + } + + private void reput(Timeout timeout, Long tick) { + if (timeout == null || tick == null) { + throw new IllegalArgumentException(); + } + + Timer timer = timeout.timer(); + if (timer.isStop() || timeout.isCancelled()) { + return; + } + + timer.newTimeout(timeout.task(), tick, TimeUnit.MILLISECONDS); + } +} From abf6b1892d387e06657857176bda10f5019f3abe Mon Sep 17 00:00:00 2001 From: xiuyuhang <442367943@qq.com> Date: Thu, 8 Nov 2018 13:28:41 +0800 Subject: [PATCH 02/11] Task for retry. --- .../registry/retry/AbstractRetryTask.java | 84 +++++++++++++++++++ .../registry/retry/FailedRegisteredTask.java | 21 +++++ .../registry/retry/FailedSubscribedTask.java | 28 +++++++ .../retry/FailedUnregisteredTask.java | 24 ++++++ .../retry/FailedUnsubscribedTask.java | 28 +++++++ .../registry/support/AbstractRetryTask.java | 36 -------- .../registry/support/FailbackRegistry.java | 8 +- .../support/FailbackRegistryTest.java | 8 +- .../dubbo/registry/dubbo/DubboRegistry.java | 8 +- .../registry/multicast/MulticastRegistry.java | 8 +- .../registry/zookeeper/ZookeeperRegistry.java | 8 +- 11 files changed, 205 insertions(+), 56 deletions(-) create mode 100644 dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/AbstractRetryTask.java create mode 100644 dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedRegisteredTask.java create mode 100644 dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedSubscribedTask.java create mode 100644 dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnregisteredTask.java create mode 100644 dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnsubscribedTask.java delete mode 100644 dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRetryTask.java diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/AbstractRetryTask.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/AbstractRetryTask.java new file mode 100644 index 00000000000..e59805f1c57 --- /dev/null +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/AbstractRetryTask.java @@ -0,0 +1,84 @@ +package org.apache.dubbo.registry.retry; + +import org.apache.dubbo.common.Constants; +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.logger.Logger; +import org.apache.dubbo.common.logger.LoggerFactory; +import org.apache.dubbo.common.timer.Timeout; +import org.apache.dubbo.common.timer.Timer; +import org.apache.dubbo.common.timer.TimerTask; +import org.apache.dubbo.common.utils.StringUtils; +import org.apache.dubbo.registry.support.FailbackRegistry; + +import java.util.concurrent.TimeUnit; + +/** + * AbstractRetryTask + */ +public abstract class AbstractRetryTask implements TimerTask { + + private final Logger logger = LoggerFactory.getLogger(getClass()); + + /** + * url for retry task + */ + protected final URL url; + + /** + * registry for this task + */ + protected final FailbackRegistry registry; + + /** + * retry period + */ + private final long retryPeriod; + + /** + * task name for this task + */ + private final String taskName; + + AbstractRetryTask(URL url, FailbackRegistry registry, String taskName) { + if (url == null || registry == null || StringUtils.isBlank(taskName)) { + throw new IllegalArgumentException(); + } + this.url = url; + this.registry = registry; + this.taskName = taskName; + this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD); + } + + private void reput(Timeout timeout, Long tick) { + if (timeout == null || tick == null) { + throw new IllegalArgumentException(); + } + + Timer timer = timeout.timer(); + if (timer.isStop() || timeout.isCancelled()) { + return; + } + + timer.newTimeout(timeout.task(), tick, TimeUnit.MILLISECONDS); + } + + @Override + public void run(Timeout timeout) throws Exception { + if (timeout.isCancelled()) { + // other thread cancel this timeout. + return; + } + if (logger.isInfoEnabled()) { + logger.info(taskName + " : " + url); + } + try { + doRetry(url, registry); + } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry + logger.warn("Failed to " + taskName + url + ", waiting for again, cause:" + t.getMessage(), t); + // reput this task when catch exception. + reput(timeout, retryPeriod); + } + } + + protected abstract void doRetry(URL url, FailbackRegistry registry); +} diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedRegisteredTask.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedRegisteredTask.java new file mode 100644 index 00000000000..bf6140727fb --- /dev/null +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedRegisteredTask.java @@ -0,0 +1,21 @@ +package org.apache.dubbo.registry.retry; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.registry.support.FailbackRegistry; + +/** + * FailedRegisteredTask + */ +public final class FailedRegisteredTask extends AbstractRetryTask { + + private static final String NAME = "retry register"; + + public FailedRegisteredTask(URL url, FailbackRegistry registry) { + super(url, registry, NAME); + } + + @Override + protected void doRetry(URL url, FailbackRegistry registry) { + registry.doRegister(url); + } +} diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedSubscribedTask.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedSubscribedTask.java new file mode 100644 index 00000000000..4d357a6bb61 --- /dev/null +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedSubscribedTask.java @@ -0,0 +1,28 @@ +package org.apache.dubbo.registry.retry; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.registry.NotifyListener; +import org.apache.dubbo.registry.support.FailbackRegistry; + +/** + * FailedSubscribedTask + */ +public final class FailedSubscribedTask extends AbstractRetryTask { + + private static final String NAME = "retry subscribe"; + + private final NotifyListener listener; + + FailedSubscribedTask(URL url, FailbackRegistry registry, NotifyListener listener) { + super(url, registry, NAME); + if (listener == null) { + throw new IllegalArgumentException(); + } + this.listener = listener; + } + + @Override + protected void doRetry(URL url, FailbackRegistry registry) { + registry.doSubscribe(url, listener); + } +} diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnregisteredTask.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnregisteredTask.java new file mode 100644 index 00000000000..8b05d2b2e8d --- /dev/null +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnregisteredTask.java @@ -0,0 +1,24 @@ +package org.apache.dubbo.registry.retry; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.registry.support.FailbackRegistry; + +/** + * FailedUnregisteredTask + * + * @author xiuyuhang [xiuyuhang] + * @since 2018-11-08 + */ +public final class FailedUnregisteredTask extends AbstractRetryTask { + + private static final String NAME = "retry unregister"; + + FailedUnregisteredTask(URL url, FailbackRegistry registry) { + super(url, registry, NAME); + } + + @Override + protected void doRetry(URL url, FailbackRegistry registry) { + registry.doUnregister(url); + } +} diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnsubscribedTask.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnsubscribedTask.java new file mode 100644 index 00000000000..d2fc0c58fab --- /dev/null +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnsubscribedTask.java @@ -0,0 +1,28 @@ +package org.apache.dubbo.registry.retry; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.registry.NotifyListener; +import org.apache.dubbo.registry.support.FailbackRegistry; + +/** + * FailedUnsubscribedTask + * + * @author xiuyuhang [xiuyuhang] + * @since 2018-11-08 + */ +public final class FailedUnsubscribedTask extends AbstractRetryTask { + + private static final String NAME = "retry register"; + + private final NotifyListener listener; + + FailedUnsubscribedTask(URL url, FailbackRegistry registry, NotifyListener listener) { + super(url, registry, NAME); + this.listener = listener; + } + + @Override + protected void doRetry(URL url, FailbackRegistry registry) { + registry.unsubscribe(url, listener); + } +} diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRetryTask.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRetryTask.java deleted file mode 100644 index 5f78cf409f2..00000000000 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRetryTask.java +++ /dev/null @@ -1,36 +0,0 @@ -package org.apache.dubbo.registry.support; - -import org.apache.dubbo.common.URL; -import org.apache.dubbo.common.timer.Timeout; -import org.apache.dubbo.common.timer.Timer; -import org.apache.dubbo.common.timer.TimerTask; - -import java.util.concurrent.TimeUnit; - -/** - * AbstractRetryTask - */ -public abstract class AbstractRetryTask implements TimerTask { - - /** - * url for retry task - */ - private final URL url; - - public AbstractRetryTask(URL url) { - this.url = url; - } - - private void reput(Timeout timeout, Long tick) { - if (timeout == null || tick == null) { - throw new IllegalArgumentException(); - } - - Timer timer = timeout.timer(); - if (timer.isStop() || timeout.isCancelled()) { - return; - } - - timer.newTimeout(timeout.task(), tick, TimeUnit.MILLISECONDS); - } -} diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/FailbackRegistry.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/FailbackRegistry.java index 5c456d5287f..1df7c4fe188 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/FailbackRegistry.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/FailbackRegistry.java @@ -451,12 +451,12 @@ public void destroy() { // ==== Template method ==== - protected abstract void doRegister(URL url); + public abstract void doRegister(URL url); - protected abstract void doUnregister(URL url); + public abstract void doUnregister(URL url); - protected abstract void doSubscribe(URL url, NotifyListener listener); + public abstract void doSubscribe(URL url, NotifyListener listener); - protected abstract void doUnsubscribe(URL url, NotifyListener listener); + public abstract void doUnsubscribe(URL url, NotifyListener listener); } diff --git a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/support/FailbackRegistryTest.java b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/support/FailbackRegistryTest.java index 7f3ac3a6be0..fb80cf3cbc8 100644 --- a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/support/FailbackRegistryTest.java +++ b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/support/FailbackRegistryTest.java @@ -224,7 +224,7 @@ public void setBad(boolean bad) { } @Override - protected void doRegister(URL url) { + public void doRegister(URL url) { if (bad) { throw new RuntimeException("can not invoke!"); } @@ -234,7 +234,7 @@ protected void doRegister(URL url) { } @Override - protected void doUnregister(URL url) { + public void doUnregister(URL url) { if (bad) { throw new RuntimeException("can not invoke!"); } @@ -244,7 +244,7 @@ protected void doUnregister(URL url) { } @Override - protected void doSubscribe(URL url, NotifyListener listener) { + public void doSubscribe(URL url, NotifyListener listener) { if (bad) { throw new RuntimeException("can not invoke!"); } @@ -254,7 +254,7 @@ protected void doSubscribe(URL url, NotifyListener listener) { } @Override - protected void doUnsubscribe(URL url, NotifyListener listener) { + public void doUnsubscribe(URL url, NotifyListener listener) { if (bad) { throw new RuntimeException("can not invoke!"); } diff --git a/dubbo-registry/dubbo-registry-default/src/main/java/org/apache/dubbo/registry/dubbo/DubboRegistry.java b/dubbo-registry/dubbo-registry-default/src/main/java/org/apache/dubbo/registry/dubbo/DubboRegistry.java index 45c94a73e48..2c6b8c1f1ec 100644 --- a/dubbo-registry/dubbo-registry-default/src/main/java/org/apache/dubbo/registry/dubbo/DubboRegistry.java +++ b/dubbo-registry/dubbo-registry-default/src/main/java/org/apache/dubbo/registry/dubbo/DubboRegistry.java @@ -138,22 +138,22 @@ public void destroy() { } @Override - protected void doRegister(URL url) { + public void doRegister(URL url) { registryService.register(url); } @Override - protected void doUnregister(URL url) { + public void doUnregister(URL url) { registryService.unregister(url); } @Override - protected void doSubscribe(URL url, NotifyListener listener) { + public void doSubscribe(URL url, NotifyListener listener) { registryService.subscribe(url, listener); } @Override - protected void doUnsubscribe(URL url, NotifyListener listener) { + public void doUnsubscribe(URL url, NotifyListener listener) { registryService.unsubscribe(url, listener); } diff --git a/dubbo-registry/dubbo-registry-multicast/src/main/java/org/apache/dubbo/registry/multicast/MulticastRegistry.java b/dubbo-registry/dubbo-registry-multicast/src/main/java/org/apache/dubbo/registry/multicast/MulticastRegistry.java index 8ff7daaea2d..06835dac6e5 100644 --- a/dubbo-registry/dubbo-registry-multicast/src/main/java/org/apache/dubbo/registry/multicast/MulticastRegistry.java +++ b/dubbo-registry/dubbo-registry-multicast/src/main/java/org/apache/dubbo/registry/multicast/MulticastRegistry.java @@ -249,17 +249,17 @@ private void unicast(String msg, String host) { } @Override - protected void doRegister(URL url) { + public void doRegister(URL url) { multicast(Constants.REGISTER + " " + url.toFullString()); } @Override - protected void doUnregister(URL url) { + public void doUnregister(URL url) { multicast(Constants.UNREGISTER + " " + url.toFullString()); } @Override - protected void doSubscribe(URL url, NotifyListener listener) { + public void doSubscribe(URL url, NotifyListener listener) { if (Constants.ANY_VALUE.equals(url.getServiceInterface())) { admin = true; } @@ -273,7 +273,7 @@ protected void doSubscribe(URL url, NotifyListener listener) { } @Override - protected void doUnsubscribe(URL url, NotifyListener listener) { + public void doUnsubscribe(URL url, NotifyListener listener) { if (!Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY, true)) { unregister(url); diff --git a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistry.java b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistry.java index 43426c7b9f6..d7a8ee097ac 100644 --- a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistry.java +++ b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistry.java @@ -109,7 +109,7 @@ public void destroy() { } @Override - protected void doRegister(URL url) { + public void doRegister(URL url) { try { zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true)); } catch (Throwable e) { @@ -118,7 +118,7 @@ protected void doRegister(URL url) { } @Override - protected void doUnregister(URL url) { + public void doUnregister(URL url) { try { zkClient.delete(toUrlPath(url)); } catch (Throwable e) { @@ -127,7 +127,7 @@ protected void doUnregister(URL url) { } @Override - protected void doSubscribe(final URL url, final NotifyListener listener) { + public void doSubscribe(final URL url, final NotifyListener listener) { try { if (Constants.ANY_VALUE.equals(url.getServiceInterface())) { String root = toRootPath(); @@ -195,7 +195,7 @@ public void childChanged(String parentPath, List currentChilds) { } @Override - protected void doUnsubscribe(URL url, NotifyListener listener) { + public void doUnsubscribe(URL url, NotifyListener listener) { ConcurrentMap listeners = zkListeners.get(url); if (listeners != null) { ChildListener zkListener = listeners.get(listener); From ac377307048c9eb18e92f819e851b9e943b917d6 Mon Sep 17 00:00:00 2001 From: xiuyuhang <442367943@qq.com> Date: Thu, 8 Nov 2018 13:32:45 +0800 Subject: [PATCH 03/11] Fix sth. --- .../org/apache/dubbo/registry/retry/AbstractRetryTask.java | 4 ++-- .../apache/dubbo/registry/retry/FailedUnsubscribedTask.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/AbstractRetryTask.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/AbstractRetryTask.java index e59805f1c57..c3843ab5ab4 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/AbstractRetryTask.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/AbstractRetryTask.java @@ -64,8 +64,8 @@ private void reput(Timeout timeout, Long tick) { @Override public void run(Timeout timeout) throws Exception { - if (timeout.isCancelled()) { - // other thread cancel this timeout. + if (timeout.isCancelled() || timeout.timer().isStop()) { + // other thread cancel this timeout or stop the timer. return; } if (logger.isInfoEnabled()) { diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnsubscribedTask.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnsubscribedTask.java index d2fc0c58fab..2da14743447 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnsubscribedTask.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnsubscribedTask.java @@ -12,7 +12,7 @@ */ public final class FailedUnsubscribedTask extends AbstractRetryTask { - private static final String NAME = "retry register"; + private static final String NAME = "retry unsubscribe"; private final NotifyListener listener; From c0bd54b3fb0abb6636f97b8924280856874ccc40 Mon Sep 17 00:00:00 2001 From: xiuyuhang <442367943@qq.com> Date: Thu, 8 Nov 2018 15:56:34 +0800 Subject: [PATCH 04/11] Finish Optimize. fix ci failed. --- .../registry/retry/AbstractRetryTask.java | 27 +- .../registry/retry/FailedNotifiedTask.java | 66 ++++ .../registry/retry/FailedSubscribedTask.java | 2 +- .../retry/FailedUnregisteredTask.java | 5 +- .../retry/FailedUnsubscribedTask.java | 8 +- .../registry/support/FailbackRegistry.java | 362 +++++++----------- .../support/FailbackRegistryTest.java | 16 +- 7 files changed, 243 insertions(+), 243 deletions(-) create mode 100644 dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedNotifiedTask.java diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/AbstractRetryTask.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/AbstractRetryTask.java index c3843ab5ab4..59b97462433 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/AbstractRetryTask.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/AbstractRetryTask.java @@ -17,7 +17,7 @@ */ public abstract class AbstractRetryTask implements TimerTask { - private final Logger logger = LoggerFactory.getLogger(getClass()); + protected final Logger logger = LoggerFactory.getLogger(getClass()); /** * url for retry task @@ -32,30 +32,41 @@ public abstract class AbstractRetryTask implements TimerTask { /** * retry period */ - private final long retryPeriod; + protected final long retryPeriod; /** * task name for this task */ - private final String taskName; + protected final String taskName; + + private volatile boolean cancel; AbstractRetryTask(URL url, FailbackRegistry registry, String taskName) { - if (url == null || registry == null || StringUtils.isBlank(taskName)) { + if (url == null || StringUtils.isBlank(taskName)) { throw new IllegalArgumentException(); } this.url = url; this.registry = registry; this.taskName = taskName; + cancel = false; this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD); } - private void reput(Timeout timeout, Long tick) { - if (timeout == null || tick == null) { + public void cancel() { + cancel = true; + } + + public boolean isCancel() { + return cancel; + } + + protected void reput(Timeout timeout, long tick) { + if (timeout == null) { throw new IllegalArgumentException(); } Timer timer = timeout.timer(); - if (timer.isStop() || timeout.isCancelled()) { + if (timer.isStop() || timeout.isCancelled() || isCancel()) { return; } @@ -64,7 +75,7 @@ private void reput(Timeout timeout, Long tick) { @Override public void run(Timeout timeout) throws Exception { - if (timeout.isCancelled() || timeout.timer().isStop()) { + if (timeout.isCancelled() || timeout.timer().isStop() || isCancel()) { // other thread cancel this timeout or stop the timer. return; } diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedNotifiedTask.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedNotifiedTask.java new file mode 100644 index 00000000000..b9f6a27cec6 --- /dev/null +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedNotifiedTask.java @@ -0,0 +1,66 @@ +package org.apache.dubbo.registry.retry; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.timer.Timeout; +import org.apache.dubbo.common.utils.CollectionUtils; +import org.apache.dubbo.registry.NotifyListener; +import org.apache.dubbo.registry.support.FailbackRegistry; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +/** + * FailedNotifiedTask + */ +public final class FailedNotifiedTask extends AbstractRetryTask { + + private static final String NAME = "retry subscribe"; + + private final NotifyListener listener; + + private final List urls = new CopyOnWriteArrayList<>(); + + public FailedNotifiedTask(URL url, NotifyListener listener) { + super(url, null, NAME); + if (listener == null) { + throw new IllegalArgumentException(); + } + this.listener = listener; + } + + public void addUrlToRetry(List urls) { + if (CollectionUtils.isEmpty(urls)) { + return; + } + this.urls.addAll(urls); + } + + public void removeRetryUrl(List urls) { + this.urls.removeAll(urls); + } + + @Override + public void run(Timeout timeout) throws Exception { + if (timeout.isCancelled() || timeout.timer().isStop() || isCancel()) { + // other thread cancel this timeout or stop the timer or cancel the task. + return; + } + if (logger.isInfoEnabled()) { + logger.info(taskName + " : " + url); + } + try { + listener.notify(urls); + urls.clear(); + } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry + logger.warn("Failed to " + taskName + url + ", waiting for again, cause:" + t.getMessage(), t); + } + + // always reput this into timer. + reput(timeout, retryPeriod); + } + + @Override + protected void doRetry(URL url, FailbackRegistry registry) { + // do nothing. + } +} diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedSubscribedTask.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedSubscribedTask.java index 4d357a6bb61..95e694aeb02 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedSubscribedTask.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedSubscribedTask.java @@ -13,7 +13,7 @@ public final class FailedSubscribedTask extends AbstractRetryTask { private final NotifyListener listener; - FailedSubscribedTask(URL url, FailbackRegistry registry, NotifyListener listener) { + public FailedSubscribedTask(URL url, FailbackRegistry registry, NotifyListener listener) { super(url, registry, NAME); if (listener == null) { throw new IllegalArgumentException(); diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnregisteredTask.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnregisteredTask.java index 8b05d2b2e8d..a1edc560187 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnregisteredTask.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnregisteredTask.java @@ -5,15 +5,12 @@ /** * FailedUnregisteredTask - * - * @author xiuyuhang [xiuyuhang] - * @since 2018-11-08 */ public final class FailedUnregisteredTask extends AbstractRetryTask { private static final String NAME = "retry unregister"; - FailedUnregisteredTask(URL url, FailbackRegistry registry) { + public FailedUnregisteredTask(URL url, FailbackRegistry registry) { super(url, registry, NAME); } diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnsubscribedTask.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnsubscribedTask.java index 2da14743447..62427c84aa3 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnsubscribedTask.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnsubscribedTask.java @@ -6,9 +6,6 @@ /** * FailedUnsubscribedTask - * - * @author xiuyuhang [xiuyuhang] - * @since 2018-11-08 */ public final class FailedUnsubscribedTask extends AbstractRetryTask { @@ -16,8 +13,11 @@ public final class FailedUnsubscribedTask extends AbstractRetryTask { private final NotifyListener listener; - FailedUnsubscribedTask(URL url, FailbackRegistry registry, NotifyListener listener) { + public FailedUnsubscribedTask(URL url, FailbackRegistry registry, NotifyListener listener) { super(url, registry, NAME); + if (listener == null) { + throw new IllegalArgumentException(); + } this.listener = listener; } diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/FailbackRegistry.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/FailbackRegistry.java index 1df7c4fe188..a7121b6aa93 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/FailbackRegistry.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/FailbackRegistry.java @@ -18,10 +18,13 @@ import org.apache.dubbo.common.Constants; import org.apache.dubbo.common.URL; -import org.apache.dubbo.common.utils.ConcurrentHashSet; -import org.apache.dubbo.common.utils.ExecutorUtil; -import org.apache.dubbo.common.utils.NamedThreadFactory; +import org.apache.dubbo.common.timer.HashedWheelTimer; import org.apache.dubbo.registry.NotifyListener; +import org.apache.dubbo.registry.retry.FailedNotifiedTask; +import org.apache.dubbo.registry.retry.FailedRegisteredTask; +import org.apache.dubbo.registry.retry.FailedSubscribedTask; +import org.apache.dubbo.registry.retry.FailedUnregisteredTask; +import org.apache.dubbo.registry.retry.FailedUnsubscribedTask; import java.util.HashMap; import java.util.HashSet; @@ -30,108 +33,158 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; /** * FailbackRegistry. (SPI, Prototype, ThreadSafe) - * */ public abstract class FailbackRegistry extends AbstractRegistry { - // Scheduled executor service - private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryFailedRetryTimer", true)); - - // Timer for failure retry, regular check if there is a request for failure, and if there is, an unlimited retry - private final ScheduledFuture retryFuture; + /* retry task map */ - private final Set failedRegistered = new ConcurrentHashSet(); + private final ConcurrentMap failedRegistered = new ConcurrentHashMap(); - private final Set failedUnregistered = new ConcurrentHashSet(); + private final ConcurrentMap failedUnregistered = new ConcurrentHashMap(); - private final ConcurrentMap> failedSubscribed = new ConcurrentHashMap>(); + private final ConcurrentMap failedSubscribed = new ConcurrentHashMap(); - private final ConcurrentMap> failedUnsubscribed = new ConcurrentHashMap>(); + private final ConcurrentMap failedUnsubscribed = new ConcurrentHashMap(); - private final ConcurrentMap>> failedNotified = new ConcurrentHashMap>>(); + private final ConcurrentMap failedNotified = new ConcurrentHashMap(); /** * The time in milliseconds the retryExecutor will wait */ private final int retryPeriod; + // Timer for failure retry, regular check if there is a request for failure, and if there is, an unlimited retry + private final HashedWheelTimer retryTimer; + public FailbackRegistry(URL url) { super(url); this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD); - this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() { - @Override - public void run() { - // Check and connect to the registry - try { - retry(); - } catch (Throwable t) { // Defensive fault tolerance - logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t); - } - } - }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS); + + // since the retry task will not be very much. 128 ticks is enough. + retryTimer = new HashedWheelTimer(retryPeriod, TimeUnit.MILLISECONDS, 128); } - public Future getRetryFuture() { - return retryFuture; + private void addFailedRegistered(URL url) { + FailedRegisteredTask newTask = new FailedRegisteredTask(url, this); + FailedRegisteredTask oldOne = failedRegistered.putIfAbsent(url, newTask); + if (oldOne == null) { + // never has a retry task. then start a new task for retry. + retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS); + } } - public Set getFailedRegistered() { - return failedRegistered; + private void removeFailedRegistered(URL url) { + FailedRegisteredTask f = failedRegistered.remove(url); + if (f != null) { + f.cancel(); + } } - public Set getFailedUnregistered() { - return failedUnregistered; + private void addFailedUnregistered(URL url) { + FailedUnregisteredTask newTask = new FailedUnregisteredTask(url, this); + FailedUnregisteredTask oldOne = failedUnregistered.putIfAbsent(url, newTask); + if (oldOne == null) { + // never has a retry task. then start a new task for retry. + retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS); + } } - public Map> getFailedSubscribed() { - return failedSubscribed; + private void removeFailedUnregistered(URL url) { + FailedUnregisteredTask f = failedUnregistered.remove(url); + if (f != null) { + f.cancel(); + } } - public Map> getFailedUnsubscribed() { - return failedUnsubscribed; + private void addFailedSubscribed(URL url, NotifyListener listener) { + Holder h = new Holder(url, listener); + FailedSubscribedTask newTask = new FailedSubscribedTask(url, this, listener); + FailedSubscribedTask f = failedSubscribed.putIfAbsent(h, newTask); + if (f == null) { + // never has a retry task. then start a new task for retry. + retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS); + } } - public Map>> getFailedNotified() { - return failedNotified; + private void removeFailedSubscribed(URL url, NotifyListener listener) { + Holder h = new Holder(url, listener); + FailedSubscribedTask f = failedSubscribed.remove(h); + if (f != null) { + f.cancel(); + } + removeFailedUnsubscribed(url, listener); + removeFailedNotified(url, listener); } - private void addFailedSubscribed(URL url, NotifyListener listener) { - Set listeners = failedSubscribed.get(url); - if (listeners == null) { - failedSubscribed.putIfAbsent(url, new ConcurrentHashSet()); - listeners = failedSubscribed.get(url); + private void addFailedUnsubscribed(URL url, NotifyListener listener) { + Holder h = new Holder(url, listener); + FailedUnsubscribedTask newTask = new FailedUnsubscribedTask(url, this, listener); + FailedUnsubscribedTask f = failedUnsubscribed.putIfAbsent(h, newTask); + if (f == null) { + // never has a retry task. then start a new task for retry. + retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS); } - listeners.add(listener); } - private void removeFailedSubscribed(URL url, NotifyListener listener) { - Set listeners = failedSubscribed.get(url); - if (listeners != null) { - listeners.remove(listener); + private void removeFailedUnsubscribed(URL url, NotifyListener listener) { + Holder h = new Holder(url, listener); + FailedUnsubscribedTask f = failedUnsubscribed.remove(h); + if (f != null) { + f.cancel(); } - listeners = failedUnsubscribed.get(url); - if (listeners != null) { - listeners.remove(listener); + } + + private void addFailedNotified(URL url, NotifyListener listener, List urls) { + Holder h = new Holder(url, listener); + FailedNotifiedTask newTask = new FailedNotifiedTask(url, listener); + FailedNotifiedTask f = failedNotified.putIfAbsent(h, newTask); + if (f == null) { + // never has a retry task. then start a new task for retry. + newTask.addUrlToRetry(urls); + retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS); + } else { + // just add urls which needs retry. + newTask.addUrlToRetry(urls); } - Map> notified = failedNotified.get(url); - if (notified != null) { - notified.remove(listener); + } + + private void removeFailedNotified(URL url, NotifyListener listener) { + Holder h = new Holder(url, listener); + FailedNotifiedTask f = failedNotified.remove(h); + if (f != null) { + f.cancel(); } } + public ConcurrentMap getFailedRegistered() { + return failedRegistered; + } + + public ConcurrentMap getFailedUnregistered() { + return failedUnregistered; + } + + public ConcurrentMap getFailedSubscribed() { + return failedSubscribed; + } + + public ConcurrentMap getFailedUnsubscribed() { + return failedUnsubscribed; + } + + public ConcurrentMap getFailedNotified() { + return failedNotified; + } + @Override public void register(URL url) { super.register(url); - failedRegistered.remove(url); - failedUnregistered.remove(url); + removeFailedRegistered(url); + removeFailedUnregistered(url); try { // Sending a registration request to the server side doRegister(url); @@ -153,15 +206,15 @@ public void register(URL url) { } // Record a failed registration request to a failed list, retry regularly - failedRegistered.add(url); + addFailedRegistered(url); } } @Override public void unregister(URL url) { super.unregister(url); - failedRegistered.remove(url); - failedUnregistered.remove(url); + removeFailedRegistered(url); + removeFailedUnregistered(url); try { // Sending a cancellation request to the server side doUnregister(url); @@ -183,7 +236,7 @@ public void unregister(URL url) { } // Record a failed registration request to a failed list, retry regularly - failedUnregistered.add(url); + addFailedUnregistered(url); } } @@ -245,12 +298,7 @@ public void unsubscribe(URL url, NotifyListener listener) { } // Record a failed registration request to a failed list, retry regularly - Set listeners = failedUnsubscribed.get(url); - if (listeners == null) { - failedUnsubscribed.putIfAbsent(url, new ConcurrentHashSet()); - listeners = failedUnsubscribed.get(url); - } - listeners.add(listener); + addFailedUnsubscribed(url, listener); } } @@ -266,12 +314,7 @@ protected void notify(URL url, NotifyListener listener, List urls) { doNotify(url, listener, urls); } catch (Exception t) { // Record a failed registration request to a failed list, retry regularly - Map> listeners = failedNotified.get(url); - if (listeners == null) { - failedNotified.putIfAbsent(url, new ConcurrentHashMap>()); - listeners = failedNotified.get(url); - } - listeners.put(listener, urls); + addFailedNotified(url, listener, urls); logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t); } } @@ -289,7 +332,7 @@ protected void recover() throws Exception { logger.info("Recover register url " + recoverRegistered); } for (URL url : recoverRegistered) { - failedRegistered.add(url); + addFailedRegistered(url); } } // subscribe @@ -307,146 +350,10 @@ protected void recover() throws Exception { } } - // Retry the failed actions - protected void retry() { - if (!failedRegistered.isEmpty()) { - Set failed = new HashSet(failedRegistered); - if (failed.size() > 0) { - if (logger.isInfoEnabled()) { - logger.info("Retry register " + failed); - } - try { - for (URL url : failed) { - try { - doRegister(url); - failedRegistered.remove(url); - } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry - logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t); - } - } - } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry - logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t); - } - } - } - if (!failedUnregistered.isEmpty()) { - Set failed = new HashSet(failedUnregistered); - if (!failed.isEmpty()) { - if (logger.isInfoEnabled()) { - logger.info("Retry unregister " + failed); - } - try { - for (URL url : failed) { - try { - doUnregister(url); - failedUnregistered.remove(url); - } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry - logger.warn("Failed to retry unregister " + failed + ", waiting for again, cause: " + t.getMessage(), t); - } - } - } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry - logger.warn("Failed to retry unregister " + failed + ", waiting for again, cause: " + t.getMessage(), t); - } - } - } - if (!failedSubscribed.isEmpty()) { - Map> failed = new HashMap>(failedSubscribed); - for (Map.Entry> entry : new HashMap>(failed).entrySet()) { - if (entry.getValue() == null || entry.getValue().size() == 0) { - failed.remove(entry.getKey()); - } - } - if (failed.size() > 0) { - if (logger.isInfoEnabled()) { - logger.info("Retry subscribe " + failed); - } - try { - for (Map.Entry> entry : failed.entrySet()) { - URL url = entry.getKey(); - Set listeners = entry.getValue(); - for (NotifyListener listener : listeners) { - try { - doSubscribe(url, listener); - listeners.remove(listener); - } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry - logger.warn("Failed to retry subscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t); - } - } - } - } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry - logger.warn("Failed to retry subscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t); - } - } - } - if (!failedUnsubscribed.isEmpty()) { - Map> failed = new HashMap>(failedUnsubscribed); - for (Map.Entry> entry : new HashMap>(failed).entrySet()) { - if (entry.getValue() == null || entry.getValue().isEmpty()) { - failed.remove(entry.getKey()); - } - } - if (failed.size() > 0) { - if (logger.isInfoEnabled()) { - logger.info("Retry unsubscribe " + failed); - } - try { - for (Map.Entry> entry : failed.entrySet()) { - URL url = entry.getKey(); - Set listeners = entry.getValue(); - for (NotifyListener listener : listeners) { - try { - doUnsubscribe(url, listener); - listeners.remove(listener); - } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry - logger.warn("Failed to retry unsubscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t); - } - } - } - } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry - logger.warn("Failed to retry unsubscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t); - } - } - } - if (!failedNotified.isEmpty()) { - Map>> failed = new HashMap>>(failedNotified); - for (Map.Entry>> entry : new HashMap>>(failed).entrySet()) { - if (entry.getValue() == null || entry.getValue().size() == 0) { - failed.remove(entry.getKey()); - } - } - if (failed.size() > 0) { - if (logger.isInfoEnabled()) { - logger.info("Retry notify " + failed); - } - try { - for (Map> values : failed.values()) { - for (Map.Entry> entry : values.entrySet()) { - try { - NotifyListener listener = entry.getKey(); - List urls = entry.getValue(); - listener.notify(urls); - values.remove(listener); - } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry - logger.warn("Failed to retry notify " + failed + ", waiting for again, cause: " + t.getMessage(), t); - } - } - } - } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry - logger.warn("Failed to retry notify " + failed + ", waiting for again, cause: " + t.getMessage(), t); - } - } - } - } - @Override public void destroy() { super.destroy(); - try { - retryFuture.cancel(true); - } catch (Throwable t) { - logger.warn(t.getMessage(), t); - } - ExecutorUtil.gracefulShutdown(retryExecutor, retryPeriod); + retryTimer.stop(); } // ==== Template method ==== @@ -459,4 +366,33 @@ public void destroy() { public abstract void doUnsubscribe(URL url, NotifyListener listener); + static class Holder { + + private final URL url; + + private final NotifyListener notifyListener; + + Holder(URL url, NotifyListener notifyListener) { + if (url == null || notifyListener == null) { + throw new IllegalArgumentException(); + } + this.url = url; + this.notifyListener = notifyListener; + } + + @Override + public int hashCode() { + return url.hashCode() + notifyListener.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof Holder) { + Holder h = (Holder) obj; + return this.url.equals(h.url) && this.notifyListener.equals(h.notifyListener); + } else { + return false; + } + } + } } diff --git a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/support/FailbackRegistryTest.java b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/support/FailbackRegistryTest.java index fb80cf3cbc8..2ed5b000d82 100644 --- a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/support/FailbackRegistryTest.java +++ b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/support/FailbackRegistryTest.java @@ -20,7 +20,6 @@ import org.apache.dubbo.common.URL; import org.apache.dubbo.common.utils.CollectionUtils; import org.apache.dubbo.registry.NotifyListener; - import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -191,7 +190,7 @@ public void notify(List urls) { notified.set(Boolean.TRUE); } }; - + MockRegistry mockRegistry = new MockRegistry(registryUrl, countDownLatch); mockRegistry.register(serviceUrl); mockRegistry.subscribe(serviceUrl, listener); @@ -200,7 +199,8 @@ public void notify(List urls) { mockRegistry.recover(); countDownLatch.await(); Assert.assertEquals(0, mockRegistry.getFailedRegistered().size()); - Assert.assertEquals(null, mockRegistry.getFailedSubscribed().get(registryUrl)); + FailbackRegistry.Holder h = new FailbackRegistry.Holder(registryUrl, listener); + Assert.assertEquals(null, mockRegistry.getFailedSubscribed().get(h)); Assert.assertEquals(countDownLatch.getCount(), 0); } @@ -262,16 +262,6 @@ public void doUnsubscribe(URL url, NotifyListener listener) { latch.countDown(); } - @Override - protected void retry() { - super.retry(); - if (bad) { - throw new RuntimeException("can not invoke!"); - } - //System.out.println("do retry"); - latch.countDown(); - } - @Override public boolean isAvailable() { return true; From daf2c52f977c65e28d98e3950ce8fb4c33900a7d Mon Sep 17 00:00:00 2001 From: xiuyuhang <442367943@qq.com> Date: Thu, 8 Nov 2018 17:23:08 +0800 Subject: [PATCH 05/11] Optimize retry for FailbackRegistry. The retry operation splits into specific operations, such as subscriptions and registrations. This approach allows for very precise retry control. --- .../registry/retry/FailedRegisteredTask.java | 1 + .../registry/retry/FailedSubscribedTask.java | 1 + .../retry/FailedUnregisteredTask.java | 1 + .../retry/FailedUnsubscribedTask.java | 1 + .../registry/support/FailbackRegistry.java | 23 +++++++++++++++++++ .../support/FailbackRegistryTest.java | 9 ++++---- 6 files changed, 32 insertions(+), 4 deletions(-) diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedRegisteredTask.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedRegisteredTask.java index bf6140727fb..04b2891e257 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedRegisteredTask.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedRegisteredTask.java @@ -17,5 +17,6 @@ public FailedRegisteredTask(URL url, FailbackRegistry registry) { @Override protected void doRetry(URL url, FailbackRegistry registry) { registry.doRegister(url); + registry.removeFailedRegisteredTask(url); } } diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedSubscribedTask.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedSubscribedTask.java index 95e694aeb02..47b35d4993f 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedSubscribedTask.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedSubscribedTask.java @@ -24,5 +24,6 @@ public FailedSubscribedTask(URL url, FailbackRegistry registry, NotifyListener l @Override protected void doRetry(URL url, FailbackRegistry registry) { registry.doSubscribe(url, listener); + registry.removeFailedSubscribedTask(url, listener); } } diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnregisteredTask.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnregisteredTask.java index a1edc560187..094fd5d6410 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnregisteredTask.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnregisteredTask.java @@ -17,5 +17,6 @@ public FailedUnregisteredTask(URL url, FailbackRegistry registry) { @Override protected void doRetry(URL url, FailbackRegistry registry) { registry.doUnregister(url); + registry.removeFailedUnregisteredTask(url); } } diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnsubscribedTask.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnsubscribedTask.java index 62427c84aa3..4d039f56c08 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnsubscribedTask.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnsubscribedTask.java @@ -24,5 +24,6 @@ public FailedUnsubscribedTask(URL url, FailbackRegistry registry, NotifyListener @Override protected void doRetry(URL url, FailbackRegistry registry) { registry.unsubscribe(url, listener); + registry.removeFailedUnsubscribedTask(url, listener); } } diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/FailbackRegistry.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/FailbackRegistry.java index a7121b6aa93..ca85958fa26 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/FailbackRegistry.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/FailbackRegistry.java @@ -68,6 +68,29 @@ public FailbackRegistry(URL url) { retryTimer = new HashedWheelTimer(retryPeriod, TimeUnit.MILLISECONDS, 128); } + public void removeFailedRegisteredTask(URL url) { + failedRegistered.remove(url); + } + + public void removeFailedUnregisteredTask(URL url) { + failedUnregistered.remove(url); + } + + public void removeFailedSubscribedTask(URL url, NotifyListener listener) { + Holder h = new Holder(url, listener); + failedSubscribed.remove(h); + } + + public void removeFailedUnsubscribedTask(URL url, NotifyListener listener) { + Holder h = new Holder(url, listener); + failedUnsubscribed.remove(h); + } + + public void removeFailedNotifiedTask(URL url, NotifyListener listener) { + Holder h = new Holder(url, listener); + failedNotified.remove(h); + } + private void addFailedRegistered(URL url) { FailedRegisteredTask newTask = new FailedRegisteredTask(url, this); FailedRegisteredTask oldOne = failedRegistered.putIfAbsent(url, newTask); diff --git a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/support/FailbackRegistryTest.java b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/support/FailbackRegistryTest.java index 2ed5b000d82..6db670c66c8 100644 --- a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/support/FailbackRegistryTest.java +++ b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/support/FailbackRegistryTest.java @@ -52,8 +52,7 @@ public void setUp() throws Exception { } /** - * Test method for - * {@link org.apache.dubbo.registry.support.FailbackRegistry#retry()}. + * Test method for retry * * @throws Exception */ @@ -61,7 +60,9 @@ public void setUp() throws Exception { public void testDoRetry() throws Exception { final AtomicReference notified = new AtomicReference(false); - final CountDownLatch latch = new CountDownLatch(3);//All of them are called 3 times. Successful attempts to reduce the failure of 1. subscribe register will not be done again + + // the latest latch just for 3. Because retry method has been removed. + final CountDownLatch latch = new CountDownLatch(2); NotifyListener listner = new NotifyListener() { @Override @@ -78,7 +79,7 @@ public void notify(List urls) { //Failure can not be called to listener. assertEquals(false, notified.get()); - assertEquals(3, latch.getCount()); + assertEquals(2, latch.getCount()); registry.setBad(false); From 52db5ca21781e15c7adb1e16d2d5371f1e8d6c29 Mon Sep 17 00:00:00 2001 From: xiuyuhang <442367943@qq.com> Date: Thu, 8 Nov 2018 17:24:52 +0800 Subject: [PATCH 06/11] Optimize retry for FailbackRegistry. The retry operation splits into specific operations, such as subscriptions and registrations. This approach allows for very precise retry control. --- .../dubbo/registry/retry/AbstractRetryTask.java | 17 +++++++++++++++++ .../registry/retry/FailedNotifiedTask.java | 17 +++++++++++++++++ .../registry/retry/FailedRegisteredTask.java | 17 +++++++++++++++++ .../registry/retry/FailedSubscribedTask.java | 17 +++++++++++++++++ .../registry/retry/FailedUnregisteredTask.java | 17 +++++++++++++++++ .../registry/retry/FailedUnsubscribedTask.java | 17 +++++++++++++++++ 6 files changed, 102 insertions(+) diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/AbstractRetryTask.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/AbstractRetryTask.java index 59b97462433..2d2e99f171d 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/AbstractRetryTask.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/AbstractRetryTask.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.dubbo.registry.retry; import org.apache.dubbo.common.Constants; diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedNotifiedTask.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedNotifiedTask.java index b9f6a27cec6..76ed70bd3a2 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedNotifiedTask.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedNotifiedTask.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.dubbo.registry.retry; import org.apache.dubbo.common.URL; diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedRegisteredTask.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedRegisteredTask.java index 04b2891e257..b33448c70c4 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedRegisteredTask.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedRegisteredTask.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.dubbo.registry.retry; import org.apache.dubbo.common.URL; diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedSubscribedTask.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedSubscribedTask.java index 47b35d4993f..2cdb9124cce 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedSubscribedTask.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedSubscribedTask.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.dubbo.registry.retry; import org.apache.dubbo.common.URL; diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnregisteredTask.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnregisteredTask.java index 094fd5d6410..3e5ea251515 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnregisteredTask.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnregisteredTask.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.dubbo.registry.retry; import org.apache.dubbo.common.URL; diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnsubscribedTask.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnsubscribedTask.java index 4d039f56c08..8672c665080 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnsubscribedTask.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnsubscribedTask.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.dubbo.registry.retry; import org.apache.dubbo.common.URL; From 3f44f51a533b6fca574b8be976b9487324858245 Mon Sep 17 00:00:00 2001 From: xiuyuhang <442367943@qq.com> Date: Sat, 10 Nov 2018 09:49:26 +0800 Subject: [PATCH 07/11] Optimize logger warn's msg. --- .../java/org/apache/dubbo/registry/retry/AbstractRetryTask.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/AbstractRetryTask.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/AbstractRetryTask.java index 2d2e99f171d..7e4798faa60 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/AbstractRetryTask.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/AbstractRetryTask.java @@ -102,7 +102,7 @@ public void run(Timeout timeout) throws Exception { try { doRetry(url, registry); } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry - logger.warn("Failed to " + taskName + url + ", waiting for again, cause:" + t.getMessage(), t); + logger.warn("Failed to execute task " + taskName + ", url: " + url + ", waiting for again, cause:" + t.getMessage(), t); // reput this task when catch exception. reput(timeout, retryPeriod); } From 0c16866fd992f3d559f937d48e765925892c4136 Mon Sep 17 00:00:00 2001 From: xiuyuhang <442367943@qq.com> Date: Mon, 12 Nov 2018 10:29:55 +0800 Subject: [PATCH 08/11] Optimize FailedNotifiedTask's run method. Optimize addXXXTask, directly return if we already have a retry task. --- .../registry/retry/AbstractRetryTask.java | 4 +-- .../registry/retry/FailedNotifiedTask.java | 24 ++-------------- .../registry/retry/FailedRegisteredTask.java | 3 +- .../registry/retry/FailedSubscribedTask.java | 3 +- .../retry/FailedUnregisteredTask.java | 3 +- .../retry/FailedUnsubscribedTask.java | 3 +- .../registry/support/FailbackRegistry.java | 28 +++++++++++++++---- 7 files changed, 35 insertions(+), 33 deletions(-) diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/AbstractRetryTask.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/AbstractRetryTask.java index 7e4798faa60..b299b2fd7e5 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/AbstractRetryTask.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/AbstractRetryTask.java @@ -100,7 +100,7 @@ public void run(Timeout timeout) throws Exception { logger.info(taskName + " : " + url); } try { - doRetry(url, registry); + doRetry(url, registry, timeout); } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry logger.warn("Failed to execute task " + taskName + ", url: " + url + ", waiting for again, cause:" + t.getMessage(), t); // reput this task when catch exception. @@ -108,5 +108,5 @@ public void run(Timeout timeout) throws Exception { } } - protected abstract void doRetry(URL url, FailbackRegistry registry); + protected abstract void doRetry(URL url, FailbackRegistry registry, Timeout timeout); } diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedNotifiedTask.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedNotifiedTask.java index 76ed70bd3a2..c241226f3a0 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedNotifiedTask.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedNotifiedTask.java @@ -57,27 +57,9 @@ public void removeRetryUrl(List urls) { } @Override - public void run(Timeout timeout) throws Exception { - if (timeout.isCancelled() || timeout.timer().isStop() || isCancel()) { - // other thread cancel this timeout or stop the timer or cancel the task. - return; - } - if (logger.isInfoEnabled()) { - logger.info(taskName + " : " + url); - } - try { - listener.notify(urls); - urls.clear(); - } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry - logger.warn("Failed to " + taskName + url + ", waiting for again, cause:" + t.getMessage(), t); - } - - // always reput this into timer. + protected void doRetry(URL url, FailbackRegistry registry, Timeout timeout) { + listener.notify(urls); + urls.clear(); reput(timeout, retryPeriod); } - - @Override - protected void doRetry(URL url, FailbackRegistry registry) { - // do nothing. - } } diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedRegisteredTask.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedRegisteredTask.java index b33448c70c4..c4d9cc6f08c 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedRegisteredTask.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedRegisteredTask.java @@ -18,6 +18,7 @@ package org.apache.dubbo.registry.retry; import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.timer.Timeout; import org.apache.dubbo.registry.support.FailbackRegistry; /** @@ -32,7 +33,7 @@ public FailedRegisteredTask(URL url, FailbackRegistry registry) { } @Override - protected void doRetry(URL url, FailbackRegistry registry) { + protected void doRetry(URL url, FailbackRegistry registry, Timeout timeout) { registry.doRegister(url); registry.removeFailedRegisteredTask(url); } diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedSubscribedTask.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedSubscribedTask.java index 2cdb9124cce..06d1ec3151b 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedSubscribedTask.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedSubscribedTask.java @@ -18,6 +18,7 @@ package org.apache.dubbo.registry.retry; import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.timer.Timeout; import org.apache.dubbo.registry.NotifyListener; import org.apache.dubbo.registry.support.FailbackRegistry; @@ -39,7 +40,7 @@ public FailedSubscribedTask(URL url, FailbackRegistry registry, NotifyListener l } @Override - protected void doRetry(URL url, FailbackRegistry registry) { + protected void doRetry(URL url, FailbackRegistry registry, Timeout timeout) { registry.doSubscribe(url, listener); registry.removeFailedSubscribedTask(url, listener); } diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnregisteredTask.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnregisteredTask.java index 3e5ea251515..4cf3aa4ebf2 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnregisteredTask.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnregisteredTask.java @@ -18,6 +18,7 @@ package org.apache.dubbo.registry.retry; import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.timer.Timeout; import org.apache.dubbo.registry.support.FailbackRegistry; /** @@ -32,7 +33,7 @@ public FailedUnregisteredTask(URL url, FailbackRegistry registry) { } @Override - protected void doRetry(URL url, FailbackRegistry registry) { + protected void doRetry(URL url, FailbackRegistry registry, Timeout timeout) { registry.doUnregister(url); registry.removeFailedUnregisteredTask(url); } diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnsubscribedTask.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnsubscribedTask.java index 8672c665080..6814583df27 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnsubscribedTask.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnsubscribedTask.java @@ -18,6 +18,7 @@ package org.apache.dubbo.registry.retry; import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.timer.Timeout; import org.apache.dubbo.registry.NotifyListener; import org.apache.dubbo.registry.support.FailbackRegistry; @@ -39,7 +40,7 @@ public FailedUnsubscribedTask(URL url, FailbackRegistry registry, NotifyListener } @Override - protected void doRetry(URL url, FailbackRegistry registry) { + protected void doRetry(URL url, FailbackRegistry registry, Timeout timeout) { registry.unsubscribe(url, listener); registry.removeFailedUnsubscribedTask(url, listener); } diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/FailbackRegistry.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/FailbackRegistry.java index ca85958fa26..d080ecd44b6 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/FailbackRegistry.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/FailbackRegistry.java @@ -92,8 +92,12 @@ public void removeFailedNotifiedTask(URL url, NotifyListener listener) { } private void addFailedRegistered(URL url) { + FailedRegisteredTask oldOne = failedRegistered.get(url); + if (oldOne != null) { + return; + } FailedRegisteredTask newTask = new FailedRegisteredTask(url, this); - FailedRegisteredTask oldOne = failedRegistered.putIfAbsent(url, newTask); + oldOne = failedRegistered.putIfAbsent(url, newTask); if (oldOne == null) { // never has a retry task. then start a new task for retry. retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS); @@ -108,8 +112,12 @@ private void removeFailedRegistered(URL url) { } private void addFailedUnregistered(URL url) { + FailedUnregisteredTask oldOne = failedUnregistered.get(url); + if (oldOne != null) { + return; + } FailedUnregisteredTask newTask = new FailedUnregisteredTask(url, this); - FailedUnregisteredTask oldOne = failedUnregistered.putIfAbsent(url, newTask); + oldOne = failedUnregistered.putIfAbsent(url, newTask); if (oldOne == null) { // never has a retry task. then start a new task for retry. retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS); @@ -125,9 +133,13 @@ private void removeFailedUnregistered(URL url) { private void addFailedSubscribed(URL url, NotifyListener listener) { Holder h = new Holder(url, listener); + FailedSubscribedTask oldOne = failedSubscribed.get(h); + if (oldOne != null) { + return; + } FailedSubscribedTask newTask = new FailedSubscribedTask(url, this, listener); - FailedSubscribedTask f = failedSubscribed.putIfAbsent(h, newTask); - if (f == null) { + oldOne = failedSubscribed.putIfAbsent(h, newTask); + if (oldOne == null) { // never has a retry task. then start a new task for retry. retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS); } @@ -145,9 +157,13 @@ private void removeFailedSubscribed(URL url, NotifyListener listener) { private void addFailedUnsubscribed(URL url, NotifyListener listener) { Holder h = new Holder(url, listener); + FailedUnsubscribedTask oldOne = failedUnsubscribed.get(h); + if (oldOne != null) { + return; + } FailedUnsubscribedTask newTask = new FailedUnsubscribedTask(url, this, listener); - FailedUnsubscribedTask f = failedUnsubscribed.putIfAbsent(h, newTask); - if (f == null) { + oldOne = failedUnsubscribed.putIfAbsent(h, newTask); + if (oldOne == null) { // never has a retry task. then start a new task for retry. retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS); } From b66eafa226e9535a7fc16fa303aa37121b8584cb Mon Sep 17 00:00:00 2001 From: xiuyuhang <442367943@qq.com> Date: Tue, 13 Nov 2018 10:08:04 +0800 Subject: [PATCH 09/11] Optimize notify logic, just notify when the urls is not empty. --- .../org/apache/dubbo/registry/retry/FailedNotifiedTask.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedNotifiedTask.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedNotifiedTask.java index c241226f3a0..3d6d8db6a91 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedNotifiedTask.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedNotifiedTask.java @@ -58,8 +58,10 @@ public void removeRetryUrl(List urls) { @Override protected void doRetry(URL url, FailbackRegistry registry, Timeout timeout) { - listener.notify(urls); - urls.clear(); + if (!CollectionUtils.isEmpty(urls)) { + listener.notify(urls); + urls.clear(); + } reput(timeout, retryPeriod); } } From e55f9a33bf025cea3e5bd5fdcd3924a552668cff Mon Sep 17 00:00:00 2001 From: xiuyuhang <442367943@qq.com> Date: Tue, 13 Nov 2018 10:08:16 +0800 Subject: [PATCH 10/11] Optimize notify logic, just notify when the urls is not empty. --- .../org/apache/dubbo/registry/retry/FailedNotifiedTask.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedNotifiedTask.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedNotifiedTask.java index 3d6d8db6a91..9ccb6730142 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedNotifiedTask.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedNotifiedTask.java @@ -58,7 +58,7 @@ public void removeRetryUrl(List urls) { @Override protected void doRetry(URL url, FailbackRegistry registry, Timeout timeout) { - if (!CollectionUtils.isEmpty(urls)) { + if (CollectionUtils.isNotEmpty(urls)) { listener.notify(urls); urls.clear(); } From 1c704be4f118380df3b5037334da2b14a2fc51b7 Mon Sep 17 00:00:00 2001 From: xiuyuhang <442367943@qq.com> Date: Tue, 13 Nov 2018 14:15:50 +0800 Subject: [PATCH 11/11] Optimize timer that use daemon thread. --- .../org/apache/dubbo/registry/support/FailbackRegistry.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/FailbackRegistry.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/FailbackRegistry.java index d080ecd44b6..5d2d2f041f8 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/FailbackRegistry.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/FailbackRegistry.java @@ -19,6 +19,7 @@ import org.apache.dubbo.common.Constants; import org.apache.dubbo.common.URL; import org.apache.dubbo.common.timer.HashedWheelTimer; +import org.apache.dubbo.common.utils.NamedThreadFactory; import org.apache.dubbo.registry.NotifyListener; import org.apache.dubbo.registry.retry.FailedNotifiedTask; import org.apache.dubbo.registry.retry.FailedRegisteredTask; @@ -65,7 +66,7 @@ public FailbackRegistry(URL url) { this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD); // since the retry task will not be very much. 128 ticks is enough. - retryTimer = new HashedWheelTimer(retryPeriod, TimeUnit.MILLISECONDS, 128); + retryTimer = new HashedWheelTimer(new NamedThreadFactory("DubboRegistryRetryTimer", true), retryPeriod, TimeUnit.MILLISECONDS, 128); } public void removeFailedRegisteredTask(URL url) {