From 9d4b6cbaae3adce765b0b1dba6a50a92e504c7b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=97=B6=E6=97=A0=E4=B8=A4=E4=B8=B6?= <442367943@qq.com> Date: Tue, 11 Dec 2018 18:00:30 +0800 Subject: [PATCH] Optimize retry for FailbackRegistry. (#2763) * Abstract retry task * Task for retry. * Fix sth. * Finish Optimize. fix ci failed. * Optimize retry for FailbackRegistry. The retry operation splits into specific operations, such as subscriptions and registrations. This approach allows for very precise retry control. * Optimize retry for FailbackRegistry. The retry operation splits into specific operations, such as subscriptions and registrations. This approach allows for very precise retry control. * Optimize logger warn's msg. * Optimize FailedNotifiedTask's run method. Optimize addXXXTask, directly return if we already have a retry task. * Optimize notify logic, just notify when the urls is not empty. * Optimize notify logic, just notify when the urls is not empty. * Optimize timer that use daemon thread. --- .../registry/retry/AbstractRetryTask.java | 112 +++++ .../registry/retry/FailedNotifiedTask.java | 67 +++ .../registry/retry/FailedRegisteredTask.java | 40 ++ .../registry/retry/FailedSubscribedTask.java | 47 ++ .../retry/FailedUnregisteredTask.java | 40 ++ .../retry/FailedUnsubscribedTask.java | 47 ++ .../registry/support/FailbackRegistry.java | 404 ++++++++---------- .../support/FailbackRegistryTest.java | 33 +- .../dubbo/registry/dubbo/DubboRegistry.java | 8 +- .../registry/multicast/MulticastRegistry.java | 8 +- .../registry/zookeeper/ZookeeperRegistry.java | 8 +- 11 files changed, 567 insertions(+), 247 deletions(-) create mode 100644 dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/AbstractRetryTask.java create mode 100644 dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedNotifiedTask.java create mode 100644 dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedRegisteredTask.java create mode 100644 dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedSubscribedTask.java create mode 100644 dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnregisteredTask.java create mode 100644 dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnsubscribedTask.java diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/AbstractRetryTask.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/AbstractRetryTask.java new file mode 100644 index 00000000000..b299b2fd7e5 --- /dev/null +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/AbstractRetryTask.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dubbo.registry.retry; + +import org.apache.dubbo.common.Constants; +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.logger.Logger; +import org.apache.dubbo.common.logger.LoggerFactory; +import org.apache.dubbo.common.timer.Timeout; +import org.apache.dubbo.common.timer.Timer; +import org.apache.dubbo.common.timer.TimerTask; +import org.apache.dubbo.common.utils.StringUtils; +import org.apache.dubbo.registry.support.FailbackRegistry; + +import java.util.concurrent.TimeUnit; + +/** + * AbstractRetryTask + */ +public abstract class AbstractRetryTask implements TimerTask { + + protected final Logger logger = LoggerFactory.getLogger(getClass()); + + /** + * url for retry task + */ + protected final URL url; + + /** + * registry for this task + */ + protected final FailbackRegistry registry; + + /** + * retry period + */ + protected final long retryPeriod; + + /** + * task name for this task + */ + protected final String taskName; + + private volatile boolean cancel; + + AbstractRetryTask(URL url, FailbackRegistry registry, String taskName) { + if (url == null || StringUtils.isBlank(taskName)) { + throw new IllegalArgumentException(); + } + this.url = url; + this.registry = registry; + this.taskName = taskName; + cancel = false; + this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD); + } + + public void cancel() { + cancel = true; + } + + public boolean isCancel() { + return cancel; + } + + protected void reput(Timeout timeout, long tick) { + if (timeout == null) { + throw new IllegalArgumentException(); + } + + Timer timer = timeout.timer(); + if (timer.isStop() || timeout.isCancelled() || isCancel()) { + return; + } + + timer.newTimeout(timeout.task(), tick, TimeUnit.MILLISECONDS); + } + + @Override + public void run(Timeout timeout) throws Exception { + if (timeout.isCancelled() || timeout.timer().isStop() || isCancel()) { + // other thread cancel this timeout or stop the timer. + return; + } + if (logger.isInfoEnabled()) { + logger.info(taskName + " : " + url); + } + try { + doRetry(url, registry, timeout); + } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry + logger.warn("Failed to execute task " + taskName + ", url: " + url + ", waiting for again, cause:" + t.getMessage(), t); + // reput this task when catch exception. + reput(timeout, retryPeriod); + } + } + + protected abstract void doRetry(URL url, FailbackRegistry registry, Timeout timeout); +} diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedNotifiedTask.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedNotifiedTask.java new file mode 100644 index 00000000000..9ccb6730142 --- /dev/null +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedNotifiedTask.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dubbo.registry.retry; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.timer.Timeout; +import org.apache.dubbo.common.utils.CollectionUtils; +import org.apache.dubbo.registry.NotifyListener; +import org.apache.dubbo.registry.support.FailbackRegistry; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +/** + * FailedNotifiedTask + */ +public final class FailedNotifiedTask extends AbstractRetryTask { + + private static final String NAME = "retry subscribe"; + + private final NotifyListener listener; + + private final List urls = new CopyOnWriteArrayList<>(); + + public FailedNotifiedTask(URL url, NotifyListener listener) { + super(url, null, NAME); + if (listener == null) { + throw new IllegalArgumentException(); + } + this.listener = listener; + } + + public void addUrlToRetry(List urls) { + if (CollectionUtils.isEmpty(urls)) { + return; + } + this.urls.addAll(urls); + } + + public void removeRetryUrl(List urls) { + this.urls.removeAll(urls); + } + + @Override + protected void doRetry(URL url, FailbackRegistry registry, Timeout timeout) { + if (CollectionUtils.isNotEmpty(urls)) { + listener.notify(urls); + urls.clear(); + } + reput(timeout, retryPeriod); + } +} diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedRegisteredTask.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedRegisteredTask.java new file mode 100644 index 00000000000..c4d9cc6f08c --- /dev/null +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedRegisteredTask.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dubbo.registry.retry; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.timer.Timeout; +import org.apache.dubbo.registry.support.FailbackRegistry; + +/** + * FailedRegisteredTask + */ +public final class FailedRegisteredTask extends AbstractRetryTask { + + private static final String NAME = "retry register"; + + public FailedRegisteredTask(URL url, FailbackRegistry registry) { + super(url, registry, NAME); + } + + @Override + protected void doRetry(URL url, FailbackRegistry registry, Timeout timeout) { + registry.doRegister(url); + registry.removeFailedRegisteredTask(url); + } +} diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedSubscribedTask.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedSubscribedTask.java new file mode 100644 index 00000000000..06d1ec3151b --- /dev/null +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedSubscribedTask.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dubbo.registry.retry; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.timer.Timeout; +import org.apache.dubbo.registry.NotifyListener; +import org.apache.dubbo.registry.support.FailbackRegistry; + +/** + * FailedSubscribedTask + */ +public final class FailedSubscribedTask extends AbstractRetryTask { + + private static final String NAME = "retry subscribe"; + + private final NotifyListener listener; + + public FailedSubscribedTask(URL url, FailbackRegistry registry, NotifyListener listener) { + super(url, registry, NAME); + if (listener == null) { + throw new IllegalArgumentException(); + } + this.listener = listener; + } + + @Override + protected void doRetry(URL url, FailbackRegistry registry, Timeout timeout) { + registry.doSubscribe(url, listener); + registry.removeFailedSubscribedTask(url, listener); + } +} diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnregisteredTask.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnregisteredTask.java new file mode 100644 index 00000000000..4cf3aa4ebf2 --- /dev/null +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnregisteredTask.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dubbo.registry.retry; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.timer.Timeout; +import org.apache.dubbo.registry.support.FailbackRegistry; + +/** + * FailedUnregisteredTask + */ +public final class FailedUnregisteredTask extends AbstractRetryTask { + + private static final String NAME = "retry unregister"; + + public FailedUnregisteredTask(URL url, FailbackRegistry registry) { + super(url, registry, NAME); + } + + @Override + protected void doRetry(URL url, FailbackRegistry registry, Timeout timeout) { + registry.doUnregister(url); + registry.removeFailedUnregisteredTask(url); + } +} diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnsubscribedTask.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnsubscribedTask.java new file mode 100644 index 00000000000..6814583df27 --- /dev/null +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnsubscribedTask.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dubbo.registry.retry; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.timer.Timeout; +import org.apache.dubbo.registry.NotifyListener; +import org.apache.dubbo.registry.support.FailbackRegistry; + +/** + * FailedUnsubscribedTask + */ +public final class FailedUnsubscribedTask extends AbstractRetryTask { + + private static final String NAME = "retry unsubscribe"; + + private final NotifyListener listener; + + public FailedUnsubscribedTask(URL url, FailbackRegistry registry, NotifyListener listener) { + super(url, registry, NAME); + if (listener == null) { + throw new IllegalArgumentException(); + } + this.listener = listener; + } + + @Override + protected void doRetry(URL url, FailbackRegistry registry, Timeout timeout) { + registry.unsubscribe(url, listener); + registry.removeFailedUnsubscribedTask(url, listener); + } +} diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/FailbackRegistry.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/FailbackRegistry.java index 5c456d5287f..5d2d2f041f8 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/FailbackRegistry.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/FailbackRegistry.java @@ -18,10 +18,14 @@ import org.apache.dubbo.common.Constants; import org.apache.dubbo.common.URL; -import org.apache.dubbo.common.utils.ConcurrentHashSet; -import org.apache.dubbo.common.utils.ExecutorUtil; +import org.apache.dubbo.common.timer.HashedWheelTimer; import org.apache.dubbo.common.utils.NamedThreadFactory; import org.apache.dubbo.registry.NotifyListener; +import org.apache.dubbo.registry.retry.FailedNotifiedTask; +import org.apache.dubbo.registry.retry.FailedRegisteredTask; +import org.apache.dubbo.registry.retry.FailedSubscribedTask; +import org.apache.dubbo.registry.retry.FailedUnregisteredTask; +import org.apache.dubbo.registry.retry.FailedUnsubscribedTask; import java.util.HashMap; import java.util.HashSet; @@ -30,108 +34,197 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; /** * FailbackRegistry. (SPI, Prototype, ThreadSafe) - * */ public abstract class FailbackRegistry extends AbstractRegistry { - // Scheduled executor service - private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryFailedRetryTimer", true)); - - // Timer for failure retry, regular check if there is a request for failure, and if there is, an unlimited retry - private final ScheduledFuture retryFuture; + /* retry task map */ - private final Set failedRegistered = new ConcurrentHashSet(); + private final ConcurrentMap failedRegistered = new ConcurrentHashMap(); - private final Set failedUnregistered = new ConcurrentHashSet(); + private final ConcurrentMap failedUnregistered = new ConcurrentHashMap(); - private final ConcurrentMap> failedSubscribed = new ConcurrentHashMap>(); + private final ConcurrentMap failedSubscribed = new ConcurrentHashMap(); - private final ConcurrentMap> failedUnsubscribed = new ConcurrentHashMap>(); + private final ConcurrentMap failedUnsubscribed = new ConcurrentHashMap(); - private final ConcurrentMap>> failedNotified = new ConcurrentHashMap>>(); + private final ConcurrentMap failedNotified = new ConcurrentHashMap(); /** * The time in milliseconds the retryExecutor will wait */ private final int retryPeriod; + // Timer for failure retry, regular check if there is a request for failure, and if there is, an unlimited retry + private final HashedWheelTimer retryTimer; + public FailbackRegistry(URL url) { super(url); this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD); - this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() { - @Override - public void run() { - // Check and connect to the registry - try { - retry(); - } catch (Throwable t) { // Defensive fault tolerance - logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t); - } - } - }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS); + + // since the retry task will not be very much. 128 ticks is enough. + retryTimer = new HashedWheelTimer(new NamedThreadFactory("DubboRegistryRetryTimer", true), retryPeriod, TimeUnit.MILLISECONDS, 128); } - public Future getRetryFuture() { - return retryFuture; + public void removeFailedRegisteredTask(URL url) { + failedRegistered.remove(url); } - public Set getFailedRegistered() { - return failedRegistered; + public void removeFailedUnregisteredTask(URL url) { + failedUnregistered.remove(url); } - public Set getFailedUnregistered() { - return failedUnregistered; + public void removeFailedSubscribedTask(URL url, NotifyListener listener) { + Holder h = new Holder(url, listener); + failedSubscribed.remove(h); } - public Map> getFailedSubscribed() { - return failedSubscribed; + public void removeFailedUnsubscribedTask(URL url, NotifyListener listener) { + Holder h = new Holder(url, listener); + failedUnsubscribed.remove(h); } - public Map> getFailedUnsubscribed() { - return failedUnsubscribed; + public void removeFailedNotifiedTask(URL url, NotifyListener listener) { + Holder h = new Holder(url, listener); + failedNotified.remove(h); } - public Map>> getFailedNotified() { - return failedNotified; + private void addFailedRegistered(URL url) { + FailedRegisteredTask oldOne = failedRegistered.get(url); + if (oldOne != null) { + return; + } + FailedRegisteredTask newTask = new FailedRegisteredTask(url, this); + oldOne = failedRegistered.putIfAbsent(url, newTask); + if (oldOne == null) { + // never has a retry task. then start a new task for retry. + retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS); + } + } + + private void removeFailedRegistered(URL url) { + FailedRegisteredTask f = failedRegistered.remove(url); + if (f != null) { + f.cancel(); + } + } + + private void addFailedUnregistered(URL url) { + FailedUnregisteredTask oldOne = failedUnregistered.get(url); + if (oldOne != null) { + return; + } + FailedUnregisteredTask newTask = new FailedUnregisteredTask(url, this); + oldOne = failedUnregistered.putIfAbsent(url, newTask); + if (oldOne == null) { + // never has a retry task. then start a new task for retry. + retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS); + } + } + + private void removeFailedUnregistered(URL url) { + FailedUnregisteredTask f = failedUnregistered.remove(url); + if (f != null) { + f.cancel(); + } } private void addFailedSubscribed(URL url, NotifyListener listener) { - Set listeners = failedSubscribed.get(url); - if (listeners == null) { - failedSubscribed.putIfAbsent(url, new ConcurrentHashSet()); - listeners = failedSubscribed.get(url); + Holder h = new Holder(url, listener); + FailedSubscribedTask oldOne = failedSubscribed.get(h); + if (oldOne != null) { + return; + } + FailedSubscribedTask newTask = new FailedSubscribedTask(url, this, listener); + oldOne = failedSubscribed.putIfAbsent(h, newTask); + if (oldOne == null) { + // never has a retry task. then start a new task for retry. + retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS); } - listeners.add(listener); } private void removeFailedSubscribed(URL url, NotifyListener listener) { - Set listeners = failedSubscribed.get(url); - if (listeners != null) { - listeners.remove(listener); + Holder h = new Holder(url, listener); + FailedSubscribedTask f = failedSubscribed.remove(h); + if (f != null) { + f.cancel(); + } + removeFailedUnsubscribed(url, listener); + removeFailedNotified(url, listener); + } + + private void addFailedUnsubscribed(URL url, NotifyListener listener) { + Holder h = new Holder(url, listener); + FailedUnsubscribedTask oldOne = failedUnsubscribed.get(h); + if (oldOne != null) { + return; + } + FailedUnsubscribedTask newTask = new FailedUnsubscribedTask(url, this, listener); + oldOne = failedUnsubscribed.putIfAbsent(h, newTask); + if (oldOne == null) { + // never has a retry task. then start a new task for retry. + retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS); + } + } + + private void removeFailedUnsubscribed(URL url, NotifyListener listener) { + Holder h = new Holder(url, listener); + FailedUnsubscribedTask f = failedUnsubscribed.remove(h); + if (f != null) { + f.cancel(); } - listeners = failedUnsubscribed.get(url); - if (listeners != null) { - listeners.remove(listener); + } + + private void addFailedNotified(URL url, NotifyListener listener, List urls) { + Holder h = new Holder(url, listener); + FailedNotifiedTask newTask = new FailedNotifiedTask(url, listener); + FailedNotifiedTask f = failedNotified.putIfAbsent(h, newTask); + if (f == null) { + // never has a retry task. then start a new task for retry. + newTask.addUrlToRetry(urls); + retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS); + } else { + // just add urls which needs retry. + newTask.addUrlToRetry(urls); } - Map> notified = failedNotified.get(url); - if (notified != null) { - notified.remove(listener); + } + + private void removeFailedNotified(URL url, NotifyListener listener) { + Holder h = new Holder(url, listener); + FailedNotifiedTask f = failedNotified.remove(h); + if (f != null) { + f.cancel(); } } + public ConcurrentMap getFailedRegistered() { + return failedRegistered; + } + + public ConcurrentMap getFailedUnregistered() { + return failedUnregistered; + } + + public ConcurrentMap getFailedSubscribed() { + return failedSubscribed; + } + + public ConcurrentMap getFailedUnsubscribed() { + return failedUnsubscribed; + } + + public ConcurrentMap getFailedNotified() { + return failedNotified; + } + @Override public void register(URL url) { super.register(url); - failedRegistered.remove(url); - failedUnregistered.remove(url); + removeFailedRegistered(url); + removeFailedUnregistered(url); try { // Sending a registration request to the server side doRegister(url); @@ -153,15 +246,15 @@ public void register(URL url) { } // Record a failed registration request to a failed list, retry regularly - failedRegistered.add(url); + addFailedRegistered(url); } } @Override public void unregister(URL url) { super.unregister(url); - failedRegistered.remove(url); - failedUnregistered.remove(url); + removeFailedRegistered(url); + removeFailedUnregistered(url); try { // Sending a cancellation request to the server side doUnregister(url); @@ -183,7 +276,7 @@ public void unregister(URL url) { } // Record a failed registration request to a failed list, retry regularly - failedUnregistered.add(url); + addFailedUnregistered(url); } } @@ -245,12 +338,7 @@ public void unsubscribe(URL url, NotifyListener listener) { } // Record a failed registration request to a failed list, retry regularly - Set listeners = failedUnsubscribed.get(url); - if (listeners == null) { - failedUnsubscribed.putIfAbsent(url, new ConcurrentHashSet()); - listeners = failedUnsubscribed.get(url); - } - listeners.add(listener); + addFailedUnsubscribed(url, listener); } } @@ -266,12 +354,7 @@ protected void notify(URL url, NotifyListener listener, List urls) { doNotify(url, listener, urls); } catch (Exception t) { // Record a failed registration request to a failed list, retry regularly - Map> listeners = failedNotified.get(url); - if (listeners == null) { - failedNotified.putIfAbsent(url, new ConcurrentHashMap>()); - listeners = failedNotified.get(url); - } - listeners.put(listener, urls); + addFailedNotified(url, listener, urls); logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t); } } @@ -289,7 +372,7 @@ protected void recover() throws Exception { logger.info("Recover register url " + recoverRegistered); } for (URL url : recoverRegistered) { - failedRegistered.add(url); + addFailedRegistered(url); } } // subscribe @@ -307,156 +390,49 @@ protected void recover() throws Exception { } } - // Retry the failed actions - protected void retry() { - if (!failedRegistered.isEmpty()) { - Set failed = new HashSet(failedRegistered); - if (failed.size() > 0) { - if (logger.isInfoEnabled()) { - logger.info("Retry register " + failed); - } - try { - for (URL url : failed) { - try { - doRegister(url); - failedRegistered.remove(url); - } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry - logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t); - } - } - } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry - logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t); - } - } - } - if (!failedUnregistered.isEmpty()) { - Set failed = new HashSet(failedUnregistered); - if (!failed.isEmpty()) { - if (logger.isInfoEnabled()) { - logger.info("Retry unregister " + failed); - } - try { - for (URL url : failed) { - try { - doUnregister(url); - failedUnregistered.remove(url); - } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry - logger.warn("Failed to retry unregister " + failed + ", waiting for again, cause: " + t.getMessage(), t); - } - } - } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry - logger.warn("Failed to retry unregister " + failed + ", waiting for again, cause: " + t.getMessage(), t); - } - } - } - if (!failedSubscribed.isEmpty()) { - Map> failed = new HashMap>(failedSubscribed); - for (Map.Entry> entry : new HashMap>(failed).entrySet()) { - if (entry.getValue() == null || entry.getValue().size() == 0) { - failed.remove(entry.getKey()); - } - } - if (failed.size() > 0) { - if (logger.isInfoEnabled()) { - logger.info("Retry subscribe " + failed); - } - try { - for (Map.Entry> entry : failed.entrySet()) { - URL url = entry.getKey(); - Set listeners = entry.getValue(); - for (NotifyListener listener : listeners) { - try { - doSubscribe(url, listener); - listeners.remove(listener); - } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry - logger.warn("Failed to retry subscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t); - } - } - } - } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry - logger.warn("Failed to retry subscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t); - } - } - } - if (!failedUnsubscribed.isEmpty()) { - Map> failed = new HashMap>(failedUnsubscribed); - for (Map.Entry> entry : new HashMap>(failed).entrySet()) { - if (entry.getValue() == null || entry.getValue().isEmpty()) { - failed.remove(entry.getKey()); - } - } - if (failed.size() > 0) { - if (logger.isInfoEnabled()) { - logger.info("Retry unsubscribe " + failed); - } - try { - for (Map.Entry> entry : failed.entrySet()) { - URL url = entry.getKey(); - Set listeners = entry.getValue(); - for (NotifyListener listener : listeners) { - try { - doUnsubscribe(url, listener); - listeners.remove(listener); - } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry - logger.warn("Failed to retry unsubscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t); - } - } - } - } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry - logger.warn("Failed to retry unsubscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t); - } - } - } - if (!failedNotified.isEmpty()) { - Map>> failed = new HashMap>>(failedNotified); - for (Map.Entry>> entry : new HashMap>>(failed).entrySet()) { - if (entry.getValue() == null || entry.getValue().size() == 0) { - failed.remove(entry.getKey()); - } - } - if (failed.size() > 0) { - if (logger.isInfoEnabled()) { - logger.info("Retry notify " + failed); - } - try { - for (Map> values : failed.values()) { - for (Map.Entry> entry : values.entrySet()) { - try { - NotifyListener listener = entry.getKey(); - List urls = entry.getValue(); - listener.notify(urls); - values.remove(listener); - } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry - logger.warn("Failed to retry notify " + failed + ", waiting for again, cause: " + t.getMessage(), t); - } - } - } - } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry - logger.warn("Failed to retry notify " + failed + ", waiting for again, cause: " + t.getMessage(), t); - } - } - } - } - @Override public void destroy() { super.destroy(); - try { - retryFuture.cancel(true); - } catch (Throwable t) { - logger.warn(t.getMessage(), t); - } - ExecutorUtil.gracefulShutdown(retryExecutor, retryPeriod); + retryTimer.stop(); } // ==== Template method ==== - protected abstract void doRegister(URL url); + public abstract void doRegister(URL url); + + public abstract void doUnregister(URL url); + + public abstract void doSubscribe(URL url, NotifyListener listener); + + public abstract void doUnsubscribe(URL url, NotifyListener listener); - protected abstract void doUnregister(URL url); + static class Holder { - protected abstract void doSubscribe(URL url, NotifyListener listener); + private final URL url; - protected abstract void doUnsubscribe(URL url, NotifyListener listener); + private final NotifyListener notifyListener; + + Holder(URL url, NotifyListener notifyListener) { + if (url == null || notifyListener == null) { + throw new IllegalArgumentException(); + } + this.url = url; + this.notifyListener = notifyListener; + } + @Override + public int hashCode() { + return url.hashCode() + notifyListener.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof Holder) { + Holder h = (Holder) obj; + return this.url.equals(h.url) && this.notifyListener.equals(h.notifyListener); + } else { + return false; + } + } + } } diff --git a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/support/FailbackRegistryTest.java b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/support/FailbackRegistryTest.java index 7f3ac3a6be0..6db670c66c8 100644 --- a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/support/FailbackRegistryTest.java +++ b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/support/FailbackRegistryTest.java @@ -20,7 +20,6 @@ import org.apache.dubbo.common.URL; import org.apache.dubbo.common.utils.CollectionUtils; import org.apache.dubbo.registry.NotifyListener; - import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -53,8 +52,7 @@ public void setUp() throws Exception { } /** - * Test method for - * {@link org.apache.dubbo.registry.support.FailbackRegistry#retry()}. + * Test method for retry * * @throws Exception */ @@ -62,7 +60,9 @@ public void setUp() throws Exception { public void testDoRetry() throws Exception { final AtomicReference notified = new AtomicReference(false); - final CountDownLatch latch = new CountDownLatch(3);//All of them are called 3 times. Successful attempts to reduce the failure of 1. subscribe register will not be done again + + // the latest latch just for 3. Because retry method has been removed. + final CountDownLatch latch = new CountDownLatch(2); NotifyListener listner = new NotifyListener() { @Override @@ -79,7 +79,7 @@ public void notify(List urls) { //Failure can not be called to listener. assertEquals(false, notified.get()); - assertEquals(3, latch.getCount()); + assertEquals(2, latch.getCount()); registry.setBad(false); @@ -191,7 +191,7 @@ public void notify(List urls) { notified.set(Boolean.TRUE); } }; - + MockRegistry mockRegistry = new MockRegistry(registryUrl, countDownLatch); mockRegistry.register(serviceUrl); mockRegistry.subscribe(serviceUrl, listener); @@ -200,7 +200,8 @@ public void notify(List urls) { mockRegistry.recover(); countDownLatch.await(); Assert.assertEquals(0, mockRegistry.getFailedRegistered().size()); - Assert.assertEquals(null, mockRegistry.getFailedSubscribed().get(registryUrl)); + FailbackRegistry.Holder h = new FailbackRegistry.Holder(registryUrl, listener); + Assert.assertEquals(null, mockRegistry.getFailedSubscribed().get(h)); Assert.assertEquals(countDownLatch.getCount(), 0); } @@ -224,7 +225,7 @@ public void setBad(boolean bad) { } @Override - protected void doRegister(URL url) { + public void doRegister(URL url) { if (bad) { throw new RuntimeException("can not invoke!"); } @@ -234,7 +235,7 @@ protected void doRegister(URL url) { } @Override - protected void doUnregister(URL url) { + public void doUnregister(URL url) { if (bad) { throw new RuntimeException("can not invoke!"); } @@ -244,7 +245,7 @@ protected void doUnregister(URL url) { } @Override - protected void doSubscribe(URL url, NotifyListener listener) { + public void doSubscribe(URL url, NotifyListener listener) { if (bad) { throw new RuntimeException("can not invoke!"); } @@ -254,7 +255,7 @@ protected void doSubscribe(URL url, NotifyListener listener) { } @Override - protected void doUnsubscribe(URL url, NotifyListener listener) { + public void doUnsubscribe(URL url, NotifyListener listener) { if (bad) { throw new RuntimeException("can not invoke!"); } @@ -262,16 +263,6 @@ protected void doUnsubscribe(URL url, NotifyListener listener) { latch.countDown(); } - @Override - protected void retry() { - super.retry(); - if (bad) { - throw new RuntimeException("can not invoke!"); - } - //System.out.println("do retry"); - latch.countDown(); - } - @Override public boolean isAvailable() { return true; diff --git a/dubbo-registry/dubbo-registry-default/src/main/java/org/apache/dubbo/registry/dubbo/DubboRegistry.java b/dubbo-registry/dubbo-registry-default/src/main/java/org/apache/dubbo/registry/dubbo/DubboRegistry.java index 45c94a73e48..2c6b8c1f1ec 100644 --- a/dubbo-registry/dubbo-registry-default/src/main/java/org/apache/dubbo/registry/dubbo/DubboRegistry.java +++ b/dubbo-registry/dubbo-registry-default/src/main/java/org/apache/dubbo/registry/dubbo/DubboRegistry.java @@ -138,22 +138,22 @@ public void destroy() { } @Override - protected void doRegister(URL url) { + public void doRegister(URL url) { registryService.register(url); } @Override - protected void doUnregister(URL url) { + public void doUnregister(URL url) { registryService.unregister(url); } @Override - protected void doSubscribe(URL url, NotifyListener listener) { + public void doSubscribe(URL url, NotifyListener listener) { registryService.subscribe(url, listener); } @Override - protected void doUnsubscribe(URL url, NotifyListener listener) { + public void doUnsubscribe(URL url, NotifyListener listener) { registryService.unsubscribe(url, listener); } diff --git a/dubbo-registry/dubbo-registry-multicast/src/main/java/org/apache/dubbo/registry/multicast/MulticastRegistry.java b/dubbo-registry/dubbo-registry-multicast/src/main/java/org/apache/dubbo/registry/multicast/MulticastRegistry.java index 8ff7daaea2d..06835dac6e5 100644 --- a/dubbo-registry/dubbo-registry-multicast/src/main/java/org/apache/dubbo/registry/multicast/MulticastRegistry.java +++ b/dubbo-registry/dubbo-registry-multicast/src/main/java/org/apache/dubbo/registry/multicast/MulticastRegistry.java @@ -249,17 +249,17 @@ private void unicast(String msg, String host) { } @Override - protected void doRegister(URL url) { + public void doRegister(URL url) { multicast(Constants.REGISTER + " " + url.toFullString()); } @Override - protected void doUnregister(URL url) { + public void doUnregister(URL url) { multicast(Constants.UNREGISTER + " " + url.toFullString()); } @Override - protected void doSubscribe(URL url, NotifyListener listener) { + public void doSubscribe(URL url, NotifyListener listener) { if (Constants.ANY_VALUE.equals(url.getServiceInterface())) { admin = true; } @@ -273,7 +273,7 @@ protected void doSubscribe(URL url, NotifyListener listener) { } @Override - protected void doUnsubscribe(URL url, NotifyListener listener) { + public void doUnsubscribe(URL url, NotifyListener listener) { if (!Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY, true)) { unregister(url); diff --git a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistry.java b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistry.java index 43426c7b9f6..d7a8ee097ac 100644 --- a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistry.java +++ b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistry.java @@ -109,7 +109,7 @@ public void destroy() { } @Override - protected void doRegister(URL url) { + public void doRegister(URL url) { try { zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true)); } catch (Throwable e) { @@ -118,7 +118,7 @@ protected void doRegister(URL url) { } @Override - protected void doUnregister(URL url) { + public void doUnregister(URL url) { try { zkClient.delete(toUrlPath(url)); } catch (Throwable e) { @@ -127,7 +127,7 @@ protected void doUnregister(URL url) { } @Override - protected void doSubscribe(final URL url, final NotifyListener listener) { + public void doSubscribe(final URL url, final NotifyListener listener) { try { if (Constants.ANY_VALUE.equals(url.getServiceInterface())) { String root = toRootPath(); @@ -195,7 +195,7 @@ public void childChanged(String parentPath, List currentChilds) { } @Override - protected void doUnsubscribe(URL url, NotifyListener listener) { + public void doUnsubscribe(URL url, NotifyListener listener) { ConcurrentMap listeners = zkListeners.get(url); if (listeners != null) { ChildListener zkListener = listeners.get(listener);