Skip to content

Commit

Permalink
Optimize retry for FailbackRegistry. (#2763)
Browse files Browse the repository at this point in the history
* Abstract retry task

* Task for retry.

* Fix sth.

* Finish Optimize. fix ci failed.

* Optimize retry for FailbackRegistry.
The retry operation splits into specific operations, such as subscriptions and registrations. This approach allows for very precise retry control.

* Optimize retry for FailbackRegistry.
The retry operation splits into specific operations, such as subscriptions and registrations. This approach allows for very precise retry control.

* Optimize logger warn's msg.

* Optimize FailedNotifiedTask's run method.
Optimize addXXXTask, directly return if we already have a retry task.

* Optimize notify logic, just notify when the urls is not empty.

* Optimize notify logic, just notify when the urls is not empty.

* Optimize timer that use daemon thread.
  • Loading branch information
carryxyh authored and beiwei30 committed Dec 11, 2018
1 parent ce4defa commit 9d4b6cb
Show file tree
Hide file tree
Showing 11 changed files with 567 additions and 247 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dubbo.registry.retry;

import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.timer.Timeout;
import org.apache.dubbo.common.timer.Timer;
import org.apache.dubbo.common.timer.TimerTask;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.registry.support.FailbackRegistry;

import java.util.concurrent.TimeUnit;

/**
* AbstractRetryTask
*/
public abstract class AbstractRetryTask implements TimerTask {

protected final Logger logger = LoggerFactory.getLogger(getClass());

/**
* url for retry task
*/
protected final URL url;

/**
* registry for this task
*/
protected final FailbackRegistry registry;

/**
* retry period
*/
protected final long retryPeriod;

/**
* task name for this task
*/
protected final String taskName;

private volatile boolean cancel;

AbstractRetryTask(URL url, FailbackRegistry registry, String taskName) {
if (url == null || StringUtils.isBlank(taskName)) {
throw new IllegalArgumentException();
}
this.url = url;
this.registry = registry;
this.taskName = taskName;
cancel = false;
this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
}

public void cancel() {
cancel = true;
}

public boolean isCancel() {
return cancel;
}

protected void reput(Timeout timeout, long tick) {
if (timeout == null) {
throw new IllegalArgumentException();
}

Timer timer = timeout.timer();
if (timer.isStop() || timeout.isCancelled() || isCancel()) {
return;
}

timer.newTimeout(timeout.task(), tick, TimeUnit.MILLISECONDS);
}

@Override
public void run(Timeout timeout) throws Exception {
if (timeout.isCancelled() || timeout.timer().isStop() || isCancel()) {
// other thread cancel this timeout or stop the timer.
return;
}
if (logger.isInfoEnabled()) {
logger.info(taskName + " : " + url);
}
try {
doRetry(url, registry, timeout);
} catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
logger.warn("Failed to execute task " + taskName + ", url: " + url + ", waiting for again, cause:" + t.getMessage(), t);
// reput this task when catch exception.
reput(timeout, retryPeriod);
}
}

protected abstract void doRetry(URL url, FailbackRegistry registry, Timeout timeout);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dubbo.registry.retry;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.timer.Timeout;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.support.FailbackRegistry;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

/**
* FailedNotifiedTask
*/
public final class FailedNotifiedTask extends AbstractRetryTask {

private static final String NAME = "retry subscribe";

private final NotifyListener listener;

private final List<URL> urls = new CopyOnWriteArrayList<>();

public FailedNotifiedTask(URL url, NotifyListener listener) {
super(url, null, NAME);
if (listener == null) {
throw new IllegalArgumentException();
}
this.listener = listener;
}

public void addUrlToRetry(List<URL> urls) {
if (CollectionUtils.isEmpty(urls)) {
return;
}
this.urls.addAll(urls);
}

public void removeRetryUrl(List<URL> urls) {
this.urls.removeAll(urls);
}

@Override
protected void doRetry(URL url, FailbackRegistry registry, Timeout timeout) {
if (CollectionUtils.isNotEmpty(urls)) {
listener.notify(urls);
urls.clear();
}
reput(timeout, retryPeriod);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dubbo.registry.retry;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.timer.Timeout;
import org.apache.dubbo.registry.support.FailbackRegistry;

/**
* FailedRegisteredTask
*/
public final class FailedRegisteredTask extends AbstractRetryTask {

private static final String NAME = "retry register";

public FailedRegisteredTask(URL url, FailbackRegistry registry) {
super(url, registry, NAME);
}

@Override
protected void doRetry(URL url, FailbackRegistry registry, Timeout timeout) {
registry.doRegister(url);
registry.removeFailedRegisteredTask(url);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dubbo.registry.retry;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.timer.Timeout;
import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.support.FailbackRegistry;

/**
* FailedSubscribedTask
*/
public final class FailedSubscribedTask extends AbstractRetryTask {

private static final String NAME = "retry subscribe";

private final NotifyListener listener;

public FailedSubscribedTask(URL url, FailbackRegistry registry, NotifyListener listener) {
super(url, registry, NAME);
if (listener == null) {
throw new IllegalArgumentException();
}
this.listener = listener;
}

@Override
protected void doRetry(URL url, FailbackRegistry registry, Timeout timeout) {
registry.doSubscribe(url, listener);
registry.removeFailedSubscribedTask(url, listener);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dubbo.registry.retry;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.timer.Timeout;
import org.apache.dubbo.registry.support.FailbackRegistry;

/**
* FailedUnregisteredTask
*/
public final class FailedUnregisteredTask extends AbstractRetryTask {

private static final String NAME = "retry unregister";

public FailedUnregisteredTask(URL url, FailbackRegistry registry) {
super(url, registry, NAME);
}

@Override
protected void doRetry(URL url, FailbackRegistry registry, Timeout timeout) {
registry.doUnregister(url);
registry.removeFailedUnregisteredTask(url);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dubbo.registry.retry;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.timer.Timeout;
import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.support.FailbackRegistry;

/**
* FailedUnsubscribedTask
*/
public final class FailedUnsubscribedTask extends AbstractRetryTask {

private static final String NAME = "retry unsubscribe";

private final NotifyListener listener;

public FailedUnsubscribedTask(URL url, FailbackRegistry registry, NotifyListener listener) {
super(url, registry, NAME);
if (listener == null) {
throw new IllegalArgumentException();
}
this.listener = listener;
}

@Override
protected void doRetry(URL url, FailbackRegistry registry, Timeout timeout) {
registry.unsubscribe(url, listener);
registry.removeFailedUnsubscribedTask(url, listener);
}
}
Loading

0 comments on commit 9d4b6cb

Please sign in to comment.