Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize retry for FailbackRegistry. #2763

Merged
merged 11 commits into from
Dec 11, 2018
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package org.apache.dubbo.registry.retry;

import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.timer.Timeout;
import org.apache.dubbo.common.timer.Timer;
import org.apache.dubbo.common.timer.TimerTask;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.registry.support.FailbackRegistry;

import java.util.concurrent.TimeUnit;

/**
* AbstractRetryTask
*/
public abstract class AbstractRetryTask implements TimerTask {

private final Logger logger = LoggerFactory.getLogger(getClass());

/**
* url for retry task
*/
protected final URL url;

/**
* registry for this task
*/
protected final FailbackRegistry registry;

/**
* retry period
*/
private final long retryPeriod;

/**
* task name for this task
*/
private final String taskName;

AbstractRetryTask(URL url, FailbackRegistry registry, String taskName) {
if (url == null || registry == null || StringUtils.isBlank(taskName)) {
throw new IllegalArgumentException();
}
this.url = url;
this.registry = registry;
this.taskName = taskName;
this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
}

private void reput(Timeout timeout, Long tick) {
if (timeout == null || tick == null) {
throw new IllegalArgumentException();
}

Timer timer = timeout.timer();
if (timer.isStop() || timeout.isCancelled()) {
return;
}

timer.newTimeout(timeout.task(), tick, TimeUnit.MILLISECONDS);
}

@Override
public void run(Timeout timeout) throws Exception {
beiwei30 marked this conversation as resolved.
Show resolved Hide resolved
if (timeout.isCancelled()) {
// other thread cancel this timeout.
return;
}
if (logger.isInfoEnabled()) {
beiwei30 marked this conversation as resolved.
Show resolved Hide resolved
logger.info(taskName + " : " + url);
}
try {
doRetry(url, registry);
} catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
logger.warn("Failed to " + taskName + url + ", waiting for again, cause:" + t.getMessage(), t);
beiwei30 marked this conversation as resolved.
Show resolved Hide resolved
// reput this task when catch exception.
reput(timeout, retryPeriod);
}
}

protected abstract void doRetry(URL url, FailbackRegistry registry);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.apache.dubbo.registry.retry;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.registry.support.FailbackRegistry;

/**
* FailedRegisteredTask
*/
public final class FailedRegisteredTask extends AbstractRetryTask {

private static final String NAME = "retry register";

public FailedRegisteredTask(URL url, FailbackRegistry registry) {
super(url, registry, NAME);
}

@Override
protected void doRetry(URL url, FailbackRegistry registry) {
registry.doRegister(url);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package org.apache.dubbo.registry.retry;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.support.FailbackRegistry;

/**
* FailedSubscribedTask
*/
public final class FailedSubscribedTask extends AbstractRetryTask {

private static final String NAME = "retry subscribe";

private final NotifyListener listener;

FailedSubscribedTask(URL url, FailbackRegistry registry, NotifyListener listener) {
super(url, registry, NAME);
if (listener == null) {
throw new IllegalArgumentException();
}
this.listener = listener;
}

@Override
protected void doRetry(URL url, FailbackRegistry registry) {
registry.doSubscribe(url, listener);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package org.apache.dubbo.registry.retry;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.registry.support.FailbackRegistry;

/**
* FailedUnregisteredTask
*
* @author xiuyuhang [xiuyuhang]
* @since 2018-11-08
*/
public final class FailedUnregisteredTask extends AbstractRetryTask {

private static final String NAME = "retry unregister";

FailedUnregisteredTask(URL url, FailbackRegistry registry) {
super(url, registry, NAME);
}

@Override
protected void doRetry(URL url, FailbackRegistry registry) {
registry.doUnregister(url);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package org.apache.dubbo.registry.retry;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.support.FailbackRegistry;

/**
* FailedUnsubscribedTask
*
* @author xiuyuhang [xiuyuhang]
* @since 2018-11-08
*/
public final class FailedUnsubscribedTask extends AbstractRetryTask {

private static final String NAME = "retry register";

private final NotifyListener listener;

FailedUnsubscribedTask(URL url, FailbackRegistry registry, NotifyListener listener) {
super(url, registry, NAME);
this.listener = listener;
}

@Override
protected void doRetry(URL url, FailbackRegistry registry) {
registry.unsubscribe(url, listener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -451,12 +451,12 @@ public void destroy() {

// ==== Template method ====

protected abstract void doRegister(URL url);
public abstract void doRegister(URL url);

protected abstract void doUnregister(URL url);
public abstract void doUnregister(URL url);

protected abstract void doSubscribe(URL url, NotifyListener listener);
public abstract void doSubscribe(URL url, NotifyListener listener);

protected abstract void doUnsubscribe(URL url, NotifyListener listener);
public abstract void doUnsubscribe(URL url, NotifyListener listener);

}
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ public void setBad(boolean bad) {
}

@Override
protected void doRegister(URL url) {
public void doRegister(URL url) {
if (bad) {
throw new RuntimeException("can not invoke!");
}
Expand All @@ -234,7 +234,7 @@ protected void doRegister(URL url) {
}

@Override
protected void doUnregister(URL url) {
public void doUnregister(URL url) {
if (bad) {
throw new RuntimeException("can not invoke!");
}
Expand All @@ -244,7 +244,7 @@ protected void doUnregister(URL url) {
}

@Override
protected void doSubscribe(URL url, NotifyListener listener) {
public void doSubscribe(URL url, NotifyListener listener) {
if (bad) {
throw new RuntimeException("can not invoke!");
}
Expand All @@ -254,7 +254,7 @@ protected void doSubscribe(URL url, NotifyListener listener) {
}

@Override
protected void doUnsubscribe(URL url, NotifyListener listener) {
public void doUnsubscribe(URL url, NotifyListener listener) {
if (bad) {
throw new RuntimeException("can not invoke!");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,22 +138,22 @@ public void destroy() {
}

@Override
protected void doRegister(URL url) {
public void doRegister(URL url) {
registryService.register(url);
}

@Override
protected void doUnregister(URL url) {
public void doUnregister(URL url) {
registryService.unregister(url);
}

@Override
protected void doSubscribe(URL url, NotifyListener listener) {
public void doSubscribe(URL url, NotifyListener listener) {
registryService.subscribe(url, listener);
}

@Override
protected void doUnsubscribe(URL url, NotifyListener listener) {
public void doUnsubscribe(URL url, NotifyListener listener) {
registryService.unsubscribe(url, listener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,17 +249,17 @@ private void unicast(String msg, String host) {
}

@Override
protected void doRegister(URL url) {
public void doRegister(URL url) {
multicast(Constants.REGISTER + " " + url.toFullString());
}

@Override
protected void doUnregister(URL url) {
public void doUnregister(URL url) {
multicast(Constants.UNREGISTER + " " + url.toFullString());
}

@Override
protected void doSubscribe(URL url, NotifyListener listener) {
public void doSubscribe(URL url, NotifyListener listener) {
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
admin = true;
}
Expand All @@ -273,7 +273,7 @@ protected void doSubscribe(URL url, NotifyListener listener) {
}

@Override
protected void doUnsubscribe(URL url, NotifyListener listener) {
public void doUnsubscribe(URL url, NotifyListener listener) {
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
unregister(url);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void destroy() {
}

@Override
protected void doRegister(URL url) {
public void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
Expand All @@ -118,7 +118,7 @@ protected void doRegister(URL url) {
}

@Override
protected void doUnregister(URL url) {
public void doUnregister(URL url) {
try {
zkClient.delete(toUrlPath(url));
} catch (Throwable e) {
Expand All @@ -127,7 +127,7 @@ protected void doUnregister(URL url) {
}

@Override
protected void doSubscribe(final URL url, final NotifyListener listener) {
public void doSubscribe(final URL url, final NotifyListener listener) {
try {
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
Expand Down Expand Up @@ -195,7 +195,7 @@ public void childChanged(String parentPath, List<String> currentChilds) {
}

@Override
protected void doUnsubscribe(URL url, NotifyListener listener) {
public void doUnsubscribe(URL url, NotifyListener listener) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners != null) {
ChildListener zkListener = listeners.get(listener);
Expand Down